Skip to main content

crabka_client_streams/processor/
erased.rs

1//! Type-erased records + the per-dispatch context the graph driver hands to
2//! each node. Records flow erased (`Box<dyn Any + Send>`) between nodes; only
3//! source/sink boundaries (de)serialize.
4
5use std::any::Any;
6use std::collections::VecDeque;
7
8use bytes::Bytes;
9
10use super::record::RecordContext;
11
12/// An error raised while a node processes a record (e.g. an internal downcast
13/// mismatch — unreachable in practice because `build()` validates wiring).
14#[derive(Debug, thiserror::Error)]
15pub enum ProcessorError {
16    #[error("type mismatch in node `{node}`: expected {expected}, found a different type")]
17    Downcast {
18        node: String,
19        expected: &'static str,
20    },
21    #[error("serialization error in sink `{node}`: {message}")]
22    Serde { node: String, message: String },
23}
24
25/// A record with erased key/value, as it flows between nodes.
26pub(crate) struct ErasedRecord {
27    pub key: Option<Box<dyn Any + Send>>,
28    pub value: Box<dyn Any + Send>,
29    pub timestamp: i64,
30}
31
32impl ErasedRecord {
33    pub fn new(
34        key: Option<Box<dyn Any + Send>>,
35        value: Box<dyn Any + Send>,
36        timestamp: i64,
37    ) -> Self {
38        Self {
39            key,
40            value,
41            timestamp,
42        }
43    }
44}
45
46/// A record emitted by a sink node, collected by the driver (test driver: into
47/// an output queue; runtime: into the producer).
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) struct OutputRecord {
50    pub topic: String,
51    pub key: Option<Bytes>,
52    pub value: Option<Bytes>,
53    pub timestamp: i64,
54}
55
56/// What the driver lends to a node for the duration of one `process` call:
57/// the forward buffer (for source/processor children), this node's child
58/// indices, the sink output collector, the source-record context, and the
59/// per-task store registry (for `get_state_store`).
60pub(crate) struct Dispatch<'a> {
61    pub buffer: &'a mut VecDeque<(usize, ErasedRecord)>,
62    pub children: &'a [usize],
63    pub output: &'a mut Vec<OutputRecord>,
64    pub record_ctx: &'a RecordContext,
65    pub stores: &'a mut crate::store::registry::StoreRegistry,
66    /// The app-wide, fully-replicated global stores (shared across tasks). Read
67    /// by stream-globaltable join processors via `ProcessorContext::global_get`.
68    pub globals: &'a crate::runtime::global::GlobalStateManager,
69    /// The graph node this dispatch is positioned at (so `schedule` tags the
70    /// owning node, and a punctuator forwards to this node's children).
71    pub node_idx: usize,
72    /// Sink for `ProcessorContext::schedule`: newly-registered punctuation schedules.
73    pub schedules: &'a mut Vec<crate::processor::punctuation::ScheduleEntry>,
74    /// Current stream-time / wall-clock — the BASE `schedule` stamps a new entry's
75    /// first-fire time from (`base + interval`).
76    pub sched_stream_time: i64,
77    pub sched_wall_clock: i64,
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use assert2::check;
84
85    #[test]
86    fn erase_then_downcast_roundtrips_value_and_key() {
87        let er = ErasedRecord::new(Some(Box::new(7i32)), Box::new("v".to_string()), 1);
88        let key = er.key.unwrap().downcast::<i32>().unwrap();
89        let val = er.value.downcast::<String>().unwrap();
90        check!(*key == 7);
91        check!(*val == "v");
92    }
93}