sim_lib_stream_combinators/
event_algebra.rs1use sim_kernel::{Error, Expr, Result, Symbol};
8use sim_lib_stream_core::{StreamItem, StreamPacket};
9
10use crate::{Stream, filter_data_shape, map_data_expr, record_bang};
11
12pub fn model_event_data_kind() -> Symbol {
14 Symbol::qualified("stream/data", "model-event")
15}
16
17pub fn rank_frontier_data_kind() -> Symbol {
19 Symbol::qualified("stream/data", "rank-frontier")
20}
21
22pub fn event_join_data_kind() -> Symbol {
24 Symbol::qualified("stream/data", "event-join")
25}
26
27pub fn expr_path<'a>(expr: &'a Expr, path: &[Symbol]) -> Option<&'a Expr> {
34 let mut cursor = expr;
35 for segment in path {
36 let Expr::Map(entries) = cursor else {
37 return None;
38 };
39 cursor = entries.iter().find_map(|(key, value)| match key {
40 Expr::Symbol(symbol) if symbol == segment => Some(value),
41 _ => None,
42 })?;
43 }
44 Some(cursor)
45}
46
47pub fn filter_data_field_eq(source: Stream, path: Vec<Symbol>, expected: Expr) -> Stream {
49 filter_data_shape(source, move |payload| {
50 Ok(expr_path(payload, &path) == Some(&expected))
51 })
52}
53
54pub fn project_data_field(source: Stream, path: Vec<Symbol>) -> Stream {
58 map_data_expr(source, move |payload| {
59 expr_path(&payload, &path)
60 .cloned()
61 .ok_or_else(|| Error::Eval("stream/project field path not found".to_owned()))
62 })
63}
64
65pub fn redact_data_field(source: Stream, path: Vec<Symbol>, redaction: Expr) -> Stream {
69 map_data_expr(source, move |payload| {
70 redact_expr_path(payload, &path, redaction.clone())
71 })
72}
73
74pub fn join_data_on_field(
81 left: Stream,
82 right: Stream,
83 left_path: Vec<Symbol>,
84 right_path: Vec<Symbol>,
85 output_kind: Symbol,
86) -> Result<Stream> {
87 let metadata = left.metadata().clone();
88 let left_items = record_bang(&left)?.items().to_vec();
89 let right_items = record_bang(&right)?.items().to_vec();
90 let mut joined = Vec::new();
91
92 for left_item in &left_items {
93 let Some(left_payload) = data_payload(left_item) else {
94 continue;
95 };
96 let Some(key) = expr_path(left_payload, &left_path).cloned() else {
97 continue;
98 };
99 for right_item in &right_items {
100 let Some(right_payload) = data_payload(right_item) else {
101 continue;
102 };
103 if expr_path(right_payload, &right_path) == Some(&key) {
104 joined.push(join_item(
105 &output_kind,
106 key.clone(),
107 left_item,
108 left_payload,
109 right_item,
110 right_payload,
111 )?);
112 }
113 }
114 }
115
116 Ok(Stream::pull(metadata, joined))
117}
118
119pub fn rank_data_by_i64_field(
125 source: Stream,
126 path: Vec<Symbol>,
127 descending: bool,
128) -> Result<Stream> {
129 let recording = record_bang(&source)?;
130 let metadata = recording.metadata().clone();
131 let mut ranked = recording
132 .items()
133 .iter()
134 .cloned()
135 .enumerate()
136 .map(|(index, item)| {
137 let score = data_payload(&item)
138 .and_then(|payload| expr_path(payload, &path))
139 .and_then(expr_i64)
140 .unwrap_or(0);
141 (index, score, item)
142 })
143 .collect::<Vec<_>>();
144
145 ranked.sort_by(
146 |(left_index, left_score, _), (right_index, right_score, _)| {
147 let order = if descending {
148 right_score.cmp(left_score)
149 } else {
150 left_score.cmp(right_score)
151 };
152 order.then_with(|| left_index.cmp(right_index))
153 },
154 );
155
156 Ok(Stream::pull(
157 metadata,
158 ranked.into_iter().map(|(_, _, item)| item).collect(),
159 ))
160}
161
162fn join_item(
163 kind: &Symbol,
164 key: Expr,
165 left_item: &StreamItem,
166 left_payload: &Expr,
167 right_item: &StreamItem,
168 right_payload: &Expr,
169) -> Result<StreamItem> {
170 let mut ticks = left_item.ticks().to_vec();
171 for tick in right_item.ticks() {
172 if !ticks.iter().any(|existing| existing.clock == tick.clock) {
173 ticks.push(tick.clone());
174 }
175 }
176 StreamItem::with_ticks(
177 StreamPacket::data(
178 kind.clone(),
179 Expr::Map(vec![
180 (field("key"), key),
181 (field("left"), left_payload.clone()),
182 (field("right"), right_payload.clone()),
183 ]),
184 ),
185 ticks,
186 )
187}
188
189fn data_payload(item: &StreamItem) -> Option<&Expr> {
190 match item.packet() {
191 StreamPacket::Data(packet) => Some(&packet.payload),
192 _ => None,
193 }
194}
195
196fn expr_i64(expr: &Expr) -> Option<i64> {
197 match expr {
198 Expr::Number(number) => number.canonical.parse().ok(),
199 Expr::String(text) => text.parse().ok(),
200 _ => None,
201 }
202}
203
204fn redact_expr_path(mut expr: Expr, path: &[Symbol], redaction: Expr) -> Result<Expr> {
205 if path.is_empty() {
206 return Ok(redaction);
207 }
208 redact_expr_path_inner(&mut expr, path, redaction)?;
209 Ok(expr)
210}
211
212fn redact_expr_path_inner(expr: &mut Expr, path: &[Symbol], redaction: Expr) -> Result<()> {
213 let Expr::Map(entries) = expr else {
214 return Err(Error::Eval(
215 "stream/redact field path does not traverse a map".to_owned(),
216 ));
217 };
218 let Some((_, value)) = entries
219 .iter_mut()
220 .find(|(key, _)| matches!(key, Expr::Symbol(symbol) if symbol == &path[0]))
221 else {
222 return Err(Error::Eval("stream/redact field path not found".to_owned()));
223 };
224 if path.len() == 1 {
225 *value = redaction;
226 Ok(())
227 } else {
228 redact_expr_path_inner(value, &path[1..], redaction)
229 }
230}
231
232fn field(name: &str) -> Expr {
233 Expr::Symbol(Symbol::new(name))
234}