Skip to main content

vox/highlevel/
mod.rs

1use std::future::IntoFuture;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use vox_core::{
7    ConnectionAcceptor, ConnectionRequest, FromVoxSession, NoopClient, PendingConnection,
8    SessionError, TransportMode, initiator,
9};
10use vox_types::{Link, MaybeSend, MaybeSync, Metadata, metadata_into_owned};
11
12mod error;
13pub use error::ServeError;
14
15#[cfg(feature = "transport-tcp")]
16mod tcp;
17
18#[cfg(feature = "transport-local")]
19mod local;
20
21#[cfg(feature = "transport-websocket")]
22mod ws;
23#[cfg(feature = "transport-websocket")]
24pub use ws::WsListener;
25
26#[cfg(feature = "transport-websocket-tls")]
27mod wss;
28#[cfg(feature = "transport-websocket-tls")]
29pub use wss::WssListener;
30
31mod channel;
32pub use channel::{ChannelListener, ChannelListenerSender};
33
34/// A listener that accepts incoming connections for [`serve_listener()`].
35pub trait VoxListener: MaybeSend + 'static {
36    /// The link type produced by this listener.
37    type Link: Link + MaybeSend + 'static;
38
39    /// Accept the next incoming connection.
40    fn accept(
41        &mut self,
42    ) -> impl std::future::Future<Output = std::io::Result<Self::Link>> + MaybeSend + '_;
43}
44
45/// Connect to a remote vox service, returning a typed client.
46///
47/// The address string determines the transport:
48///
49/// - `tcp://host:port` or bare `host:port` — TCP stream transport
50/// - `local://path` — Unix socket / Windows named pipe
51/// - `ws://host:port/path` — WebSocket transport
52///
53/// # Examples
54///
55/// ```no_run
56/// # #[vox::service]
57/// # trait Hello {
58/// #     async fn say_hello(&self) -> String;
59/// # }
60/// # #[tokio::main(flavor = "current_thread")]
61/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
62/// let client: HelloClient = vox::connect("127.0.0.1:9000").await?;
63/// let reply = client.say_hello().await?;
64/// # Ok(())
65/// # }
66/// ```
67// r[impl rpc.session-setup]
68pub fn connect<Client: FromVoxSession>(
69    addr: impl std::fmt::Display,
70) -> ConnectBuilder<'static, Client> {
71    ConnectBuilder::new(addr.to_string())
72}
73
74enum ConnectAddress {
75    Tcp(String),
76    Local(String),
77    #[cfg(feature = "transport-websocket")]
78    Ws(String),
79}
80
81fn parse_connect_address(addr: String) -> Result<ConnectAddress, SessionError> {
82    let (scheme, host) = match addr.split_once("://") {
83        Some((scheme, host)) => (scheme.to_string(), host.to_string()),
84        None => ("tcp".to_string(), addr),
85    };
86
87    match scheme.as_str() {
88        #[cfg(feature = "transport-tcp")]
89        "tcp" => Ok(ConnectAddress::Tcp(host)),
90        #[cfg(feature = "transport-local")]
91        "local" => Ok(ConnectAddress::Local(host)),
92        #[cfg(feature = "transport-websocket")]
93        "ws" | "wss" => Ok(ConnectAddress::Ws(format!("{scheme}://{host}"))),
94        _ => Err(SessionError::Protocol(format!(
95            "unknown transport scheme: {scheme:?}"
96        ))),
97    }
98}
99
100pub struct ConnectBuilder<'a, Client> {
101    addr: String,
102    metadata: Metadata<'a>,
103    on_connection: Option<Arc<dyn ConnectionAcceptor>>,
104    connect_timeout: Option<Duration>,
105    resumable: bool,
106    wait_for_service: Option<Duration>,
107    _client: std::marker::PhantomData<Client>,
108}
109
110impl<'a, Client> ConnectBuilder<'a, Client> {
111    fn new(addr: String) -> Self {
112        Self {
113            addr,
114            metadata: vec![],
115            on_connection: None,
116            connect_timeout: Some(Duration::from_secs(5)),
117            resumable: false,
118            wait_for_service: None,
119            _client: std::marker::PhantomData,
120        }
121    }
122
123    // r[impl rpc.virtual-connection.accept]
124    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
125        self.on_connection = Some(Arc::new(acceptor));
126        self
127    }
128
129    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
130        self.metadata = metadata;
131        self
132    }
133
134    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
135        self.connect_timeout = Some(timeout);
136        self
137    }
138
139    pub fn resumable(mut self) -> Self {
140        self.resumable = true;
141        self
142    }
143
144    // r[impl session.initial-connect-waiting]
145    /// Wait for the service to become reachable, retrying for up to `timeout`.
146    ///
147    /// Only transient failures (I/O errors, connect timeouts) are retried.
148    /// Protocol errors, schema incompatibilities, and explicit rejections fail
149    /// immediately without retrying.
150    pub fn wait_for_service(mut self, timeout: Duration) -> Self {
151        self.wait_for_service = Some(timeout);
152        self
153    }
154}
155
156const INITIAL_CONNECT_BACKOFF_MIN: Duration = Duration::from_millis(100);
157const INITIAL_CONNECT_BACKOFF_MAX: Duration = Duration::from_secs(5);
158
159impl<'a, Client> ConnectBuilder<'a, Client>
160where
161    Client: FromVoxSession,
162{
163    pub async fn establish(self) -> Result<Client, SessionError> {
164        let ConnectBuilder {
165            addr,
166            metadata,
167            on_connection,
168            connect_timeout,
169            resumable,
170            wait_for_service,
171            _client: _,
172        } = self;
173
174        let parsed = parse_connect_address(addr)?;
175        let metadata = metadata_into_owned(metadata);
176
177        match wait_for_service {
178            // r[impl session.initial-connect-waiting]
179            // r[impl session.initial-connect-waiting.no-session]
180            Some(service_timeout) => {
181                let deadline = Instant::now() + service_timeout;
182                let mut backoff = INITIAL_CONNECT_BACKOFF_MIN;
183
184                loop {
185                    // r[impl session.initial-connect-waiting.timeout]
186                    // Cap each attempt by the remaining waiting budget so a single
187                    // slow attempt cannot exceed the caller-supplied timeout.
188                    let now = Instant::now();
189                    if now >= deadline {
190                        return Err(SessionError::ConnectTimeout);
191                    }
192                    let remaining = deadline - now;
193
194                    let attempt = Self::establish_once(
195                        &parsed,
196                        metadata.clone(),
197                        on_connection.clone(),
198                        connect_timeout,
199                        resumable,
200                    );
201                    let result = match moire::time::timeout(remaining, attempt).await {
202                        Ok(r) => r,
203                        Err(_) => Err(SessionError::ConnectTimeout),
204                    };
205
206                    match result {
207                        Ok(client) => return Ok(client),
208                        // r[impl session.initial-connect-waiting.non-retryable]
209                        Err(e)
210                            if !matches!(e, SessionError::Io(_) | SessionError::ConnectTimeout) =>
211                        {
212                            return Err(e);
213                        }
214                        // r[impl session.initial-connect-waiting.retryable]
215                        // r[impl session.initial-connect-waiting.backoff]
216                        Err(e) => {
217                            let now = Instant::now();
218                            if now >= deadline {
219                                return Err(e);
220                            }
221                            let remaining = deadline - now;
222                            let sleep = backoff.min(remaining);
223                            moire::time::sleep(sleep).await;
224                            backoff = backoff.saturating_mul(2).min(INITIAL_CONNECT_BACKOFF_MAX);
225                        }
226                    }
227                }
228            }
229            None => {
230                Self::establish_once(&parsed, metadata, on_connection, connect_timeout, resumable)
231                    .await
232            }
233        }
234    }
235
236    async fn establish_once(
237        parsed: &ConnectAddress,
238        metadata: vox_types::Metadata<'static>,
239        on_connection: Option<Arc<dyn ConnectionAcceptor>>,
240        connect_timeout: Option<Duration>,
241        resumable: bool,
242    ) -> Result<Client, SessionError> {
243        match parsed {
244            #[cfg(feature = "transport-tcp")]
245            ConnectAddress::Tcp(host) => {
246                let mut builder = initiator(
247                    vox_stream::tcp_link_source(host.clone()),
248                    TransportMode::Bare,
249                );
250                if let Some(acceptor) = on_connection.clone() {
251                    builder = builder.on_connection(AcceptorRef(acceptor));
252                }
253                if let Some(timeout) = connect_timeout {
254                    builder = builder.connect_timeout(timeout);
255                }
256                if resumable {
257                    builder = builder.resumable();
258                }
259                builder.metadata(metadata).establish::<Client>().await
260            }
261            #[cfg(feature = "transport-local")]
262            ConnectAddress::Local(host) => {
263                let mut builder = initiator(
264                    vox_stream::local_link_source(host.clone()),
265                    TransportMode::Bare,
266                );
267                if let Some(acceptor) = on_connection.clone() {
268                    builder = builder.on_connection(AcceptorRef(acceptor));
269                }
270                if let Some(timeout) = connect_timeout {
271                    builder = builder.connect_timeout(timeout);
272                }
273                if resumable {
274                    builder = builder.resumable();
275                }
276                builder.metadata(metadata).establish::<Client>().await
277            }
278            #[cfg(feature = "transport-websocket")]
279            ConnectAddress::Ws(url) => {
280                let mut builder = initiator(
281                    vox_websocket::ws_link_source(url.clone()),
282                    TransportMode::Bare,
283                );
284                if let Some(acceptor) = on_connection {
285                    builder = builder.on_connection(AcceptorRef(acceptor));
286                }
287                if let Some(timeout) = connect_timeout {
288                    builder = builder.connect_timeout(timeout);
289                }
290                if resumable {
291                    builder = builder.resumable();
292                }
293                builder.metadata(metadata).establish::<Client>().await
294            }
295            #[allow(unreachable_patterns)]
296            _ => Err(SessionError::Protocol(
297                "transport not enabled in this vox build".to_string(),
298            )),
299        }
300    }
301}
302
303impl<'a, Client> IntoFuture for ConnectBuilder<'a, Client>
304where
305    Client: FromVoxSession + 'a,
306{
307    type Output = Result<Client, SessionError>;
308    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'a>>;
309
310    fn into_future(self) -> Self::IntoFuture {
311        Box::pin(self.establish())
312    }
313}
314
315/// Serve a vox service by address string, accepting connections in a loop.
316///
317/// The address string determines the transport:
318///
319/// - `tcp://host:port` or bare `host:port` — TCP stream transport
320/// - `local://path` — Unix socket / Windows named pipe
321/// - `ws://host:port` — WebSocket (accepts TCP, upgrades to WS)
322/// - `wss://host:port?cert=/path/to/cert.pem&key=/path/to/key.pem` — WebSocket over TLS
323///
324/// This function runs forever (or until an I/O error occurs). Each incoming
325/// connection is handled in a spawned task.
326///
327/// # Examples
328///
329/// ```no_run
330/// # #[vox::service]
331/// # trait Hello {
332/// #     async fn say_hello(&self) -> String;
333/// # }
334/// # #[derive(Clone)]
335/// # struct HelloService;
336/// # impl Hello for HelloService {
337/// #     async fn say_hello(&self) -> String { "hi".into() }
338/// # }
339/// # #[tokio::main(flavor = "current_thread")]
340/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
341/// vox::serve("0.0.0.0:9000", HelloDispatcher::new(HelloService)).await?;
342/// # Ok(())
343/// # }
344/// ```
345pub async fn serve(
346    addr: impl std::fmt::Display,
347    acceptor: impl ConnectionAcceptor,
348) -> Result<(), ServeError> {
349    let addr = addr.to_string();
350    let (scheme, host) = match addr.split_once("://") {
351        Some((scheme, host)) => (scheme.to_string(), host.to_string()),
352        None => ("tcp".to_string(), addr),
353    };
354
355    match scheme.as_str() {
356        #[cfg(feature = "transport-tcp")]
357        "tcp" => {
358            let listener = tokio::net::TcpListener::bind(&host).await?;
359            Ok(serve_listener(listener, acceptor).await?)
360        }
361        #[cfg(feature = "transport-local")]
362        "local" => local::serve_local(&host, acceptor).await,
363        #[cfg(feature = "transport-websocket")]
364        "ws" => {
365            let listener = WsListener::bind(&host).await?;
366            Ok(serve_listener(listener, acceptor).await?)
367        }
368        #[cfg(feature = "transport-websocket-tls")]
369        "wss" => wss::serve_wss(&host, acceptor).await,
370        _ => Err(ServeError::UnsupportedScheme { scheme }),
371    }
372}
373
374/// Serve a vox service on a pre-bound listener.
375///
376/// Takes a [`VoxListener`] (e.g. `TcpListener`) and a [`ConnectionAcceptor`].
377/// Each incoming connection is handled in a spawned task. Runs until an I/O
378/// error occurs on the listener.
379///
380/// # Examples
381///
382/// ```no_run
383/// # #[vox::service]
384/// # trait Hello {
385/// #     async fn say_hello(&self) -> String;
386/// # }
387/// # #[derive(Clone)]
388/// # struct HelloService;
389/// # impl Hello for HelloService {
390/// #     async fn say_hello(&self) -> String { "hi".into() }
391/// # }
392/// # #[tokio::main(flavor = "current_thread")]
393/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
394/// let listener = tokio::net::TcpListener::bind("0.0.0.0:9000").await?;
395/// vox::serve_listener(listener, HelloDispatcher::new(HelloService)).await?;
396/// # Ok(())
397/// # }
398/// ```
399pub async fn serve_listener<L>(
400    mut listener: L,
401    acceptor: impl ConnectionAcceptor,
402) -> Result<(), SessionError>
403where
404    L: VoxListener,
405    <L::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
406    <L::Link as Link>::Rx: MaybeSend + 'static,
407{
408    let acceptor: Arc<dyn ConnectionAcceptor> = Arc::new(acceptor);
409    loop {
410        let link = listener.accept().await.map_err(SessionError::Io)?;
411        let acceptor = acceptor.clone();
412        moire::spawn(async move {
413            let result = vox_core::acceptor_on(link)
414                .on_connection(AcceptorRef(acceptor))
415                .establish::<NoopClient>()
416                .await;
417            if let Ok(client) = result {
418                client.caller.closed().await;
419            }
420        });
421    }
422}
423
424/// Wrapper that implements `ConnectionAcceptor` by delegating to an `Arc<dyn ConnectionAcceptor>`.
425struct AcceptorRef(Arc<dyn ConnectionAcceptor>);
426
427impl ConnectionAcceptor for AcceptorRef {
428    fn accept(
429        &self,
430        request: &ConnectionRequest,
431        connection: PendingConnection,
432    ) -> Result<(), Metadata<'static>> {
433        self.0.accept(request, connection)
434    }
435}