sim-lib-stream-combinators 0.1.0-rc.1

Lazy in-memory stream combinators for STREAM 6.
Documentation
use sim_kernel::{ContentId, Expr, Ref, Symbol, Tick};
use sim_lib_stream_core::{
    BufferOverflowPolicy, BufferPolicy, StreamDirection, StreamItem, StreamMedia, StreamMetadata,
    StreamPacket,
};

use crate::{
    Stream, event_join_data_kind, expr_path, fan, filter_data_field_eq, join_data_on_field,
    model_event_data_kind, project_data_field, rank_data_by_i64_field, rank_frontier_data_kind,
    record_bang, redact_data_field, take,
};

#[test]
fn event_algebra_filters_ranks_and_replays_model_branch() {
    let source = Stream::pull(
        data_metadata(),
        vec![
            model_event("a", "delta", "draft-a", 20, 1),
            model_event("b", "tool-call", "tool-b", 0, 2),
            model_event("b", "delta", "draft-b", 90, 3),
            rank_frontier("a", "coord-a", 20, 4),
            rank_frontier("b", "coord-b", 90, 5),
        ],
    );
    let fanout = fan(source);
    let deltas = filter_data_field_eq(
        fanout.left.filter_data_kind(model_event_data_kind()),
        path(&["event"]),
        Expr::String("delta".to_owned()),
    );
    let frontiers = fanout.right.filter_data_kind(rank_frontier_data_kind());

    let joined = join_data_on_field(
        deltas,
        frontiers,
        path(&["span-id"]),
        path(&["span-id"]),
        event_join_data_kind(),
    )
    .unwrap();
    let ranked = rank_data_by_i64_field(joined, path(&["right", "score"]), true).unwrap();
    let chosen = take(ranked, 1);
    let recording = record_bang(&chosen).unwrap();

    assert_eq!(recording.len(), 1);
    let payload = data_payload(&recording.items()[0]).unwrap();
    assert_eq!(
        expr_path(payload, &path(&["left", "text"])),
        Some(&Expr::String("draft-b".to_owned()))
    );
    assert_eq!(
        expr_path(payload, &path(&["right", "coordinate"])),
        Some(&Expr::String("coord-b".to_owned()))
    );

    let projected = project_data_field(recording.replay(), path(&["left", "text"]));
    let projected_items = projected.take_packets(8).unwrap();
    assert_eq!(projected_items.len(), 1);
    assert_eq!(
        data_payload(&projected_items[0]),
        Some(&Expr::String("draft-b".to_owned()))
    );

    let redacted = redact_data_field(
        recording.replay(),
        path(&["left", "text"]),
        Expr::String("REDACTED".to_owned()),
    );
    let redacted_items = redacted.take_packets(8).unwrap();
    assert_eq!(
        expr_path(
            data_payload(&redacted_items[0]).unwrap(),
            &path(&["left", "text"])
        ),
        Some(&Expr::String("REDACTED".to_owned()))
    );
    assert_eq!(
        recording.replay().take_packets(8).unwrap(),
        recording.replay().take_packets(8).unwrap()
    );
}

fn data_metadata() -> StreamMetadata {
    StreamMetadata::new(
        Symbol::qualified("stream", "event-algebra-test"),
        StreamMedia::Data,
        StreamDirection::Source,
        clock_symbol(),
        BufferPolicy::bounded_with_overflow(8, BufferOverflowPolicy::DropNewest).unwrap(),
    )
}

fn model_event(span: &str, event: &str, text: &str, score: i64, tick_index: u8) -> StreamItem {
    StreamItem::with_ticks(
        StreamPacket::model_event(Expr::Map(vec![
            field("span-id", span),
            field("event", event),
            field("text", text),
            score_field(score),
        ])),
        vec![tick(tick_index)],
    )
    .unwrap()
}

fn rank_frontier(span: &str, coordinate: &str, score: i64, tick_index: u8) -> StreamItem {
    StreamItem::with_ticks(
        StreamPacket::rank_frontier(Expr::Map(vec![
            field("span-id", span),
            field("coordinate", coordinate),
            score_field(score),
        ])),
        vec![tick(tick_index)],
    )
    .unwrap()
}

fn data_payload(item: &StreamItem) -> Option<&Expr> {
    match item.packet() {
        StreamPacket::Data(packet) => Some(&packet.payload),
        _ => None,
    }
}

fn field(name: &str, value: &str) -> (Expr, Expr) {
    (
        Expr::Symbol(Symbol::new(name)),
        Expr::String(value.to_owned()),
    )
}

fn score_field(score: i64) -> (Expr, Expr) {
    (
        Expr::Symbol(Symbol::new("score")),
        Expr::String(score.to_string()),
    )
}

fn path(segments: &[&str]) -> Vec<Symbol> {
    segments
        .iter()
        .map(|segment| Symbol::new(*segment))
        .collect()
}

fn clock_symbol() -> Symbol {
    Symbol::qualified("clock", "event-algebra-test")
}

fn tick(index: u8) -> Tick {
    Tick::new(
        clock_symbol(),
        Ref::Content(ContentId::from_bytes(
            Symbol::qualified("core", "sha256"),
            [index; 32],
        )),
    )
}