crabka_client_streams/processor/
erased.rs1use std::any::Any;
6use std::collections::VecDeque;
7
8use bytes::Bytes;
9
10use super::record::RecordContext;
11
12#[derive(Debug, thiserror::Error)]
15pub enum ProcessorError {
16 #[error("type mismatch in node `{node}`: expected {expected}, found a different type")]
17 Downcast {
18 node: String,
19 expected: &'static str,
20 },
21 #[error("serialization error in sink `{node}`: {message}")]
22 Serde { node: String, message: String },
23}
24
25pub(crate) struct ErasedRecord {
27 pub key: Option<Box<dyn Any + Send>>,
28 pub value: Box<dyn Any + Send>,
29 pub timestamp: i64,
30}
31
32impl ErasedRecord {
33 pub fn new(
34 key: Option<Box<dyn Any + Send>>,
35 value: Box<dyn Any + Send>,
36 timestamp: i64,
37 ) -> Self {
38 Self {
39 key,
40 value,
41 timestamp,
42 }
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) struct OutputRecord {
50 pub topic: String,
51 pub key: Option<Bytes>,
52 pub value: Option<Bytes>,
53 pub timestamp: i64,
54}
55
56pub(crate) struct Dispatch<'a> {
61 pub buffer: &'a mut VecDeque<(usize, ErasedRecord)>,
62 pub children: &'a [usize],
63 pub output: &'a mut Vec<OutputRecord>,
64 pub record_ctx: &'a RecordContext,
65 pub stores: &'a mut crate::store::registry::StoreRegistry,
66 pub globals: &'a crate::runtime::global::GlobalStateManager,
69 pub node_idx: usize,
72 pub schedules: &'a mut Vec<crate::processor::punctuation::ScheduleEntry>,
74 pub sched_stream_time: i64,
77 pub sched_wall_clock: i64,
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use assert2::check;
84
85 #[test]
86 fn erase_then_downcast_roundtrips_value_and_key() {
87 let er = ErasedRecord::new(Some(Box::new(7i32)), Box::new("v".to_string()), 1);
88 let key = er.key.unwrap().downcast::<i32>().unwrap();
89 let val = er.value.downcast::<String>().unwrap();
90 check!(*key == 7);
91 check!(*val == "v");
92 }
93}