1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use crate::{buffer::Buff, runtime, Session};
use smol::channel::{Receiver, Sender};
use std::sync::Arc;
mod congestion;
mod multiplex_actor;
pub mod pkt_trace;
mod relconn;
mod structs;
pub use relconn::RelConn;
pub struct Multiplex {
urel_send: Sender<Buff>,
urel_recv: Receiver<Buff>,
conn_open: Sender<(Option<String>, Sender<RelConn>)>,
conn_accept: Receiver<RelConn>,
send_session: Sender<Arc<Session>>,
_task: smol::Task<()>,
}
fn to_ioerror<T: Into<Box<dyn std::error::Error + Send + Sync>>>(val: T) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::ConnectionReset, val)
}
impl Multiplex {
pub fn new(session: Session) -> Self {
let (send_session, recv_session) = smol::channel::unbounded();
let (urel_send, urel_send_recv) = smol::channel::bounded(1024);
let (urel_recv_send, urel_recv) = smol::channel::unbounded();
let (conn_open, conn_open_recv) = smol::channel::unbounded();
let (conn_accept_send, conn_accept) = smol::channel::bounded(100);
let session = Arc::new(session);
send_session.try_send(session).unwrap();
let _task = runtime::spawn(async move {
let retval = multiplex_actor::multiplex(
recv_session,
urel_send_recv,
urel_recv_send,
conn_open_recv,
conn_accept_send,
)
.await;
tracing::debug!("multiplex actor returned {:?}", retval);
});
Multiplex {
urel_send,
urel_recv,
conn_open,
conn_accept,
send_session,
_task,
}
}
#[tracing::instrument(skip(self, msg), level = "trace")]
pub async fn send_urel(&self, msg: impl Into<Buff>) -> std::io::Result<()> {
self.urel_send.send(msg.into()).await.map_err(to_ioerror)
}
#[tracing::instrument(skip(self), level = "trace")]
pub async fn recv_urel(&self) -> std::io::Result<Buff> {
self.urel_recv.recv().await.map_err(to_ioerror)
}
pub fn try_recv_urel(&self) -> std::io::Result<Buff> {
self.urel_recv.try_recv().map_err(to_ioerror)
}
pub async fn replace_session(&self, sess: Session) {
let sess = Arc::new(sess);
let _ = self.send_session.try_send(sess);
}
pub async fn open_conn(&self, additional: Option<String>) -> std::io::Result<RelConn> {
let (send, recv) = smol::channel::unbounded();
self.conn_open
.send((additional.clone(), send))
.await
.map_err(to_ioerror)?;
if let Ok(s) = recv.recv().await {
Ok(s)
} else {
smol::future::pending().await
}
}
pub async fn accept_conn(&self) -> std::io::Result<RelConn> {
self.conn_accept.recv().await.map_err(to_ioerror)
}
}