1use sim_kernel::{CapabilityName, Error, Expr, Result, Symbol};
2use sim_lib_server::{
3 FrameEnvelope, FrameKind, ServerFrame, stream_chunk_frame_from_expr, stream_frame_to_expr,
4};
5use sim_lib_stream_core::{
6 DataPacket, StreamEnvelope, StreamFaultKind, StreamFaultSpec, StreamMetadata, StreamPacket,
7 stream_cancel_capability, stream_open_capability, stream_push_capability,
8 stream_read_capability, stream_remote_network_capability, stream_stats_capability,
9};
10
11#[derive(Clone, Debug, PartialEq, Eq)]
20pub enum StreamControl {
21 Open {
23 stream_id: Symbol,
25 metadata: StreamMetadata,
27 },
28 Next {
30 stream_id: Symbol,
32 limit: Option<u64>,
34 },
35 Push {
37 stream_id: Symbol,
39 envelope: Box<StreamEnvelope>,
41 },
42 Close {
44 stream_id: Symbol,
46 },
47 Cancel {
49 stream_id: Symbol,
51 },
52 Stats {
54 stream_id: Symbol,
56 },
57 Metadata {
59 stream_id: Symbol,
61 },
62 Fault {
64 stream_id: Symbol,
66 fault: StreamFaultSpec,
68 },
69}
70
71impl StreamControl {
72 pub fn operation(&self) -> Symbol {
74 match self {
75 Self::Open { .. } => stream_control_open_symbol(),
76 Self::Next { .. } => stream_control_next_symbol(),
77 Self::Push { .. } => stream_control_push_symbol(),
78 Self::Close { .. } => stream_control_close_symbol(),
79 Self::Cancel { .. } => stream_control_cancel_symbol(),
80 Self::Stats { .. } => stream_control_stats_symbol(),
81 Self::Metadata { .. } => stream_control_metadata_symbol(),
82 Self::Fault { .. } => stream_control_fault_symbol(),
83 }
84 }
85
86 pub fn required_capability(&self) -> CapabilityName {
88 match self {
89 Self::Open { .. } => stream_open_capability(),
90 Self::Next { .. } => stream_read_capability(),
91 Self::Push { .. } => stream_push_capability(),
92 Self::Close { .. } | Self::Cancel { .. } => stream_cancel_capability(),
93 Self::Stats { .. } | Self::Metadata { .. } | Self::Fault { .. } => {
94 stream_stats_capability()
95 }
96 }
97 }
98
99 pub fn stream_id(&self) -> &Symbol {
101 match self {
102 Self::Open { stream_id, .. }
103 | Self::Next { stream_id, .. }
104 | Self::Push { stream_id, .. }
105 | Self::Close { stream_id }
106 | Self::Cancel { stream_id }
107 | Self::Stats { stream_id }
108 | Self::Metadata { stream_id }
109 | Self::Fault { stream_id, .. } => stream_id,
110 }
111 }
112
113 pub fn to_expr(&self) -> Expr {
118 StreamPacket::data(self.operation(), self.payload_expr()).to_expr()
119 }
120
121 fn payload_expr(&self) -> Expr {
122 let mut entries = vec![
123 key_expr("control", Expr::Symbol(stream_control_tag_symbol())),
124 key_expr("stream-id", Expr::Symbol(self.stream_id().clone())),
125 ];
126 match self {
127 Self::Open { metadata, .. } => {
128 entries.push(key_expr("metadata", metadata.table_expr()));
129 }
130 Self::Next { limit, .. } => {
131 entries.push(key_expr(
132 "limit",
133 limit
134 .map(|limit| Expr::String(limit.to_string()))
135 .unwrap_or(Expr::Nil),
136 ));
137 }
138 Self::Push { envelope, .. } => {
139 entries.push(key_expr("envelope", envelope.to_expr()));
140 }
141 Self::Close { .. }
142 | Self::Cancel { .. }
143 | Self::Stats { .. }
144 | Self::Metadata { .. } => {}
145 Self::Fault { fault, .. } => {
146 entries.push(key_expr("fault", Expr::Symbol(fault.kind.symbol())));
147 entries.push(key_expr("count", Expr::String(fault.count.to_string())));
148 }
149 }
150 Expr::Map(entries)
151 }
152}
153
154impl TryFrom<Expr> for StreamControl {
155 type Error = Error;
156
157 fn try_from(expr: Expr) -> Result<Self> {
158 let StreamPacket::Data(DataPacket { kind, payload }) = StreamPacket::try_from(expr)? else {
159 return Err(Error::TypeMismatch {
160 expected: "stream fabric control data packet",
161 found: "stream packet",
162 });
163 };
164 let entries = map_entries(&payload)?;
165 let tag = symbol_field(entries, "control")?;
166 if *tag != stream_control_tag_symbol() {
167 return Err(Error::Eval(format!(
168 "unknown stream fabric control tag {}",
169 tag.as_qualified_str()
170 )));
171 }
172 let stream_id = symbol_field(entries, "stream-id")?.clone();
173 match kind.as_qualified_str().as_str() {
174 "stream/fabric/open" => {
175 ensure_fields(entries, &["control", "stream-id", "metadata"])?;
176 Ok(Self::Open {
177 stream_id,
178 metadata: StreamMetadata::from_table_expr(field(entries, "metadata")?)?,
179 })
180 }
181 "stream/fabric/next" => {
182 ensure_fields(entries, &["control", "stream-id", "limit"])?;
183 Ok(Self::Next {
184 stream_id,
185 limit: optional_u64(field(entries, "limit")?)?,
186 })
187 }
188 "stream/fabric/push" => {
189 ensure_fields(entries, &["control", "stream-id", "envelope"])?;
190 Ok(Self::Push {
191 stream_id,
192 envelope: Box::new(StreamEnvelope::try_from(
193 field(entries, "envelope")?.clone(),
194 )?),
195 })
196 }
197 "stream/fabric/close" => {
198 ensure_fields(entries, &["control", "stream-id"])?;
199 Ok(Self::Close { stream_id })
200 }
201 "stream/fabric/cancel" => {
202 ensure_fields(entries, &["control", "stream-id"])?;
203 Ok(Self::Cancel { stream_id })
204 }
205 "stream/fabric/stats" => {
206 ensure_fields(entries, &["control", "stream-id"])?;
207 Ok(Self::Stats { stream_id })
208 }
209 "stream/fabric/metadata" => {
210 ensure_fields(entries, &["control", "stream-id"])?;
211 Ok(Self::Metadata { stream_id })
212 }
213 "stream/fabric/fault" => {
214 ensure_fields(entries, &["control", "stream-id", "fault", "count"])?;
215 Ok(Self::Fault {
216 stream_id,
217 fault: StreamFaultSpec::new(
218 StreamFaultKind::from_symbol(symbol_field(entries, "fault")?)?,
219 parse_u64(field(entries, "count")?)? as usize,
220 ),
221 })
222 }
223 other => Err(Error::Eval(format!(
224 "unknown stream fabric control operation {other}"
225 ))),
226 }
227 }
228}
229
230pub fn stream_control_frame_from_control(
236 cx: &mut sim_kernel::Cx,
237 codec: Symbol,
238 control: &StreamControl,
239 envelope: FrameEnvelope,
240) -> Result<ServerFrame> {
241 cx.require(&stream_remote_network_capability())?;
242 cx.require(&control.required_capability())?;
243 stream_chunk_frame_from_expr(cx, codec, &control.to_expr(), envelope)
244}
245
246pub fn stream_control_from_frame(
251 cx: &mut sim_kernel::Cx,
252 frame: &ServerFrame,
253) -> Result<StreamControl> {
254 if frame.kind != FrameKind::StreamChunk {
255 return Err(Error::Eval(format!(
256 "stream fabric control expected stream chunk frame, got {}",
257 frame.kind.as_symbol()
258 )));
259 }
260 let expr = stream_frame_to_expr(cx, frame)?.ok_or_else(|| {
261 Error::Eval("stream fabric control frame did not decode to a payload".to_owned())
262 })?;
263 StreamControl::try_from(expr)
264}
265
266pub fn stream_control_operation_symbols() -> [Symbol; 8] {
268 [
269 stream_control_open_symbol(),
270 stream_control_next_symbol(),
271 stream_control_push_symbol(),
272 stream_control_close_symbol(),
273 stream_control_cancel_symbol(),
274 stream_control_stats_symbol(),
275 stream_control_metadata_symbol(),
276 stream_control_fault_symbol(),
277 ]
278}
279
280pub fn stream_control_required_capability(control: &StreamControl) -> CapabilityName {
284 control.required_capability()
285}
286
287pub fn stream_control_open_symbol() -> Symbol {
289 Symbol::qualified("stream/fabric", "open")
290}
291
292pub fn stream_control_next_symbol() -> Symbol {
294 Symbol::qualified("stream/fabric", "next")
295}
296
297pub fn stream_control_push_symbol() -> Symbol {
299 Symbol::qualified("stream/fabric", "push")
300}
301
302pub fn stream_control_close_symbol() -> Symbol {
304 Symbol::qualified("stream/fabric", "close")
305}
306
307pub fn stream_control_cancel_symbol() -> Symbol {
309 Symbol::qualified("stream/fabric", "cancel")
310}
311
312pub fn stream_control_stats_symbol() -> Symbol {
314 Symbol::qualified("stream/fabric", "stats")
315}
316
317pub fn stream_control_metadata_symbol() -> Symbol {
319 Symbol::qualified("stream/fabric", "metadata")
320}
321
322pub fn stream_control_fault_symbol() -> Symbol {
324 Symbol::qualified("stream/fabric", "fault")
325}
326
327fn stream_control_tag_symbol() -> Symbol {
328 Symbol::qualified("stream/fabric-control", "v1")
329}
330
331fn key_expr(name: &str, value: Expr) -> (Expr, Expr) {
332 (Expr::Symbol(Symbol::new(name)), value)
333}
334
335fn map_entries(expr: &Expr) -> Result<&[(Expr, Expr)]> {
336 match expr {
337 Expr::Map(entries) => Ok(entries),
338 other => Err(Error::TypeMismatch {
339 expected: "stream fabric control map",
340 found: expr_kind(other),
341 }),
342 }
343}
344
345fn optional_u64(expr: &Expr) -> Result<Option<u64>> {
346 match expr {
347 Expr::Nil => Ok(None),
348 Expr::String(value) => value
349 .parse::<u64>()
350 .map(Some)
351 .map_err(|err| Error::Eval(format!("invalid stream fabric control limit: {err}"))),
352 other => Err(Error::TypeMismatch {
353 expected: "optional u64 string",
354 found: expr_kind(other),
355 }),
356 }
357}
358
359fn parse_u64(expr: &Expr) -> Result<u64> {
360 match expr {
361 Expr::String(value) => value
362 .parse::<u64>()
363 .map_err(|err| Error::Eval(format!("invalid stream fabric control count: {err}"))),
364 other => Err(Error::TypeMismatch {
365 expected: "u64 string",
366 found: expr_kind(other),
367 }),
368 }
369}
370
371fn symbol_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Symbol> {
372 match field(entries, name)? {
373 Expr::Symbol(symbol) => Ok(symbol),
374 other => Err(Error::TypeMismatch {
375 expected: "symbol field",
376 found: expr_kind(other),
377 }),
378 }
379}
380
381fn field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a Expr> {
382 entries
383 .iter()
384 .find_map(|(key, value)| match key {
385 Expr::Symbol(symbol) if symbol.namespace.is_none() && symbol.name.as_ref() == name => {
386 Some(value)
387 }
388 _ => None,
389 })
390 .ok_or_else(|| Error::Eval(format!("stream fabric control missing {name} field")))
391}
392
393fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
394 for (key, _) in entries {
395 let Expr::Symbol(symbol) = key else {
396 return Err(Error::TypeMismatch {
397 expected: "symbol stream fabric control field",
398 found: expr_kind(key),
399 });
400 };
401 if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
402 continue;
403 }
404 return Err(Error::Eval(format!(
405 "unknown stream fabric control field {}",
406 symbol.as_qualified_str()
407 )));
408 }
409 Ok(())
410}
411
412fn expr_kind(expr: &Expr) -> &'static str {
413 match expr {
414 Expr::Nil => "nil",
415 Expr::Bool(_) => "bool",
416 Expr::Number(_) => "number",
417 Expr::Symbol(_) => "symbol",
418 Expr::Local(_) => "local",
419 Expr::String(_) => "string",
420 Expr::Bytes(_) => "bytes",
421 Expr::List(_) => "list",
422 Expr::Vector(_) => "vector",
423 Expr::Map(_) => "map",
424 Expr::Set(_) => "set",
425 Expr::Call { .. } => "call",
426 Expr::Infix { .. } => "infix",
427 Expr::Prefix { .. } => "prefix",
428 Expr::Postfix { .. } => "postfix",
429 Expr::Block(_) => "block",
430 Expr::Quote { .. } => "quote",
431 Expr::Annotated { .. } => "annotated",
432 Expr::Extension { .. } => "extension",
433 }
434}