use sim_kernel::{Error, Expr, Result, Symbol};
use sim_lib_stream_core::{StreamItem, StreamPacket};
use crate::{Stream, filter_data_shape, map_data_expr, record_bang};
pub fn model_event_data_kind() -> Symbol {
Symbol::qualified("stream/data", "model-event")
}
pub fn rank_frontier_data_kind() -> Symbol {
Symbol::qualified("stream/data", "rank-frontier")
}
pub fn event_join_data_kind() -> Symbol {
Symbol::qualified("stream/data", "event-join")
}
pub fn expr_path<'a>(expr: &'a Expr, path: &[Symbol]) -> Option<&'a Expr> {
let mut cursor = expr;
for segment in path {
let Expr::Map(entries) = cursor else {
return None;
};
cursor = entries.iter().find_map(|(key, value)| match key {
Expr::Symbol(symbol) if symbol == segment => Some(value),
_ => None,
})?;
}
Some(cursor)
}
pub fn filter_data_field_eq(source: Stream, path: Vec<Symbol>, expected: Expr) -> Stream {
filter_data_shape(source, move |payload| {
Ok(expr_path(payload, &path) == Some(&expected))
})
}
pub fn project_data_field(source: Stream, path: Vec<Symbol>) -> Stream {
map_data_expr(source, move |payload| {
expr_path(&payload, &path)
.cloned()
.ok_or_else(|| Error::Eval("stream/project field path not found".to_owned()))
})
}
pub fn redact_data_field(source: Stream, path: Vec<Symbol>, redaction: Expr) -> Stream {
map_data_expr(source, move |payload| {
redact_expr_path(payload, &path, redaction.clone())
})
}
pub fn join_data_on_field(
left: Stream,
right: Stream,
left_path: Vec<Symbol>,
right_path: Vec<Symbol>,
output_kind: Symbol,
) -> Result<Stream> {
let metadata = left.metadata().clone();
let left_items = record_bang(&left)?.items().to_vec();
let right_items = record_bang(&right)?.items().to_vec();
let mut joined = Vec::new();
for left_item in &left_items {
let Some(left_payload) = data_payload(left_item) else {
continue;
};
let Some(key) = expr_path(left_payload, &left_path).cloned() else {
continue;
};
for right_item in &right_items {
let Some(right_payload) = data_payload(right_item) else {
continue;
};
if expr_path(right_payload, &right_path) == Some(&key) {
joined.push(join_item(
&output_kind,
key.clone(),
left_item,
left_payload,
right_item,
right_payload,
)?);
}
}
}
Ok(Stream::pull(metadata, joined))
}
pub fn rank_data_by_i64_field(
source: Stream,
path: Vec<Symbol>,
descending: bool,
) -> Result<Stream> {
let recording = record_bang(&source)?;
let metadata = recording.metadata().clone();
let mut ranked = recording
.items()
.iter()
.cloned()
.enumerate()
.map(|(index, item)| {
let score = data_payload(&item)
.and_then(|payload| expr_path(payload, &path))
.and_then(expr_i64)
.unwrap_or(0);
(index, score, item)
})
.collect::<Vec<_>>();
ranked.sort_by(
|(left_index, left_score, _), (right_index, right_score, _)| {
let order = if descending {
right_score.cmp(left_score)
} else {
left_score.cmp(right_score)
};
order.then_with(|| left_index.cmp(right_index))
},
);
Ok(Stream::pull(
metadata,
ranked.into_iter().map(|(_, _, item)| item).collect(),
))
}
fn join_item(
kind: &Symbol,
key: Expr,
left_item: &StreamItem,
left_payload: &Expr,
right_item: &StreamItem,
right_payload: &Expr,
) -> Result<StreamItem> {
let mut ticks = left_item.ticks().to_vec();
for tick in right_item.ticks() {
if !ticks.iter().any(|existing| existing.clock == tick.clock) {
ticks.push(tick.clone());
}
}
StreamItem::with_ticks(
StreamPacket::data(
kind.clone(),
Expr::Map(vec![
(field("key"), key),
(field("left"), left_payload.clone()),
(field("right"), right_payload.clone()),
]),
),
ticks,
)
}
fn data_payload(item: &StreamItem) -> Option<&Expr> {
match item.packet() {
StreamPacket::Data(packet) => Some(&packet.payload),
_ => None,
}
}
fn expr_i64(expr: &Expr) -> Option<i64> {
match expr {
Expr::Number(number) => number.canonical.parse().ok(),
Expr::String(text) => text.parse().ok(),
_ => None,
}
}
fn redact_expr_path(mut expr: Expr, path: &[Symbol], redaction: Expr) -> Result<Expr> {
if path.is_empty() {
return Ok(redaction);
}
redact_expr_path_inner(&mut expr, path, redaction)?;
Ok(expr)
}
fn redact_expr_path_inner(expr: &mut Expr, path: &[Symbol], redaction: Expr) -> Result<()> {
let Expr::Map(entries) = expr else {
return Err(Error::Eval(
"stream/redact field path does not traverse a map".to_owned(),
));
};
let Some((_, value)) = entries
.iter_mut()
.find(|(key, _)| matches!(key, Expr::Symbol(symbol) if symbol == &path[0]))
else {
return Err(Error::Eval("stream/redact field path not found".to_owned()));
};
if path.len() == 1 {
*value = redaction;
Ok(())
} else {
redact_expr_path_inner(value, &path[1..], redaction)
}
}
fn field(name: &str) -> Expr {
Expr::Symbol(Symbol::new(name))
}