sim-lib-stream-core 0.1.0-rc.1

Core stream metadata, packets, envelopes, and buffer values.
Documentation
use sim_kernel::{Expr, Ref, Symbol, Tick};

use crate::{
    BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamFaultKind, StreamFaultPlan,
    StreamFaultSpec, StreamInspectorSnapshot, StreamInspectorStatus, StreamItem, StreamMedia,
    StreamMetadata, StreamPacket, TransportProfile, stream_fault_symbols,
    stream_inspector_model_symbol, stream_inspector_route_local_symbol,
};

#[test]
fn stream_inspector_snapshot_reports_queue_status_and_diagnostics() {
    let stream = crate::StreamValue::pull(diagnostic_metadata(), vec![item("one"), item("two")]);
    let snapshot = StreamInspectorSnapshot::from_stream_value(
        &stream,
        stream_inspector_route_local_symbol(),
        &TransportProfile::memory_local(),
        vec![Symbol::qualified("stream/test", "recent")],
    )
    .unwrap();

    assert_eq!(
        snapshot.stream_id,
        Symbol::qualified("stream", "diagnostics")
    );
    assert_eq!(snapshot.route, stream_inspector_route_local_symbol());
    assert_eq!(snapshot.media, StreamMedia::Diagnostic);
    assert_eq!(
        snapshot.profile,
        Symbol::qualified("stream/profile", "memory-local")
    );
    assert_eq!(snapshot.clock, Symbol::qualified("clock", "sample"));
    assert_eq!(snapshot.status, StreamInspectorStatus::Live);
    assert_eq!(snapshot.queue_depth, 2);
    assert_eq!(snapshot.last_sequence, Some(1));
    assert_eq!(
        table_value(&snapshot.to_expr(), "inspector"),
        Some(&Expr::Symbol(stream_inspector_model_symbol()))
    );

    let _ = stream.take_packets(2).unwrap();
    let ended = StreamInspectorSnapshot::from_stream_value(
        &stream,
        stream_inspector_route_local_symbol(),
        &TransportProfile::memory_local(),
        Vec::new(),
    )
    .unwrap();
    assert_eq!(ended.status, StreamInspectorStatus::Ended);
    assert_eq!(ended.queue_depth, 0);
}

#[test]
fn stream_fault_plan_names_and_applies_required_faults() {
    let items = vec![item("one"), item("two"), item("three")];
    let plan = StreamFaultPlan::new(vec![
        StreamFaultSpec::new(StreamFaultKind::Drop, 1),
        StreamFaultSpec::new(StreamFaultKind::Reorder, 1),
        StreamFaultSpec::new(StreamFaultKind::Duplicate, 1),
        StreamFaultSpec::new(StreamFaultKind::Delay, 1),
        StreamFaultSpec::new(StreamFaultKind::Cancel, 1),
        StreamFaultSpec::new(StreamFaultKind::Timeout, 1),
        StreamFaultSpec::new(StreamFaultKind::Disconnect, 1),
        StreamFaultSpec::new(StreamFaultKind::Reconnect, 1),
        StreamFaultSpec::new(StreamFaultKind::UnsupportedProfile, 1),
    ]);
    let result = plan.apply(&items);

    assert_eq!(stream_fault_symbols().len(), 9);
    assert_eq!(result.diagnostics, stream_fault_symbols().to_vec());
    assert_eq!(result.items.len(), 3);
    let Expr::List(faults) = plan.to_expr() else {
        panic!("fault plan should encode as a list");
    };
    assert_eq!(
        table_value(&faults[0], "fault"),
        Some(&Expr::Symbol(StreamFaultKind::Drop.symbol()))
    );
}

fn diagnostic_metadata() -> StreamMetadata {
    StreamMetadata::new(
        Symbol::qualified("stream", "diagnostics"),
        StreamMedia::Diagnostic,
        StreamDirection::Source,
        Symbol::qualified("clock", "sample"),
        BufferPolicy::bounded_with_overflow(2, BufferOverflowPolicy::DropOldest).unwrap(),
    )
}

fn item(message: &str) -> StreamItem {
    StreamItem::with_ticks(
        StreamPacket::Diagnostic(crate::StreamDiagnostic::new(
            Symbol::qualified("stream/test", "packet"),
            message,
        )),
        vec![Tick::new(
            Symbol::qualified("clock", "sample"),
            Ref::Symbol(Symbol::qualified("stream/test", message)),
        )],
    )
    .unwrap()
}

fn table_value<'a>(expr: &'a Expr, key: &str) -> Option<&'a Expr> {
    let Expr::Map(entries) = expr else {
        return None;
    };
    entries.iter().find_map(|(entry_key, entry_value)| {
        let Expr::Symbol(entry_key) = entry_key else {
            return None;
        };
        (entry_key.name.as_ref() == key).then_some(entry_value)
    })
}