Skip to main content

sim_lib_server/
connection.rs

1use std::{
2    sync::{Arc, Mutex},
3    time::Duration,
4};
5
6use sim_citizen_derive::non_citizen;
7use sim_kernel::{
8    CapabilityName, ClassRef, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest,
9    Expr, Object, Result, Symbol, Value, eval_remote_capability,
10};
11
12use crate::{
13    EvalSite, FrameKind, FrameRouter, IsolationPolicy, ServerAddress, ServerFrame,
14    eval_reply_from_frame, server_frame_from_request, symbol_list_value,
15};
16
17/// State of a single server connection session.
18#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct Session {
20    /// Session id, unique within the owning runtime.
21    pub id: u64,
22    /// Codec negotiated for this session's frames.
23    pub negotiated_codec: Symbol,
24    /// Isolation policy applied to evaluation in this session.
25    pub isolation: IsolationPolicy,
26    /// Whether the session has been closed.
27    pub closed: bool,
28}
29
30impl Session {
31    pub(crate) fn as_value(&self, cx: &mut Cx) -> Result<Value> {
32        let isolation = self.isolation.as_value(cx)?;
33        cx.factory().table(vec![
34            (Symbol::new("id"), cx.factory().string(self.id.to_string())?),
35            (
36                Symbol::new("negotiated-codec"),
37                cx.factory().symbol(self.negotiated_codec.clone())?,
38            ),
39            (Symbol::new("isolation"), isolation),
40            (Symbol::new("closed"), cx.factory().bool(self.closed)?),
41        ])
42    }
43}
44
45#[derive(Clone)]
46#[non_citizen(
47    reason = "live server connection handle; reconstruct via server/Address descriptor and connect ops",
48    kind = "handle"
49)]
50/// Live client handle to a server endpoint, evaluating expressions over a transport [`EvalSite`].
51pub struct Connection {
52    address: ServerAddress,
53    default_codec: Symbol,
54    supported_codecs: Vec<Symbol>,
55    site: Arc<dyn EvalSite>,
56    session: Arc<Mutex<Session>>,
57    router: Arc<FrameRouter>,
58    role: Option<Symbol>,
59}
60
61impl Connection {
62    /// Opens a connection with default role and isolation policy.
63    pub fn new(
64        address: ServerAddress,
65        default_codec: Symbol,
66        supported_codecs: Vec<Symbol>,
67        site: Arc<dyn EvalSite>,
68    ) -> Result<Self> {
69        Self::with_session(
70            address,
71            default_codec,
72            supported_codecs,
73            site,
74            None,
75            IsolationPolicy::default(),
76        )
77    }
78
79    /// Opens a connection with an explicit `role` and `isolation` policy, verifying the
80    /// address transport is available before constructing the initial session.
81    pub fn with_session(
82        address: ServerAddress,
83        default_codec: Symbol,
84        supported_codecs: Vec<Symbol>,
85        site: Arc<dyn EvalSite>,
86        role: Option<Symbol>,
87        isolation: IsolationPolicy,
88    ) -> Result<Self> {
89        address.ensure_transport_available()?;
90        Ok(Self {
91            address,
92            default_codec: default_codec.clone(),
93            supported_codecs,
94            site,
95            session: Arc::new(Mutex::new(Session {
96                id: 0,
97                negotiated_codec: default_codec,
98                isolation,
99                closed: false,
100            })),
101            router: Arc::new(FrameRouter::default()),
102            role,
103        })
104    }
105
106    /// Returns the remote address this connection targets.
107    pub fn address(&self) -> &ServerAddress {
108        &self.address
109    }
110
111    /// Returns the codec used by default for outbound frames.
112    pub fn default_codec(&self) -> &Symbol {
113        &self.default_codec
114    }
115
116    /// Returns the codecs this connection is willing to negotiate.
117    pub fn supported_codecs(&self) -> &[Symbol] {
118        &self.supported_codecs
119    }
120
121    /// Returns the underlying eval site that carries frames.
122    pub fn site(&self) -> &Arc<dyn EvalSite> {
123        &self.site
124    }
125
126    /// Returns a snapshot of the current session, or a closed placeholder if the lock is poisoned.
127    pub fn session(&self) -> Session {
128        self.session
129            .lock()
130            .map(|session| session.clone())
131            .unwrap_or(Session {
132                id: 0,
133                negotiated_codec: self.default_codec.clone(),
134                isolation: IsolationPolicy::default(),
135                closed: true,
136            })
137    }
138
139    /// Returns the role symbol attached to outbound frames, if any.
140    pub fn role(&self) -> Option<&Symbol> {
141        self.role.as_ref()
142    }
143
144    /// Evaluates `expr` remotely and returns its result value, using the connection's
145    /// default consistency.
146    pub fn request(
147        &self,
148        cx: &mut Cx,
149        expr: Expr,
150        timeout: Option<Duration>,
151        required_capabilities: Vec<CapabilityName>,
152    ) -> Result<Value> {
153        self.request_with_consistency(
154            cx,
155            expr,
156            timeout,
157            required_capabilities,
158            self.default_consistency(),
159        )
160    }
161
162    pub(crate) fn request_with_consistency(
163        &self,
164        cx: &mut Cx,
165        expr: Expr,
166        timeout: Option<Duration>,
167        required_capabilities: Vec<CapabilityName>,
168        consistency: Consistency,
169    ) -> Result<Value> {
170        self.enforce_consistency(consistency)?;
171        let request = EvalRequest {
172            expr,
173            result_shape: None,
174            required_capabilities,
175            deadline: timeout,
176            consistency,
177            mode: EvalMode::Eval,
178            answer_limit: None,
179            stream_buffer: None,
180            stream: false,
181            trace: false,
182        };
183        let reply = self.realize(cx, request)?;
184        Ok(reply.value)
185    }
186
187    /// Sends `expr` as a request frame without awaiting a synchronous result, returning the
188    /// assigned message id for later correlation. Uses the connection's default consistency.
189    pub fn send(
190        &self,
191        cx: &mut Cx,
192        expr: Expr,
193        codec: Symbol,
194        deadline: Option<Duration>,
195        required_capabilities: Vec<CapabilityName>,
196        reply_codec_hint: Option<Symbol>,
197    ) -> Result<u64> {
198        self.send_with_consistency(
199            cx,
200            expr,
201            codec,
202            deadline,
203            required_capabilities,
204            reply_codec_hint,
205            self.default_consistency(),
206        )
207    }
208
209    #[allow(clippy::too_many_arguments)]
210    pub(crate) fn send_with_consistency(
211        &self,
212        cx: &mut Cx,
213        expr: Expr,
214        codec: Symbol,
215        deadline: Option<Duration>,
216        required_capabilities: Vec<CapabilityName>,
217        reply_codec_hint: Option<Symbol>,
218        consistency: Consistency,
219    ) -> Result<u64> {
220        self.enforce_consistency(consistency)?;
221        self.enforce_remote_capability(cx)?;
222        let msg_id = self.router.fresh_msg_id();
223        let mut frame = server_frame_from_request(
224            cx,
225            &codec,
226            EvalRequest {
227                expr,
228                result_shape: None,
229                required_capabilities,
230                deadline,
231                consistency,
232                mode: EvalMode::Eval,
233                answer_limit: None,
234                stream_buffer: None,
235                stream: false,
236                trace: false,
237            },
238        )?;
239        frame.kind = FrameKind::Request;
240        frame.envelope.reply_codec_hint = reply_codec_hint;
241        frame.envelope.role = self.role.clone();
242        frame.msg_id = Some(msg_id);
243
244        let mut reply = self.site.answer(cx, frame)?;
245        if reply.correlate.is_none() {
246            reply.correlate = Some(msg_id);
247        }
248        self.router.push_inbound(reply)?;
249        Ok(msg_id)
250    }
251
252    /// Sends `expr` as a one-way notify frame, expecting no correlated reply.
253    pub fn notify(
254        &self,
255        cx: &mut Cx,
256        expr: Expr,
257        codec: Symbol,
258        deadline: Option<Duration>,
259        required_capabilities: Vec<CapabilityName>,
260    ) -> Result<()> {
261        self.enforce_remote_capability(cx)?;
262        let mut frame = ServerFrame::from_expr(
263            cx,
264            codec,
265            FrameKind::Notify,
266            &expr,
267            self.default_consistency(),
268            required_capabilities,
269            false,
270        )?;
271        frame.envelope.deadline = deadline;
272        frame.envelope.role = self.role.clone();
273        let _ = self.site.answer(cx, frame)?;
274        Ok(())
275    }
276
277    /// Pops the next buffered inbound frame, if one is queued.
278    ///
279    /// Returns [`Error::PoisonedLock`] when the router's queue mutex is
280    /// poisoned, rather than panicking.
281    pub fn receive(&self, _timeout: Option<Duration>) -> Result<Option<ServerFrame>> {
282        self.router.pop_inbound()
283    }
284
285    /// Closes the connection and marks its session closed.
286    pub fn close(&self, cx: &mut Cx) -> Result<()> {
287        self.site.close_connection(cx)?;
288        if let Ok(mut session) = self.session.lock() {
289            session.closed = true;
290        }
291        Ok(())
292    }
293}
294
295impl Object for Connection {
296    fn display(&self, _cx: &mut Cx) -> Result<String> {
297        Ok("#<server-connection>".to_owned())
298    }
299
300    fn as_any(&self) -> &dyn std::any::Any {
301        self
302    }
303}
304
305impl sim_kernel::ObjectCompat for Connection {
306    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
307        cx.factory().class_stub(
308            sim_kernel::ClassId(0),
309            Symbol::qualified("server", "Connection"),
310        )
311    }
312    fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
313        self.as_table(cx)?.object().as_expr(cx)
314    }
315    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
316        let address = self.address.as_value(cx)?;
317        let default_codec = cx.factory().symbol(self.default_codec.clone())?;
318        let supported_codecs = symbol_list_value(cx, &self.supported_codecs)?;
319        let site_kind = cx.factory().string(self.site.site_kind().to_owned())?;
320        let site_address = self.site.address().as_value(cx)?;
321        let site_codecs = symbol_list_value(cx, self.site.codecs())?;
322        let session = self
323            .session
324            .lock()
325            .map_err(|_| Error::HostError("connection session mutex poisoned".to_owned()))?
326            .as_value(cx)?;
327        let next_msg_id = cx
328            .factory()
329            .string(self.router.peek_next_msg_id().to_string())?;
330        let role = match &self.role {
331            Some(role) => cx.factory().symbol(role.clone())?,
332            None => cx.factory().nil()?,
333        };
334        cx.factory().table(vec![
335            (
336                Symbol::new("kind"),
337                cx.factory().symbol(Symbol::new("connection"))?,
338            ),
339            (Symbol::new("address"), address),
340            (Symbol::new("default-codec"), default_codec),
341            (Symbol::new("supported-codecs"), supported_codecs),
342            (Symbol::new("site-kind"), site_kind),
343            (Symbol::new("site-address"), site_address),
344            (Symbol::new("site-codecs"), site_codecs),
345            (Symbol::new("session"), session),
346            (Symbol::new("role"), role),
347            (Symbol::new("next-msg-id"), next_msg_id),
348        ])
349    }
350    fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
351        Some(self)
352    }
353}
354
355impl EvalFabric for Connection {
356    fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
357        self.enforce_remote_capability(cx)?;
358        let mut frame = server_frame_from_request(cx, self.default_codec(), request)?;
359        frame.msg_id = Some(self.router.fresh_msg_id());
360        frame.envelope.role = self.role.clone();
361        let reply = self
362            .site
363            .answer_with_timeout(cx, frame.clone(), frame.envelope.deadline)?;
364        eval_reply_from_frame(cx, &reply)
365    }
366}
367
368impl Connection {
369    fn requires_remote_capability(&self) -> bool {
370        self.address.is_remote_like() || self.site.address().is_remote_like()
371    }
372
373    pub(crate) fn default_consistency(&self) -> Consistency {
374        if self.requires_remote_capability() {
375            Consistency::RemoteOnly
376        } else {
377            Consistency::LocalFirst
378        }
379    }
380
381    fn enforce_remote_capability(&self, cx: &mut Cx) -> Result<()> {
382        if self.requires_remote_capability() {
383            cx.require(&eval_remote_capability())?;
384        }
385        Ok(())
386    }
387
388    pub(crate) fn enforce_consistency(&self, consistency: Consistency) -> Result<()> {
389        match consistency {
390            Consistency::LocalOnly if self.requires_remote_capability() => {
391                Err(Error::Eval(format!(
392                    "consistency local-only refuses remote address kind {}",
393                    self.address.kind_symbol()
394                )))
395            }
396            Consistency::RemoteOnly if !self.requires_remote_capability() => {
397                Err(Error::Eval(format!(
398                    "consistency remote-only refuses local address kind {}",
399                    self.address.kind_symbol()
400                )))
401            }
402            _ => Ok(()),
403        }
404    }
405}