1use std::{future::Future, pin::Pin};
2
3use moire::sync::mpsc;
4use roam_types::{
5 Conduit, ConduitTx, ConnectionSettings, Handler, MaybeSend, MaybeSync, MessageFamily, Metadata,
6 Parity,
7};
8
9use crate::IntoConduit;
10
11use super::{
12 CloseRequest, ConnectionAcceptor, OpenRequest, Session, SessionError, SessionHandle,
13 SessionKeepaliveConfig,
14};
15use crate::{Driver, DriverCaller, DriverReplySink};
16
17#[cfg(not(target_arch = "wasm32"))]
20pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
21#[cfg(target_arch = "wasm32")]
22pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
23
24#[cfg(not(target_arch = "wasm32"))]
25type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
26#[cfg(target_arch = "wasm32")]
27type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
28
29#[cfg(not(target_arch = "wasm32"))]
30fn default_spawn_fn() -> SpawnFn {
31 Box::new(|fut| {
32 tokio::spawn(fut);
33 })
34}
35
36#[cfg(target_arch = "wasm32")]
37fn default_spawn_fn() -> SpawnFn {
38 Box::new(|fut| {
39 wasm_bindgen_futures::spawn_local(fut);
40 })
41}
42
43pub fn initiator<I: IntoConduit>(into_conduit: I) -> SessionInitiatorBuilder<'static, I::Conduit> {
46 SessionInitiatorBuilder::new(into_conduit.into_conduit())
47}
48
49pub fn acceptor<I: IntoConduit>(into_conduit: I) -> SessionAcceptorBuilder<'static, I::Conduit> {
51 SessionAcceptorBuilder::new(into_conduit.into_conduit())
52}
53
54pub struct SessionInitiatorBuilder<'a, C> {
55 conduit: C,
56 root_settings: ConnectionSettings,
57 metadata: Metadata<'a>,
58 on_connection: Option<Box<dyn ConnectionAcceptor>>,
59 keepalive: Option<SessionKeepaliveConfig>,
60 spawn_fn: SpawnFn,
61}
62
63impl<'a, C> SessionInitiatorBuilder<'a, C> {
64 fn new(conduit: C) -> Self {
65 Self {
66 conduit,
67 root_settings: ConnectionSettings {
68 parity: Parity::Odd,
69 max_concurrent_requests: 64,
70 },
71 metadata: vec![],
72 on_connection: None,
73 keepalive: None,
74 spawn_fn: default_spawn_fn(),
75 }
76 }
77
78 pub fn parity(mut self, parity: Parity) -> Self {
79 self.root_settings.parity = parity;
80 self
81 }
82
83 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
84 self.root_settings = settings;
85 self
86 }
87
88 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
89 self.root_settings.max_concurrent_requests = max_concurrent_requests;
90 self
91 }
92
93 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
94 self.metadata = metadata;
95 self
96 }
97
98 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
99 self.on_connection = Some(Box::new(acceptor));
100 self
101 }
102
103 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
104 self.keepalive = Some(keepalive);
105 self
106 }
107
108 #[cfg(not(target_arch = "wasm32"))]
111 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
112 self.spawn_fn = Box::new(f);
113 self
114 }
115
116 #[cfg(target_arch = "wasm32")]
119 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
120 self.spawn_fn = Box::new(f);
121 self
122 }
123
124 pub async fn establish<Client: From<DriverCaller>>(
125 self,
126 handler: impl Handler<DriverReplySink> + 'static,
127 ) -> Result<(Client, SessionHandle), SessionError>
128 where
129 C: Conduit<Msg = MessageFamily> + 'static,
130 C::Tx: MaybeSend + MaybeSync + 'static,
131 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
132 C::Rx: MaybeSend + 'static,
133 {
134 let (tx, rx) = self.conduit.split();
135 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
136 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
137 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
138 let mut session: Session<C> = Session::pre_handshake(
139 tx,
140 rx,
141 self.on_connection,
142 open_rx,
143 close_rx,
144 control_tx.clone(),
145 control_rx,
146 self.keepalive,
147 );
148 let handle = session
149 .establish_as_initiator(self.root_settings, self.metadata)
150 .await?;
151 let session_handle = SessionHandle {
152 open_tx,
153 close_tx,
154 control_tx,
155 };
156 let mut driver = Driver::new(handle, handler);
157 let client = Client::from(driver.caller());
158 (self.spawn_fn)(Box::pin(async move { session.run().await }));
159 #[cfg(not(target_arch = "wasm32"))]
160 tokio::spawn(async move { driver.run().await });
161 #[cfg(target_arch = "wasm32")]
162 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
163 Ok((client, session_handle))
164 }
165}
166
167pub struct SessionAcceptorBuilder<'a, C> {
168 conduit: C,
169 root_settings: ConnectionSettings,
170 metadata: Metadata<'a>,
171 on_connection: Option<Box<dyn ConnectionAcceptor>>,
172 keepalive: Option<SessionKeepaliveConfig>,
173 spawn_fn: SpawnFn,
174}
175
176impl<'a, C> SessionAcceptorBuilder<'a, C> {
177 fn new(conduit: C) -> Self {
178 Self {
179 conduit,
180 root_settings: ConnectionSettings {
181 parity: Parity::Even,
182 max_concurrent_requests: 64,
183 },
184 metadata: vec![],
185 on_connection: None,
186 keepalive: None,
187 spawn_fn: default_spawn_fn(),
188 }
189 }
190
191 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
192 self.root_settings = settings;
193 self
194 }
195
196 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
197 self.root_settings.max_concurrent_requests = max_concurrent_requests;
198 self
199 }
200
201 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
202 self.metadata = metadata;
203 self
204 }
205
206 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
207 self.on_connection = Some(Box::new(acceptor));
208 self
209 }
210
211 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
212 self.keepalive = Some(keepalive);
213 self
214 }
215
216 #[cfg(not(target_arch = "wasm32"))]
219 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
220 self.spawn_fn = Box::new(f);
221 self
222 }
223
224 #[cfg(target_arch = "wasm32")]
227 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
228 self.spawn_fn = Box::new(f);
229 self
230 }
231
232 #[moire::instrument]
233 pub async fn establish<Client: From<DriverCaller>>(
234 self,
235 handler: impl Handler<DriverReplySink> + 'static,
236 ) -> Result<(Client, SessionHandle), SessionError>
237 where
238 C: Conduit<Msg = MessageFamily> + 'static,
239 C::Tx: MaybeSend + MaybeSync + 'static,
240 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
241 C::Rx: MaybeSend + 'static,
242 {
243 let (tx, rx) = self.conduit.split();
244 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
245 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
246 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
247 let mut session: Session<C> = Session::pre_handshake(
248 tx,
249 rx,
250 self.on_connection,
251 open_rx,
252 close_rx,
253 control_tx.clone(),
254 control_rx,
255 self.keepalive,
256 );
257 let handle = session
258 .establish_as_acceptor(self.root_settings, self.metadata)
259 .await?;
260 let session_handle = SessionHandle {
261 open_tx,
262 close_tx,
263 control_tx,
264 };
265 let mut driver = Driver::new(handle, handler);
266 let client = Client::from(driver.caller());
267 (self.spawn_fn)(Box::pin(async move { session.run().await }));
268 #[cfg(not(target_arch = "wasm32"))]
269 tokio::spawn(async move { driver.run().await });
270 #[cfg(target_arch = "wasm32")]
271 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
272 Ok((client, session_handle))
273 }
274}