use std::sync::Arc;
use arrow_array::RecordBatch;
use smallvec::SmallVec;
pub type TimerKey = SmallVec<[u8; 16]>;
#[derive(Debug, Clone)]
pub struct Event {
pub timestamp: i64,
pub data: Arc<RecordBatch>,
}
impl Event {
#[must_use]
pub fn new(timestamp: i64, data: RecordBatch) -> Self {
Self {
timestamp,
data: Arc::new(data),
}
}
}
#[derive(Debug, Clone)]
pub struct OperatorState {
pub operator_id: String,
pub version: u32,
pub data: Vec<u8>,
}
impl OperatorState {
#[must_use]
pub fn v1(operator_id: String, data: Vec<u8>) -> Self {
Self {
operator_id,
version: 1,
data,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum OperatorError {
#[error("State access failed: {0}")]
StateAccessFailed(String),
#[error("Serialization failed: {0}")]
SerializationFailed(String),
#[error("Processing failed: {0}")]
ProcessingFailed(String),
#[error("Configuration error: {0}")]
ConfigError(String),
}
impl From<arrow_schema::ArrowError> for OperatorError {
fn from(e: arrow_schema::ArrowError) -> Self {
Self::SerializationFailed(e.to_string())
}
}
pub mod sliding_window;
pub mod window;
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Int64Array, RecordBatch};
use std::sync::Arc;
#[test]
fn test_event_creation() {
let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
let event = Event::new(12345, batch);
assert_eq!(event.timestamp, 12345);
assert_eq!(event.data.num_rows(), 3);
}
}