1use std::{
2 sync::{Arc, Mutex},
3 time::Duration,
4};
5
6use sim_citizen_derive::non_citizen;
7use sim_kernel::{
8 CapabilityName, ClassRef, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest,
9 Expr, Object, Result, Symbol, Value, eval_remote_capability,
10};
11
12use crate::{
13 EvalSite, FrameKind, FrameRouter, IsolationPolicy, ServerAddress, ServerFrame,
14 eval_reply_from_frame, server_frame_from_request, symbol_list_value,
15};
16
17#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct Session {
20 pub id: u64,
22 pub negotiated_codec: Symbol,
24 pub isolation: IsolationPolicy,
26 pub closed: bool,
28}
29
30impl Session {
31 pub(crate) fn as_value(&self, cx: &mut Cx) -> Result<Value> {
32 let isolation = self.isolation.as_value(cx)?;
33 cx.factory().table(vec![
34 (Symbol::new("id"), cx.factory().string(self.id.to_string())?),
35 (
36 Symbol::new("negotiated-codec"),
37 cx.factory().symbol(self.negotiated_codec.clone())?,
38 ),
39 (Symbol::new("isolation"), isolation),
40 (Symbol::new("closed"), cx.factory().bool(self.closed)?),
41 ])
42 }
43}
44
45#[derive(Clone)]
46#[non_citizen(
47 reason = "live server connection handle; reconstruct via server/Address descriptor and connect ops",
48 kind = "handle"
49)]
50pub struct Connection {
52 address: ServerAddress,
53 default_codec: Symbol,
54 supported_codecs: Vec<Symbol>,
55 site: Arc<dyn EvalSite>,
56 session: Arc<Mutex<Session>>,
57 router: Arc<FrameRouter>,
58 role: Option<Symbol>,
59}
60
61impl Connection {
62 pub fn new(
64 address: ServerAddress,
65 default_codec: Symbol,
66 supported_codecs: Vec<Symbol>,
67 site: Arc<dyn EvalSite>,
68 ) -> Result<Self> {
69 Self::with_session(
70 address,
71 default_codec,
72 supported_codecs,
73 site,
74 None,
75 IsolationPolicy::default(),
76 )
77 }
78
79 pub fn with_session(
82 address: ServerAddress,
83 default_codec: Symbol,
84 supported_codecs: Vec<Symbol>,
85 site: Arc<dyn EvalSite>,
86 role: Option<Symbol>,
87 isolation: IsolationPolicy,
88 ) -> Result<Self> {
89 address.ensure_transport_available()?;
90 Ok(Self {
91 address,
92 default_codec: default_codec.clone(),
93 supported_codecs,
94 site,
95 session: Arc::new(Mutex::new(Session {
96 id: 0,
97 negotiated_codec: default_codec,
98 isolation,
99 closed: false,
100 })),
101 router: Arc::new(FrameRouter::default()),
102 role,
103 })
104 }
105
106 pub fn address(&self) -> &ServerAddress {
108 &self.address
109 }
110
111 pub fn default_codec(&self) -> &Symbol {
113 &self.default_codec
114 }
115
116 pub fn supported_codecs(&self) -> &[Symbol] {
118 &self.supported_codecs
119 }
120
121 pub fn site(&self) -> &Arc<dyn EvalSite> {
123 &self.site
124 }
125
126 pub fn session(&self) -> Session {
128 self.session
129 .lock()
130 .map(|session| session.clone())
131 .unwrap_or(Session {
132 id: 0,
133 negotiated_codec: self.default_codec.clone(),
134 isolation: IsolationPolicy::default(),
135 closed: true,
136 })
137 }
138
139 pub fn role(&self) -> Option<&Symbol> {
141 self.role.as_ref()
142 }
143
144 pub fn request(
147 &self,
148 cx: &mut Cx,
149 expr: Expr,
150 timeout: Option<Duration>,
151 required_capabilities: Vec<CapabilityName>,
152 ) -> Result<Value> {
153 self.request_with_consistency(
154 cx,
155 expr,
156 timeout,
157 required_capabilities,
158 self.default_consistency(),
159 )
160 }
161
162 pub(crate) fn request_with_consistency(
163 &self,
164 cx: &mut Cx,
165 expr: Expr,
166 timeout: Option<Duration>,
167 required_capabilities: Vec<CapabilityName>,
168 consistency: Consistency,
169 ) -> Result<Value> {
170 self.enforce_consistency(consistency)?;
171 let request = EvalRequest {
172 expr,
173 result_shape: None,
174 required_capabilities,
175 deadline: timeout,
176 consistency,
177 mode: EvalMode::Eval,
178 answer_limit: None,
179 stream_buffer: None,
180 stream: false,
181 trace: false,
182 };
183 let reply = self.realize(cx, request)?;
184 Ok(reply.value)
185 }
186
187 pub fn send(
190 &self,
191 cx: &mut Cx,
192 expr: Expr,
193 codec: Symbol,
194 deadline: Option<Duration>,
195 required_capabilities: Vec<CapabilityName>,
196 reply_codec_hint: Option<Symbol>,
197 ) -> Result<u64> {
198 self.send_with_consistency(
199 cx,
200 expr,
201 codec,
202 deadline,
203 required_capabilities,
204 reply_codec_hint,
205 self.default_consistency(),
206 )
207 }
208
209 #[allow(clippy::too_many_arguments)]
210 pub(crate) fn send_with_consistency(
211 &self,
212 cx: &mut Cx,
213 expr: Expr,
214 codec: Symbol,
215 deadline: Option<Duration>,
216 required_capabilities: Vec<CapabilityName>,
217 reply_codec_hint: Option<Symbol>,
218 consistency: Consistency,
219 ) -> Result<u64> {
220 self.enforce_consistency(consistency)?;
221 self.enforce_remote_capability(cx)?;
222 let msg_id = self.router.fresh_msg_id();
223 let mut frame = server_frame_from_request(
224 cx,
225 &codec,
226 EvalRequest {
227 expr,
228 result_shape: None,
229 required_capabilities,
230 deadline,
231 consistency,
232 mode: EvalMode::Eval,
233 answer_limit: None,
234 stream_buffer: None,
235 stream: false,
236 trace: false,
237 },
238 )?;
239 frame.kind = FrameKind::Request;
240 frame.envelope.reply_codec_hint = reply_codec_hint;
241 frame.envelope.role = self.role.clone();
242 frame.msg_id = Some(msg_id);
243
244 let mut reply = self.site.answer(cx, frame)?;
245 if reply.correlate.is_none() {
246 reply.correlate = Some(msg_id);
247 }
248 self.router.push_inbound(reply)?;
249 Ok(msg_id)
250 }
251
252 pub fn notify(
254 &self,
255 cx: &mut Cx,
256 expr: Expr,
257 codec: Symbol,
258 deadline: Option<Duration>,
259 required_capabilities: Vec<CapabilityName>,
260 ) -> Result<()> {
261 self.enforce_remote_capability(cx)?;
262 let mut frame = ServerFrame::from_expr(
263 cx,
264 codec,
265 FrameKind::Notify,
266 &expr,
267 self.default_consistency(),
268 required_capabilities,
269 false,
270 )?;
271 frame.envelope.deadline = deadline;
272 frame.envelope.role = self.role.clone();
273 let _ = self.site.answer(cx, frame)?;
274 Ok(())
275 }
276
277 pub fn receive(&self, _timeout: Option<Duration>) -> Result<Option<ServerFrame>> {
282 self.router.pop_inbound()
283 }
284
285 pub fn close(&self, cx: &mut Cx) -> Result<()> {
287 self.site.close_connection(cx)?;
288 if let Ok(mut session) = self.session.lock() {
289 session.closed = true;
290 }
291 Ok(())
292 }
293}
294
295impl Object for Connection {
296 fn display(&self, _cx: &mut Cx) -> Result<String> {
297 Ok("#<server-connection>".to_owned())
298 }
299
300 fn as_any(&self) -> &dyn std::any::Any {
301 self
302 }
303}
304
305impl sim_kernel::ObjectCompat for Connection {
306 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
307 cx.factory().class_stub(
308 sim_kernel::ClassId(0),
309 Symbol::qualified("server", "Connection"),
310 )
311 }
312 fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
313 self.as_table(cx)?.object().as_expr(cx)
314 }
315 fn as_table(&self, cx: &mut Cx) -> Result<Value> {
316 let address = self.address.as_value(cx)?;
317 let default_codec = cx.factory().symbol(self.default_codec.clone())?;
318 let supported_codecs = symbol_list_value(cx, &self.supported_codecs)?;
319 let site_kind = cx.factory().string(self.site.site_kind().to_owned())?;
320 let site_address = self.site.address().as_value(cx)?;
321 let site_codecs = symbol_list_value(cx, self.site.codecs())?;
322 let session = self
323 .session
324 .lock()
325 .map_err(|_| Error::HostError("connection session mutex poisoned".to_owned()))?
326 .as_value(cx)?;
327 let next_msg_id = cx
328 .factory()
329 .string(self.router.peek_next_msg_id().to_string())?;
330 let role = match &self.role {
331 Some(role) => cx.factory().symbol(role.clone())?,
332 None => cx.factory().nil()?,
333 };
334 cx.factory().table(vec![
335 (
336 Symbol::new("kind"),
337 cx.factory().symbol(Symbol::new("connection"))?,
338 ),
339 (Symbol::new("address"), address),
340 (Symbol::new("default-codec"), default_codec),
341 (Symbol::new("supported-codecs"), supported_codecs),
342 (Symbol::new("site-kind"), site_kind),
343 (Symbol::new("site-address"), site_address),
344 (Symbol::new("site-codecs"), site_codecs),
345 (Symbol::new("session"), session),
346 (Symbol::new("role"), role),
347 (Symbol::new("next-msg-id"), next_msg_id),
348 ])
349 }
350 fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
351 Some(self)
352 }
353}
354
355impl EvalFabric for Connection {
356 fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
357 self.enforce_remote_capability(cx)?;
358 let mut frame = server_frame_from_request(cx, self.default_codec(), request)?;
359 frame.msg_id = Some(self.router.fresh_msg_id());
360 frame.envelope.role = self.role.clone();
361 let reply = self
362 .site
363 .answer_with_timeout(cx, frame.clone(), frame.envelope.deadline)?;
364 eval_reply_from_frame(cx, &reply)
365 }
366}
367
368impl Connection {
369 fn requires_remote_capability(&self) -> bool {
370 self.address.is_remote_like() || self.site.address().is_remote_like()
371 }
372
373 pub(crate) fn default_consistency(&self) -> Consistency {
374 if self.requires_remote_capability() {
375 Consistency::RemoteOnly
376 } else {
377 Consistency::LocalFirst
378 }
379 }
380
381 fn enforce_remote_capability(&self, cx: &mut Cx) -> Result<()> {
382 if self.requires_remote_capability() {
383 cx.require(&eval_remote_capability())?;
384 }
385 Ok(())
386 }
387
388 pub(crate) fn enforce_consistency(&self, consistency: Consistency) -> Result<()> {
389 match consistency {
390 Consistency::LocalOnly if self.requires_remote_capability() => {
391 Err(Error::Eval(format!(
392 "consistency local-only refuses remote address kind {}",
393 self.address.kind_symbol()
394 )))
395 }
396 Consistency::RemoteOnly if !self.requires_remote_capability() => {
397 Err(Error::Eval(format!(
398 "consistency remote-only refuses local address kind {}",
399 self.address.kind_symbol()
400 )))
401 }
402 _ => Ok(()),
403 }
404 }
405}