use std::{any::Any, sync::Arc};
use crate::{multiplex::multiplex_actor, pipe::Pipe, MuxPublic, MuxSecret};
use concurrent_queue::ConcurrentQueue;
use futures_util::TryFutureExt;
use smol::channel::{Receiver, Sender};
use smol_str::SmolStr;
use super::{structs::PipePool, MuxStream};
pub struct Multiplex {
pipe_pool: Arc<PipePool>,
conn_open: Sender<(SmolStr, Sender<MuxStream>)>,
conn_accept: Receiver<MuxStream>,
friends: ConcurrentQueue<Box<dyn Any + Send>>,
_task: smol::Task<()>,
}
fn to_ioerror<T: Into<Box<dyn std::error::Error + Send + Sync>>>(val: T) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::ConnectionReset, val)
}
impl Multiplex {
pub fn new(my_long_sk: MuxSecret, their_long_sk: Option<MuxPublic>) -> Self {
let pipe_pool = Arc::new(PipePool::new(50)); let (conn_open, conn_open_recv) = smol::channel::unbounded();
let (conn_accept_send, conn_accept) = smol::channel::unbounded();
let _task = smolscale::spawn(
multiplex_actor::multiplex(
pipe_pool.clone(),
conn_open_recv,
conn_accept_send,
my_long_sk.0,
their_long_sk.map(|s| s.0),
)
.unwrap_or_else(|e| {
log::debug!("oh no the multiplex actor RETURNED?! {:?}", e);
}),
);
Multiplex {
pipe_pool, conn_open,
conn_accept,
friends: ConcurrentQueue::unbounded(),
_task,
}
}
pub fn add_drop_friend(&self, friend: impl Any + Send) {
self.friends.push(Box::new(friend)).unwrap()
}
pub fn add_pipe(&self, pipe: impl Pipe) {
self.pipe_pool.add_pipe(pipe)
}
pub fn last_send_pipe(&self) -> Option<impl Pipe> {
self.pipe_pool.last_send_pipe()
}
pub fn last_recv_pipe(&self) -> Option<impl Pipe> {
self.pipe_pool.last_recv_pipe()
}
pub fn iter_pipes(&self) -> impl Iterator<Item = impl Pipe> + '_ {
self.pipe_pool.all_pipes().into_iter()
}
pub async fn open_conn(&self, additional: &str) -> std::io::Result<MuxStream> {
let (send, recv) = smol::channel::unbounded();
self.conn_open
.send((additional.into(), send))
.await
.map_err(to_ioerror)?;
if let Ok(s) = recv.recv().await {
Ok(s)
} else {
smol::future::pending().await
}
}
pub async fn accept_conn(&self) -> std::io::Result<MuxStream> {
self.conn_accept.recv().await.map_err(to_ioerror)
}
}