Skip to main content

sim_lib_server/
transport.rs

1use std::{sync::Arc, thread, time::Duration};
2
3use sim_kernel::{CapabilityName, Cx, Error, Result, Symbol};
4
5use crate::{
6    EvalSite, Server, ServerAddress, ServerRuntime, ThreadMode, pool::default_worker_pool,
7};
8
9mod backends;
10mod framing;
11#[cfg(feature = "server-net-http")]
12mod http_transport;
13mod site;
14mod socket;
15#[cfg(feature = "server-net-http")]
16mod sse_transport;
17#[cfg(test)]
18mod tests;
19#[cfg(feature = "server-net-http")]
20mod ws_transport;
21
22pub use backends::{
23    LocalTransport, LoopbackTransportEndpoint, RegistryTransport, WasmConnectionTransport,
24};
25pub use framing::{decode_transport_frame, encode_transport_frame};
26#[cfg(feature = "server-net-http")]
27pub use http_transport::{HttpConnectionTransport, HttpServerTransport};
28pub use site::TransportEvalSite;
29pub use socket::{TcpConnectionTransport, TcpServerTransport};
30#[cfg(unix)]
31#[allow(unused_imports)]
32pub use socket::{UnixConnectionTransport, UnixServerTransport};
33#[cfg(feature = "server-net-http")]
34pub use sse_transport::{SseConnectionTransport, SseServerTransport};
35#[cfg(feature = "server-net-http")]
36pub use ws_transport::{WsConnectionTransport, WsServerTransport};
37
38pub(crate) use backends::TransportEndpoint;
39use backends::{has_registered_endpoint, register_endpoint, unregister_endpoint};
40use framing::{
41    answer_or_negotiate, error_frame_from_error, io_to_host, is_timeout, read_frame_from,
42    route_frame_bytes, update_negotiated_codec_from_reply, write_frame_to,
43};
44
45pub(crate) const MAX_TRANSPORT_FRAME_BYTES: usize = 8 * 1024 * 1024;
46pub(crate) const SERVER_CONNECTION_IO_TIMEOUT_MS: u64 = 250;
47pub(crate) const DEFAULT_MAX_INFLIGHT_FRAMES: usize = 8;
48pub(crate) const NETWORK_CAPABILITY: &str = "network";
49pub(crate) const WEBHOOK_SERVE_CAPABILITY: &str = "webhook-serve";
50#[cfg(feature = "server-net-http")]
51pub(crate) const HTTP_TRANSPORT_PATH: &str = "/sim/frame";
52#[cfg(feature = "server-net-http")]
53pub(crate) const SSE_TRANSPORT_PATH: &str = "/sim/stream";
54#[cfg(feature = "server-net-http")]
55pub(crate) const WS_TRANSPORT_PATH: &str = "/sim/ws";
56
57/// Listening side of a transport: binds an address and accepts connections.
58pub trait ServerTransport: Send + Sync {
59    /// Returns the address this transport is bound to.
60    fn address(&self) -> &ServerAddress;
61    /// Blocks until a connection arrives and returns it.
62    fn accept(&self, cx: &mut Cx) -> Result<Box<dyn ConnectionTransport>>;
63    /// Shuts down the listener and releases its resources.
64    fn shutdown(&self, cx: &mut Cx) -> Result<()>;
65
66    /// Accepts a connection, returning `None` if `timeout` elapses first.
67    fn accept_timeout(
68        &self,
69        cx: &mut Cx,
70        timeout: Duration,
71    ) -> Result<Option<Box<dyn ConnectionTransport>>>;
72}
73
74/// One open connection over which server frames are sent and received.
75pub trait ConnectionTransport: Send + Sync {
76    /// Sends one frame over the connection.
77    fn send_frame(&mut self, cx: &mut Cx, frame: crate::ServerFrame) -> Result<()>;
78    /// Receives one frame, returning `None` on timeout or end of stream.
79    fn recv_frame(
80        &mut self,
81        cx: &mut Cx,
82        timeout: Option<Duration>,
83    ) -> Result<Option<crate::ServerFrame>>;
84    /// Closes the connection.
85    fn close(&mut self, cx: &mut Cx) -> Result<()>;
86    /// Returns this connection as `Any` for downcasting.
87    fn as_any(&self) -> &dyn std::any::Any;
88
89    /// Serves the connection server-side against `site`.
90    ///
91    /// The default implementation errors; transports that support server-side
92    /// serving override it.
93    fn serve_connection(
94        &mut self,
95        _runtime: &Arc<ServerRuntime>,
96        _site: &Arc<dyn EvalSite>,
97    ) -> Result<()> {
98        Err(Error::Eval(
99            "transport does not support server-side serving".to_owned(),
100        ))
101    }
102}
103
104pub fn start_server_transport(server: &Server) -> Result<()> {
105    if !server.address().transport_available() {
106        return Err(Error::Eval(format!(
107            "no transport for address kind {}",
108            server.address().kind_symbol()
109        )));
110    }
111    match server.address() {
112        ServerAddress::Local | ServerAddress::Any => Ok(()),
113        ServerAddress::Tcp { .. }
114        | ServerAddress::Unix { .. }
115        | ServerAddress::Http { .. }
116        | ServerAddress::Sse { .. }
117        | ServerAddress::Ws { .. } => {
118            let Some(runtime) = server.runtime().cloned() else {
119                return Ok(());
120            };
121            register_endpoint(TransportEndpoint {
122                address: server.address().clone(),
123                site: server.site().clone(),
124            })?;
125            let site = server.site().clone();
126            match server.thread() {
127                ThreadMode::Main => {
128                    run_accept_loop(runtime, site);
129                    Ok(())
130                }
131                ThreadMode::Coroutine(_) => Ok(()),
132                ThreadMode::Coop | ThreadMode::Spawn | ThreadMode::Pool => {
133                    let accept_runtime = runtime.clone();
134                    let handle = thread::spawn(move || run_accept_loop(accept_runtime, site));
135                    runtime.set_accept_thread(handle)
136                }
137            }
138        }
139        ServerAddress::Wasm { region } => {
140            let _ = crate::wasm::lookup_wasm_region(region)?;
141            Ok(())
142        }
143        _ => register_endpoint(TransportEndpoint {
144            address: server.address().clone(),
145            site: server.site().clone(),
146        }),
147    }
148}
149
150pub fn shutdown_server_transport(server: &Server) -> Result<()> {
151    match server.address() {
152        ServerAddress::Local | ServerAddress::Any => Ok(()),
153        ServerAddress::Tcp { .. }
154        | ServerAddress::Unix { .. }
155        | ServerAddress::Http { .. }
156        | ServerAddress::Sse { .. }
157        | ServerAddress::Ws { .. } => {
158            if let Some(runtime) = server.runtime() {
159                runtime.begin_stop();
160                runtime.join_accept_thread()?;
161                runtime.join_worker_threads()?;
162                runtime.with_cx(|cx| runtime.transport().shutdown(cx))?;
163                runtime.clear_sessions()?;
164            }
165            unregister_endpoint(server.address())?;
166            Ok(())
167        }
168        ServerAddress::Wasm { .. } => Ok(()),
169        _ => unregister_endpoint(server.address()),
170    }
171}
172
173pub fn require_start_capabilities(cx: &Cx, address: &ServerAddress) -> Result<()> {
174    match address {
175        ServerAddress::Tcp { .. } | ServerAddress::Unix { .. } => {
176            cx.require(&CapabilityName::new(NETWORK_CAPABILITY))
177        }
178        ServerAddress::Http { .. } | ServerAddress::Sse { .. } | ServerAddress::Ws { .. } => {
179            cx.require(&CapabilityName::new(NETWORK_CAPABILITY))?;
180            cx.require(&CapabilityName::new(WEBHOOK_SERVE_CAPABILITY))
181        }
182        _ => Ok(()),
183    }
184}
185
186pub fn require_connect_capabilities(cx: &Cx, address: &ServerAddress) -> Result<()> {
187    match address {
188        ServerAddress::Tcp { .. }
189        | ServerAddress::Unix { .. }
190        | ServerAddress::Http { .. }
191        | ServerAddress::Sse { .. }
192        | ServerAddress::Ws { .. } => cx.require(&CapabilityName::new(NETWORK_CAPABILITY)),
193        _ => Ok(()),
194    }
195}
196
197/// Connects to `address` and returns the eval site plus negotiated codec.
198///
199/// Loopback fallback is disabled; see
200/// [`connect_transport_site_with_loopback`].
201pub fn connect_transport_site(
202    cx: &mut Cx,
203    address: ServerAddress,
204    offered_codecs: Vec<Symbol>,
205) -> Result<(Arc<dyn EvalSite>, Symbol)> {
206    connect_transport_site_with_loopback(cx, address, offered_codecs, false)
207}
208
209/// Connects to `address`, returning the eval site plus negotiated codec.
210///
211/// When `allow_loopback` is set, a connection that fails may fall back to a
212/// registered in-process endpoint for the same address.
213pub fn connect_transport_site_with_loopback(
214    cx: &mut Cx,
215    address: ServerAddress,
216    offered_codecs: Vec<Symbol>,
217    allow_loopback: bool,
218) -> Result<(Arc<dyn EvalSite>, Symbol)> {
219    require_connect_capabilities(cx, &address)?;
220    TransportEvalSite::connect_with_loopback(cx, address, offered_codecs, allow_loopback)
221}
222
223/// Registers `site` as the loopback endpoint for `address`.
224///
225/// The returned [`LoopbackTransportEndpoint`] unregisters the endpoint when
226/// dropped.
227pub fn register_loopback_transport_endpoint(
228    address: ServerAddress,
229    site: Arc<dyn EvalSite>,
230) -> Result<LoopbackTransportEndpoint> {
231    backends::register_loopback_endpoint(address, site)
232}
233
234#[cfg(unix)]
235fn open_unix_connection_transport(
236    address: &ServerAddress,
237    allow_loopback: bool,
238) -> Result<Box<dyn ConnectionTransport>> {
239    match socket::UnixConnectionTransport::connect(address) {
240        Ok(transport) => Ok(Box::new(transport)),
241        Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
242            Ok(Box::new(RegistryTransport::new(address.clone())))
243        }
244        Err(error) => Err(error),
245    }
246}
247
248#[cfg(not(unix))]
249fn open_unix_connection_transport(
250    _address: &ServerAddress,
251    _allow_loopback: bool,
252) -> Result<Box<dyn ConnectionTransport>> {
253    Err(Error::Eval(
254        "unix sockets are not available on this target".to_owned(),
255    ))
256}
257
258#[cfg(feature = "server-net-http")]
259fn open_http_connection_transport(
260    address: &ServerAddress,
261    allow_loopback: bool,
262) -> Result<Box<dyn ConnectionTransport>> {
263    match HttpConnectionTransport::connect(address) {
264        Ok(transport) => Ok(Box::new(transport)),
265        Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
266            Ok(Box::new(RegistryTransport::new(address.clone())))
267        }
268        Err(error) => Err(error),
269    }
270}
271
272#[cfg(not(feature = "server-net-http"))]
273fn open_http_connection_transport(
274    _address: &ServerAddress,
275    _allow_loopback: bool,
276) -> Result<Box<dyn ConnectionTransport>> {
277    Err(http_transport_disabled_error())
278}
279
280#[cfg(feature = "server-net-http")]
281fn open_sse_connection_transport(
282    address: &ServerAddress,
283    allow_loopback: bool,
284) -> Result<Box<dyn ConnectionTransport>> {
285    match SseConnectionTransport::connect(address) {
286        Ok(transport) => Ok(Box::new(transport)),
287        Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
288            Ok(Box::new(RegistryTransport::new(address.clone())))
289        }
290        Err(error) => Err(error),
291    }
292}
293
294#[cfg(not(feature = "server-net-http"))]
295fn open_sse_connection_transport(
296    _address: &ServerAddress,
297    _allow_loopback: bool,
298) -> Result<Box<dyn ConnectionTransport>> {
299    Err(http_transport_disabled_error())
300}
301
302#[cfg(feature = "server-net-http")]
303fn open_ws_connection_transport(
304    address: &ServerAddress,
305    allow_loopback: bool,
306) -> Result<Box<dyn ConnectionTransport>> {
307    match WsConnectionTransport::connect(address) {
308        Ok(transport) => Ok(Box::new(transport)),
309        Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
310            Ok(Box::new(RegistryTransport::new(address.clone())))
311        }
312        Err(error) => Err(error),
313    }
314}
315
316#[cfg(not(feature = "server-net-http"))]
317fn open_ws_connection_transport(
318    _address: &ServerAddress,
319    _allow_loopback: bool,
320) -> Result<Box<dyn ConnectionTransport>> {
321    Err(http_transport_disabled_error())
322}
323
324fn open_connection_transport(
325    address: &ServerAddress,
326    allow_loopback: bool,
327) -> Result<Box<dyn ConnectionTransport>> {
328    match address {
329        ServerAddress::Local | ServerAddress::Any => Err(Error::Eval(
330            "local addresses require a direct site or server value".to_owned(),
331        )),
332        ServerAddress::InProcess { .. } | ServerAddress::Coroutine { .. } => {
333            Ok(Box::new(RegistryTransport::new(address.clone())))
334        }
335        ServerAddress::Wasm { .. } => Ok(Box::new(WasmConnectionTransport::connect(address)?)),
336        ServerAddress::Http { .. } => open_http_connection_transport(address, allow_loopback),
337        ServerAddress::Sse { .. } => open_sse_connection_transport(address, allow_loopback),
338        ServerAddress::Ws { .. } => open_ws_connection_transport(address, allow_loopback),
339        ServerAddress::Tcp { .. } => match TcpConnectionTransport::connect(address) {
340            Ok(transport) => Ok(Box::new(transport)),
341            Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
342                Ok(Box::new(RegistryTransport::new(address.clone())))
343            }
344            Err(error) => Err(error),
345        },
346        ServerAddress::Unix { .. } => open_unix_connection_transport(address, allow_loopback),
347        _ => Err(Error::Eval(format!(
348            "no connection transport for address kind {}",
349            address.kind_symbol()
350        ))),
351    }
352}
353
354#[cfg(unix)]
355fn open_unix_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
356    Ok(Some(Arc::new(socket::UnixServerTransport::bind(address)?)))
357}
358
359#[cfg(not(unix))]
360fn open_unix_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
361    Err(Error::Eval(
362        "unix sockets are not available on this target".to_owned(),
363    ))
364}
365
366#[cfg(feature = "server-net-http")]
367fn open_http_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
368    Ok(Some(Arc::new(HttpServerTransport::bind(address)?)))
369}
370
371#[cfg(not(feature = "server-net-http"))]
372fn open_http_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
373    Err(http_transport_disabled_error())
374}
375
376#[cfg(feature = "server-net-http")]
377fn open_sse_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
378    Ok(Some(Arc::new(SseServerTransport::bind(address)?)))
379}
380
381#[cfg(not(feature = "server-net-http"))]
382fn open_sse_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
383    Err(http_transport_disabled_error())
384}
385
386#[cfg(feature = "server-net-http")]
387fn open_ws_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
388    Ok(Some(Arc::new(WsServerTransport::bind(address)?)))
389}
390
391#[cfg(not(feature = "server-net-http"))]
392fn open_ws_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
393    Err(http_transport_disabled_error())
394}
395
396pub fn open_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
397    match &address {
398        ServerAddress::Local | ServerAddress::Any | ServerAddress::Coroutine { .. } => Ok(None),
399        ServerAddress::Tcp { .. } => Ok(Some(Arc::new(TcpServerTransport::bind(address)?))),
400        ServerAddress::Unix { .. } => open_unix_server_transport(address),
401        ServerAddress::InProcess { .. } => Ok(Some(
402            Arc::new(RegistryTransport::new(address)) as Arc<dyn ServerTransport>
403        )),
404        ServerAddress::Wasm { region } => {
405            let _ = crate::wasm::lookup_wasm_region(region)?;
406            Ok(None)
407        }
408        ServerAddress::Http { .. } => open_http_server_transport(address),
409        ServerAddress::Sse { .. } => open_sse_server_transport(address),
410        ServerAddress::Ws { .. } => open_ws_server_transport(address),
411        _ => Ok(None),
412    }
413}
414
415pub(crate) fn transport_kind(address: &ServerAddress) -> &'static str {
416    match address {
417        ServerAddress::InProcess { .. } => "in-proc",
418        ServerAddress::Coroutine { .. } => "coroutine",
419        ServerAddress::Tcp { .. } => "tcp",
420        ServerAddress::Unix { .. } => "unix",
421        ServerAddress::Wasm { .. } => "wasm-shmem",
422        ServerAddress::Http { .. } => "http",
423        ServerAddress::Sse { .. } => "sse",
424        ServerAddress::Ws { .. } => "ws",
425        _ => "transport",
426    }
427}
428
429#[cfg(not(feature = "server-net-http"))]
430fn http_transport_disabled_error() -> Error {
431    Error::Eval("http transport requires the server-net-http feature".to_owned())
432}
433
434fn run_accept_loop(runtime: Arc<ServerRuntime>, site: Arc<dyn EvalSite>) {
435    while !runtime.is_stopping() {
436        let accepted = match runtime.accept_timeout(Duration::from_millis(25)) {
437            Ok(connection) => connection,
438            Err(_) => break,
439        };
440        let Some(mut connection) = accepted else {
441            thread::sleep(Duration::from_millis(25));
442            continue;
443        };
444        match runtime.thread_mode() {
445            ThreadMode::Main | ThreadMode::Coop => {
446                let _ = connection.serve_connection(&runtime, &site);
447            }
448            ThreadMode::Spawn => {
449                let runtime_for_worker = runtime.clone();
450                let site_for_worker = site.clone();
451                let handle = thread::spawn(move || {
452                    let _ = connection.serve_connection(&runtime_for_worker, &site_for_worker);
453                });
454                if runtime.register_worker_thread(handle).is_err() {
455                    runtime.begin_stop();
456                    break;
457                }
458            }
459            ThreadMode::Pool => {
460                let runtime_for_worker = runtime.clone();
461                let site_for_worker = site.clone();
462                default_worker_pool().execute(move || {
463                    let _ = connection.serve_connection(&runtime_for_worker, &site_for_worker);
464                });
465            }
466            ThreadMode::Coroutine(_) => {}
467        }
468    }
469}