Skip to main content

sim_lib_server/
server.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicU8, AtomicU64, Ordering},
5    },
6    time::Instant,
7};
8
9use sim_citizen_derive::non_citizen;
10use sim_kernel::{ClassRef, Cx, Expr, Object, Result, Symbol, Value};
11
12use crate::{
13    EvalSite, FrameRouter, IsolationPolicy, ServerAddress, ServerFrame, ServerRuntime,
14    TriggerHandle, symbol_list_value,
15};
16
17static NEXT_SERVER_ID: AtomicU64 = AtomicU64::new(1);
18
19/// Threading strategy a server uses to service connections.
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum ThreadMode {
22    /// Run on the calling main thread.
23    Main,
24    /// Run cooperatively, yielding to the host scheduler.
25    Coop,
26    /// Spawn a dedicated thread per connection.
27    Spawn,
28    /// Service connections from a shared worker pool.
29    Pool,
30    /// Run the wrapped mode as a coroutine.
31    Coroutine(Box<ThreadMode>),
32}
33
34impl ThreadMode {
35    /// Parses a thread mode from an expression: a bare symbol (`main`, `coop`, `spawn`,
36    /// `pool`) or a `(coroutine <base>)` list.
37    pub fn from_expr(expr: &Expr) -> Result<Self> {
38        match expr {
39            Expr::Symbol(symbol) => match symbol.name.as_ref() {
40                "main" => Ok(Self::Main),
41                "coop" => Ok(Self::Coop),
42                "spawn" => Ok(Self::Spawn),
43                "pool" => Ok(Self::Pool),
44                other => Err(sim_kernel::Error::Eval(format!(
45                    "unsupported thread mode {other}"
46                ))),
47            },
48            Expr::List(items) | Expr::Vector(items) => {
49                let Some(Expr::Symbol(head)) = items.first() else {
50                    return Err(sim_kernel::Error::TypeMismatch {
51                        expected: "thread mode list starting with a symbol",
52                        found: "non-symbol",
53                    });
54                };
55                if head.name.as_ref() != "coroutine" {
56                    return Err(sim_kernel::Error::Eval(format!(
57                        "unsupported thread mode {}",
58                        head
59                    )));
60                }
61                let base = match items.get(1) {
62                    Some(expr) => Self::from_expr(expr)?,
63                    None => Self::Coop,
64                };
65                Ok(Self::Coroutine(Box::new(base)))
66            }
67            _ => Err(sim_kernel::Error::TypeMismatch {
68                expected: "thread mode expression",
69                found: "non-thread-mode",
70            }),
71        }
72    }
73
74    /// Renders this thread mode back to its expression form.
75    pub fn as_expr(&self) -> Expr {
76        match self {
77            Self::Main => Expr::Symbol(Symbol::new("main")),
78            Self::Coop => Expr::Symbol(Symbol::new("coop")),
79            Self::Spawn => Expr::Symbol(Symbol::new("spawn")),
80            Self::Pool => Expr::Symbol(Symbol::new("pool")),
81            Self::Coroutine(base) => {
82                Expr::List(vec![Expr::Symbol(Symbol::new("coroutine")), base.as_expr()])
83            }
84        }
85    }
86
87    /// Returns whether this thread mode can be used in the current environment.
88    pub fn is_available_now(&self) -> bool {
89        match self {
90            Self::Main | Self::Coop | Self::Spawn | Self::Pool => true,
91            Self::Coroutine(base) => matches!(base.as_ref(), Self::Main | Self::Coop),
92        }
93    }
94
95    /// Returns whether this mode is a coroutine variant.
96    pub fn is_coroutine(&self) -> bool {
97        matches!(self, Self::Coroutine(_))
98    }
99}
100
101/// Lifecycle status of a running server.
102#[derive(Clone, Copy, Debug, PartialEq, Eq)]
103pub enum ServerStatus {
104    /// Server is accepting and servicing connections.
105    Running,
106    /// Server is paused and not servicing connections.
107    Suspended,
108    /// Server has stopped.
109    Stopped,
110}
111
112impl ServerStatus {
113    fn as_u8(self) -> u8 {
114        match self {
115            Self::Running => 0,
116            Self::Suspended => 1,
117            Self::Stopped => 2,
118        }
119    }
120
121    fn from_u8(value: u8) -> Self {
122        match value {
123            0 => Self::Running,
124            1 => Self::Suspended,
125            2 => Self::Stopped,
126            _ => Self::Stopped,
127        }
128    }
129
130    fn as_symbol(self) -> Symbol {
131        Symbol::new(match self {
132            Self::Running => "running",
133            Self::Suspended => "suspended",
134            Self::Stopped => "stopped",
135        })
136    }
137}
138
139#[non_citizen(
140    reason = "live server handle; reconstruct configuration via server/Address descriptor and start ops",
141    kind = "handle"
142)]
143/// Live server handle: an address bound to an [`EvalSite`], its codec and threading
144/// configuration, lifecycle status, triggers, and optional [`ServerRuntime`].
145pub struct Server {
146    id: u64,
147    name: Option<Symbol>,
148    address: ServerAddress,
149    default_codec: Symbol,
150    supported_codecs: Vec<Symbol>,
151    thread: ThreadMode,
152    isolation: IsolationPolicy,
153    status: AtomicU8,
154    site: Arc<dyn EvalSite>,
155    spec: Vec<(Symbol, Expr)>,
156    router: Arc<FrameRouter>,
157    triggers: Arc<Mutex<Vec<Arc<TriggerHandle>>>>,
158    runtime: Option<Arc<ServerRuntime>>,
159    started_at: Instant,
160}
161
162impl Server {
163    /// Builds a server without an attached runtime.
164    #[allow(clippy::too_many_arguments)]
165    pub fn new(
166        address: ServerAddress,
167        default_codec: Symbol,
168        supported_codecs: Vec<Symbol>,
169        thread: ThreadMode,
170        isolation: IsolationPolicy,
171        name: Option<Symbol>,
172        site: Arc<dyn EvalSite>,
173        spec: Vec<(Symbol, Expr)>,
174    ) -> Result<Self> {
175        Self::with_runtime(
176            address,
177            default_codec,
178            supported_codecs,
179            thread,
180            isolation,
181            name,
182            site,
183            spec,
184            None,
185        )
186    }
187
188    /// Builds a server, optionally attaching a [`ServerRuntime`], after verifying the
189    /// address transport is available.
190    #[allow(clippy::too_many_arguments)]
191    pub fn with_runtime(
192        address: ServerAddress,
193        default_codec: Symbol,
194        supported_codecs: Vec<Symbol>,
195        thread: ThreadMode,
196        isolation: IsolationPolicy,
197        name: Option<Symbol>,
198        site: Arc<dyn EvalSite>,
199        spec: Vec<(Symbol, Expr)>,
200        runtime: Option<Arc<ServerRuntime>>,
201    ) -> Result<Self> {
202        address.ensure_transport_available()?;
203        Ok(Self {
204            id: NEXT_SERVER_ID.fetch_add(1, Ordering::Relaxed),
205            name,
206            address,
207            default_codec,
208            supported_codecs,
209            thread,
210            isolation,
211            status: AtomicU8::new(ServerStatus::Running.as_u8()),
212            site,
213            spec,
214            router: Arc::new(FrameRouter::default()),
215            triggers: Arc::new(Mutex::new(Vec::new())),
216            runtime,
217            started_at: Instant::now(),
218        })
219    }
220
221    /// Returns this server's unique id.
222    pub fn id(&self) -> u64 {
223        self.id
224    }
225
226    /// Returns the server's name, if one was assigned.
227    pub fn name(&self) -> Option<&Symbol> {
228        self.name.as_ref()
229    }
230
231    /// Returns the address the server is bound to.
232    pub fn address(&self) -> &ServerAddress {
233        &self.address
234    }
235
236    /// Returns the codec used by default for frames.
237    pub fn default_codec(&self) -> &Symbol {
238        &self.default_codec
239    }
240
241    /// Returns the codecs the server is willing to negotiate.
242    pub fn supported_codecs(&self) -> &[Symbol] {
243        &self.supported_codecs
244    }
245
246    /// Returns the server's threading mode.
247    pub fn thread(&self) -> &ThreadMode {
248        &self.thread
249    }
250
251    /// Returns the eval site that handles incoming frames.
252    pub fn site(&self) -> &Arc<dyn EvalSite> {
253        &self.site
254    }
255
256    /// Returns the isolation policy applied to sessions.
257    pub fn isolation(&self) -> &IsolationPolicy {
258        &self.isolation
259    }
260
261    /// Returns the configuration spec entries the server was started with.
262    pub fn spec(&self) -> &[(Symbol, Expr)] {
263        &self.spec
264    }
265
266    /// Returns the attached runtime, if the server is listening.
267    pub fn runtime(&self) -> Option<&Arc<ServerRuntime>> {
268        self.runtime.as_ref()
269    }
270
271    /// Returns the current lifecycle status.
272    pub fn status(&self) -> ServerStatus {
273        ServerStatus::from_u8(self.status.load(Ordering::Relaxed))
274    }
275
276    /// Sets the lifecycle status.
277    pub fn set_status(&self, status: ServerStatus) {
278        self.status.store(status.as_u8(), Ordering::Relaxed);
279    }
280
281    /// Returns the elapsed time since the server started, in milliseconds.
282    pub fn uptime_millis(&self) -> u64 {
283        self.started_at.elapsed().as_millis() as u64
284    }
285
286    /// Registers a trigger handle to be tracked and stopped with the server.
287    pub fn register_trigger(&self, trigger: Arc<TriggerHandle>) -> Result<()> {
288        self.triggers
289            .lock()
290            .map_err(|_| sim_kernel::Error::PoisonedLock("server triggers"))?
291            .push(trigger);
292        Ok(())
293    }
294
295    /// Stops every registered trigger.
296    pub fn stop_triggers(&self) -> Result<()> {
297        for trigger in self.trigger_snapshots()? {
298            trigger.stop()?;
299        }
300        Ok(())
301    }
302
303    /// Buffers `frame` as inbound and delivers it to the eval site, as if fired by a trigger.
304    pub fn deliver_trigger_frame(&self, cx: &mut Cx, frame: ServerFrame) -> Result<()> {
305        self.router.push_inbound(frame.clone())?;
306        let _ = self.site.answer(cx, frame)?;
307        Ok(())
308    }
309
310    /// Returns a snapshot of the currently registered trigger handles.
311    pub fn trigger_snapshots(&self) -> Result<Vec<Arc<TriggerHandle>>> {
312        Ok(self
313            .triggers
314            .lock()
315            .map_err(|_| sim_kernel::Error::PoisonedLock("server triggers"))?
316            .clone())
317    }
318
319    /// Returns a table value reflecting the server's configuration and live state.
320    pub fn reflect_value(&self, cx: &mut Cx) -> Result<Value> {
321        let mut entries = table_entries(self, cx)?;
322        entries.extend(live_state_entries(self, cx)?);
323        cx.factory().table(entries)
324    }
325
326    /// Returns a table value summarizing health: status, uptime, and session and message counts.
327    pub fn health_value(&self, cx: &mut Cx) -> Result<Value> {
328        let (sessions, connections, messages_sent, messages_received) = self
329            .runtime
330            .as_ref()
331            .map(|runtime| {
332                (
333                    runtime.session_count(),
334                    runtime.connection_count(),
335                    runtime.messages_sent(),
336                    runtime.messages_received(),
337                )
338            })
339            .unwrap_or((0, 0, 0, 0));
340        cx.factory().table(vec![
341            (
342                Symbol::new("status"),
343                cx.factory().symbol(self.status().as_symbol())?,
344            ),
345            (
346                Symbol::new("uptime"),
347                cx.factory().string(self.uptime_millis().to_string())?,
348            ),
349            (
350                Symbol::new("sessions"),
351                cx.factory().string(sessions.to_string())?,
352            ),
353            (
354                Symbol::new("connections"),
355                cx.factory().string(connections.to_string())?,
356            ),
357            (
358                Symbol::new("messages-sent"),
359                cx.factory().string(messages_sent.to_string())?,
360            ),
361            (
362                Symbol::new("messages-received"),
363                cx.factory().string(messages_received.to_string())?,
364            ),
365        ])
366    }
367
368    /// Returns a list value of the runtime's sessions, or an empty list if not listening.
369    pub fn sessions_value(&self, cx: &mut Cx) -> Result<Value> {
370        let Some(runtime) = &self.runtime else {
371            return cx.factory().list(Vec::new());
372        };
373        let sessions = runtime
374            .sessions()?
375            .into_iter()
376            .map(|session| session.as_value(cx))
377            .collect::<Result<Vec<_>>>()?;
378        cx.factory().list(sessions)
379    }
380}
381
382impl Object for Server {
383    fn display(&self, _cx: &mut Cx) -> Result<String> {
384        Ok("#<server>".to_owned())
385    }
386
387    fn as_any(&self) -> &dyn std::any::Any {
388        self
389    }
390}
391
392impl sim_kernel::ObjectCompat for Server {
393    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
394        cx.factory().class_stub(
395            sim_kernel::ClassId(0),
396            Symbol::qualified("server", "Server"),
397        )
398    }
399    fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
400        self.as_table(cx)?.object().as_expr(cx)
401    }
402    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
403        let mut entries = table_entries(self, cx)?;
404        entries.extend(live_state_entries(self, cx)?);
405        cx.factory().table(entries)
406    }
407}
408
409impl Clone for Server {
410    fn clone(&self) -> Self {
411        Self {
412            id: self.id,
413            name: self.name.clone(),
414            address: self.address.clone(),
415            default_codec: self.default_codec.clone(),
416            supported_codecs: self.supported_codecs.clone(),
417            thread: self.thread.clone(),
418            isolation: self.isolation.clone(),
419            status: AtomicU8::new(self.status().as_u8()),
420            site: self.site.clone(),
421            spec: self.spec.clone(),
422            router: self.router.clone(),
423            triggers: self.triggers.clone(),
424            runtime: self.runtime.clone(),
425            started_at: self.started_at,
426        }
427    }
428}
429
430fn table_entries(server: &Server, cx: &mut Cx) -> Result<Vec<(Symbol, Value)>> {
431    let name = match server.name() {
432        Some(name) => cx.factory().symbol(name.clone())?,
433        None => cx.factory().nil()?,
434    };
435    let spec_entries = server
436        .spec()
437        .iter()
438        .map(|(key, value)| {
439            cx.factory()
440                .expr(Expr::List(vec![Expr::Symbol(key.clone()), value.clone()]))
441        })
442        .collect::<Result<Vec<_>>>()?;
443    let spec = cx.factory().list(spec_entries)?;
444    let address = server.address.as_value(cx)?;
445    let default_codec = cx.factory().symbol(server.default_codec.clone())?;
446    let supported_codecs = symbol_list_value(cx, &server.supported_codecs)?;
447    let thread = cx.factory().expr(server.thread.as_expr())?;
448    let site_kind = cx.factory().string(server.site.site_kind().to_owned())?;
449    let site_address = server.site.address().as_value(cx)?;
450    let site_codecs = symbol_list_value(cx, server.site.codecs())?;
451    let isolation = server.isolation.as_value(cx)?;
452    let listening = cx.factory().bool(server.runtime.is_some())?;
453    let next_msg_id = cx
454        .factory()
455        .string(server.router.peek_next_msg_id().to_string())?;
456    Ok(vec![
457        (
458            Symbol::new("kind"),
459            cx.factory().symbol(Symbol::new("server"))?,
460        ),
461        (
462            Symbol::new("id"),
463            cx.factory().string(server.id.to_string())?,
464        ),
465        (Symbol::new("name"), name),
466        (Symbol::new("address"), address),
467        (Symbol::new("default-codec"), default_codec),
468        (Symbol::new("supported-codecs"), supported_codecs),
469        (Symbol::new("thread"), thread),
470        (Symbol::new("site-kind"), site_kind),
471        (Symbol::new("site-address"), site_address),
472        (Symbol::new("site-codecs"), site_codecs),
473        (Symbol::new("isolation"), isolation),
474        (Symbol::new("listening"), listening),
475        (Symbol::new("spec"), spec),
476        (Symbol::new("next-msg-id"), next_msg_id),
477    ])
478}
479
480fn live_state_entries(server: &Server, cx: &mut Cx) -> Result<Vec<(Symbol, Value)>> {
481    let trigger_values = server
482        .trigger_snapshots()?
483        .into_iter()
484        .map(|trigger| trigger.reflect_value(cx))
485        .collect::<Result<Vec<_>>>()?;
486    let triggers = cx.factory().list(trigger_values)?;
487    let (sessions, connections, messages_sent, messages_received) = server
488        .runtime
489        .as_ref()
490        .map(|runtime| {
491            (
492                runtime.session_count(),
493                runtime.connection_count(),
494                runtime.messages_sent(),
495                runtime.messages_received(),
496            )
497        })
498        .unwrap_or((0, 0, 0, 0));
499    Ok(vec![
500        (
501            Symbol::new("status"),
502            cx.factory().symbol(server.status().as_symbol())?,
503        ),
504        (
505            Symbol::new("uptime"),
506            cx.factory().string(server.uptime_millis().to_string())?,
507        ),
508        (
509            Symbol::new("sessions"),
510            cx.factory().string(sessions.to_string())?,
511        ),
512        (
513            Symbol::new("connections"),
514            cx.factory().string(connections.to_string())?,
515        ),
516        (
517            Symbol::new("messages-sent"),
518            cx.factory().string(messages_sent.to_string())?,
519        ),
520        (
521            Symbol::new("messages-received"),
522            cx.factory().string(messages_received.to_string())?,
523        ),
524        (Symbol::new("triggers"), triggers),
525        (Symbol::new("line-driver"), cx.factory().nil()?),
526    ])
527}