sim_lib_server/
runtime.rs1use std::thread::JoinHandle;
2use std::{
3 collections::BTreeMap,
4 sync::{
5 Arc, Mutex,
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 },
8};
9
10use sim_kernel::{Cx, Error, Result};
11
12use crate::{ConnectionTransport, IsolationPolicy, ServerTransport, Session, ThreadMode};
13
14pub struct ServerRuntime {
17 transport: Arc<dyn ServerTransport>,
18 thread_mode: ThreadMode,
19 sessions: Mutex<BTreeMap<u64, Session>>,
20 cx: Mutex<Cx>,
21 accept_thread: Mutex<Option<JoinHandle<()>>>,
22 worker_threads: Mutex<Vec<JoinHandle<()>>>,
23 next_session_id: AtomicU64,
24 connections: AtomicU64,
25 messages_sent: AtomicU64,
26 messages_received: AtomicU64,
27 max_inflight: usize,
28 session_isolation: IsolationPolicy,
29 stopping: AtomicBool,
30}
31
32impl ServerRuntime {
33 pub fn new(
35 transport: Arc<dyn ServerTransport>,
36 cx: Cx,
37 thread_mode: ThreadMode,
38 max_inflight: usize,
39 ) -> Self {
40 Self::new_with_isolation(
41 transport,
42 cx,
43 thread_mode,
44 max_inflight,
45 IsolationPolicy::default(),
46 )
47 }
48
49 pub fn new_with_isolation(
51 transport: Arc<dyn ServerTransport>,
52 cx: Cx,
53 thread_mode: ThreadMode,
54 max_inflight: usize,
55 session_isolation: IsolationPolicy,
56 ) -> Self {
57 Self {
58 transport,
59 thread_mode,
60 sessions: Mutex::new(BTreeMap::new()),
61 cx: Mutex::new(cx),
62 accept_thread: Mutex::new(None),
63 worker_threads: Mutex::new(Vec::new()),
64 next_session_id: AtomicU64::new(1),
65 connections: AtomicU64::new(0),
66 messages_sent: AtomicU64::new(0),
67 messages_received: AtomicU64::new(0),
68 max_inflight,
69 session_isolation,
70 stopping: AtomicBool::new(false),
71 }
72 }
73
74 pub fn transport(&self) -> &Arc<dyn ServerTransport> {
76 &self.transport
77 }
78
79 pub fn thread_mode(&self) -> &ThreadMode {
81 &self.thread_mode
82 }
83
84 pub fn session_count(&self) -> usize {
86 self.sessions
87 .lock()
88 .map(|sessions| sessions.len())
89 .unwrap_or(0)
90 }
91
92 pub fn sessions(&self) -> Result<Vec<Session>> {
94 let sessions = self
95 .sessions
96 .lock()
97 .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
98 Ok(sessions.values().cloned().collect())
99 }
100
101 pub fn begin_stop(&self) {
103 self.stopping.store(true, Ordering::SeqCst);
104 }
105
106 pub fn is_stopping(&self) -> bool {
108 self.stopping.load(Ordering::SeqCst)
109 }
110
111 pub fn open_session(
113 &self,
114 negotiated_codec: sim_kernel::Symbol,
115 isolation: crate::IsolationPolicy,
116 ) -> Result<u64> {
117 let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
118 self.connections.fetch_add(1, Ordering::Relaxed);
119 self.sessions
120 .lock()
121 .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
122 .insert(
123 session_id,
124 Session {
125 id: session_id,
126 negotiated_codec,
127 isolation,
128 closed: false,
129 },
130 );
131 Ok(session_id)
132 }
133
134 pub fn update_session_codec(
136 &self,
137 session_id: u64,
138 negotiated_codec: sim_kernel::Symbol,
139 ) -> Result<()> {
140 let mut sessions = self
141 .sessions
142 .lock()
143 .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
144 let Some(session) = sessions.get_mut(&session_id) else {
145 return Ok(());
146 };
147 session.negotiated_codec = negotiated_codec;
148 Ok(())
149 }
150
151 pub fn close_session(&self, session_id: u64) -> Result<()> {
153 self.sessions
154 .lock()
155 .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
156 .remove(&session_id);
157 Ok(())
158 }
159
160 pub fn clear_sessions(&self) -> Result<()> {
162 self.sessions
163 .lock()
164 .map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
165 .clear();
166 Ok(())
167 }
168
169 pub fn connection_count(&self) -> u64 {
171 self.connections.load(Ordering::Relaxed)
172 }
173
174 pub fn messages_sent(&self) -> u64 {
176 self.messages_sent.load(Ordering::Relaxed)
177 }
178
179 pub fn messages_received(&self) -> u64 {
181 self.messages_received.load(Ordering::Relaxed)
182 }
183
184 pub fn max_inflight(&self) -> usize {
186 self.max_inflight
187 }
188
189 pub fn session_isolation(&self) -> &IsolationPolicy {
191 &self.session_isolation
192 }
193
194 pub fn note_message_sent(&self) {
196 self.messages_sent.fetch_add(1, Ordering::Relaxed);
197 }
198
199 pub fn note_message_received(&self) {
201 self.messages_received.fetch_add(1, Ordering::Relaxed);
202 }
203
204 pub fn with_cx<T>(&self, f: impl FnOnce(&mut Cx) -> Result<T>) -> Result<T> {
206 let mut cx = self
207 .cx
208 .lock()
209 .map_err(|_| Error::HostError("server runtime cx mutex poisoned".to_owned()))?;
210 f(&mut cx)
211 }
212
213 pub fn set_accept_thread(&self, handle: JoinHandle<()>) -> Result<()> {
215 let mut slot = self.accept_thread.lock().map_err(|_| {
216 Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
217 })?;
218 *slot = Some(handle);
219 Ok(())
220 }
221
222 pub fn join_accept_thread(&self) -> Result<()> {
224 let handle = self
225 .accept_thread
226 .lock()
227 .map_err(|_| {
228 Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
229 })?
230 .take();
231 if let Some(handle) = handle {
232 let _ = handle.join();
233 }
234 Ok(())
235 }
236
237 pub fn register_worker_thread(&self, handle: JoinHandle<()>) -> Result<()> {
239 self.worker_threads
240 .lock()
241 .map_err(|_| {
242 Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
243 })?
244 .push(handle);
245 Ok(())
246 }
247
248 pub fn join_worker_threads(&self) -> Result<()> {
250 let handles = std::mem::take(&mut *self.worker_threads.lock().map_err(|_| {
251 Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
252 })?);
253 for handle in handles {
254 let _ = handle.join();
255 }
256 Ok(())
257 }
258
259 pub fn accept_timeout(
261 &self,
262 timeout: std::time::Duration,
263 ) -> Result<Option<Box<dyn ConnectionTransport>>> {
264 self.with_cx(|cx| self.transport.accept_timeout(cx, timeout))
265 }
266}