Skip to main content

sim_lib_stream_combinators/
event_algebra.rs

1//! Finite event algebra helpers for data streams.
2//!
3//! These helpers intentionally live in the stream-combinators crate. They build
4//! programmable joins, lenses, and ordering on top of ordinary stream packets
5//! without adding kernel protocol surface.
6
7use sim_kernel::{Error, Expr, Result, Symbol};
8use sim_lib_stream_core::{StreamItem, StreamPacket};
9
10use crate::{Stream, filter_data_shape, map_data_expr, record_bang};
11
12/// Returns the canonical data-packet kind for model events.
13pub fn model_event_data_kind() -> Symbol {
14    Symbol::qualified("stream/data", "model-event")
15}
16
17/// Returns the canonical data-packet kind for rank-frontier updates.
18pub fn rank_frontier_data_kind() -> Symbol {
19    Symbol::qualified("stream/data", "rank-frontier")
20}
21
22/// Returns the canonical data-packet kind produced by event joins.
23pub fn event_join_data_kind() -> Symbol {
24    Symbol::qualified("stream/data", "event-join")
25}
26
27/// Resolves a dotted `path` of symbol keys through nested map expressions.
28///
29/// Walks `expr` one segment at a time, descending into
30/// [`Expr::Map`](sim_kernel::Expr) entries keyed by each symbol. Returns the
31/// addressed sub-expression, or `None` if any segment is missing or the cursor
32/// is not a map.
33pub fn expr_path<'a>(expr: &'a Expr, path: &[Symbol]) -> Option<&'a Expr> {
34    let mut cursor = expr;
35    for segment in path {
36        let Expr::Map(entries) = cursor else {
37            return None;
38        };
39        cursor = entries.iter().find_map(|(key, value)| match key {
40            Expr::Symbol(symbol) if symbol == segment => Some(value),
41            _ => None,
42        })?;
43    }
44    Some(cursor)
45}
46
47/// Keeps data packets whose payload field at `path` equals `expected`.
48pub fn filter_data_field_eq(source: Stream, path: Vec<Symbol>, expected: Expr) -> Stream {
49    filter_data_shape(source, move |payload| {
50        Ok(expr_path(payload, &path) == Some(&expected))
51    })
52}
53
54/// Projects each data payload down to the sub-expression at `path`.
55///
56/// A packet whose payload lacks `path` fails the stream with an eval error.
57pub fn project_data_field(source: Stream, path: Vec<Symbol>) -> Stream {
58    map_data_expr(source, move |payload| {
59        expr_path(&payload, &path)
60            .cloned()
61            .ok_or_else(|| Error::Eval("stream/project field path not found".to_owned()))
62    })
63}
64
65/// Replaces each data payload's value at `path` with `redaction`.
66///
67/// A packet whose payload does not traverse to `path` fails with an eval error.
68pub fn redact_data_field(source: Stream, path: Vec<Symbol>, redaction: Expr) -> Stream {
69    map_data_expr(source, move |payload| {
70        redact_expr_path(payload, &path, redaction.clone())
71    })
72}
73
74/// Inner-joins two data streams on equal field values.
75///
76/// Both streams are recorded to completion, then every `left` payload whose
77/// value at `left_path` equals a `right` payload's value at `right_path` emits
78/// a packet of `output_kind` carrying `{key, left, right}` and the union of
79/// both items' ticks. Both sources must reach `done`.
80pub fn join_data_on_field(
81    left: Stream,
82    right: Stream,
83    left_path: Vec<Symbol>,
84    right_path: Vec<Symbol>,
85    output_kind: Symbol,
86) -> Result<Stream> {
87    let metadata = left.metadata().clone();
88    let left_items = record_bang(&left)?.items().to_vec();
89    let right_items = record_bang(&right)?.items().to_vec();
90    let mut joined = Vec::new();
91
92    for left_item in &left_items {
93        let Some(left_payload) = data_payload(left_item) else {
94            continue;
95        };
96        let Some(key) = expr_path(left_payload, &left_path).cloned() else {
97            continue;
98        };
99        for right_item in &right_items {
100            let Some(right_payload) = data_payload(right_item) else {
101                continue;
102            };
103            if expr_path(right_payload, &right_path) == Some(&key) {
104                joined.push(join_item(
105                    &output_kind,
106                    key.clone(),
107                    left_item,
108                    left_payload,
109                    right_item,
110                    right_payload,
111                )?);
112            }
113        }
114    }
115
116    Ok(Stream::pull(metadata, joined))
117}
118
119/// Reorders recorded data packets by an `i64` score read from `path`.
120///
121/// The source is recorded to completion, each payload's value at `path` is
122/// parsed as an `i64` (missing or unparseable scores default to `0`), and the
123/// packets are stably sorted ascending, or descending when `descending` is set.
124pub fn rank_data_by_i64_field(
125    source: Stream,
126    path: Vec<Symbol>,
127    descending: bool,
128) -> Result<Stream> {
129    let recording = record_bang(&source)?;
130    let metadata = recording.metadata().clone();
131    let mut ranked = recording
132        .items()
133        .iter()
134        .cloned()
135        .enumerate()
136        .map(|(index, item)| {
137            let score = data_payload(&item)
138                .and_then(|payload| expr_path(payload, &path))
139                .and_then(expr_i64)
140                .unwrap_or(0);
141            (index, score, item)
142        })
143        .collect::<Vec<_>>();
144
145    ranked.sort_by(
146        |(left_index, left_score, _), (right_index, right_score, _)| {
147            let order = if descending {
148                right_score.cmp(left_score)
149            } else {
150                left_score.cmp(right_score)
151            };
152            order.then_with(|| left_index.cmp(right_index))
153        },
154    );
155
156    Ok(Stream::pull(
157        metadata,
158        ranked.into_iter().map(|(_, _, item)| item).collect(),
159    ))
160}
161
162fn join_item(
163    kind: &Symbol,
164    key: Expr,
165    left_item: &StreamItem,
166    left_payload: &Expr,
167    right_item: &StreamItem,
168    right_payload: &Expr,
169) -> Result<StreamItem> {
170    let mut ticks = left_item.ticks().to_vec();
171    for tick in right_item.ticks() {
172        if !ticks.iter().any(|existing| existing.clock == tick.clock) {
173            ticks.push(tick.clone());
174        }
175    }
176    StreamItem::with_ticks(
177        StreamPacket::data(
178            kind.clone(),
179            Expr::Map(vec![
180                (field("key"), key),
181                (field("left"), left_payload.clone()),
182                (field("right"), right_payload.clone()),
183            ]),
184        ),
185        ticks,
186    )
187}
188
189fn data_payload(item: &StreamItem) -> Option<&Expr> {
190    match item.packet() {
191        StreamPacket::Data(packet) => Some(&packet.payload),
192        _ => None,
193    }
194}
195
196fn expr_i64(expr: &Expr) -> Option<i64> {
197    match expr {
198        Expr::Number(number) => number.canonical.parse().ok(),
199        Expr::String(text) => text.parse().ok(),
200        _ => None,
201    }
202}
203
204fn redact_expr_path(mut expr: Expr, path: &[Symbol], redaction: Expr) -> Result<Expr> {
205    if path.is_empty() {
206        return Ok(redaction);
207    }
208    redact_expr_path_inner(&mut expr, path, redaction)?;
209    Ok(expr)
210}
211
212fn redact_expr_path_inner(expr: &mut Expr, path: &[Symbol], redaction: Expr) -> Result<()> {
213    let Expr::Map(entries) = expr else {
214        return Err(Error::Eval(
215            "stream/redact field path does not traverse a map".to_owned(),
216        ));
217    };
218    let Some((_, value)) = entries
219        .iter_mut()
220        .find(|(key, _)| matches!(key, Expr::Symbol(symbol) if symbol == &path[0]))
221    else {
222        return Err(Error::Eval("stream/redact field path not found".to_owned()));
223    };
224    if path.len() == 1 {
225        *value = redaction;
226        Ok(())
227    } else {
228        redact_expr_path_inner(value, &path[1..], redaction)
229    }
230}
231
232fn field(name: &str) -> Expr {
233    Expr::Symbol(Symbol::new(name))
234}