1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicU8, AtomicU64, Ordering},
5 },
6 time::Instant,
7};
8
9use sim_citizen_derive::non_citizen;
10use sim_kernel::{ClassRef, Cx, Expr, Object, Result, Symbol, Value};
11
12use crate::{
13 EvalSite, FrameRouter, IsolationPolicy, ServerAddress, ServerFrame, ServerRuntime,
14 TriggerHandle, symbol_list_value,
15};
16
17static NEXT_SERVER_ID: AtomicU64 = AtomicU64::new(1);
18
19#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum ThreadMode {
22 Main,
24 Coop,
26 Spawn,
28 Pool,
30 Coroutine(Box<ThreadMode>),
32}
33
34impl ThreadMode {
35 pub fn from_expr(expr: &Expr) -> Result<Self> {
38 match expr {
39 Expr::Symbol(symbol) => match symbol.name.as_ref() {
40 "main" => Ok(Self::Main),
41 "coop" => Ok(Self::Coop),
42 "spawn" => Ok(Self::Spawn),
43 "pool" => Ok(Self::Pool),
44 other => Err(sim_kernel::Error::Eval(format!(
45 "unsupported thread mode {other}"
46 ))),
47 },
48 Expr::List(items) | Expr::Vector(items) => {
49 let Some(Expr::Symbol(head)) = items.first() else {
50 return Err(sim_kernel::Error::TypeMismatch {
51 expected: "thread mode list starting with a symbol",
52 found: "non-symbol",
53 });
54 };
55 if head.name.as_ref() != "coroutine" {
56 return Err(sim_kernel::Error::Eval(format!(
57 "unsupported thread mode {}",
58 head
59 )));
60 }
61 let base = match items.get(1) {
62 Some(expr) => Self::from_expr(expr)?,
63 None => Self::Coop,
64 };
65 Ok(Self::Coroutine(Box::new(base)))
66 }
67 _ => Err(sim_kernel::Error::TypeMismatch {
68 expected: "thread mode expression",
69 found: "non-thread-mode",
70 }),
71 }
72 }
73
74 pub fn as_expr(&self) -> Expr {
76 match self {
77 Self::Main => Expr::Symbol(Symbol::new("main")),
78 Self::Coop => Expr::Symbol(Symbol::new("coop")),
79 Self::Spawn => Expr::Symbol(Symbol::new("spawn")),
80 Self::Pool => Expr::Symbol(Symbol::new("pool")),
81 Self::Coroutine(base) => {
82 Expr::List(vec![Expr::Symbol(Symbol::new("coroutine")), base.as_expr()])
83 }
84 }
85 }
86
87 pub fn is_available_now(&self) -> bool {
89 match self {
90 Self::Main | Self::Coop | Self::Spawn | Self::Pool => true,
91 Self::Coroutine(base) => matches!(base.as_ref(), Self::Main | Self::Coop),
92 }
93 }
94
95 pub fn is_coroutine(&self) -> bool {
97 matches!(self, Self::Coroutine(_))
98 }
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
103pub enum ServerStatus {
104 Running,
106 Suspended,
108 Stopped,
110}
111
112impl ServerStatus {
113 fn as_u8(self) -> u8 {
114 match self {
115 Self::Running => 0,
116 Self::Suspended => 1,
117 Self::Stopped => 2,
118 }
119 }
120
121 fn from_u8(value: u8) -> Self {
122 match value {
123 0 => Self::Running,
124 1 => Self::Suspended,
125 2 => Self::Stopped,
126 _ => Self::Stopped,
127 }
128 }
129
130 fn as_symbol(self) -> Symbol {
131 Symbol::new(match self {
132 Self::Running => "running",
133 Self::Suspended => "suspended",
134 Self::Stopped => "stopped",
135 })
136 }
137}
138
139#[non_citizen(
140 reason = "live server handle; reconstruct configuration via server/Address descriptor and start ops",
141 kind = "handle"
142)]
143pub struct Server {
146 id: u64,
147 name: Option<Symbol>,
148 address: ServerAddress,
149 default_codec: Symbol,
150 supported_codecs: Vec<Symbol>,
151 thread: ThreadMode,
152 isolation: IsolationPolicy,
153 status: AtomicU8,
154 site: Arc<dyn EvalSite>,
155 spec: Vec<(Symbol, Expr)>,
156 router: Arc<FrameRouter>,
157 triggers: Arc<Mutex<Vec<Arc<TriggerHandle>>>>,
158 runtime: Option<Arc<ServerRuntime>>,
159 started_at: Instant,
160}
161
162impl Server {
163 #[allow(clippy::too_many_arguments)]
165 pub fn new(
166 address: ServerAddress,
167 default_codec: Symbol,
168 supported_codecs: Vec<Symbol>,
169 thread: ThreadMode,
170 isolation: IsolationPolicy,
171 name: Option<Symbol>,
172 site: Arc<dyn EvalSite>,
173 spec: Vec<(Symbol, Expr)>,
174 ) -> Result<Self> {
175 Self::with_runtime(
176 address,
177 default_codec,
178 supported_codecs,
179 thread,
180 isolation,
181 name,
182 site,
183 spec,
184 None,
185 )
186 }
187
188 #[allow(clippy::too_many_arguments)]
191 pub fn with_runtime(
192 address: ServerAddress,
193 default_codec: Symbol,
194 supported_codecs: Vec<Symbol>,
195 thread: ThreadMode,
196 isolation: IsolationPolicy,
197 name: Option<Symbol>,
198 site: Arc<dyn EvalSite>,
199 spec: Vec<(Symbol, Expr)>,
200 runtime: Option<Arc<ServerRuntime>>,
201 ) -> Result<Self> {
202 address.ensure_transport_available()?;
203 Ok(Self {
204 id: NEXT_SERVER_ID.fetch_add(1, Ordering::Relaxed),
205 name,
206 address,
207 default_codec,
208 supported_codecs,
209 thread,
210 isolation,
211 status: AtomicU8::new(ServerStatus::Running.as_u8()),
212 site,
213 spec,
214 router: Arc::new(FrameRouter::default()),
215 triggers: Arc::new(Mutex::new(Vec::new())),
216 runtime,
217 started_at: Instant::now(),
218 })
219 }
220
221 pub fn id(&self) -> u64 {
223 self.id
224 }
225
226 pub fn name(&self) -> Option<&Symbol> {
228 self.name.as_ref()
229 }
230
231 pub fn address(&self) -> &ServerAddress {
233 &self.address
234 }
235
236 pub fn default_codec(&self) -> &Symbol {
238 &self.default_codec
239 }
240
241 pub fn supported_codecs(&self) -> &[Symbol] {
243 &self.supported_codecs
244 }
245
246 pub fn thread(&self) -> &ThreadMode {
248 &self.thread
249 }
250
251 pub fn site(&self) -> &Arc<dyn EvalSite> {
253 &self.site
254 }
255
256 pub fn isolation(&self) -> &IsolationPolicy {
258 &self.isolation
259 }
260
261 pub fn spec(&self) -> &[(Symbol, Expr)] {
263 &self.spec
264 }
265
266 pub fn runtime(&self) -> Option<&Arc<ServerRuntime>> {
268 self.runtime.as_ref()
269 }
270
271 pub fn status(&self) -> ServerStatus {
273 ServerStatus::from_u8(self.status.load(Ordering::Relaxed))
274 }
275
276 pub fn set_status(&self, status: ServerStatus) {
278 self.status.store(status.as_u8(), Ordering::Relaxed);
279 }
280
281 pub fn uptime_millis(&self) -> u64 {
283 self.started_at.elapsed().as_millis() as u64
284 }
285
286 pub fn register_trigger(&self, trigger: Arc<TriggerHandle>) -> Result<()> {
288 self.triggers
289 .lock()
290 .map_err(|_| sim_kernel::Error::PoisonedLock("server triggers"))?
291 .push(trigger);
292 Ok(())
293 }
294
295 pub fn stop_triggers(&self) -> Result<()> {
297 for trigger in self.trigger_snapshots()? {
298 trigger.stop()?;
299 }
300 Ok(())
301 }
302
303 pub fn deliver_trigger_frame(&self, cx: &mut Cx, frame: ServerFrame) -> Result<()> {
305 self.router.push_inbound(frame.clone())?;
306 let _ = self.site.answer(cx, frame)?;
307 Ok(())
308 }
309
310 pub fn trigger_snapshots(&self) -> Result<Vec<Arc<TriggerHandle>>> {
312 Ok(self
313 .triggers
314 .lock()
315 .map_err(|_| sim_kernel::Error::PoisonedLock("server triggers"))?
316 .clone())
317 }
318
319 pub fn reflect_value(&self, cx: &mut Cx) -> Result<Value> {
321 let mut entries = table_entries(self, cx)?;
322 entries.extend(live_state_entries(self, cx)?);
323 cx.factory().table(entries)
324 }
325
326 pub fn health_value(&self, cx: &mut Cx) -> Result<Value> {
328 let (sessions, connections, messages_sent, messages_received) = self
329 .runtime
330 .as_ref()
331 .map(|runtime| {
332 (
333 runtime.session_count(),
334 runtime.connection_count(),
335 runtime.messages_sent(),
336 runtime.messages_received(),
337 )
338 })
339 .unwrap_or((0, 0, 0, 0));
340 cx.factory().table(vec![
341 (
342 Symbol::new("status"),
343 cx.factory().symbol(self.status().as_symbol())?,
344 ),
345 (
346 Symbol::new("uptime"),
347 cx.factory().string(self.uptime_millis().to_string())?,
348 ),
349 (
350 Symbol::new("sessions"),
351 cx.factory().string(sessions.to_string())?,
352 ),
353 (
354 Symbol::new("connections"),
355 cx.factory().string(connections.to_string())?,
356 ),
357 (
358 Symbol::new("messages-sent"),
359 cx.factory().string(messages_sent.to_string())?,
360 ),
361 (
362 Symbol::new("messages-received"),
363 cx.factory().string(messages_received.to_string())?,
364 ),
365 ])
366 }
367
368 pub fn sessions_value(&self, cx: &mut Cx) -> Result<Value> {
370 let Some(runtime) = &self.runtime else {
371 return cx.factory().list(Vec::new());
372 };
373 let sessions = runtime
374 .sessions()?
375 .into_iter()
376 .map(|session| session.as_value(cx))
377 .collect::<Result<Vec<_>>>()?;
378 cx.factory().list(sessions)
379 }
380}
381
382impl Object for Server {
383 fn display(&self, _cx: &mut Cx) -> Result<String> {
384 Ok("#<server>".to_owned())
385 }
386
387 fn as_any(&self) -> &dyn std::any::Any {
388 self
389 }
390}
391
392impl sim_kernel::ObjectCompat for Server {
393 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
394 cx.factory().class_stub(
395 sim_kernel::ClassId(0),
396 Symbol::qualified("server", "Server"),
397 )
398 }
399 fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
400 self.as_table(cx)?.object().as_expr(cx)
401 }
402 fn as_table(&self, cx: &mut Cx) -> Result<Value> {
403 let mut entries = table_entries(self, cx)?;
404 entries.extend(live_state_entries(self, cx)?);
405 cx.factory().table(entries)
406 }
407}
408
409impl Clone for Server {
410 fn clone(&self) -> Self {
411 Self {
412 id: self.id,
413 name: self.name.clone(),
414 address: self.address.clone(),
415 default_codec: self.default_codec.clone(),
416 supported_codecs: self.supported_codecs.clone(),
417 thread: self.thread.clone(),
418 isolation: self.isolation.clone(),
419 status: AtomicU8::new(self.status().as_u8()),
420 site: self.site.clone(),
421 spec: self.spec.clone(),
422 router: self.router.clone(),
423 triggers: self.triggers.clone(),
424 runtime: self.runtime.clone(),
425 started_at: self.started_at,
426 }
427 }
428}
429
430fn table_entries(server: &Server, cx: &mut Cx) -> Result<Vec<(Symbol, Value)>> {
431 let name = match server.name() {
432 Some(name) => cx.factory().symbol(name.clone())?,
433 None => cx.factory().nil()?,
434 };
435 let spec_entries = server
436 .spec()
437 .iter()
438 .map(|(key, value)| {
439 cx.factory()
440 .expr(Expr::List(vec![Expr::Symbol(key.clone()), value.clone()]))
441 })
442 .collect::<Result<Vec<_>>>()?;
443 let spec = cx.factory().list(spec_entries)?;
444 let address = server.address.as_value(cx)?;
445 let default_codec = cx.factory().symbol(server.default_codec.clone())?;
446 let supported_codecs = symbol_list_value(cx, &server.supported_codecs)?;
447 let thread = cx.factory().expr(server.thread.as_expr())?;
448 let site_kind = cx.factory().string(server.site.site_kind().to_owned())?;
449 let site_address = server.site.address().as_value(cx)?;
450 let site_codecs = symbol_list_value(cx, server.site.codecs())?;
451 let isolation = server.isolation.as_value(cx)?;
452 let listening = cx.factory().bool(server.runtime.is_some())?;
453 let next_msg_id = cx
454 .factory()
455 .string(server.router.peek_next_msg_id().to_string())?;
456 Ok(vec![
457 (
458 Symbol::new("kind"),
459 cx.factory().symbol(Symbol::new("server"))?,
460 ),
461 (
462 Symbol::new("id"),
463 cx.factory().string(server.id.to_string())?,
464 ),
465 (Symbol::new("name"), name),
466 (Symbol::new("address"), address),
467 (Symbol::new("default-codec"), default_codec),
468 (Symbol::new("supported-codecs"), supported_codecs),
469 (Symbol::new("thread"), thread),
470 (Symbol::new("site-kind"), site_kind),
471 (Symbol::new("site-address"), site_address),
472 (Symbol::new("site-codecs"), site_codecs),
473 (Symbol::new("isolation"), isolation),
474 (Symbol::new("listening"), listening),
475 (Symbol::new("spec"), spec),
476 (Symbol::new("next-msg-id"), next_msg_id),
477 ])
478}
479
480fn live_state_entries(server: &Server, cx: &mut Cx) -> Result<Vec<(Symbol, Value)>> {
481 let trigger_values = server
482 .trigger_snapshots()?
483 .into_iter()
484 .map(|trigger| trigger.reflect_value(cx))
485 .collect::<Result<Vec<_>>>()?;
486 let triggers = cx.factory().list(trigger_values)?;
487 let (sessions, connections, messages_sent, messages_received) = server
488 .runtime
489 .as_ref()
490 .map(|runtime| {
491 (
492 runtime.session_count(),
493 runtime.connection_count(),
494 runtime.messages_sent(),
495 runtime.messages_received(),
496 )
497 })
498 .unwrap_or((0, 0, 0, 0));
499 Ok(vec![
500 (
501 Symbol::new("status"),
502 cx.factory().symbol(server.status().as_symbol())?,
503 ),
504 (
505 Symbol::new("uptime"),
506 cx.factory().string(server.uptime_millis().to_string())?,
507 ),
508 (
509 Symbol::new("sessions"),
510 cx.factory().string(sessions.to_string())?,
511 ),
512 (
513 Symbol::new("connections"),
514 cx.factory().string(connections.to_string())?,
515 ),
516 (
517 Symbol::new("messages-sent"),
518 cx.factory().string(messages_sent.to_string())?,
519 ),
520 (
521 Symbol::new("messages-received"),
522 cx.factory().string(messages_received.to_string())?,
523 ),
524 (Symbol::new("triggers"), triggers),
525 (Symbol::new("line-driver"), cx.factory().nil()?),
526 ])
527}