Skip to main content

sim_lib_server/
runtime.rs

1use std::thread::JoinHandle;
2use std::{
3    collections::BTreeMap,
4    sync::{
5        Arc, Mutex,
6        atomic::{AtomicBool, AtomicU64, Ordering},
7    },
8};
9
10use sim_kernel::{Cx, Error, Result};
11
12use crate::{ConnectionTransport, IsolationPolicy, ServerTransport, Session, ThreadMode};
13
14/// Server-side runtime: owns the transport, tracks live sessions and worker threads, and
15/// records connection and message counters.
16pub struct ServerRuntime {
17    transport: Arc<dyn ServerTransport>,
18    thread_mode: ThreadMode,
19    sessions: Mutex<BTreeMap<u64, Session>>,
20    cx: Mutex<Cx>,
21    accept_thread: Mutex<Option<JoinHandle<()>>>,
22    worker_threads: Mutex<Vec<JoinHandle<()>>>,
23    next_session_id: AtomicU64,
24    connections: AtomicU64,
25    messages_sent: AtomicU64,
26    messages_received: AtomicU64,
27    max_inflight: usize,
28    session_isolation: IsolationPolicy,
29    stopping: AtomicBool,
30}
31
32impl ServerRuntime {
33    /// Creates a runtime with the default session isolation policy.
34    pub fn new(
35        transport: Arc<dyn ServerTransport>,
36        cx: Cx,
37        thread_mode: ThreadMode,
38        max_inflight: usize,
39    ) -> Self {
40        Self::new_with_isolation(
41            transport,
42            cx,
43            thread_mode,
44            max_inflight,
45            IsolationPolicy::default(),
46        )
47    }
48
49    /// Creates a runtime with an explicit session isolation policy applied to new sessions.
50    pub fn new_with_isolation(
51        transport: Arc<dyn ServerTransport>,
52        cx: Cx,
53        thread_mode: ThreadMode,
54        max_inflight: usize,
55        session_isolation: IsolationPolicy,
56    ) -> Self {
57        Self {
58            transport,
59            thread_mode,
60            sessions: Mutex::new(BTreeMap::new()),
61            cx: Mutex::new(cx),
62            accept_thread: Mutex::new(None),
63            worker_threads: Mutex::new(Vec::new()),
64            next_session_id: AtomicU64::new(1),
65            connections: AtomicU64::new(0),
66            messages_sent: AtomicU64::new(0),
67            messages_received: AtomicU64::new(0),
68            max_inflight,
69            session_isolation,
70            stopping: AtomicBool::new(false),
71        }
72    }
73
74    /// Returns the transport this runtime accepts connections on.
75    pub fn transport(&self) -> &Arc<dyn ServerTransport> {
76        &self.transport
77    }
78
79    /// Returns the threading mode used to service connections.
80    pub fn thread_mode(&self) -> &ThreadMode {
81        &self.thread_mode
82    }
83
84    /// Returns the number of currently open sessions.
85    pub fn session_count(&self) -> usize {
86        self.sessions
87            .lock()
88            .map(|sessions| sessions.len())
89            .unwrap_or(0)
90    }
91
92    /// Returns a snapshot of all open sessions.
93    pub fn sessions(&self) -> Result<Vec<Session>> {
94        let sessions = self
95            .sessions
96            .lock()
97            .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
98        Ok(sessions.values().cloned().collect())
99    }
100
101    /// Signals that the runtime should stop accepting and servicing connections.
102    pub fn begin_stop(&self) {
103        self.stopping.store(true, Ordering::SeqCst);
104    }
105
106    /// Returns whether a stop has been requested.
107    pub fn is_stopping(&self) -> bool {
108        self.stopping.load(Ordering::SeqCst)
109    }
110
111    /// Opens a new session with the given codec and isolation policy, returning its id.
112    pub fn open_session(
113        &self,
114        negotiated_codec: sim_kernel::Symbol,
115        isolation: crate::IsolationPolicy,
116    ) -> Result<u64> {
117        let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
118        self.connections.fetch_add(1, Ordering::Relaxed);
119        self.sessions
120            .lock()
121            .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
122            .insert(
123                session_id,
124                Session {
125                    id: session_id,
126                    negotiated_codec,
127                    isolation,
128                    closed: false,
129                },
130            );
131        Ok(session_id)
132    }
133
134    /// Updates the negotiated codec for an existing session; a no-op if the session is gone.
135    pub fn update_session_codec(
136        &self,
137        session_id: u64,
138        negotiated_codec: sim_kernel::Symbol,
139    ) -> Result<()> {
140        let mut sessions = self
141            .sessions
142            .lock()
143            .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
144        let Some(session) = sessions.get_mut(&session_id) else {
145            return Ok(());
146        };
147        session.negotiated_codec = negotiated_codec;
148        Ok(())
149    }
150
151    /// Removes the session with `session_id` from the runtime.
152    pub fn close_session(&self, session_id: u64) -> Result<()> {
153        self.sessions
154            .lock()
155            .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
156            .remove(&session_id);
157        Ok(())
158    }
159
160    /// Removes all sessions from the runtime.
161    pub fn clear_sessions(&self) -> Result<()> {
162        self.sessions
163            .lock()
164            .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
165            .clear();
166        Ok(())
167    }
168
169    /// Returns the total number of connections opened over the runtime's lifetime.
170    pub fn connection_count(&self) -> u64 {
171        self.connections.load(Ordering::Relaxed)
172    }
173
174    /// Returns the total number of messages sent.
175    pub fn messages_sent(&self) -> u64 {
176        self.messages_sent.load(Ordering::Relaxed)
177    }
178
179    /// Returns the total number of messages received.
180    pub fn messages_received(&self) -> u64 {
181        self.messages_received.load(Ordering::Relaxed)
182    }
183
184    /// Returns the maximum number of in-flight requests permitted.
185    pub fn max_inflight(&self) -> usize {
186        self.max_inflight
187    }
188
189    /// Returns the isolation policy applied to new sessions.
190    pub fn session_isolation(&self) -> &IsolationPolicy {
191        &self.session_isolation
192    }
193
194    /// Increments the sent-message counter.
195    pub fn note_message_sent(&self) {
196        self.messages_sent.fetch_add(1, Ordering::Relaxed);
197    }
198
199    /// Increments the received-message counter.
200    pub fn note_message_received(&self) {
201        self.messages_received.fetch_add(1, Ordering::Relaxed);
202    }
203
204    /// Runs `f` with exclusive access to the runtime's shared evaluation context.
205    pub fn with_cx<T>(&self, f: impl FnOnce(&mut Cx) -> Result<T>) -> Result<T> {
206        let mut cx = self
207            .cx
208            .lock()
209            .map_err(|_| Error::HostError("server runtime cx mutex poisoned".to_owned()))?;
210        f(&mut cx)
211    }
212
213    /// Stores the join handle for the connection-accept thread.
214    pub fn set_accept_thread(&self, handle: JoinHandle<()>) -> Result<()> {
215        let mut slot = self.accept_thread.lock().map_err(|_| {
216            Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
217        })?;
218        *slot = Some(handle);
219        Ok(())
220    }
221
222    /// Joins the accept thread if one is registered.
223    pub fn join_accept_thread(&self) -> Result<()> {
224        let handle = self
225            .accept_thread
226            .lock()
227            .map_err(|_| {
228                Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
229            })?
230            .take();
231        if let Some(handle) = handle {
232            let _ = handle.join();
233        }
234        Ok(())
235    }
236
237    /// Records a worker thread's join handle for later cleanup.
238    pub fn register_worker_thread(&self, handle: JoinHandle<()>) -> Result<()> {
239        self.worker_threads
240            .lock()
241            .map_err(|_| {
242                Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
243            })?
244            .push(handle);
245        Ok(())
246    }
247
248    /// Joins and drains all registered worker threads.
249    pub fn join_worker_threads(&self) -> Result<()> {
250        let handles = std::mem::take(&mut *self.worker_threads.lock().map_err(|_| {
251            Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
252        })?);
253        for handle in handles {
254            let _ = handle.join();
255        }
256        Ok(())
257    }
258
259    /// Waits up to `timeout` for an incoming connection, returning its transport if one arrives.
260    pub fn accept_timeout(
261        &self,
262        timeout: std::time::Duration,
263    ) -> Result<Option<Box<dyn ConnectionTransport>>> {
264        self.with_cx(|cx| self.transport.accept_timeout(cx, timeout))
265    }
266}