use std::thread::JoinHandle;
use std::{
collections::BTreeMap,
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
},
};
use sim_kernel::{Cx, Error, Result};
use crate::{ConnectionTransport, IsolationPolicy, ServerTransport, Session, ThreadMode};
pub struct ServerRuntime {
transport: Arc<dyn ServerTransport>,
thread_mode: ThreadMode,
sessions: Mutex<BTreeMap<u64, Session>>,
cx: Mutex<Cx>,
accept_thread: Mutex<Option<JoinHandle<()>>>,
worker_threads: Mutex<Vec<JoinHandle<()>>>,
next_session_id: AtomicU64,
connections: AtomicU64,
messages_sent: AtomicU64,
messages_received: AtomicU64,
max_inflight: usize,
session_isolation: IsolationPolicy,
stopping: AtomicBool,
}
impl ServerRuntime {
pub fn new(
transport: Arc<dyn ServerTransport>,
cx: Cx,
thread_mode: ThreadMode,
max_inflight: usize,
) -> Self {
Self::new_with_isolation(
transport,
cx,
thread_mode,
max_inflight,
IsolationPolicy::default(),
)
}
pub fn new_with_isolation(
transport: Arc<dyn ServerTransport>,
cx: Cx,
thread_mode: ThreadMode,
max_inflight: usize,
session_isolation: IsolationPolicy,
) -> Self {
Self {
transport,
thread_mode,
sessions: Mutex::new(BTreeMap::new()),
cx: Mutex::new(cx),
accept_thread: Mutex::new(None),
worker_threads: Mutex::new(Vec::new()),
next_session_id: AtomicU64::new(1),
connections: AtomicU64::new(0),
messages_sent: AtomicU64::new(0),
messages_received: AtomicU64::new(0),
max_inflight,
session_isolation,
stopping: AtomicBool::new(false),
}
}
pub fn transport(&self) -> &Arc<dyn ServerTransport> {
&self.transport
}
pub fn thread_mode(&self) -> &ThreadMode {
&self.thread_mode
}
pub fn session_count(&self) -> usize {
self.sessions
.lock()
.map(|sessions| sessions.len())
.unwrap_or(0)
}
pub fn sessions(&self) -> Result<Vec<Session>> {
let sessions = self
.sessions
.lock()
.map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
Ok(sessions.values().cloned().collect())
}
pub fn begin_stop(&self) {
self.stopping.store(true, Ordering::SeqCst);
}
pub fn is_stopping(&self) -> bool {
self.stopping.load(Ordering::SeqCst)
}
pub fn open_session(
&self,
negotiated_codec: sim_kernel::Symbol,
isolation: crate::IsolationPolicy,
) -> Result<u64> {
let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
self.connections.fetch_add(1, Ordering::Relaxed);
self.sessions
.lock()
.map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
.insert(
session_id,
Session {
id: session_id,
negotiated_codec,
isolation,
closed: false,
},
);
Ok(session_id)
}
pub fn update_session_codec(
&self,
session_id: u64,
negotiated_codec: sim_kernel::Symbol,
) -> Result<()> {
let mut sessions = self
.sessions
.lock()
.map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?;
let Some(session) = sessions.get_mut(&session_id) else {
return Ok(());
};
session.negotiated_codec = negotiated_codec;
Ok(())
}
pub fn close_session(&self, session_id: u64) -> Result<()> {
self.sessions
.lock()
.map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
.remove(&session_id);
Ok(())
}
pub fn clear_sessions(&self) -> Result<()> {
self.sessions
.lock()
.map_err(|_| Error::HostError("server runtime session mutex poisoned".to_owned()))?
.clear();
Ok(())
}
pub fn connection_count(&self) -> u64 {
self.connections.load(Ordering::Relaxed)
}
pub fn messages_sent(&self) -> u64 {
self.messages_sent.load(Ordering::Relaxed)
}
pub fn messages_received(&self) -> u64 {
self.messages_received.load(Ordering::Relaxed)
}
pub fn max_inflight(&self) -> usize {
self.max_inflight
}
pub fn session_isolation(&self) -> &IsolationPolicy {
&self.session_isolation
}
pub fn note_message_sent(&self) {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn note_message_received(&self) {
self.messages_received.fetch_add(1, Ordering::Relaxed);
}
pub fn with_cx<T>(&self, f: impl FnOnce(&mut Cx) -> Result<T>) -> Result<T> {
let mut cx = self
.cx
.lock()
.map_err(|_| Error::HostError("server runtime cx mutex poisoned".to_owned()))?;
f(&mut cx)
}
pub fn set_accept_thread(&self, handle: JoinHandle<()>) -> Result<()> {
let mut slot = self.accept_thread.lock().map_err(|_| {
Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
})?;
*slot = Some(handle);
Ok(())
}
pub fn join_accept_thread(&self) -> Result<()> {
let handle = self
.accept_thread
.lock()
.map_err(|_| {
Error::HostError("server runtime accept-thread mutex poisoned".to_owned())
})?
.take();
if let Some(handle) = handle {
let _ = handle.join();
}
Ok(())
}
pub fn register_worker_thread(&self, handle: JoinHandle<()>) -> Result<()> {
self.worker_threads
.lock()
.map_err(|_| {
Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
})?
.push(handle);
Ok(())
}
pub fn join_worker_threads(&self) -> Result<()> {
let handles = std::mem::take(&mut *self.worker_threads.lock().map_err(|_| {
Error::HostError("server runtime worker-thread mutex poisoned".to_owned())
})?);
for handle in handles {
let _ = handle.join();
}
Ok(())
}
pub fn accept_timeout(
&self,
timeout: std::time::Duration,
) -> Result<Option<Box<dyn ConnectionTransport>>> {
self.with_cx(|cx| self.transport.accept_timeout(cx, timeout))
}
}