Skip to main content

laminar_core/operator/
mod.rs

1//! # Operator Module
2//!
3//! Streaming operators for transforming and processing events.
4//!
5//! ## Operator Types
6//!
7//! - **Stateless**: map, filter, flatmap
8//! - **Stateful**: window, aggregate, join
9//!
10//! All operators implement the `Operator` trait and can be composed into
11//! directed acyclic graphs (DAGs) for complex stream processing.
12
13use std::sync::Arc;
14
15use arrow_array::RecordBatch;
16use smallvec::SmallVec;
17
18/// Timer key type optimized for window IDs (16 bytes).
19/// Re-exported from time module for convenience.
20pub type TimerKey = SmallVec<[u8; 16]>;
21
22/// An event flowing through the system
23#[derive(Debug, Clone)]
24pub struct Event {
25    /// Timestamp of the event
26    pub timestamp: i64,
27    /// Event payload as Arrow `RecordBatch` wrapped in `Arc` for zero-copy multicast.
28    ///
29    /// Cloning an `Event` increments the `Arc` reference count (~2ns, O(1))
30    /// instead of copying all column `Arc` pointers (O(columns)).
31    pub data: Arc<RecordBatch>,
32}
33
34impl Event {
35    /// Create a new event, wrapping the batch in `Arc` for zero-copy sharing.
36    #[must_use]
37    pub fn new(timestamp: i64, data: RecordBatch) -> Self {
38        Self {
39            timestamp,
40            data: Arc::new(data),
41        }
42    }
43}
44
45/// Output from an operator
46#[derive(Debug)]
47pub enum Output {
48    /// Regular event output
49    Event(Event),
50    /// Watermark update
51    Watermark(i64),
52    /// Late event that arrived after watermark (no side output configured)
53    LateEvent(Event),
54    /// Late event routed to a named side output
55    SideOutput {
56        /// The name of the side output to route to
57        name: String,
58        /// The late event
59        event: Event,
60    },
61    /// Changelog record with Z-set weight (F011B).
62    ///
63    /// Used by `EmitStrategy::Changelog` to emit structured change records
64    /// for CDC pipelines and cascading materialized views.
65    Changelog(window::ChangelogRecord),
66    /// Checkpoint completion with snapshotted operator states.
67    ///
68    /// Emitted when a `CheckpointRequest` is processed by a core thread.
69    /// Carries the checkpoint ID and all operator states for persistence by Ring 1.
70    CheckpointComplete {
71        /// The checkpoint ID from the request
72        checkpoint_id: u64,
73        /// Snapshotted states from all operators on this core
74        operator_states: Vec<OperatorState>,
75    },
76}
77
78/// Collection type for operator outputs.
79///
80/// Uses `SmallVec` to avoid heap allocation for common cases (0-3 outputs).
81/// The size 4 is chosen based on typical operator patterns:
82/// - 0 outputs: filter that drops events
83/// - 1 output: most common case (map, regular processing)
84/// - 2 outputs: event + watermark
85/// - 3+ outputs: flatmap or window emission
86pub type OutputVec = SmallVec<[Output; 4]>;
87
88/// Context provided to operators during processing
89pub struct OperatorContext<'a> {
90    /// Current event time
91    pub event_time: i64,
92    /// Current processing time (system time in microseconds)
93    pub processing_time: i64,
94    /// Timer registration
95    pub timers: &'a mut crate::time::TimerService,
96    /// State store access
97    pub state: &'a mut dyn crate::state::StateStore,
98    /// Watermark generator
99    pub watermark_generator: &'a mut dyn crate::time::WatermarkGenerator,
100    /// Operator index in the chain
101    pub operator_index: usize,
102}
103
104/// Trait implemented by all streaming operators
105pub trait Operator: Send {
106    /// Process an incoming event
107    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec;
108
109    /// Handle timer expiration
110    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec;
111
112    /// Checkpoint the operator's state
113    fn checkpoint(&self) -> OperatorState;
114
115    /// Restore from a checkpoint
116    ///
117    /// # Errors
118    ///
119    /// Returns `OperatorError::StateAccessFailed` if the state cannot be accessed
120    /// Returns `OperatorError::SerializationFailed` if the state cannot be deserialized
121    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>;
122}
123
124/// A timer registration
125#[derive(Debug, Clone)]
126pub struct Timer {
127    /// Timer key (uses `SmallVec` to avoid heap allocation for keys up to 16 bytes)
128    pub key: TimerKey,
129    /// Expiration timestamp
130    pub timestamp: i64,
131}
132
133/// Serialized operator state for checkpointing
134#[derive(Debug, Clone)]
135pub struct OperatorState {
136    /// Operator ID
137    pub operator_id: String,
138    /// Serialized state data
139    pub data: Vec<u8>,
140}
141
142/// Errors that can occur in operators
143#[derive(Debug, thiserror::Error)]
144pub enum OperatorError {
145    /// State access error
146    #[error("State access failed: {0}")]
147    StateAccessFailed(String),
148
149    /// Serialization error
150    #[error("Serialization failed: {0}")]
151    SerializationFailed(String),
152
153    /// Processing error
154    #[error("Processing failed: {0}")]
155    ProcessingFailed(String),
156
157    /// Configuration error (e.g., missing required builder field)
158    #[error("Configuration error: {0}")]
159    ConfigError(String),
160}
161
162pub mod asof_join;
163pub mod changelog;
164pub mod lag_lead;
165pub mod lookup_join;
166pub mod partitioned_topk;
167pub mod session_window;
168pub mod sliding_window;
169pub mod stream_join;
170pub mod temporal_join;
171pub mod topk;
172pub mod watermark_sort;
173pub mod window;
174pub mod window_sort;
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use arrow_array::{Int64Array, RecordBatch};
180    use std::sync::Arc;
181
182    #[test]
183    fn test_event_creation() {
184        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
185        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
186
187        let event = Event::new(12345, batch);
188
189        assert_eq!(event.timestamp, 12345);
190        assert_eq!(event.data.num_rows(), 3);
191    }
192}