use std::any::Any;
use std::collections::VecDeque;
use bytes::Bytes;
use super::record::RecordContext;
#[derive(Debug, thiserror::Error)]
pub enum ProcessorError {
#[error("type mismatch in node `{node}`: expected {expected}, found a different type")]
Downcast {
node: String,
expected: &'static str,
},
#[error("serialization error in sink `{node}`: {message}")]
Serde { node: String, message: String },
}
pub(crate) struct ErasedRecord {
pub key: Option<Box<dyn Any + Send>>,
pub value: Box<dyn Any + Send>,
pub timestamp: i64,
}
impl ErasedRecord {
pub fn new(
key: Option<Box<dyn Any + Send>>,
value: Box<dyn Any + Send>,
timestamp: i64,
) -> Self {
Self {
key,
value,
timestamp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct OutputRecord {
pub topic: String,
pub key: Option<Bytes>,
pub value: Option<Bytes>,
pub timestamp: i64,
}
pub(crate) struct Dispatch<'a> {
pub buffer: &'a mut VecDeque<(usize, ErasedRecord)>,
pub children: &'a [usize],
pub output: &'a mut Vec<OutputRecord>,
pub record_ctx: &'a RecordContext,
pub stores: &'a mut crate::store::registry::StoreRegistry,
pub globals: &'a crate::runtime::global::GlobalStateManager,
pub node_idx: usize,
pub schedules: &'a mut Vec<crate::processor::punctuation::ScheduleEntry>,
pub sched_stream_time: i64,
pub sched_wall_clock: i64,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::check;
#[test]
fn erase_then_downcast_roundtrips_value_and_key() {
let er = ErasedRecord::new(Some(Box::new(7i32)), Box::new("v".to_string()), 1);
let key = er.key.unwrap().downcast::<i32>().unwrap();
let val = er.value.downcast::<String>().unwrap();
check!(*key == 7);
check!(*val == "v");
}
}