Skip to main content

roam_core/session/
builders.rs

1use std::{future::Future, pin::Pin};
2
3use moire::sync::mpsc;
4use roam_types::{
5    Conduit, ConduitTx, ConnectionSettings, Handler, MaybeSend, MaybeSync, MessageFamily, Metadata,
6    Parity,
7};
8
9use crate::IntoConduit;
10
11use super::{
12    CloseRequest, ConnectionAcceptor, OpenRequest, Session, SessionError, SessionHandle,
13    SessionKeepaliveConfig,
14};
15use crate::{Driver, DriverCaller, DriverReplySink};
16
17/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
18/// on WASM it's `'static` only (no `Send` requirement).
19#[cfg(not(target_arch = "wasm32"))]
20pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
21#[cfg(target_arch = "wasm32")]
22pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
23
24#[cfg(not(target_arch = "wasm32"))]
25type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
26#[cfg(target_arch = "wasm32")]
27type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
28
29#[cfg(not(target_arch = "wasm32"))]
30fn default_spawn_fn() -> SpawnFn {
31    Box::new(|fut| {
32        tokio::spawn(fut);
33    })
34}
35
36#[cfg(target_arch = "wasm32")]
37fn default_spawn_fn() -> SpawnFn {
38    Box::new(|fut| {
39        wasm_bindgen_futures::spawn_local(fut);
40    })
41}
42
43// r[impl rpc.session-setup]
44// r[impl session.role]
45pub fn initiator<I: IntoConduit>(into_conduit: I) -> SessionInitiatorBuilder<'static, I::Conduit> {
46    SessionInitiatorBuilder::new(into_conduit.into_conduit())
47}
48
49// r[impl session.role]
50pub fn acceptor<I: IntoConduit>(into_conduit: I) -> SessionAcceptorBuilder<'static, I::Conduit> {
51    SessionAcceptorBuilder::new(into_conduit.into_conduit())
52}
53
54pub struct SessionInitiatorBuilder<'a, C> {
55    conduit: C,
56    root_settings: ConnectionSettings,
57    metadata: Metadata<'a>,
58    on_connection: Option<Box<dyn ConnectionAcceptor>>,
59    keepalive: Option<SessionKeepaliveConfig>,
60    spawn_fn: SpawnFn,
61}
62
63impl<'a, C> SessionInitiatorBuilder<'a, C> {
64    fn new(conduit: C) -> Self {
65        Self {
66            conduit,
67            root_settings: ConnectionSettings {
68                parity: Parity::Odd,
69                max_concurrent_requests: 64,
70            },
71            metadata: vec![],
72            on_connection: None,
73            keepalive: None,
74            spawn_fn: default_spawn_fn(),
75        }
76    }
77
78    pub fn parity(mut self, parity: Parity) -> Self {
79        self.root_settings.parity = parity;
80        self
81    }
82
83    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
84        self.root_settings = settings;
85        self
86    }
87
88    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
89        self.root_settings.max_concurrent_requests = max_concurrent_requests;
90        self
91    }
92
93    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
94        self.metadata = metadata;
95        self
96    }
97
98    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
99        self.on_connection = Some(Box::new(acceptor));
100        self
101    }
102
103    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
104        self.keepalive = Some(keepalive);
105        self
106    }
107
108    /// Override the function used to spawn the session background task.
109    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
110    #[cfg(not(target_arch = "wasm32"))]
111    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
112        self.spawn_fn = Box::new(f);
113        self
114    }
115
116    /// Override the function used to spawn the session background task.
117    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
118    #[cfg(target_arch = "wasm32")]
119    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
120        self.spawn_fn = Box::new(f);
121        self
122    }
123
124    pub async fn establish<Client: From<DriverCaller>>(
125        self,
126        handler: impl Handler<DriverReplySink> + 'static,
127    ) -> Result<(Client, SessionHandle), SessionError>
128    where
129        C: Conduit<Msg = MessageFamily> + 'static,
130        C::Tx: MaybeSend + MaybeSync + 'static,
131        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
132        C::Rx: MaybeSend + 'static,
133    {
134        let (tx, rx) = self.conduit.split();
135        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
136        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
137        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
138        let mut session: Session<C> = Session::pre_handshake(
139            tx,
140            rx,
141            self.on_connection,
142            open_rx,
143            close_rx,
144            control_tx.clone(),
145            control_rx,
146            self.keepalive,
147        );
148        let handle = session
149            .establish_as_initiator(self.root_settings, self.metadata)
150            .await?;
151        let session_handle = SessionHandle {
152            open_tx,
153            close_tx,
154            control_tx,
155        };
156        let mut driver = Driver::new(handle, handler);
157        let client = Client::from(driver.caller());
158        (self.spawn_fn)(Box::pin(async move { session.run().await }));
159        #[cfg(not(target_arch = "wasm32"))]
160        tokio::spawn(async move { driver.run().await });
161        #[cfg(target_arch = "wasm32")]
162        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
163        Ok((client, session_handle))
164    }
165}
166
167pub struct SessionAcceptorBuilder<'a, C> {
168    conduit: C,
169    root_settings: ConnectionSettings,
170    metadata: Metadata<'a>,
171    on_connection: Option<Box<dyn ConnectionAcceptor>>,
172    keepalive: Option<SessionKeepaliveConfig>,
173    spawn_fn: SpawnFn,
174}
175
176impl<'a, C> SessionAcceptorBuilder<'a, C> {
177    fn new(conduit: C) -> Self {
178        Self {
179            conduit,
180            root_settings: ConnectionSettings {
181                parity: Parity::Even,
182                max_concurrent_requests: 64,
183            },
184            metadata: vec![],
185            on_connection: None,
186            keepalive: None,
187            spawn_fn: default_spawn_fn(),
188        }
189    }
190
191    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
192        self.root_settings = settings;
193        self
194    }
195
196    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
197        self.root_settings.max_concurrent_requests = max_concurrent_requests;
198        self
199    }
200
201    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
202        self.metadata = metadata;
203        self
204    }
205
206    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
207        self.on_connection = Some(Box::new(acceptor));
208        self
209    }
210
211    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
212        self.keepalive = Some(keepalive);
213        self
214    }
215
216    /// Override the function used to spawn the session background task.
217    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
218    #[cfg(not(target_arch = "wasm32"))]
219    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
220        self.spawn_fn = Box::new(f);
221        self
222    }
223
224    /// Override the function used to spawn the session background task.
225    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
226    #[cfg(target_arch = "wasm32")]
227    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
228        self.spawn_fn = Box::new(f);
229        self
230    }
231
232    #[moire::instrument]
233    pub async fn establish<Client: From<DriverCaller>>(
234        self,
235        handler: impl Handler<DriverReplySink> + 'static,
236    ) -> Result<(Client, SessionHandle), SessionError>
237    where
238        C: Conduit<Msg = MessageFamily> + 'static,
239        C::Tx: MaybeSend + MaybeSync + 'static,
240        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
241        C::Rx: MaybeSend + 'static,
242    {
243        let (tx, rx) = self.conduit.split();
244        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
245        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
246        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
247        let mut session: Session<C> = Session::pre_handshake(
248            tx,
249            rx,
250            self.on_connection,
251            open_rx,
252            close_rx,
253            control_tx.clone(),
254            control_rx,
255            self.keepalive,
256        );
257        let handle = session
258            .establish_as_acceptor(self.root_settings, self.metadata)
259            .await?;
260        let session_handle = SessionHandle {
261            open_tx,
262            close_tx,
263            control_tx,
264        };
265        let mut driver = Driver::new(handle, handler);
266        let client = Client::from(driver.caller());
267        (self.spawn_fn)(Box::pin(async move { session.run().await }));
268        #[cfg(not(target_arch = "wasm32"))]
269        tokio::spawn(async move { driver.run().await });
270        #[cfg(target_arch = "wasm32")]
271        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
272        Ok((client, session_handle))
273    }
274}