Skip to main content

sim_lib_stream_core/
inspector.rs

1//! Stream inspector and fault-injection surface for stream-core.
2//!
3//! This module supplies the concrete observability behavior layered over the
4//! streaming fabric: a point-in-time [`StreamInspectorSnapshot`] of a live
5//! stream's health, the [`StreamInspectorStatus`] lifecycle classification
6//! derived from runtime [`StreamStats`], and a fault model
7//! ([`StreamFaultKind`], [`StreamFaultSpec`], [`StreamFaultPlan`],
8//! [`StreamFaultResult`]) that tooling uses to inject or simulate degraded
9//! delivery. Snapshots and plans render to the kernel [`Expr`] graph so the
10//! same data round-trips through any codec surface, and the symbol helpers
11//! expose the stable [`Symbol`] vocabulary tooling matches against.
12
13use sim_kernel::{Error, Expr, Result, Symbol};
14
15use crate::{
16    BufferPolicy, StreamItem, StreamMedia, StreamMetadata, StreamStats, StreamValue,
17    TransportProfile,
18};
19
20/// Lifecycle classification of an observed stream.
21///
22/// Reported by [`StreamInspectorSnapshot`] to describe whether a stream is
23/// still flowing, has finished, or has entered a degraded or terminal
24/// condition. Each variant carries a stable wire label and qualified symbol.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub enum StreamInspectorStatus {
27    /// Stream is open and actively delivering items.
28    Live,
29    /// Stream closed normally after delivering its items.
30    Ended,
31    /// Stream was cancelled by a consumer or producer.
32    Cancelled,
33    /// Stream's bounded buffer dropped, rejected, or overflowed items.
34    BufferOverflow,
35    /// Stream transport is currently disconnected.
36    Disconnected,
37    /// Stream transport is attempting to re-establish a connection.
38    Reconnecting,
39    /// Stream's transport profile was refused as unsupported.
40    RefusedProfile,
41    /// Stream has been forced into a fault condition for inspection.
42    Faulted,
43}
44
45impl StreamInspectorStatus {
46    /// Returns the stable lowercase wire label for this status.
47    pub fn wire_label(self) -> &'static str {
48        match self {
49            Self::Live => "live",
50            Self::Ended => "ended",
51            Self::Cancelled => "cancelled",
52            Self::BufferOverflow => "buffer-overflow",
53            Self::Disconnected => "disconnected",
54            Self::Reconnecting => "reconnecting",
55            Self::RefusedProfile => "refused-profile",
56            Self::Faulted => "faulted",
57        }
58    }
59
60    /// Returns this status as a `stream/inspector-status/<label>` symbol.
61    pub fn symbol(self) -> Symbol {
62        Symbol::qualified("stream/inspector-status", self.wire_label())
63    }
64
65    /// Classifies a stream's status from its runtime stats.
66    ///
67    /// Precedence is cancellation, then buffer loss (any dropped, overflowed,
68    /// or rejected items), then end-of-stream (`done` or closed), otherwise
69    /// [`StreamInspectorStatus::Live`].
70    pub fn from_stats(stats: &StreamStats, done: bool) -> Self {
71        if stats.cancelled {
72            Self::Cancelled
73        } else if stats.dropped_newest > 0
74            || stats.dropped_oldest > 0
75            || stats.overflow_errors > 0
76            || stats.rejected > 0
77        {
78            Self::BufferOverflow
79        } else if done || stats.closed {
80            Self::Ended
81        } else {
82            Self::Live
83        }
84    }
85}
86
87/// Point-in-time observation of a single stream's identity and health.
88///
89/// Captures the stream's metadata-derived identity (id, media, clock, buffer
90/// policy), its routing and transport profile, its current
91/// [`StreamInspectorStatus`], and live queue/loss counters. Render with
92/// [`StreamInspectorSnapshot::to_expr`] to hand the snapshot to a codec.
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct StreamInspectorSnapshot {
95    /// Stable identifier of the observed stream.
96    pub stream_id: Symbol,
97    /// Route the stream is being observed on.
98    pub route: Symbol,
99    /// Media kind carried by the stream.
100    pub media: StreamMedia,
101    /// Name of the transport profile in effect.
102    pub profile: Symbol,
103    /// Clock domain the stream is timed against.
104    pub clock: Symbol,
105    /// Current lifecycle status of the stream.
106    pub status: StreamInspectorStatus,
107    /// Bounded buffer policy governing the stream's queue.
108    pub buffer: BufferPolicy,
109    /// Number of items currently queued in the buffer.
110    pub queue_depth: usize,
111    /// Total items dropped (newest plus oldest) since the stream opened.
112    pub dropped_count: u64,
113    /// Sequence number of the most recent observed item, if any.
114    pub last_sequence: Option<u64>,
115    /// Recent diagnostic symbols collected for the stream.
116    pub recent_diagnostics: Vec<Symbol>,
117}
118
119impl StreamInspectorSnapshot {
120    /// Builds a snapshot from stream metadata, stats, and observed counters.
121    #[allow(clippy::too_many_arguments)]
122    pub fn new(
123        metadata: &StreamMetadata,
124        route: Symbol,
125        profile: Symbol,
126        status: StreamInspectorStatus,
127        queue_depth: usize,
128        stats: &StreamStats,
129        last_sequence: Option<u64>,
130        recent_diagnostics: Vec<Symbol>,
131    ) -> Self {
132        Self {
133            stream_id: metadata.id().clone(),
134            route,
135            media: metadata.media(),
136            profile,
137            clock: metadata.clock().clone(),
138            status,
139            buffer: metadata.buffer().clone(),
140            queue_depth,
141            dropped_count: stats.dropped_newest.saturating_add(stats.dropped_oldest),
142            last_sequence,
143            recent_diagnostics,
144        }
145    }
146
147    /// Builds a snapshot by querying a live [`StreamValue`].
148    ///
149    /// Reads the stream's stats, queue depth, and completion flag to derive the
150    /// last observed sequence and [`StreamInspectorStatus`]. Returns an error if
151    /// any of those queries fail.
152    pub fn from_stream_value(
153        stream: &StreamValue,
154        route: Symbol,
155        profile: &TransportProfile,
156        recent_diagnostics: Vec<Symbol>,
157    ) -> Result<Self> {
158        let stats = stream.stats()?;
159        let queue_depth = stream.queue_depth()?;
160        let observed = stats
161            .accepted
162            .max(stats.yielded.saturating_add(queue_depth as u64));
163        let last_sequence = observed.checked_sub(1);
164        let status = StreamInspectorStatus::from_stats(&stats, stream.is_done()?);
165        Ok(Self::new(
166            stream.metadata(),
167            route,
168            profile.name().clone(),
169            status,
170            queue_depth,
171            &stats,
172            last_sequence,
173            recent_diagnostics,
174        ))
175    }
176
177    /// Renders the snapshot as a tagged [`Expr`] map for codec round-tripping.
178    pub fn to_expr(&self) -> Expr {
179        Expr::Map(vec![
180            (
181                Expr::Symbol(Symbol::new("inspector")),
182                Expr::Symbol(stream_inspector_model_symbol()),
183            ),
184            (
185                Expr::Symbol(Symbol::new("id")),
186                Expr::Symbol(self.stream_id.clone()),
187            ),
188            (
189                Expr::Symbol(Symbol::new("route")),
190                Expr::Symbol(self.route.clone()),
191            ),
192            (
193                Expr::Symbol(Symbol::new("media")),
194                Expr::Symbol(self.media.symbol()),
195            ),
196            (
197                Expr::Symbol(Symbol::new("profile")),
198                Expr::Symbol(self.profile.clone()),
199            ),
200            (
201                Expr::Symbol(Symbol::new("clock")),
202                Expr::Symbol(self.clock.clone()),
203            ),
204            (
205                Expr::Symbol(Symbol::new("status")),
206                Expr::Symbol(self.status.symbol()),
207            ),
208            (Expr::Symbol(Symbol::new("buffer")), self.buffer.to_expr()),
209            (
210                Expr::Symbol(Symbol::new("queue-depth")),
211                Expr::String(self.queue_depth.to_string()),
212            ),
213            (
214                Expr::Symbol(Symbol::new("dropped-count")),
215                Expr::String(self.dropped_count.to_string()),
216            ),
217            (
218                Expr::Symbol(Symbol::new("last-sequence")),
219                optional_u64_expr(self.last_sequence),
220            ),
221            (
222                Expr::Symbol(Symbol::new("recent-diagnostics")),
223                Expr::List(
224                    self.recent_diagnostics
225                        .iter()
226                        .cloned()
227                        .map(Expr::Symbol)
228                        .collect(),
229                ),
230            ),
231        ])
232    }
233}
234
235/// Kind of fault a [`StreamFaultPlan`] can inject into a stream.
236///
237/// Each variant names a class of degraded delivery. Item-level kinds
238/// ([`StreamFaultKind::Drop`], [`StreamFaultKind::Reorder`],
239/// [`StreamFaultKind::Duplicate`], [`StreamFaultKind::Delay`]) rewrite the item
240/// sequence when applied; transport-level kinds only record a diagnostic.
241#[derive(Clone, Copy, Debug, PartialEq, Eq)]
242pub enum StreamFaultKind {
243    /// Discards leading items from the stream.
244    Drop,
245    /// Swaps the order of the first two items.
246    Reorder,
247    /// Re-emits the leading item one or more extra times.
248    Duplicate,
249    /// Rotates leading items to the back to simulate late arrival.
250    Delay,
251    /// Models a consumer or producer cancellation.
252    Cancel,
253    /// Models a delivery timeout.
254    Timeout,
255    /// Models a transport disconnect.
256    Disconnect,
257    /// Models a transport reconnect.
258    Reconnect,
259    /// Models a refused, unsupported transport profile.
260    UnsupportedProfile,
261}
262
263impl StreamFaultKind {
264    /// Returns the stable lowercase wire label for this fault kind.
265    pub fn wire_label(self) -> &'static str {
266        match self {
267            Self::Drop => "drop",
268            Self::Reorder => "reorder",
269            Self::Duplicate => "duplicate",
270            Self::Delay => "delay",
271            Self::Cancel => "cancel",
272            Self::Timeout => "timeout",
273            Self::Disconnect => "disconnect",
274            Self::Reconnect => "reconnect",
275            Self::UnsupportedProfile => "unsupported-profile",
276        }
277    }
278
279    /// Returns this fault kind as a `stream/fault/<label>` symbol.
280    pub fn symbol(self) -> Symbol {
281        Symbol::qualified("stream/fault", self.wire_label())
282    }
283
284    /// Parses a fault kind from its bare or fully qualified symbol.
285    ///
286    /// Accepts both the short label (`drop`) and the qualified form
287    /// (`stream/fault/drop`). Returns an error for any unknown fault.
288    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
289        match symbol.as_qualified_str().as_str() {
290            "drop" | "stream/fault/drop" => Ok(Self::Drop),
291            "reorder" | "stream/fault/reorder" => Ok(Self::Reorder),
292            "duplicate" | "stream/fault/duplicate" => Ok(Self::Duplicate),
293            "delay" | "stream/fault/delay" => Ok(Self::Delay),
294            "cancel" | "stream/fault/cancel" => Ok(Self::Cancel),
295            "timeout" | "stream/fault/timeout" => Ok(Self::Timeout),
296            "disconnect" | "stream/fault/disconnect" => Ok(Self::Disconnect),
297            "reconnect" | "stream/fault/reconnect" => Ok(Self::Reconnect),
298            "unsupported-profile" | "stream/fault/unsupported-profile" => {
299                Ok(Self::UnsupportedProfile)
300            }
301            other => Err(Error::Eval(format!("unknown stream fault {other}"))),
302        }
303    }
304}
305
306/// A single fault to apply, paired with a repetition count.
307#[derive(Clone, Debug, PartialEq, Eq)]
308pub struct StreamFaultSpec {
309    /// Kind of fault to inject.
310    pub kind: StreamFaultKind,
311    /// Number of items the fault affects (at least 1).
312    pub count: usize,
313}
314
315impl StreamFaultSpec {
316    /// Builds a fault spec, clamping `count` to a minimum of 1.
317    pub fn new(kind: StreamFaultKind, count: usize) -> Self {
318        Self {
319            kind,
320            count: count.max(1),
321        }
322    }
323}
324
325/// An ordered list of faults to apply to a stream's item sequence.
326#[derive(Clone, Debug, Default, PartialEq, Eq)]
327pub struct StreamFaultPlan {
328    faults: Vec<StreamFaultSpec>,
329}
330
331/// Outcome of applying a [`StreamFaultPlan`] to a sequence of items.
332#[derive(Clone, Debug, PartialEq, Eq)]
333pub struct StreamFaultResult {
334    /// Items after the plan's faults have been applied.
335    pub items: Vec<StreamItem>,
336    /// Diagnostic symbols recording each fault that was applied, in order.
337    pub diagnostics: Vec<Symbol>,
338}
339
340impl StreamFaultPlan {
341    /// Builds a plan from an ordered list of fault specs.
342    pub fn new(faults: Vec<StreamFaultSpec>) -> Self {
343        Self { faults }
344    }
345
346    /// Returns the plan's fault specs in application order.
347    pub fn faults(&self) -> &[StreamFaultSpec] {
348        &self.faults
349    }
350
351    /// Applies every fault in order to a copy of `items`.
352    ///
353    /// Item-level faults rewrite the sequence; transport-level faults are
354    /// recorded as diagnostics without altering items. Returns the rewritten
355    /// items together with one diagnostic symbol per applied fault.
356    pub fn apply(&self, items: &[StreamItem]) -> StreamFaultResult {
357        let mut items = items.to_vec();
358        let mut diagnostics = Vec::new();
359        for fault in &self.faults {
360            diagnostics.push(fault.kind.symbol());
361            match fault.kind {
362                StreamFaultKind::Drop => {
363                    let remove = fault.count.min(items.len());
364                    items.drain(0..remove);
365                }
366                StreamFaultKind::Reorder => {
367                    if items.len() > 1 {
368                        items.swap(0, 1);
369                    }
370                }
371                StreamFaultKind::Duplicate => {
372                    if let Some(item) = items.first().cloned() {
373                        for _ in 0..fault.count {
374                            items.insert(0, item.clone());
375                        }
376                    }
377                }
378                StreamFaultKind::Delay => {
379                    if !items.is_empty() {
380                        let rotate = fault.count.min(items.len());
381                        items.rotate_left(rotate);
382                    }
383                }
384                StreamFaultKind::Cancel
385                | StreamFaultKind::Timeout
386                | StreamFaultKind::Disconnect
387                | StreamFaultKind::Reconnect
388                | StreamFaultKind::UnsupportedProfile => {}
389            }
390        }
391        StreamFaultResult { items, diagnostics }
392    }
393
394    /// Renders the plan as an [`Expr`] list of fault/count maps.
395    pub fn to_expr(&self) -> Expr {
396        Expr::List(
397            self.faults
398                .iter()
399                .map(|fault| {
400                    Expr::Map(vec![
401                        (
402                            Expr::Symbol(Symbol::new("fault")),
403                            Expr::Symbol(fault.kind.symbol()),
404                        ),
405                        (
406                            Expr::Symbol(Symbol::new("count")),
407                            Expr::String(fault.count.to_string()),
408                        ),
409                    ])
410                })
411                .collect(),
412        )
413    }
414}
415
416/// Returns the versioned model tag stamped into inspector snapshots.
417pub fn stream_inspector_model_symbol() -> Symbol {
418    Symbol::qualified("stream/inspector", "v1")
419}
420
421/// Returns the route symbol denoting a locally observed stream.
422pub fn stream_inspector_route_local_symbol() -> Symbol {
423    Symbol::qualified("stream/route", "local")
424}
425
426/// Returns every [`StreamInspectorStatus`] symbol as a fixed-size array.
427pub fn stream_inspector_status_symbols() -> [Symbol; 8] {
428    [
429        StreamInspectorStatus::Live.symbol(),
430        StreamInspectorStatus::Ended.symbol(),
431        StreamInspectorStatus::Cancelled.symbol(),
432        StreamInspectorStatus::BufferOverflow.symbol(),
433        StreamInspectorStatus::Disconnected.symbol(),
434        StreamInspectorStatus::Reconnecting.symbol(),
435        StreamInspectorStatus::RefusedProfile.symbol(),
436        StreamInspectorStatus::Faulted.symbol(),
437    ]
438}
439
440/// Returns every [`StreamFaultKind`] symbol as a fixed-size array.
441pub fn stream_fault_symbols() -> [Symbol; 9] {
442    [
443        StreamFaultKind::Drop.symbol(),
444        StreamFaultKind::Reorder.symbol(),
445        StreamFaultKind::Duplicate.symbol(),
446        StreamFaultKind::Delay.symbol(),
447        StreamFaultKind::Cancel.symbol(),
448        StreamFaultKind::Timeout.symbol(),
449        StreamFaultKind::Disconnect.symbol(),
450        StreamFaultKind::Reconnect.symbol(),
451        StreamFaultKind::UnsupportedProfile.symbol(),
452    ]
453}
454
455/// Checks that a fault kind is in the supported set, erroring otherwise.
456pub fn ensure_fault_supported(kind: StreamFaultKind) -> Result<()> {
457    if stream_fault_symbols().contains(&kind.symbol()) {
458        Ok(())
459    } else {
460        Err(Error::Eval("unsupported stream fault".to_owned()))
461    }
462}
463
464fn optional_u64_expr(value: Option<u64>) -> Expr {
465    value
466        .map(|value| Expr::String(value.to_string()))
467        .unwrap_or(Expr::Nil)
468}