laminar_core/operator/mod.rs
1//! # Operator Module
2//!
3//! Streaming operators for transforming and processing events.
4//!
5//! ## Operator Types
6//!
7//! - **Stateless**: map, filter, flatmap
8//! - **Stateful**: window, aggregate, join
9//!
10//! All operators implement the `Operator` trait and can be composed into
11//! directed acyclic graphs (DAGs) for complex stream processing.
12
13use std::sync::Arc;
14
15use arrow_array::RecordBatch;
16use smallvec::SmallVec;
17
18/// Timer key type optimized for window IDs (16 bytes).
19/// Re-exported from time module for convenience.
20pub type TimerKey = SmallVec<[u8; 16]>;
21
22/// An event flowing through the system
23#[derive(Debug, Clone)]
24pub struct Event {
25 /// Timestamp of the event
26 pub timestamp: i64,
27 /// Event payload as Arrow `RecordBatch` wrapped in `Arc` for zero-copy multicast.
28 ///
29 /// Cloning an `Event` increments the `Arc` reference count (~2ns, O(1))
30 /// instead of copying all column `Arc` pointers (O(columns)).
31 pub data: Arc<RecordBatch>,
32}
33
34impl Event {
35 /// Create a new event, wrapping the batch in `Arc` for zero-copy sharing.
36 #[must_use]
37 pub fn new(timestamp: i64, data: RecordBatch) -> Self {
38 Self {
39 timestamp,
40 data: Arc::new(data),
41 }
42 }
43}
44
45/// Output from an operator
46///
47/// Infrequent variants (`SideOutput`, `CheckpointComplete`) are boxed to keep
48/// the enum size small for the common hot-path variants (`Event`, `Watermark`).
49#[derive(Debug)]
50pub enum Output {
51 /// Regular event output
52 Event(Event),
53 /// Watermark update
54 Watermark(i64),
55 /// Late event that arrived after watermark (no side output configured)
56 LateEvent(Event),
57 /// Late event routed to a named side output (boxed — infrequent path).
58 SideOutput(Box<SideOutputData>),
59 /// Changelog record with Z-set weight.
60 ///
61 /// Used by `EmitStrategy::Changelog` to emit structured change records
62 /// for CDC pipelines and cascading materialized views.
63 Changelog(window::ChangelogRecord),
64 /// Checkpoint completion with snapshotted operator states (boxed — infrequent path).
65 ///
66 /// Emitted when a `CheckpointRequest` is processed by a core thread.
67 /// Carries the checkpoint ID and all operator states for persistence by Ring 1.
68 CheckpointComplete(Box<CheckpointCompleteData>),
69}
70
71/// Data for a late event routed to a named side output.
72///
73/// Boxed inside [`Output::SideOutput`] to reduce enum size on the hot path.
74#[derive(Debug)]
75pub struct SideOutputData {
76 /// The name of the side output to route to.
77 ///
78 /// Uses `Arc<str>` to avoid per-event String allocation — the name is
79 /// typically shared across all late events for a given operator.
80 pub name: Arc<str>,
81 /// The late event
82 pub event: Event,
83}
84
85/// Data for a checkpoint completion.
86///
87/// Boxed inside [`Output::CheckpointComplete`] to reduce enum size on the hot path.
88#[derive(Debug)]
89pub struct CheckpointCompleteData {
90 /// The checkpoint ID from the request
91 pub checkpoint_id: u64,
92 /// Snapshotted states from all operators on this core
93 pub operator_states: Vec<OperatorState>,
94}
95
96/// Collection type for operator outputs.
97///
98/// Uses `SmallVec` to avoid heap allocation for common cases (0-3 outputs).
99/// The size 4 is chosen based on typical operator patterns:
100/// - 0 outputs: filter that drops events
101/// - 1 output: most common case (map, regular processing)
102/// - 2 outputs: event + watermark
103/// - 3+ outputs: flatmap or window emission
104pub type OutputVec = SmallVec<[Output; 4]>;
105
106/// Context provided to operators during processing
107pub struct OperatorContext<'a> {
108 /// Current event time
109 pub event_time: i64,
110 /// Current processing time (system time in microseconds)
111 pub processing_time: i64,
112 /// Timer registration
113 pub timers: &'a mut crate::time::TimerService,
114 /// State store access
115 pub state: &'a mut dyn crate::state::StateStore,
116 /// Watermark generator
117 pub watermark_generator: &'a mut dyn crate::time::WatermarkGenerator,
118 /// Operator index in the chain
119 pub operator_index: usize,
120}
121
122/// Trait implemented by all streaming operators
123pub trait Operator: Send {
124 /// Process an incoming event
125 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec;
126
127 /// Handle timer expiration
128 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec;
129
130 /// Checkpoint the operator's state
131 fn checkpoint(&self) -> OperatorState;
132
133 /// Restore from a checkpoint
134 ///
135 /// # Errors
136 ///
137 /// Returns `OperatorError::StateAccessFailed` if the state cannot be accessed
138 /// Returns `OperatorError::SerializationFailed` if the state cannot be deserialized
139 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>;
140}
141
142/// A timer registration
143#[derive(Debug, Clone)]
144pub struct Timer {
145 /// Timer key (uses `SmallVec` to avoid heap allocation for keys up to 16 bytes)
146 pub key: TimerKey,
147 /// Expiration timestamp
148 pub timestamp: i64,
149}
150
151/// Serialized operator state for checkpointing
152#[derive(Debug, Clone)]
153pub struct OperatorState {
154 /// Operator ID
155 pub operator_id: String,
156 /// Serialized state data
157 pub data: Vec<u8>,
158}
159
160/// Errors that can occur in operators
161#[derive(Debug, thiserror::Error)]
162pub enum OperatorError {
163 /// State access error
164 #[error("State access failed: {0}")]
165 StateAccessFailed(String),
166
167 /// Serialization error
168 #[error("Serialization failed: {0}")]
169 SerializationFailed(String),
170
171 /// Processing error
172 #[error("Processing failed: {0}")]
173 ProcessingFailed(String),
174
175 /// Configuration error (e.g., missing required builder field)
176 #[error("Configuration error: {0}")]
177 ConfigError(String),
178}
179
180pub mod asof_join;
181pub mod changelog;
182pub mod cumulate_window;
183pub mod lag_lead;
184pub mod lookup_join;
185pub mod partitioned_topk;
186pub mod session_window;
187pub mod sliding_window;
188pub mod stream_join;
189pub mod table_cache;
190pub mod temporal_join;
191pub mod topk;
192pub mod watermark_sort;
193pub mod window;
194pub mod window_sort;
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use arrow_array::{Int64Array, RecordBatch};
200 use std::sync::Arc;
201
202 #[test]
203 fn test_event_creation() {
204 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
205 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
206
207 let event = Event::new(12345, batch);
208
209 assert_eq!(event.timestamp, 12345);
210 assert_eq!(event.data.num_rows(), 3);
211 }
212}