1use std::{sync::Arc, thread, time::Duration};
2
3use sim_kernel::{CapabilityName, Cx, Error, Result, Symbol};
4
5use crate::{
6 EvalSite, Server, ServerAddress, ServerRuntime, ThreadMode, pool::default_worker_pool,
7};
8
9mod backends;
10mod framing;
11#[cfg(feature = "server-net-http")]
12mod http_transport;
13mod site;
14mod socket;
15#[cfg(feature = "server-net-http")]
16mod sse_transport;
17#[cfg(test)]
18mod tests;
19#[cfg(feature = "server-net-http")]
20mod ws_transport;
21
22pub use backends::{
23 LocalTransport, LoopbackTransportEndpoint, RegistryTransport, WasmConnectionTransport,
24};
25pub use framing::{decode_transport_frame, encode_transport_frame};
26#[cfg(feature = "server-net-http")]
27pub use http_transport::{HttpConnectionTransport, HttpServerTransport};
28pub use site::TransportEvalSite;
29pub use socket::{TcpConnectionTransport, TcpServerTransport};
30#[cfg(unix)]
31#[allow(unused_imports)]
32pub use socket::{UnixConnectionTransport, UnixServerTransport};
33#[cfg(feature = "server-net-http")]
34pub use sse_transport::{SseConnectionTransport, SseServerTransport};
35#[cfg(feature = "server-net-http")]
36pub use ws_transport::{WsConnectionTransport, WsServerTransport};
37
38pub(crate) use backends::TransportEndpoint;
39use backends::{has_registered_endpoint, register_endpoint, unregister_endpoint};
40use framing::{
41 answer_or_negotiate, error_frame_from_error, io_to_host, is_timeout, read_frame_from,
42 route_frame_bytes, update_negotiated_codec_from_reply, write_frame_to,
43};
44
45pub(crate) const MAX_TRANSPORT_FRAME_BYTES: usize = 8 * 1024 * 1024;
46pub(crate) const SERVER_CONNECTION_IO_TIMEOUT_MS: u64 = 250;
47pub(crate) const DEFAULT_MAX_INFLIGHT_FRAMES: usize = 8;
48pub(crate) const NETWORK_CAPABILITY: &str = "network";
49pub(crate) const WEBHOOK_SERVE_CAPABILITY: &str = "webhook-serve";
50#[cfg(feature = "server-net-http")]
51pub(crate) const HTTP_TRANSPORT_PATH: &str = "/sim/frame";
52#[cfg(feature = "server-net-http")]
53pub(crate) const SSE_TRANSPORT_PATH: &str = "/sim/stream";
54#[cfg(feature = "server-net-http")]
55pub(crate) const WS_TRANSPORT_PATH: &str = "/sim/ws";
56
57pub trait ServerTransport: Send + Sync {
59 fn address(&self) -> &ServerAddress;
61 fn accept(&self, cx: &mut Cx) -> Result<Box<dyn ConnectionTransport>>;
63 fn shutdown(&self, cx: &mut Cx) -> Result<()>;
65
66 fn accept_timeout(
68 &self,
69 cx: &mut Cx,
70 timeout: Duration,
71 ) -> Result<Option<Box<dyn ConnectionTransport>>>;
72}
73
74pub trait ConnectionTransport: Send + Sync {
76 fn send_frame(&mut self, cx: &mut Cx, frame: crate::ServerFrame) -> Result<()>;
78 fn recv_frame(
80 &mut self,
81 cx: &mut Cx,
82 timeout: Option<Duration>,
83 ) -> Result<Option<crate::ServerFrame>>;
84 fn close(&mut self, cx: &mut Cx) -> Result<()>;
86 fn as_any(&self) -> &dyn std::any::Any;
88
89 fn serve_connection(
94 &mut self,
95 _runtime: &Arc<ServerRuntime>,
96 _site: &Arc<dyn EvalSite>,
97 ) -> Result<()> {
98 Err(Error::Eval(
99 "transport does not support server-side serving".to_owned(),
100 ))
101 }
102}
103
104pub fn start_server_transport(server: &Server) -> Result<()> {
105 if !server.address().transport_available() {
106 return Err(Error::Eval(format!(
107 "no transport for address kind {}",
108 server.address().kind_symbol()
109 )));
110 }
111 match server.address() {
112 ServerAddress::Local | ServerAddress::Any => Ok(()),
113 ServerAddress::Tcp { .. }
114 | ServerAddress::Unix { .. }
115 | ServerAddress::Http { .. }
116 | ServerAddress::Sse { .. }
117 | ServerAddress::Ws { .. } => {
118 let Some(runtime) = server.runtime().cloned() else {
119 return Ok(());
120 };
121 register_endpoint(TransportEndpoint {
122 address: server.address().clone(),
123 site: server.site().clone(),
124 })?;
125 let site = server.site().clone();
126 match server.thread() {
127 ThreadMode::Main => {
128 run_accept_loop(runtime, site);
129 Ok(())
130 }
131 ThreadMode::Coroutine(_) => Ok(()),
132 ThreadMode::Coop | ThreadMode::Spawn | ThreadMode::Pool => {
133 let accept_runtime = runtime.clone();
134 let handle = thread::spawn(move || run_accept_loop(accept_runtime, site));
135 runtime.set_accept_thread(handle)
136 }
137 }
138 }
139 ServerAddress::Wasm { region } => {
140 let _ = crate::wasm::lookup_wasm_region(region)?;
141 Ok(())
142 }
143 _ => register_endpoint(TransportEndpoint {
144 address: server.address().clone(),
145 site: server.site().clone(),
146 }),
147 }
148}
149
150pub fn shutdown_server_transport(server: &Server) -> Result<()> {
151 match server.address() {
152 ServerAddress::Local | ServerAddress::Any => Ok(()),
153 ServerAddress::Tcp { .. }
154 | ServerAddress::Unix { .. }
155 | ServerAddress::Http { .. }
156 | ServerAddress::Sse { .. }
157 | ServerAddress::Ws { .. } => {
158 if let Some(runtime) = server.runtime() {
159 runtime.begin_stop();
160 runtime.join_accept_thread()?;
161 runtime.join_worker_threads()?;
162 runtime.with_cx(|cx| runtime.transport().shutdown(cx))?;
163 runtime.clear_sessions()?;
164 }
165 unregister_endpoint(server.address())?;
166 Ok(())
167 }
168 ServerAddress::Wasm { .. } => Ok(()),
169 _ => unregister_endpoint(server.address()),
170 }
171}
172
173pub fn require_start_capabilities(cx: &Cx, address: &ServerAddress) -> Result<()> {
174 match address {
175 ServerAddress::Tcp { .. } | ServerAddress::Unix { .. } => {
176 cx.require(&CapabilityName::new(NETWORK_CAPABILITY))
177 }
178 ServerAddress::Http { .. } | ServerAddress::Sse { .. } | ServerAddress::Ws { .. } => {
179 cx.require(&CapabilityName::new(NETWORK_CAPABILITY))?;
180 cx.require(&CapabilityName::new(WEBHOOK_SERVE_CAPABILITY))
181 }
182 _ => Ok(()),
183 }
184}
185
186pub fn require_connect_capabilities(cx: &Cx, address: &ServerAddress) -> Result<()> {
187 match address {
188 ServerAddress::Tcp { .. }
189 | ServerAddress::Unix { .. }
190 | ServerAddress::Http { .. }
191 | ServerAddress::Sse { .. }
192 | ServerAddress::Ws { .. } => cx.require(&CapabilityName::new(NETWORK_CAPABILITY)),
193 _ => Ok(()),
194 }
195}
196
197pub fn connect_transport_site(
202 cx: &mut Cx,
203 address: ServerAddress,
204 offered_codecs: Vec<Symbol>,
205) -> Result<(Arc<dyn EvalSite>, Symbol)> {
206 connect_transport_site_with_loopback(cx, address, offered_codecs, false)
207}
208
209pub fn connect_transport_site_with_loopback(
214 cx: &mut Cx,
215 address: ServerAddress,
216 offered_codecs: Vec<Symbol>,
217 allow_loopback: bool,
218) -> Result<(Arc<dyn EvalSite>, Symbol)> {
219 require_connect_capabilities(cx, &address)?;
220 TransportEvalSite::connect_with_loopback(cx, address, offered_codecs, allow_loopback)
221}
222
223pub fn register_loopback_transport_endpoint(
228 address: ServerAddress,
229 site: Arc<dyn EvalSite>,
230) -> Result<LoopbackTransportEndpoint> {
231 backends::register_loopback_endpoint(address, site)
232}
233
234#[cfg(unix)]
235fn open_unix_connection_transport(
236 address: &ServerAddress,
237 allow_loopback: bool,
238) -> Result<Box<dyn ConnectionTransport>> {
239 match socket::UnixConnectionTransport::connect(address) {
240 Ok(transport) => Ok(Box::new(transport)),
241 Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
242 Ok(Box::new(RegistryTransport::new(address.clone())))
243 }
244 Err(error) => Err(error),
245 }
246}
247
248#[cfg(not(unix))]
249fn open_unix_connection_transport(
250 _address: &ServerAddress,
251 _allow_loopback: bool,
252) -> Result<Box<dyn ConnectionTransport>> {
253 Err(Error::Eval(
254 "unix sockets are not available on this target".to_owned(),
255 ))
256}
257
258#[cfg(feature = "server-net-http")]
259fn open_http_connection_transport(
260 address: &ServerAddress,
261 allow_loopback: bool,
262) -> Result<Box<dyn ConnectionTransport>> {
263 match HttpConnectionTransport::connect(address) {
264 Ok(transport) => Ok(Box::new(transport)),
265 Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
266 Ok(Box::new(RegistryTransport::new(address.clone())))
267 }
268 Err(error) => Err(error),
269 }
270}
271
272#[cfg(not(feature = "server-net-http"))]
273fn open_http_connection_transport(
274 _address: &ServerAddress,
275 _allow_loopback: bool,
276) -> Result<Box<dyn ConnectionTransport>> {
277 Err(http_transport_disabled_error())
278}
279
280#[cfg(feature = "server-net-http")]
281fn open_sse_connection_transport(
282 address: &ServerAddress,
283 allow_loopback: bool,
284) -> Result<Box<dyn ConnectionTransport>> {
285 match SseConnectionTransport::connect(address) {
286 Ok(transport) => Ok(Box::new(transport)),
287 Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
288 Ok(Box::new(RegistryTransport::new(address.clone())))
289 }
290 Err(error) => Err(error),
291 }
292}
293
294#[cfg(not(feature = "server-net-http"))]
295fn open_sse_connection_transport(
296 _address: &ServerAddress,
297 _allow_loopback: bool,
298) -> Result<Box<dyn ConnectionTransport>> {
299 Err(http_transport_disabled_error())
300}
301
302#[cfg(feature = "server-net-http")]
303fn open_ws_connection_transport(
304 address: &ServerAddress,
305 allow_loopback: bool,
306) -> Result<Box<dyn ConnectionTransport>> {
307 match WsConnectionTransport::connect(address) {
308 Ok(transport) => Ok(Box::new(transport)),
309 Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
310 Ok(Box::new(RegistryTransport::new(address.clone())))
311 }
312 Err(error) => Err(error),
313 }
314}
315
316#[cfg(not(feature = "server-net-http"))]
317fn open_ws_connection_transport(
318 _address: &ServerAddress,
319 _allow_loopback: bool,
320) -> Result<Box<dyn ConnectionTransport>> {
321 Err(http_transport_disabled_error())
322}
323
324fn open_connection_transport(
325 address: &ServerAddress,
326 allow_loopback: bool,
327) -> Result<Box<dyn ConnectionTransport>> {
328 match address {
329 ServerAddress::Local | ServerAddress::Any => Err(Error::Eval(
330 "local addresses require a direct site or server value".to_owned(),
331 )),
332 ServerAddress::InProcess { .. } | ServerAddress::Coroutine { .. } => {
333 Ok(Box::new(RegistryTransport::new(address.clone())))
334 }
335 ServerAddress::Wasm { .. } => Ok(Box::new(WasmConnectionTransport::connect(address)?)),
336 ServerAddress::Http { .. } => open_http_connection_transport(address, allow_loopback),
337 ServerAddress::Sse { .. } => open_sse_connection_transport(address, allow_loopback),
338 ServerAddress::Ws { .. } => open_ws_connection_transport(address, allow_loopback),
339 ServerAddress::Tcp { .. } => match TcpConnectionTransport::connect(address) {
340 Ok(transport) => Ok(Box::new(transport)),
341 Err(_error) if allow_loopback && has_registered_endpoint(address)? => {
342 Ok(Box::new(RegistryTransport::new(address.clone())))
343 }
344 Err(error) => Err(error),
345 },
346 ServerAddress::Unix { .. } => open_unix_connection_transport(address, allow_loopback),
347 _ => Err(Error::Eval(format!(
348 "no connection transport for address kind {}",
349 address.kind_symbol()
350 ))),
351 }
352}
353
354#[cfg(unix)]
355fn open_unix_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
356 Ok(Some(Arc::new(socket::UnixServerTransport::bind(address)?)))
357}
358
359#[cfg(not(unix))]
360fn open_unix_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
361 Err(Error::Eval(
362 "unix sockets are not available on this target".to_owned(),
363 ))
364}
365
366#[cfg(feature = "server-net-http")]
367fn open_http_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
368 Ok(Some(Arc::new(HttpServerTransport::bind(address)?)))
369}
370
371#[cfg(not(feature = "server-net-http"))]
372fn open_http_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
373 Err(http_transport_disabled_error())
374}
375
376#[cfg(feature = "server-net-http")]
377fn open_sse_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
378 Ok(Some(Arc::new(SseServerTransport::bind(address)?)))
379}
380
381#[cfg(not(feature = "server-net-http"))]
382fn open_sse_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
383 Err(http_transport_disabled_error())
384}
385
386#[cfg(feature = "server-net-http")]
387fn open_ws_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
388 Ok(Some(Arc::new(WsServerTransport::bind(address)?)))
389}
390
391#[cfg(not(feature = "server-net-http"))]
392fn open_ws_server_transport(_address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
393 Err(http_transport_disabled_error())
394}
395
396pub fn open_server_transport(address: ServerAddress) -> Result<Option<Arc<dyn ServerTransport>>> {
397 match &address {
398 ServerAddress::Local | ServerAddress::Any | ServerAddress::Coroutine { .. } => Ok(None),
399 ServerAddress::Tcp { .. } => Ok(Some(Arc::new(TcpServerTransport::bind(address)?))),
400 ServerAddress::Unix { .. } => open_unix_server_transport(address),
401 ServerAddress::InProcess { .. } => Ok(Some(
402 Arc::new(RegistryTransport::new(address)) as Arc<dyn ServerTransport>
403 )),
404 ServerAddress::Wasm { region } => {
405 let _ = crate::wasm::lookup_wasm_region(region)?;
406 Ok(None)
407 }
408 ServerAddress::Http { .. } => open_http_server_transport(address),
409 ServerAddress::Sse { .. } => open_sse_server_transport(address),
410 ServerAddress::Ws { .. } => open_ws_server_transport(address),
411 _ => Ok(None),
412 }
413}
414
415pub(crate) fn transport_kind(address: &ServerAddress) -> &'static str {
416 match address {
417 ServerAddress::InProcess { .. } => "in-proc",
418 ServerAddress::Coroutine { .. } => "coroutine",
419 ServerAddress::Tcp { .. } => "tcp",
420 ServerAddress::Unix { .. } => "unix",
421 ServerAddress::Wasm { .. } => "wasm-shmem",
422 ServerAddress::Http { .. } => "http",
423 ServerAddress::Sse { .. } => "sse",
424 ServerAddress::Ws { .. } => "ws",
425 _ => "transport",
426 }
427}
428
429#[cfg(not(feature = "server-net-http"))]
430fn http_transport_disabled_error() -> Error {
431 Error::Eval("http transport requires the server-net-http feature".to_owned())
432}
433
434fn run_accept_loop(runtime: Arc<ServerRuntime>, site: Arc<dyn EvalSite>) {
435 while !runtime.is_stopping() {
436 let accepted = match runtime.accept_timeout(Duration::from_millis(25)) {
437 Ok(connection) => connection,
438 Err(_) => break,
439 };
440 let Some(mut connection) = accepted else {
441 thread::sleep(Duration::from_millis(25));
442 continue;
443 };
444 match runtime.thread_mode() {
445 ThreadMode::Main | ThreadMode::Coop => {
446 let _ = connection.serve_connection(&runtime, &site);
447 }
448 ThreadMode::Spawn => {
449 let runtime_for_worker = runtime.clone();
450 let site_for_worker = site.clone();
451 let handle = thread::spawn(move || {
452 let _ = connection.serve_connection(&runtime_for_worker, &site_for_worker);
453 });
454 if runtime.register_worker_thread(handle).is_err() {
455 runtime.begin_stop();
456 break;
457 }
458 }
459 ThreadMode::Pool => {
460 let runtime_for_worker = runtime.clone();
461 let site_for_worker = site.clone();
462 default_worker_pool().execute(move || {
463 let _ = connection.serve_connection(&runtime_for_worker, &site_for_worker);
464 });
465 }
466 ThreadMode::Coroutine(_) => {}
467 }
468 }
469}