Skip to main content

sim_lib_stream_fabric/
control.rs

1use sim_kernel::{CapabilityName, Error, Expr, Result, Symbol};
2use sim_lib_server::{
3    FrameEnvelope, FrameKind, ServerFrame, stream_chunk_frame_from_expr, stream_frame_to_expr,
4};
5use sim_lib_stream_core::{
6    DataPacket, StreamEnvelope, StreamFaultKind, StreamFaultSpec, StreamMetadata, StreamPacket,
7    stream_cancel_capability, stream_open_capability, stream_push_capability,
8    stream_read_capability, stream_remote_network_capability, stream_stats_capability,
9};
10
11/// Control-plane operation exchanged over remote stream-fabric frames.
12///
13/// Each variant names one action a peer can take on a placed stream -- opening
14/// it, pulling the next chunk, pushing an envelope, ending it, or reporting a
15/// fault -- so the location-transparent eval surface can drive a remote stream
16/// without exposing transport-specific APIs. A [`StreamControl`] round-trips
17/// through the shared `Expr` graph (see [`StreamControl::to_expr`] and the
18/// `TryFrom<Expr>` impl) and is carried by [`stream_control_frame_from_control`].
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub enum StreamControl {
21    /// Open a stream with the given identity and [`StreamMetadata`].
22    Open {
23        /// Identifier of the stream being opened.
24        stream_id: Symbol,
25        /// Declared metadata (media, direction, clock domain, buffer policy).
26        metadata: StreamMetadata,
27    },
28    /// Request the next chunk(s), bounded by an optional element limit.
29    Next {
30        /// Identifier of the stream being pulled.
31        stream_id: Symbol,
32        /// Maximum number of elements to return, or `None` for unbounded.
33        limit: Option<u64>,
34    },
35    /// Push one stream envelope toward the peer.
36    Push {
37        /// Identifier of the stream being pushed to.
38        stream_id: Symbol,
39        /// Boxed envelope carrying the packet, ticks, and transport profile.
40        envelope: Box<StreamEnvelope>,
41    },
42    /// Close a stream cleanly once production is finished.
43    Close {
44        /// Identifier of the stream being closed.
45        stream_id: Symbol,
46    },
47    /// Cancel a stream early, abandoning any remaining production.
48    Cancel {
49        /// Identifier of the stream being cancelled.
50        stream_id: Symbol,
51    },
52    /// Request runtime statistics for a stream.
53    Stats {
54        /// Identifier of the stream being queried.
55        stream_id: Symbol,
56    },
57    /// Request the metadata table for a stream.
58    Metadata {
59        /// Identifier of the stream being queried.
60        stream_id: Symbol,
61    },
62    /// Report a fault against a stream.
63    Fault {
64        /// Identifier of the faulting stream.
65        stream_id: Symbol,
66        /// Fault kind and occurrence count.
67        fault: StreamFaultSpec,
68    },
69}
70
71impl StreamControl {
72    /// Returns the operation symbol naming this control variant.
73    pub fn operation(&self) -> Symbol {
74        match self {
75            Self::Open { .. } => stream_control_open_symbol(),
76            Self::Next { .. } => stream_control_next_symbol(),
77            Self::Push { .. } => stream_control_push_symbol(),
78            Self::Close { .. } => stream_control_close_symbol(),
79            Self::Cancel { .. } => stream_control_cancel_symbol(),
80            Self::Stats { .. } => stream_control_stats_symbol(),
81            Self::Metadata { .. } => stream_control_metadata_symbol(),
82            Self::Fault { .. } => stream_control_fault_symbol(),
83        }
84    }
85
86    /// Returns the capability a peer must hold to perform this control action.
87    pub fn required_capability(&self) -> CapabilityName {
88        match self {
89            Self::Open { .. } => stream_open_capability(),
90            Self::Next { .. } => stream_read_capability(),
91            Self::Push { .. } => stream_push_capability(),
92            Self::Close { .. } | Self::Cancel { .. } => stream_cancel_capability(),
93            Self::Stats { .. } | Self::Metadata { .. } | Self::Fault { .. } => {
94                stream_stats_capability()
95            }
96        }
97    }
98
99    /// Returns the stream identifier targeted by this control action.
100    pub fn stream_id(&self) -> &Symbol {
101        match self {
102            Self::Open { stream_id, .. }
103            | Self::Next { stream_id, .. }
104            | Self::Push { stream_id, .. }
105            | Self::Close { stream_id }
106            | Self::Cancel { stream_id }
107            | Self::Stats { stream_id }
108            | Self::Metadata { stream_id }
109            | Self::Fault { stream_id, .. } => stream_id,
110        }
111    }
112
113    /// Encodes this control action as a stream data packet `Expr`.
114    ///
115    /// The inverse is the `TryFrom<Expr>` impl, which reparses the packet back
116    /// into a [`StreamControl`].
117    pub fn to_expr(&self) -> Expr {
118        StreamPacket::data(self.operation(), self.payload_expr()).to_expr()
119    }
120
121    fn payload_expr(&self) -> Expr {
122        let mut entries = vec![
123            key_expr("control", Expr::Symbol(stream_control_tag_symbol())),
124            key_expr("stream-id", Expr::Symbol(self.stream_id().clone())),
125        ];
126        match self {
127            Self::Open { metadata, .. } => {
128                entries.push(key_expr("metadata", metadata.table_expr()));
129            }
130            Self::Next { limit, .. } => {
131                entries.push(key_expr(
132                    "limit",
133                    limit
134                        .map(|limit| Expr::String(limit.to_string()))
135                        .unwrap_or(Expr::Nil),
136                ));
137            }
138            Self::Push { envelope, .. } => {
139                entries.push(key_expr("envelope", envelope.to_expr()));
140            }
141            Self::Close { .. }
142            | Self::Cancel { .. }
143            | Self::Stats { .. }
144            | Self::Metadata { .. } => {}
145            Self::Fault { fault, .. } => {
146                entries.push(key_expr("fault", Expr::Symbol(fault.kind.symbol())));
147                entries.push(key_expr("count", Expr::String(fault.count.to_string())));
148            }
149        }
150        Expr::Map(entries)
151    }
152}
153
154impl TryFrom<Expr> for StreamControl {
155    type Error = Error;
156
157    fn try_from(expr: Expr) -> Result<Self> {
158        let StreamPacket::Data(DataPacket { kind, payload }) = StreamPacket::try_from(expr)? else {
159            return Err(Error::TypeMismatch {
160                expected: "stream fabric control data packet",
161                found: "stream packet",
162            });
163        };
164        let entries = map_entries(&payload)?;
165        let tag = symbol_field(entries, "control")?;
166        if *tag != stream_control_tag_symbol() {
167            return Err(Error::Eval(format!(
168                "unknown stream fabric control tag {}",
169                tag.as_qualified_str()
170            )));
171        }
172        let stream_id = symbol_field(entries, "stream-id")?.clone();
173        match kind.as_qualified_str().as_str() {
174            "stream/fabric/open" => {
175                ensure_fields(entries, &["control", "stream-id", "metadata"])?;
176                Ok(Self::Open {
177                    stream_id,
178                    metadata: StreamMetadata::from_table_expr(field(entries, "metadata")?)?,
179                })
180            }
181            "stream/fabric/next" => {
182                ensure_fields(entries, &["control", "stream-id", "limit"])?;
183                Ok(Self::Next {
184                    stream_id,
185                    limit: optional_u64(field(entries, "limit")?)?,
186                })
187            }
188            "stream/fabric/push" => {
189                ensure_fields(entries, &["control", "stream-id", "envelope"])?;
190                Ok(Self::Push {
191                    stream_id,
192                    envelope: Box::new(StreamEnvelope::try_from(
193                        field(entries, "envelope")?.clone(),
194                    )?),
195                })
196            }
197            "stream/fabric/close" => {
198                ensure_fields(entries, &["control", "stream-id"])?;
199                Ok(Self::Close { stream_id })
200            }
201            "stream/fabric/cancel" => {
202                ensure_fields(entries, &["control", "stream-id"])?;
203                Ok(Self::Cancel { stream_id })
204            }
205            "stream/fabric/stats" => {
206                ensure_fields(entries, &["control", "stream-id"])?;
207                Ok(Self::Stats { stream_id })
208            }
209            "stream/fabric/metadata" => {
210                ensure_fields(entries, &["control", "stream-id"])?;
211                Ok(Self::Metadata { stream_id })
212            }
213            "stream/fabric/fault" => {
214                ensure_fields(entries, &["control", "stream-id", "fault", "count"])?;
215                Ok(Self::Fault {
216                    stream_id,
217                    fault: StreamFaultSpec::new(
218                        StreamFaultKind::from_symbol(symbol_field(entries, "fault")?)?,
219                        parse_u64(field(entries, "count")?)? as usize,
220                    ),
221                })
222            }
223            other => Err(Error::Eval(format!(
224                "unknown stream fabric control operation {other}"
225            ))),
226        }
227    }
228}
229
230/// Encodes a [`StreamControl`] into a server stream-chunk frame.
231///
232/// Requires the remote-network capability and the action's own
233/// [`StreamControl::required_capability`] before encoding the control packet
234/// with `codec` into a frame.
235pub fn stream_control_frame_from_control(
236    cx: &mut sim_kernel::Cx,
237    codec: Symbol,
238    control: &StreamControl,
239    envelope: FrameEnvelope,
240) -> Result<ServerFrame> {
241    cx.require(&stream_remote_network_capability())?;
242    cx.require(&control.required_capability())?;
243    stream_chunk_frame_from_expr(cx, codec, &control.to_expr(), envelope)
244}
245
246/// Decodes a server stream-chunk frame back into a [`StreamControl`].
247///
248/// Returns an error when the frame is not a stream-chunk frame or does not
249/// decode to a control payload.
250pub fn stream_control_from_frame(
251    cx: &mut sim_kernel::Cx,
252    frame: &ServerFrame,
253) -> Result<StreamControl> {
254    if frame.kind != FrameKind::StreamChunk {
255        return Err(Error::Eval(format!(
256            "stream fabric control expected stream chunk frame, got {}",
257            frame.kind.as_symbol()
258        )));
259    }
260    let expr = stream_frame_to_expr(cx, frame)?.ok_or_else(|| {
261        Error::Eval("stream fabric control frame did not decode to a payload".to_owned())
262    })?;
263    StreamControl::try_from(expr)
264}
265
266/// Returns every stream-fabric control operation symbol, one per variant.
267pub fn stream_control_operation_symbols() -> [Symbol; 8] {
268    [
269        stream_control_open_symbol(),
270        stream_control_next_symbol(),
271        stream_control_push_symbol(),
272        stream_control_close_symbol(),
273        stream_control_cancel_symbol(),
274        stream_control_stats_symbol(),
275        stream_control_metadata_symbol(),
276        stream_control_fault_symbol(),
277    ]
278}
279
280/// Returns the capability required to perform `control`.
281///
282/// Free-function form of [`StreamControl::required_capability`].
283pub fn stream_control_required_capability(control: &StreamControl) -> CapabilityName {
284    control.required_capability()
285}
286
287/// Returns the operation symbol for the open control action.
288pub fn stream_control_open_symbol() -> Symbol {
289    Symbol::qualified("stream/fabric", "open")
290}
291
292/// Returns the operation symbol for the next control action.
293pub fn stream_control_next_symbol() -> Symbol {
294    Symbol::qualified("stream/fabric", "next")
295}
296
297/// Returns the operation symbol for the push control action.
298pub fn stream_control_push_symbol() -> Symbol {
299    Symbol::qualified("stream/fabric", "push")
300}
301
302/// Returns the operation symbol for the close control action.
303pub fn stream_control_close_symbol() -> Symbol {
304    Symbol::qualified("stream/fabric", "close")
305}
306
307/// Returns the operation symbol for the cancel control action.
308pub fn stream_control_cancel_symbol() -> Symbol {
309    Symbol::qualified("stream/fabric", "cancel")
310}
311
312/// Returns the operation symbol for the stats control action.
313pub fn stream_control_stats_symbol() -> Symbol {
314    Symbol::qualified("stream/fabric", "stats")
315}
316
317/// Returns the operation symbol for the metadata control action.
318pub fn stream_control_metadata_symbol() -> Symbol {
319    Symbol::qualified("stream/fabric", "metadata")
320}
321
322/// Returns the operation symbol for the fault control action.
323pub fn stream_control_fault_symbol() -> Symbol {
324    Symbol::qualified("stream/fabric", "fault")
325}
326
327fn stream_control_tag_symbol() -> Symbol {
328    Symbol::qualified("stream/fabric-control", "v1")
329}
330
331fn key_expr(name: &str, value: Expr) -> (Expr, Expr) {
332    (Expr::Symbol(Symbol::new(name)), value)
333}
334
335fn map_entries(expr: &Expr) -> Result<&[(Expr, Expr)]> {
336    match expr {
337        Expr::Map(entries) => Ok(entries),
338        other => Err(Error::TypeMismatch {
339            expected: "stream fabric control map",
340            found: expr_kind(other),
341        }),
342    }
343}
344
345fn optional_u64(expr: &Expr) -> Result<Option<u64>> {
346    match expr {
347        Expr::Nil => Ok(None),
348        Expr::String(value) => value
349            .parse::<u64>()
350            .map(Some)
351            .map_err(|err| Error::Eval(format!("invalid stream fabric control limit: {err}"))),
352        other => Err(Error::TypeMismatch {
353            expected: "optional u64 string",
354            found: expr_kind(other),
355        }),
356    }
357}
358
359fn parse_u64(expr: &Expr) -> Result<u64> {
360    match expr {
361        Expr::String(value) => value
362            .parse::<u64>()
363            .map_err(|err| Error::Eval(format!("invalid stream fabric control count: {err}"))),
364        other => Err(Error::TypeMismatch {
365            expected: "u64 string",
366            found: expr_kind(other),
367        }),
368    }
369}
370
371fn symbol_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Symbol> {
372    match field(entries, name)? {
373        Expr::Symbol(symbol) => Ok(symbol),
374        other => Err(Error::TypeMismatch {
375            expected: "symbol field",
376            found: expr_kind(other),
377        }),
378    }
379}
380
381fn field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Expr> {
382    entries
383        .iter()
384        .find_map(|(key, value)| match key {
385            Expr::Symbol(symbol) if symbol.namespace.is_none() && symbol.name.as_ref() == name => {
386                Some(value)
387            }
388            _ => None,
389        })
390        .ok_or_else(|| Error::Eval(format!("stream fabric control missing {name} field")))
391}
392
393fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
394    for (key, _) in entries {
395        let Expr::Symbol(symbol) = key else {
396            return Err(Error::TypeMismatch {
397                expected: "symbol stream fabric control field",
398                found: expr_kind(key),
399            });
400        };
401        if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
402            continue;
403        }
404        return Err(Error::Eval(format!(
405            "unknown stream fabric control field {}",
406            symbol.as_qualified_str()
407        )));
408    }
409    Ok(())
410}
411
412fn expr_kind(expr: &Expr) -> &'static str {
413    match expr {
414        Expr::Nil => "nil",
415        Expr::Bool(_) => "bool",
416        Expr::Number(_) => "number",
417        Expr::Symbol(_) => "symbol",
418        Expr::Local(_) => "local",
419        Expr::String(_) => "string",
420        Expr::Bytes(_) => "bytes",
421        Expr::List(_) => "list",
422        Expr::Vector(_) => "vector",
423        Expr::Map(_) => "map",
424        Expr::Set(_) => "set",
425        Expr::Call { .. } => "call",
426        Expr::Infix { .. } => "infix",
427        Expr::Prefix { .. } => "prefix",
428        Expr::Postfix { .. } => "postfix",
429        Expr::Block(_) => "block",
430        Expr::Quote { .. } => "quote",
431        Expr::Annotated { .. } => "annotated",
432        Expr::Extension { .. } => "extension",
433    }
434}