Skip to main content

sim_lib_server/frame/
model.rs

1use std::time::Duration;
2
3use sim_codec::DecodeLimits;
4use sim_kernel::{
5    CapabilityName, ClassRef, Consistency, Cx, EncodeOptions, Expr, Object, ReadPolicy, Result,
6    Symbol, Value,
7};
8
9use crate::codecio::{decode_frame_payload, encode_frame_payload};
10use crate::helpers::format_duration;
11
12const SERVER_FRAME_VERSION: u16 = 1;
13
14/// Discriminates the role a [`ServerFrame`] plays in the wire protocol.
15#[derive(Clone, Debug, PartialEq, Eq)]
16pub enum FrameKind {
17    /// An eval/agent request.
18    Request,
19    /// A successful response to a request.
20    Response,
21    /// An error response to a request.
22    Error,
23    /// A one-way notification with no reply expected.
24    Notify,
25    /// The opening frame of a streamed response.
26    StreamStart,
27    /// One chunk of a streamed response.
28    StreamChunk,
29    /// The closing frame of a streamed response.
30    StreamEnd,
31    /// A codec negotiation offer.
32    Negotiate {
33        /// Codecs the sender is willing to use.
34        codecs: Vec<Symbol>,
35    },
36    /// A liveness probe.
37    Ping,
38    /// A reply to a [`FrameKind::Ping`].
39    Pong,
40    /// A lifecycle control message.
41    Lifecycle {
42        /// Lifecycle command being issued.
43        command: LifecycleCommand,
44    },
45    /// A scheduled or event-driven trigger.
46    Trigger {
47        /// Symbol naming the trigger source.
48        source: Symbol,
49        /// Trigger time in milliseconds.
50        when_ms: u64,
51    },
52    /// A role assignment for multi-hop routing.
53    Role {
54        /// Symbol naming the assigned role.
55        role: Symbol,
56        /// Hop count at which the role applies.
57        hop: u32,
58    },
59}
60
61impl FrameKind {
62    /// Returns the symbol naming this frame kind (e.g. `request`, `stream-end`).
63    pub fn as_symbol(&self) -> Symbol {
64        Symbol::new(match self {
65            Self::Request => "request",
66            Self::Response => "response",
67            Self::Error => "error",
68            Self::Notify => "notify",
69            Self::StreamStart => "stream-start",
70            Self::StreamChunk => "stream-chunk",
71            Self::StreamEnd => "stream-end",
72            Self::Negotiate { .. } => "negotiate",
73            Self::Ping => "ping",
74            Self::Pong => "pong",
75            Self::Lifecycle { .. } => "lifecycle",
76            Self::Trigger { .. } => "trigger",
77            Self::Role { .. } => "role",
78        })
79    }
80}
81
82/// Command carried by a [`FrameKind::Lifecycle`] frame.
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum LifecycleCommand {
85    /// Start the server.
86    Start,
87    /// Stop the server.
88    Stop,
89    /// Suspend serving without tearing down.
90    Suspend,
91    /// Resume a suspended server.
92    Resume,
93    /// Request a health report.
94    Health,
95}
96
97/// Out-of-band metadata attached to a [`ServerFrame`] alongside its payload.
98#[derive(Clone, Debug, Default, PartialEq, Eq)]
99pub struct FrameEnvelope {
100    /// Optional deadline for handling the frame.
101    pub deadline: Option<Duration>,
102    /// Consistency policy requested for the call.
103    pub consistency: Consistency,
104    /// Whether an execution trace is requested.
105    pub trace: bool,
106    /// Capabilities the call requires.
107    pub required_capabilities: Vec<CapabilityName>,
108    /// Codec the sender prefers for the reply, if any.
109    pub reply_codec_hint: Option<Symbol>,
110    /// Routing role assigned to the frame, if any.
111    pub role: Option<Symbol>,
112    /// Hop count for multi-hop routing.
113    pub hop: u32,
114    /// Symbol naming the trigger source, if the frame was triggered.
115    pub trigger_source: Option<Symbol>,
116}
117
118impl FrameEnvelope {
119    fn as_value(&self, cx: &mut Cx) -> Result<Value> {
120        let deadline = match self.deadline {
121            Some(deadline) => cx.factory().string(format_duration(deadline))?,
122            None => cx.factory().nil()?,
123        };
124        let required_capabilities = cx.factory().list(
125            self.required_capabilities
126                .iter()
127                .map(|capability| cx.factory().string(capability.as_str().to_owned()))
128                .collect::<Result<Vec<_>>>()?,
129        )?;
130        let reply_codec_hint = match &self.reply_codec_hint {
131            Some(codec) => cx.factory().symbol(codec.clone())?,
132            None => cx.factory().nil()?,
133        };
134        let role = match &self.role {
135            Some(role) => cx.factory().symbol(role.clone())?,
136            None => cx.factory().nil()?,
137        };
138        let trigger_source = match &self.trigger_source {
139            Some(source) => cx.factory().symbol(source.clone())?,
140            None => cx.factory().nil()?,
141        };
142        cx.factory().table(vec![
143            (Symbol::new("deadline"), deadline),
144            (
145                Symbol::new("consistency"),
146                cx.factory().symbol(self.consistency.as_symbol())?,
147            ),
148            (Symbol::new("trace"), cx.factory().bool(self.trace)?),
149            (Symbol::new("requires"), required_capabilities),
150            (Symbol::new("reply-codec-hint"), reply_codec_hint),
151            (Symbol::new("role"), role),
152            (
153                Symbol::new("hop"),
154                cx.factory().string(self.hop.to_string())?,
155            ),
156            (Symbol::new("trigger-source"), trigger_source),
157        ])
158    }
159}
160
161#[sim_citizen_derive::non_citizen(
162    reason = "server frame runtime shell; class-backed descriptor is server/Frame",
163    kind = "marker"
164)]
165/// A wire frame carrying a codec-encoded payload plus protocol metadata.
166#[derive(Clone, Debug, PartialEq, Eq)]
167pub struct ServerFrame {
168    /// Server frame format version.
169    pub version: u16,
170    /// Codec used to encode the payload.
171    pub codec: Symbol,
172    /// Identifier of this message, if assigned.
173    pub msg_id: Option<u64>,
174    /// Message id this frame correlates to, if it is a reply.
175    pub correlate: Option<u64>,
176    /// Role this frame plays in the protocol.
177    pub kind: FrameKind,
178    /// Out-of-band envelope metadata.
179    pub envelope: FrameEnvelope,
180    /// Codec-encoded payload bytes.
181    pub payload: Vec<u8>,
182}
183
184impl ServerFrame {
185    /// Builds a frame at the current version with no message or correlation id.
186    pub fn new(codec: Symbol, kind: FrameKind, envelope: FrameEnvelope, payload: Vec<u8>) -> Self {
187        Self {
188            version: SERVER_FRAME_VERSION,
189            codec,
190            msg_id: None,
191            correlate: None,
192            kind,
193            envelope,
194            payload,
195        }
196    }
197
198    /// Builds a frame by encoding `expr` under `codec` into the payload.
199    ///
200    /// Seeds the envelope with the given consistency, required capabilities,
201    /// and trace flag, leaving the remaining envelope fields at their defaults.
202    pub fn from_expr(
203        cx: &mut Cx,
204        codec: Symbol,
205        kind: FrameKind,
206        expr: &Expr,
207        consistency: Consistency,
208        required_capabilities: Vec<CapabilityName>,
209        trace: bool,
210    ) -> Result<Self> {
211        let payload = encode_frame_payload(cx, &codec, expr, EncodeOptions::default())?;
212        Ok(Self::new(
213            codec,
214            kind,
215            FrameEnvelope {
216                consistency,
217                required_capabilities,
218                trace,
219                ..FrameEnvelope::default()
220            },
221            payload,
222        ))
223    }
224
225    /// Decodes the payload back into an expression using the frame's codec.
226    pub fn decode_expr(&self, cx: &mut Cx, read_policy: ReadPolicy) -> Result<Expr> {
227        decode_frame_payload(
228            cx,
229            &self.codec,
230            &self.payload,
231            read_policy,
232            DecodeLimits::default(),
233        )
234    }
235}
236
237impl Object for ServerFrame {
238    fn display(&self, _cx: &mut Cx) -> Result<String> {
239        Ok(format!("#<server-frame {}>", self.kind.as_symbol()))
240    }
241
242    fn as_any(&self) -> &dyn std::any::Any {
243        self
244    }
245}
246
247impl sim_kernel::ObjectCompat for ServerFrame {
248    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
249        cx.factory().class_stub(
250            sim_kernel::ClassId(0),
251            Symbol::qualified("server", "ServerFrame"),
252        )
253    }
254    fn as_expr(&self, cx: &mut Cx) -> Result<Expr> {
255        self.as_table(cx)?.object().as_expr(cx)
256    }
257    fn as_table(&self, cx: &mut Cx) -> Result<Value> {
258        let msg_id = match self.msg_id {
259            Some(msg_id) => cx.factory().string(msg_id.to_string())?,
260            None => cx.factory().nil()?,
261        };
262        let correlate = match self.correlate {
263            Some(correlate) => cx.factory().string(correlate.to_string())?,
264            None => cx.factory().nil()?,
265        };
266        let envelope = self.envelope.as_value(cx)?;
267        cx.factory().table(vec![
268            (
269                Symbol::new("version"),
270                cx.factory().string(self.version.to_string())?,
271            ),
272            (
273                Symbol::new("codec"),
274                cx.factory().symbol(self.codec.clone())?,
275            ),
276            (Symbol::new("msg-id"), msg_id),
277            (Symbol::new("correlate"), correlate),
278            (
279                Symbol::new("kind"),
280                cx.factory().symbol(self.kind.as_symbol())?,
281            ),
282            (Symbol::new("envelope"), envelope),
283            (
284                Symbol::new("payload"),
285                cx.factory().bytes(self.payload.clone())?,
286            ),
287        ])
288    }
289}