Skip to main content

laminar_core/reactor/
mod.rs

1//! # Reactor Module
2//!
3//! The core event loop for `LaminarDB`, implementing a single-threaded reactor pattern
4//! optimized for streaming workloads.
5//!
6//! ## Design Goals
7//!
8//! - **Zero allocations** during event processing
9//! - **CPU-pinned** execution for cache locality
10//! - **Lock-free** communication with other threads
11//! - **500K+ events/sec** per core throughput
12//!
13//! ## Architecture
14//!
15//! The reactor runs a tight event loop that:
16//! 1. Polls input sources for events
17//! 2. Routes events to operators
18//! 3. Manages operator state
19//! 4. Emits results to sinks
20//!
21//! Communication with Ring 1 (background tasks) happens via SPSC queues.
22
23use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{
31    CheckpointCompleteData, Event, Operator, OperatorContext, OperatorState, Output, SideOutputData,
32};
33use crate::state::{AHashMapStore, StateStore};
34use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
35
36/// Trait for output sinks that consume reactor outputs.
37pub trait Sink: Send {
38    /// Write outputs to the sink.
39    ///
40    /// The sink should drain items from the provided vector, leaving it empty
41    /// but preserving capacity so the caller can reuse the allocation.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if the sink cannot accept the outputs.
46    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError>;
47
48    /// Flush any buffered data.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if the flush operation fails.
53    fn flush(&mut self) -> Result<(), SinkError>;
54}
55
56/// Errors that can occur in sinks.
57#[derive(Debug, thiserror::Error)]
58pub enum SinkError {
59    /// Failed to write to sink
60    #[error("Write failed: {0}")]
61    WriteFailed(String),
62
63    /// Failed to flush sink
64    #[error("Flush failed: {0}")]
65    FlushFailed(String),
66
67    /// Sink is closed
68    #[error("Sink is closed")]
69    Closed,
70}
71
72/// Configuration for the reactor
73#[derive(Debug, Clone)]
74pub struct ReactorConfig {
75    /// Maximum events to process per poll
76    pub batch_size: usize,
77    /// CPU core to pin the reactor thread to (None = no pinning)
78    pub cpu_affinity: Option<usize>,
79    /// Maximum time to spend in one iteration
80    pub max_iteration_time: Duration,
81    /// Size of the event buffer
82    pub event_buffer_size: usize,
83    /// Maximum out-of-orderness for watermark generation (milliseconds)
84    pub max_out_of_orderness: i64,
85}
86
87impl Default for ReactorConfig {
88    fn default() -> Self {
89        Self {
90            batch_size: 1024,
91            cpu_affinity: None,
92            max_iteration_time: Duration::from_millis(10),
93            event_buffer_size: 65536,
94            max_out_of_orderness: 1000, // 1 second
95        }
96    }
97}
98
99/// The main reactor for event processing
100pub struct Reactor {
101    config: ReactorConfig,
102    operators: Vec<Box<dyn Operator>>,
103    timer_service: TimerService,
104    event_queue: VecDeque<Event>,
105    output_buffer: Vec<Output>,
106    state_store: Box<dyn StateStore>,
107    watermark_generator: Box<dyn WatermarkGenerator>,
108    current_event_time: i64,
109    start_time: Instant,
110    events_processed: u64,
111    /// Pre-allocated buffers for operator chain processing
112    /// We use two buffers and swap between them to avoid allocations
113    operator_buffer_1: Vec<Output>,
114    operator_buffer_2: Vec<Output>,
115    /// Optional sink for outputs
116    sink: Option<Box<dyn Sink>>,
117    /// Shutdown flag for graceful termination
118    shutdown: Arc<AtomicBool>,
119}
120
121impl Reactor {
122    /// Creates a new reactor with the given configuration
123    ///
124    /// # Errors
125    ///
126    /// Currently does not return any errors, but may in the future if initialization fails
127    pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
128        let event_queue = VecDeque::with_capacity(config.event_buffer_size);
129        let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
130            config.max_out_of_orderness,
131        ));
132
133        Ok(Self {
134            config,
135            operators: Vec::new(),
136            timer_service: TimerService::new(),
137            event_queue,
138            output_buffer: Vec::with_capacity(1024),
139            state_store: Box::new(AHashMapStore::new()),
140            watermark_generator,
141            current_event_time: 0,
142            start_time: Instant::now(),
143            events_processed: 0,
144            operator_buffer_1: Vec::with_capacity(256),
145            operator_buffer_2: Vec::with_capacity(256),
146            sink: None,
147            shutdown: Arc::new(AtomicBool::new(false)),
148        })
149    }
150
151    /// Register an operator in the processing chain
152    pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
153        self.operators.push(operator);
154    }
155
156    /// Set the output sink for the reactor
157    pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
158        self.sink = Some(sink);
159    }
160
161    /// Get a handle to the shutdown flag
162    #[must_use]
163    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
164        Arc::clone(&self.shutdown)
165    }
166
167    /// Submit an event for processing
168    ///
169    /// # Errors
170    ///
171    /// Returns `ReactorError::QueueFull` if the event queue is at capacity
172    pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
173        if self.event_queue.len() >= self.config.event_buffer_size {
174            return Err(ReactorError::QueueFull {
175                capacity: self.config.event_buffer_size,
176            });
177        }
178
179        self.event_queue.push_back(event);
180        Ok(())
181    }
182
183    /// Submit multiple events for processing
184    ///
185    /// # Errors
186    ///
187    /// Returns `ReactorError::QueueFull` if there's insufficient capacity for all events
188    pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
189        let available = self.config.event_buffer_size - self.event_queue.len();
190        if events.len() > available {
191            return Err(ReactorError::QueueFull {
192                capacity: self.config.event_buffer_size,
193            });
194        }
195
196        self.event_queue.extend(events);
197        Ok(())
198    }
199
200    /// Processes one iteration of the event loop, filling `self.output_buffer`.
201    ///
202    /// Separated from [`poll`] so that [`run`] and [`shutdown`] can access
203    /// `self.output_buffer` directly without allocating.
204    fn process_events(&mut self) {
205        // Hot path guard - will panic on allocation when allocation-tracking is enabled
206        let _guard = HotPathGuard::enter("Reactor::poll");
207
208        // Task budget tracking - records metrics on drop
209        let _iteration_budget = TaskBudget::ring0_iteration();
210
211        let poll_start = Instant::now();
212        let processing_time = self.get_processing_time();
213
214        // 1. Fire expired timers
215        let fired_timers = self.timer_service.poll_timers(self.current_event_time);
216        for mut timer in fired_timers {
217            if let Some(idx) = timer.operator_index {
218                // Route to specific operator
219                if let Some(operator) = self.operators.get_mut(idx) {
220                    let timer_key = timer.key.take().unwrap_or_default();
221                    let timer_for_operator = crate::operator::Timer {
222                        key: timer_key,
223                        timestamp: timer.timestamp,
224                    };
225
226                    let mut ctx = OperatorContext {
227                        event_time: self.current_event_time,
228                        processing_time,
229                        timers: &mut self.timer_service,
230                        state: self.state_store.as_mut(),
231                        watermark_generator: self.watermark_generator.as_mut(),
232                        operator_index: idx,
233                    };
234
235                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
236                    self.output_buffer.extend(outputs);
237                }
238            } else {
239                // Legacy: Broadcast to all operators (warning: creates key contention)
240                for (idx, operator) in self.operators.iter_mut().enumerate() {
241                    // Move key out of timer (only first operator gets it!)
242                    let timer_key = timer.key.take().unwrap_or_default();
243                    let timer_for_operator = crate::operator::Timer {
244                        key: timer_key,
245                        timestamp: timer.timestamp,
246                    };
247
248                    let mut ctx = OperatorContext {
249                        event_time: self.current_event_time,
250                        processing_time,
251                        timers: &mut self.timer_service,
252                        state: self.state_store.as_mut(),
253                        watermark_generator: self.watermark_generator.as_mut(),
254                        operator_index: idx,
255                    };
256
257                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
258                    self.output_buffer.extend(outputs);
259                }
260            }
261        }
262
263        // 2. Process events
264        let mut events_in_batch = 0;
265        while let Some(event) = self.event_queue.pop_front() {
266            // Update current event time
267            if event.timestamp > self.current_event_time {
268                self.current_event_time = event.timestamp;
269            }
270
271            // Generate watermark if needed
272            if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
273                self.output_buffer
274                    .push(Output::Watermark(watermark.timestamp()));
275            }
276
277            // Process through operator chain using pre-allocated buffers
278            // Start with the event in buffer 1
279            self.operator_buffer_1.clear();
280            self.operator_buffer_1.push(Output::Event(event));
281
282            let mut current_buffer_is_1 = true;
283
284            for (idx, operator) in self.operators.iter_mut().enumerate() {
285                // Determine which buffer to read from and which to write to
286                let (current_buffer, next_buffer) = if current_buffer_is_1 {
287                    (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
288                } else {
289                    (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
290                };
291
292                next_buffer.clear();
293
294                for output in current_buffer.drain(..) {
295                    if let Output::Event(event) = output {
296                        let mut ctx = OperatorContext {
297                            event_time: self.current_event_time,
298                            processing_time,
299                            timers: &mut self.timer_service,
300                            state: self.state_store.as_mut(),
301                            watermark_generator: self.watermark_generator.as_mut(),
302                            operator_index: idx,
303                        };
304
305                        let operator_outputs = operator.process(&event, &mut ctx);
306                        next_buffer.extend(operator_outputs);
307                    } else {
308                        // Pass through watermarks and late events
309                        next_buffer.push(output);
310                    }
311                }
312
313                // Swap buffers for next iteration
314                current_buffer_is_1 = !current_buffer_is_1;
315            }
316
317            // Extend output buffer with final results
318            let final_buffer = if current_buffer_is_1 {
319                &mut self.operator_buffer_1
320            } else {
321                &mut self.operator_buffer_2
322            };
323            self.output_buffer.append(final_buffer);
324            self.events_processed += 1;
325            events_in_batch += 1;
326
327            // Check batch size limit
328            if events_in_batch >= self.config.batch_size {
329                break;
330            }
331
332            // Check time limit
333            if poll_start.elapsed() >= self.config.max_iteration_time {
334                break;
335            }
336        }
337    }
338
339    /// Run one iteration of the event loop.
340    ///
341    /// Returns outputs ready for downstream. Allocates a new `Vec` per call.
342    /// Prefer [`poll_into`](Self::poll_into) or the internal `run` loop for zero-allocation
343    /// processing.
344    pub fn poll(&mut self) -> Vec<Output> {
345        self.process_events();
346        self.output_buffer.drain(..).collect()
347    }
348
349    /// Run one iteration, appending outputs to a caller-provided buffer.
350    ///
351    /// This avoids the per-poll allocation of [`poll`](Self::poll) — the caller can
352    /// reuse the same `Vec` across iterations, retaining its capacity.
353    pub fn poll_into(&mut self, output: &mut Vec<Output>) {
354        self.process_events();
355        output.append(&mut self.output_buffer);
356    }
357
358    /// Advances the watermark to the given timestamp.
359    ///
360    /// Called when an external watermark message arrives (e.g., from TPC coordination).
361    /// Updates the reactor's event time tracking and watermark generator state.
362    /// Any resulting watermark output will be included in the next `poll()` result.
363    pub fn advance_watermark(&mut self, timestamp: i64) {
364        // Update current event time if this watermark is newer
365        if timestamp > self.current_event_time {
366            self.current_event_time = timestamp;
367        }
368
369        // Feed the timestamp to the watermark generator so it can advance
370        if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
371            self.output_buffer
372                .push(Output::Watermark(watermark.timestamp()));
373        }
374    }
375
376    /// Triggers a checkpoint by snapshotting all operator states.
377    ///
378    /// Called when a `CheckpointRequest` arrives from the control plane.
379    /// Collects the serialized state from each operator and returns it
380    /// for persistence by Ring 1.
381    pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
382        self.operators.iter().map(|op| op.checkpoint()).collect()
383    }
384
385    /// Get current processing time in microseconds since reactor start
386    #[allow(clippy::cast_possible_truncation)] // Saturating conversion handles overflow on next line
387    fn get_processing_time(&self) -> i64 {
388        // Saturating conversion - after ~292 years this will saturate at i64::MAX
389        let micros = self.start_time.elapsed().as_micros();
390        if micros > i64::MAX as u128 {
391            i64::MAX
392        } else {
393            micros as i64
394        }
395    }
396
397    /// Get the number of events processed
398    #[must_use]
399    pub fn events_processed(&self) -> u64 {
400        self.events_processed
401    }
402
403    /// Get the number of events in the queue
404    #[must_use]
405    pub fn queue_size(&self) -> usize {
406        self.event_queue.len()
407    }
408
409    /// Set CPU affinity if configured
410    ///
411    /// # Errors
412    ///
413    /// Returns `ReactorError` if CPU affinity cannot be set (platform-specific)
414    #[allow(unused_variables)]
415    pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
416        if let Some(cpu_id) = self.config.cpu_affinity {
417            #[cfg(target_os = "linux")]
418            {
419                use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
420                use std::mem;
421
422                // SAFETY: We're calling libc functions with valid parameters.
423                // The cpu_set_t is properly initialized with CPU_ZERO.
424                // The process ID 0 refers to the current thread.
425                #[allow(unsafe_code)]
426                unsafe {
427                    let mut set: cpu_set_t = mem::zeroed();
428                    CPU_ZERO(&mut set);
429                    CPU_SET(cpu_id, &mut set);
430
431                    let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
432                    if result != 0 {
433                        return Err(ReactorError::InitializationFailed(format!(
434                            "Failed to set CPU affinity to core {}: {}",
435                            cpu_id,
436                            std::io::Error::last_os_error()
437                        )));
438                    }
439                }
440            }
441
442            #[cfg(target_os = "windows")]
443            {
444                use windows_sys::Win32::System::Threading::{
445                    GetCurrentThread, SetThreadAffinityMask,
446                };
447
448                // SAFETY: We're calling Windows API functions with valid parameters.
449                // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
450                // The mask is a valid CPU mask for the specified core.
451                #[allow(unsafe_code)]
452                unsafe {
453                    let mask: usize = 1 << cpu_id;
454                    let result = SetThreadAffinityMask(GetCurrentThread(), mask);
455                    if result == 0 {
456                        return Err(ReactorError::InitializationFailed(format!(
457                            "Failed to set CPU affinity to core {}: {}",
458                            cpu_id,
459                            std::io::Error::last_os_error()
460                        )));
461                    }
462                }
463            }
464
465            #[cfg(not(any(target_os = "linux", target_os = "windows")))]
466            {
467                tracing::warn!("CPU affinity is not implemented for this platform");
468            }
469        }
470        Ok(())
471    }
472
473    /// Runs the event loop continuously until shutdown
474    ///
475    /// # Errors
476    ///
477    /// Returns `ReactorError` if CPU affinity cannot be set or if shutdown fails
478    pub fn run(&mut self) -> Result<(), ReactorError> {
479        self.set_cpu_affinity()?;
480
481        while !self.shutdown.load(Ordering::Relaxed) {
482            // Process events into self.output_buffer (zero-alloc)
483            self.process_events();
484
485            // Send outputs to sink if configured.
486            // Sink drains output_buffer, preserving capacity for reuse.
487            if !self.output_buffer.is_empty() {
488                if let Some(sink) = &mut self.sink {
489                    if let Err(e) = sink.write(&mut self.output_buffer) {
490                        tracing::error!("Failed to write to sink: {e}");
491                        // Continue processing even if sink fails
492                    }
493                }
494                self.output_buffer.clear();
495            }
496
497            // If no events to process, emit a CPU spin hint (PAUSE on x86,
498            // YIELD on ARM) instead of the heavier yield_now() syscall.
499            // In a thread-per-core design the reactor owns its CPU, so a
500            // lightweight spin hint is preferred over a kernel-mediated yield.
501            if self.event_queue.is_empty() {
502                std::hint::spin_loop();
503            }
504
505            // Periodically check for shutdown signal
506            if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
507                break;
508            }
509        }
510
511        // Flush sink before shutdown
512        if let Some(sink) = &mut self.sink {
513            if let Err(e) = sink.flush() {
514                tracing::error!("Failed to flush sink during shutdown: {e}");
515            }
516        }
517
518        Ok(())
519    }
520
521    /// Stops the reactor gracefully
522    ///
523    /// # Errors
524    ///
525    /// Currently does not return any errors, but may in the future if shutdown fails
526    pub fn shutdown(&mut self) -> Result<(), ReactorError> {
527        // Signal shutdown
528        self.shutdown.store(true, Ordering::Relaxed);
529
530        // Process remaining events (zero-alloc drain)
531        while !self.event_queue.is_empty() {
532            self.process_events();
533
534            if !self.output_buffer.is_empty() {
535                if let Some(sink) = &mut self.sink {
536                    if let Err(e) = sink.write(&mut self.output_buffer) {
537                        tracing::error!("Failed to write final outputs during shutdown: {e}");
538                    }
539                }
540                self.output_buffer.clear();
541            }
542        }
543
544        // Final flush
545        if let Some(sink) = &mut self.sink {
546            if let Err(e) = sink.flush() {
547                tracing::error!("Failed to flush sink during shutdown: {e}");
548            }
549        }
550
551        Ok(())
552    }
553}
554
555/// Errors that can occur in the reactor
556#[derive(Debug, thiserror::Error)]
557pub enum ReactorError {
558    /// Failed to initialize the reactor
559    #[error("Initialization failed: {0}")]
560    InitializationFailed(String),
561
562    /// Event processing error
563    #[error("Event processing failed: {0}")]
564    EventProcessingFailed(String),
565
566    /// Shutdown error
567    #[error("Shutdown failed: {0}")]
568    ShutdownFailed(String),
569
570    /// Event queue is full
571    #[error("Event queue full (capacity: {capacity})")]
572    QueueFull {
573        /// The configured capacity of the event queue
574        capacity: usize,
575    },
576}
577
578/// A simple sink that writes outputs to stdout (for testing).
579pub struct StdoutSink;
580
581impl Sink for StdoutSink {
582    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
583        for output in outputs.drain(..) {
584            match output {
585                Output::Event(event) => {
586                    println!(
587                        "Event: timestamp={}, data={:?}",
588                        event.timestamp, event.data
589                    );
590                }
591                Output::Watermark(timestamp) => {
592                    println!("Watermark: {timestamp}");
593                }
594                Output::LateEvent(event) => {
595                    println!(
596                        "Late Event (dropped): timestamp={}, data={:?}",
597                        event.timestamp, event.data
598                    );
599                }
600                Output::SideOutput(side_output) => {
601                    let SideOutputData { name, event } = *side_output;
602                    println!(
603                        "Side Output [{}]: timestamp={}, data={:?}",
604                        name, event.timestamp, event.data
605                    );
606                }
607                Output::Changelog(record) => {
608                    println!(
609                        "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
610                        record.operation,
611                        record.weight,
612                        record.emit_timestamp,
613                        record.event.timestamp,
614                        record.event.data
615                    );
616                }
617                Output::CheckpointComplete(data) => {
618                    let CheckpointCompleteData {
619                        checkpoint_id,
620                        operator_states,
621                    } = *data;
622                    println!(
623                        "Checkpoint: id={checkpoint_id}, operators={}",
624                        operator_states.len()
625                    );
626                }
627            }
628        }
629        Ok(())
630    }
631
632    fn flush(&mut self) -> Result<(), SinkError> {
633        Ok(())
634    }
635}
636
637/// A buffering sink that collects outputs (for testing).
638#[derive(Default)]
639pub struct BufferingSink {
640    buffer: Vec<Output>,
641}
642
643impl BufferingSink {
644    /// Create a new buffering sink.
645    #[must_use]
646    pub fn new() -> Self {
647        Self::default()
648    }
649
650    /// Get the buffered outputs.
651    #[must_use]
652    pub fn take_buffer(&mut self) -> Vec<Output> {
653        std::mem::take(&mut self.buffer)
654    }
655}
656
657impl Sink for BufferingSink {
658    fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
659        self.buffer.append(outputs);
660        Ok(())
661    }
662
663    fn flush(&mut self) -> Result<(), SinkError> {
664        Ok(())
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use crate::operator::OutputVec;
672    use arrow_array::{Int64Array, RecordBatch};
673    use std::sync::Arc;
674
675    // Mock operator for testing
676    struct PassthroughOperator;
677
678    impl Operator for PassthroughOperator {
679        fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
680            let mut output = OutputVec::new();
681            output.push(Output::Event(event.clone()));
682            output
683        }
684
685        fn on_timer(
686            &mut self,
687            _timer: crate::operator::Timer,
688            _ctx: &mut OperatorContext,
689        ) -> OutputVec {
690            OutputVec::new()
691        }
692
693        fn checkpoint(&self) -> crate::operator::OperatorState {
694            crate::operator::OperatorState {
695                operator_id: "passthrough".to_string(),
696                data: vec![],
697            }
698        }
699
700        fn restore(
701            &mut self,
702            _state: crate::operator::OperatorState,
703        ) -> Result<(), crate::operator::OperatorError> {
704            Ok(())
705        }
706    }
707
708    #[test]
709    fn test_default_config() {
710        let config = ReactorConfig::default();
711        assert_eq!(config.batch_size, 1024);
712        assert_eq!(config.event_buffer_size, 65536);
713    }
714
715    #[test]
716    fn test_reactor_creation() {
717        let config = ReactorConfig::default();
718        let reactor = Reactor::new(config);
719        assert!(reactor.is_ok());
720    }
721
722    #[test]
723    fn test_reactor_add_operator() {
724        let config = ReactorConfig::default();
725        let mut reactor = Reactor::new(config).unwrap();
726
727        let operator = Box::new(PassthroughOperator);
728        reactor.add_operator(operator);
729
730        assert_eq!(reactor.operators.len(), 1);
731    }
732
733    #[test]
734    fn test_reactor_submit_event() {
735        let config = ReactorConfig::default();
736        let mut reactor = Reactor::new(config).unwrap();
737
738        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
739        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
740        let event = Event::new(12345, batch);
741
742        assert!(reactor.submit(event).is_ok());
743        assert_eq!(reactor.queue_size(), 1);
744    }
745
746    #[test]
747    fn test_reactor_poll_processes_events() {
748        let config = ReactorConfig::default();
749        let mut reactor = Reactor::new(config).unwrap();
750
751        // Add a passthrough operator
752        reactor.add_operator(Box::new(PassthroughOperator));
753
754        // Submit an event
755        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
756        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
757        let event = Event::new(12345, batch);
758
759        reactor.submit(event.clone()).unwrap();
760
761        // Poll should process the event
762        let outputs = reactor.poll();
763        assert!(!outputs.is_empty());
764        assert_eq!(reactor.events_processed(), 1);
765        assert_eq!(reactor.queue_size(), 0);
766    }
767
768    #[test]
769    fn test_reactor_queue_full() {
770        let config = ReactorConfig {
771            event_buffer_size: 2, // Very small buffer
772            ..ReactorConfig::default()
773        };
774        let mut reactor = Reactor::new(config).unwrap();
775
776        let array = Arc::new(Int64Array::from(vec![1]));
777        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
778
779        // Fill the queue
780        for i in 0..2 {
781            let event = Event::new(i64::from(i), batch.clone());
782            assert!(reactor.submit(event).is_ok());
783        }
784
785        // Next submit should fail
786        let event = Event::new(100, batch);
787        assert!(matches!(
788            reactor.submit(event),
789            Err(ReactorError::QueueFull { .. })
790        ));
791    }
792
793    #[test]
794    fn test_reactor_batch_processing() {
795        let config = ReactorConfig {
796            batch_size: 2, // Small batch size
797            ..ReactorConfig::default()
798        };
799        let mut reactor = Reactor::new(config).unwrap();
800
801        reactor.add_operator(Box::new(PassthroughOperator));
802
803        let array = Arc::new(Int64Array::from(vec![1]));
804        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
805
806        // Submit 5 events
807        for i in 0..5 {
808            let event = Event::new(i64::from(i), batch.clone());
809            reactor.submit(event).unwrap();
810        }
811
812        // First poll should process only batch_size events
813        reactor.poll();
814        assert_eq!(reactor.events_processed(), 2);
815        assert_eq!(reactor.queue_size(), 3);
816
817        // Second poll should process 2 more
818        reactor.poll();
819        assert_eq!(reactor.events_processed(), 4);
820        assert_eq!(reactor.queue_size(), 1);
821
822        // Third poll should process the last one
823        reactor.poll();
824        assert_eq!(reactor.events_processed(), 5);
825        assert_eq!(reactor.queue_size(), 0);
826    }
827
828    #[test]
829    fn test_reactor_with_sink() {
830        let config = ReactorConfig::default();
831        let mut reactor = Reactor::new(config).unwrap();
832
833        // Add a buffering sink
834        let sink = Box::new(BufferingSink::new());
835        reactor.set_sink(sink);
836
837        // Add passthrough operator
838        reactor.add_operator(Box::new(PassthroughOperator));
839
840        let array = Arc::new(Int64Array::from(vec![42]));
841        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
842        let event = Event::new(1000, batch);
843
844        // Submit an event
845        reactor.submit(event).unwrap();
846
847        // Process
848        let outputs = reactor.poll();
849        // Should get the event output (and possibly a watermark)
850        assert!(!outputs.is_empty());
851
852        // Verify we got the event
853        assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
854    }
855
856    #[test]
857    fn test_reactor_shutdown() {
858        let config = ReactorConfig::default();
859        let mut reactor = Reactor::new(config).unwrap();
860
861        // Get shutdown handle
862        let shutdown_handle = reactor.shutdown_handle();
863        assert!(!shutdown_handle.load(Ordering::Relaxed));
864
865        let array = Arc::new(Int64Array::from(vec![1]));
866        let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
867
868        // Submit some events
869        for i in 0..5 {
870            reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
871        }
872
873        // Shutdown should process remaining events
874        reactor.shutdown().unwrap();
875        assert!(shutdown_handle.load(Ordering::Relaxed));
876        assert_eq!(reactor.queue_size(), 0);
877    }
878}