1use std::future::IntoFuture;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use vox_core::{
7 ConnectionAcceptor, ConnectionRequest, FromVoxSession, NoopClient, PendingConnection,
8 SessionError, TransportMode, initiator,
9};
10use vox_types::{Link, MaybeSend, MaybeSync, Metadata, metadata_into_owned};
11
12mod error;
13pub use error::ServeError;
14
15#[cfg(feature = "transport-tcp")]
16mod tcp;
17
18#[cfg(feature = "transport-local")]
19mod local;
20
21#[cfg(feature = "transport-websocket")]
22mod ws;
23#[cfg(feature = "transport-websocket")]
24pub use ws::WsListener;
25
26#[cfg(feature = "transport-websocket-tls")]
27mod wss;
28#[cfg(feature = "transport-websocket-tls")]
29pub use wss::WssListener;
30
31mod channel;
32pub use channel::{ChannelListener, ChannelListenerSender};
33
34pub trait VoxListener: MaybeSend + 'static {
36 type Link: Link + MaybeSend + 'static;
38
39 fn accept(
41 &mut self,
42 ) -> impl std::future::Future<Output = std::io::Result<Self::Link>> + MaybeSend + '_;
43}
44
45pub fn connect<Client: FromVoxSession>(
69 addr: impl std::fmt::Display,
70) -> ConnectBuilder<'static, Client> {
71 ConnectBuilder::new(addr.to_string())
72}
73
74enum ConnectAddress {
75 Tcp(String),
76 Local(String),
77 #[cfg(feature = "transport-websocket")]
78 Ws(String),
79}
80
81fn parse_connect_address(addr: String) -> Result<ConnectAddress, SessionError> {
82 let (scheme, host) = match addr.split_once("://") {
83 Some((scheme, host)) => (scheme.to_string(), host.to_string()),
84 None => ("tcp".to_string(), addr),
85 };
86
87 match scheme.as_str() {
88 #[cfg(feature = "transport-tcp")]
89 "tcp" => Ok(ConnectAddress::Tcp(host)),
90 #[cfg(feature = "transport-local")]
91 "local" => Ok(ConnectAddress::Local(host)),
92 #[cfg(feature = "transport-websocket")]
93 "ws" | "wss" => Ok(ConnectAddress::Ws(format!("{scheme}://{host}"))),
94 _ => Err(SessionError::Protocol(format!(
95 "unknown transport scheme: {scheme:?}"
96 ))),
97 }
98}
99
100pub struct ConnectBuilder<'a, Client> {
101 addr: String,
102 metadata: Metadata<'a>,
103 on_connection: Option<Arc<dyn ConnectionAcceptor>>,
104 connect_timeout: Option<Duration>,
105 resumable: bool,
106 wait_for_service: Option<Duration>,
107 _client: std::marker::PhantomData<Client>,
108}
109
110impl<'a, Client> ConnectBuilder<'a, Client> {
111 fn new(addr: String) -> Self {
112 Self {
113 addr,
114 metadata: vec![],
115 on_connection: None,
116 connect_timeout: Some(Duration::from_secs(5)),
117 resumable: false,
118 wait_for_service: None,
119 _client: std::marker::PhantomData,
120 }
121 }
122
123 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
125 self.on_connection = Some(Arc::new(acceptor));
126 self
127 }
128
129 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
130 self.metadata = metadata;
131 self
132 }
133
134 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
135 self.connect_timeout = Some(timeout);
136 self
137 }
138
139 pub fn resumable(mut self) -> Self {
140 self.resumable = true;
141 self
142 }
143
144 pub fn wait_for_service(mut self, timeout: Duration) -> Self {
151 self.wait_for_service = Some(timeout);
152 self
153 }
154}
155
156const INITIAL_CONNECT_BACKOFF_MIN: Duration = Duration::from_millis(100);
157const INITIAL_CONNECT_BACKOFF_MAX: Duration = Duration::from_secs(5);
158
159impl<'a, Client> ConnectBuilder<'a, Client>
160where
161 Client: FromVoxSession,
162{
163 pub async fn establish(self) -> Result<Client, SessionError> {
164 let ConnectBuilder {
165 addr,
166 metadata,
167 on_connection,
168 connect_timeout,
169 resumable,
170 wait_for_service,
171 _client: _,
172 } = self;
173
174 let parsed = parse_connect_address(addr)?;
175 let metadata = metadata_into_owned(metadata);
176
177 match wait_for_service {
178 Some(service_timeout) => {
181 let deadline = Instant::now() + service_timeout;
182 let mut backoff = INITIAL_CONNECT_BACKOFF_MIN;
183
184 loop {
185 let now = Instant::now();
189 if now >= deadline {
190 return Err(SessionError::ConnectTimeout);
191 }
192 let remaining = deadline - now;
193
194 let attempt = Self::establish_once(
195 &parsed,
196 metadata.clone(),
197 on_connection.clone(),
198 connect_timeout,
199 resumable,
200 );
201 let result = match moire::time::timeout(remaining, attempt).await {
202 Ok(r) => r,
203 Err(_) => Err(SessionError::ConnectTimeout),
204 };
205
206 match result {
207 Ok(client) => return Ok(client),
208 Err(e)
210 if !matches!(e, SessionError::Io(_) | SessionError::ConnectTimeout) =>
211 {
212 return Err(e);
213 }
214 Err(e) => {
217 let now = Instant::now();
218 if now >= deadline {
219 return Err(e);
220 }
221 let remaining = deadline - now;
222 let sleep = backoff.min(remaining);
223 moire::time::sleep(sleep).await;
224 backoff = backoff.saturating_mul(2).min(INITIAL_CONNECT_BACKOFF_MAX);
225 }
226 }
227 }
228 }
229 None => {
230 Self::establish_once(&parsed, metadata, on_connection, connect_timeout, resumable)
231 .await
232 }
233 }
234 }
235
236 async fn establish_once(
237 parsed: &ConnectAddress,
238 metadata: vox_types::Metadata<'static>,
239 on_connection: Option<Arc<dyn ConnectionAcceptor>>,
240 connect_timeout: Option<Duration>,
241 resumable: bool,
242 ) -> Result<Client, SessionError> {
243 match parsed {
244 #[cfg(feature = "transport-tcp")]
245 ConnectAddress::Tcp(host) => {
246 let mut builder = initiator(
247 vox_stream::tcp_link_source(host.clone()),
248 TransportMode::Bare,
249 );
250 if let Some(acceptor) = on_connection.clone() {
251 builder = builder.on_connection(AcceptorRef(acceptor));
252 }
253 if let Some(timeout) = connect_timeout {
254 builder = builder.connect_timeout(timeout);
255 }
256 if resumable {
257 builder = builder.resumable();
258 }
259 builder.metadata(metadata).establish::<Client>().await
260 }
261 #[cfg(feature = "transport-local")]
262 ConnectAddress::Local(host) => {
263 let mut builder = initiator(
264 vox_stream::local_link_source(host.clone()),
265 TransportMode::Bare,
266 );
267 if let Some(acceptor) = on_connection.clone() {
268 builder = builder.on_connection(AcceptorRef(acceptor));
269 }
270 if let Some(timeout) = connect_timeout {
271 builder = builder.connect_timeout(timeout);
272 }
273 if resumable {
274 builder = builder.resumable();
275 }
276 builder.metadata(metadata).establish::<Client>().await
277 }
278 #[cfg(feature = "transport-websocket")]
279 ConnectAddress::Ws(url) => {
280 let mut builder = initiator(
281 vox_websocket::ws_link_source(url.clone()),
282 TransportMode::Bare,
283 );
284 if let Some(acceptor) = on_connection {
285 builder = builder.on_connection(AcceptorRef(acceptor));
286 }
287 if let Some(timeout) = connect_timeout {
288 builder = builder.connect_timeout(timeout);
289 }
290 if resumable {
291 builder = builder.resumable();
292 }
293 builder.metadata(metadata).establish::<Client>().await
294 }
295 #[allow(unreachable_patterns)]
296 _ => Err(SessionError::Protocol(
297 "transport not enabled in this vox build".to_string(),
298 )),
299 }
300 }
301}
302
303impl<'a, Client> IntoFuture for ConnectBuilder<'a, Client>
304where
305 Client: FromVoxSession + 'a,
306{
307 type Output = Result<Client, SessionError>;
308 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'a>>;
309
310 fn into_future(self) -> Self::IntoFuture {
311 Box::pin(self.establish())
312 }
313}
314
315pub async fn serve(
346 addr: impl std::fmt::Display,
347 acceptor: impl ConnectionAcceptor,
348) -> Result<(), ServeError> {
349 let addr = addr.to_string();
350 let (scheme, host) = match addr.split_once("://") {
351 Some((scheme, host)) => (scheme.to_string(), host.to_string()),
352 None => ("tcp".to_string(), addr),
353 };
354
355 match scheme.as_str() {
356 #[cfg(feature = "transport-tcp")]
357 "tcp" => {
358 let listener = tokio::net::TcpListener::bind(&host).await?;
359 Ok(serve_listener(listener, acceptor).await?)
360 }
361 #[cfg(feature = "transport-local")]
362 "local" => local::serve_local(&host, acceptor).await,
363 #[cfg(feature = "transport-websocket")]
364 "ws" => {
365 let listener = WsListener::bind(&host).await?;
366 Ok(serve_listener(listener, acceptor).await?)
367 }
368 #[cfg(feature = "transport-websocket-tls")]
369 "wss" => wss::serve_wss(&host, acceptor).await,
370 _ => Err(ServeError::UnsupportedScheme { scheme }),
371 }
372}
373
374pub async fn serve_listener<L>(
400 mut listener: L,
401 acceptor: impl ConnectionAcceptor,
402) -> Result<(), SessionError>
403where
404 L: VoxListener,
405 <L::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
406 <L::Link as Link>::Rx: MaybeSend + 'static,
407{
408 let acceptor: Arc<dyn ConnectionAcceptor> = Arc::new(acceptor);
409 loop {
410 let link = listener.accept().await.map_err(SessionError::Io)?;
411 let acceptor = acceptor.clone();
412 moire::spawn(async move {
413 let result = vox_core::acceptor_on(link)
414 .on_connection(AcceptorRef(acceptor))
415 .establish::<NoopClient>()
416 .await;
417 if let Ok(client) = result {
418 client.caller.closed().await;
419 }
420 });
421 }
422}
423
424struct AcceptorRef(Arc<dyn ConnectionAcceptor>);
426
427impl ConnectionAcceptor for AcceptorRef {
428 fn accept(
429 &self,
430 request: &ConnectionRequest,
431 connection: PendingConnection,
432 ) -> Result<(), Metadata<'static>> {
433 self.0.accept(request, connection)
434 }
435}