Skip to main content

laminar_core/reactor/
mod.rs

1//! # Reactor Module
2//!
3//! The core event loop for `LaminarDB`, implementing a single-threaded reactor pattern
4//! optimized for streaming workloads.
5//!
6//! ## Design Goals
7//!
8//! - **Zero allocations** during event processing
9//! - **CPU-pinned** execution for cache locality
10//! - **Lock-free** communication with other threads
11//! - **500K+ events/sec** per core throughput
12//!
13//! ## Architecture
14//!
15//! The reactor runs a tight event loop that:
16//! 1. Polls input sources for events
17//! 2. Routes events to operators
18//! 3. Manages operator state
19//! 4. Emits results to sinks
20//!
21//! Communication with Ring 1 (background tasks) happens via SPSC queues.
22
23use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{InMemoryStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34/// Trait for output sinks that consume reactor outputs.
35pub trait Sink: Send {
36    /// Write outputs to the sink.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the sink cannot accept the outputs.
41    fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError>;
42
43    /// Flush any buffered data.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the flush operation fails.
48    fn flush(&mut self) -> Result<(), SinkError>;
49}
50
51/// Errors that can occur in sinks.
52#[derive(Debug, thiserror::Error)]
53pub enum SinkError {
54    /// Failed to write to sink
55    #[error("Write failed: {0}")]
56    WriteFailed(String),
57
58    /// Failed to flush sink
59    #[error("Flush failed: {0}")]
60    FlushFailed(String),
61
62    /// Sink is closed
63    #[error("Sink is closed")]
64    Closed,
65}
66
67/// Configuration for the reactor
68#[derive(Debug, Clone)]
69pub struct ReactorConfig {
70    /// Maximum events to process per poll
71    pub batch_size: usize,
72    /// CPU core to pin the reactor thread to (None = no pinning)
73    pub cpu_affinity: Option<usize>,
74    /// Maximum time to spend in one iteration
75    pub max_iteration_time: Duration,
76    /// Size of the event buffer
77    pub event_buffer_size: usize,
78    /// Maximum out-of-orderness for watermark generation (milliseconds)
79    pub max_out_of_orderness: i64,
80}
81
82impl Default for ReactorConfig {
83    fn default() -> Self {
84        Self {
85            batch_size: 1024,
86            cpu_affinity: None,
87            max_iteration_time: Duration::from_millis(10),
88            event_buffer_size: 65536,
89            max_out_of_orderness: 1000, // 1 second
90        }
91    }
92}
93
94/// The main reactor for event processing
95pub struct Reactor {
96    config: ReactorConfig,
97    operators: Vec<Box<dyn Operator>>,
98    timer_service: TimerService,
99    event_queue: VecDeque<Event>,
100    output_buffer: Vec<Output>,
101    state_store: Box<dyn StateStore>,
102    watermark_generator: Box<dyn WatermarkGenerator>,
103    current_event_time: i64,
104    start_time: Instant,
105    events_processed: u64,
106    /// Pre-allocated buffers for operator chain processing
107    /// We use two buffers and swap between them to avoid allocations
108    operator_buffer_1: Vec<Output>,
109    operator_buffer_2: Vec<Output>,
110    /// Optional sink for outputs
111    sink: Option<Box<dyn Sink>>,
112    /// Shutdown flag for graceful termination
113    shutdown: Arc<AtomicBool>,
114}
115
116impl Reactor {
117    /// Creates a new reactor with the given configuration
118    ///
119    /// # Errors
120    ///
121    /// Currently does not return any errors, but may in the future if initialization fails
122    pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
123        let event_queue = VecDeque::with_capacity(config.event_buffer_size);
124        let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
125            config.max_out_of_orderness,
126        ));
127
128        Ok(Self {
129            config,
130            operators: Vec::new(),
131            timer_service: TimerService::new(),
132            event_queue,
133            output_buffer: Vec::with_capacity(1024),
134            state_store: Box::new(InMemoryStore::new()),
135            watermark_generator,
136            current_event_time: 0,
137            start_time: Instant::now(),
138            events_processed: 0,
139            operator_buffer_1: Vec::with_capacity(256),
140            operator_buffer_2: Vec::with_capacity(256),
141            sink: None,
142            shutdown: Arc::new(AtomicBool::new(false)),
143        })
144    }
145
146    /// Register an operator in the processing chain
147    pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
148        self.operators.push(operator);
149    }
150
151    /// Set the output sink for the reactor
152    pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
153        self.sink = Some(sink);
154    }
155
156    /// Get a handle to the shutdown flag
157    #[must_use]
158    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
159        Arc::clone(&self.shutdown)
160    }
161
162    /// Submit an event for processing
163    ///
164    /// # Errors
165    ///
166    /// Returns `ReactorError::QueueFull` if the event queue is at capacity
167    pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
168        if self.event_queue.len() >= self.config.event_buffer_size {
169            return Err(ReactorError::QueueFull {
170                capacity: self.config.event_buffer_size,
171            });
172        }
173
174        self.event_queue.push_back(event);
175        Ok(())
176    }
177
178    /// Submit multiple events for processing
179    ///
180    /// # Errors
181    ///
182    /// Returns `ReactorError::QueueFull` if there's insufficient capacity for all events
183    pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
184        let available = self.config.event_buffer_size - self.event_queue.len();
185        if events.len() > available {
186            return Err(ReactorError::QueueFull {
187                capacity: self.config.event_buffer_size,
188            });
189        }
190
191        self.event_queue.extend(events);
192        Ok(())
193    }
194
195    /// Run one iteration of the event loop
196    /// Returns outputs ready for downstream
197    pub fn poll(&mut self) -> Vec<Output> {
198        // Hot path guard - will panic on allocation when allocation-tracking is enabled
199        let _guard = HotPathGuard::enter("Reactor::poll");
200
201        // Task budget tracking - records metrics on drop
202        let _iteration_budget = TaskBudget::ring0_iteration();
203
204        let poll_start = Instant::now();
205        let processing_time = self.get_processing_time();
206
207        // 1. Fire expired timers
208        let fired_timers = self.timer_service.poll_timers(self.current_event_time);
209        for mut timer in fired_timers {
210            if let Some(idx) = timer.operator_index {
211                // Route to specific operator
212                if let Some(operator) = self.operators.get_mut(idx) {
213                    let timer_key = timer.key.take().unwrap_or_default();
214                    let timer_for_operator = crate::operator::Timer {
215                        key: timer_key,
216                        timestamp: timer.timestamp,
217                    };
218
219                    let mut ctx = OperatorContext {
220                        event_time: self.current_event_time,
221                        processing_time,
222                        timers: &mut self.timer_service,
223                        state: self.state_store.as_mut(),
224                        watermark_generator: self.watermark_generator.as_mut(),
225                        operator_index: idx,
226                    };
227
228                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
229                    self.output_buffer.extend(outputs);
230                }
231            } else {
232                // Legacy: Broadcast to all operators (warning: creates key contention)
233                for (idx, operator) in self.operators.iter_mut().enumerate() {
234                    // Move key out of timer (only first operator gets it!)
235                    let timer_key = timer.key.take().unwrap_or_default();
236                    let timer_for_operator = crate::operator::Timer {
237                        key: timer_key,
238                        timestamp: timer.timestamp,
239                    };
240
241                    let mut ctx = OperatorContext {
242                        event_time: self.current_event_time,
243                        processing_time,
244                        timers: &mut self.timer_service,
245                        state: self.state_store.as_mut(),
246                        watermark_generator: self.watermark_generator.as_mut(),
247                        operator_index: idx,
248                    };
249
250                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
251                    self.output_buffer.extend(outputs);
252                }
253            }
254        }
255
256        // 2. Process events
257        let mut events_in_batch = 0;
258        while let Some(event) = self.event_queue.pop_front() {
259            // Update current event time
260            if event.timestamp > self.current_event_time {
261                self.current_event_time = event.timestamp;
262            }
263
264            // Generate watermark if needed
265            if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
266                self.output_buffer
267                    .push(Output::Watermark(watermark.timestamp()));
268            }
269
270            // Process through operator chain using pre-allocated buffers
271            // Start with the event in buffer 1
272            self.operator_buffer_1.clear();
273            self.operator_buffer_1.push(Output::Event(event));
274
275            let mut current_buffer_is_1 = true;
276
277            for (idx, operator) in self.operators.iter_mut().enumerate() {
278                // Determine which buffer to read from and which to write to
279                let (current_buffer, next_buffer) = if current_buffer_is_1 {
280                    (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
281                } else {
282                    (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
283                };
284
285                next_buffer.clear();
286
287                for output in current_buffer.drain(..) {
288                    if let Output::Event(event) = output {
289                        let mut ctx = OperatorContext {
290                            event_time: self.current_event_time,
291                            processing_time,
292                            timers: &mut self.timer_service,
293                            state: self.state_store.as_mut(),
294                            watermark_generator: self.watermark_generator.as_mut(),
295                            operator_index: idx,
296                        };
297
298                        let operator_outputs = operator.process(&event, &mut ctx);
299                        next_buffer.extend(operator_outputs);
300                    } else {
301                        // Pass through watermarks and late events
302                        next_buffer.push(output);
303                    }
304                }
305
306                // Swap buffers for next iteration
307                current_buffer_is_1 = !current_buffer_is_1;
308            }
309
310            // Extend output buffer with final results
311            let final_buffer = if current_buffer_is_1 {
312                &mut self.operator_buffer_1
313            } else {
314                &mut self.operator_buffer_2
315            };
316            self.output_buffer.append(final_buffer);
317            self.events_processed += 1;
318            events_in_batch += 1;
319
320            // Check batch size limit
321            if events_in_batch >= self.config.batch_size {
322                break;
323            }
324
325            // Check time limit
326            if poll_start.elapsed() >= self.config.max_iteration_time {
327                break;
328            }
329        }
330
331        // 3. Return outputs
332        std::mem::take(&mut self.output_buffer)
333    }
334
335    /// Advances the watermark to the given timestamp.
336    ///
337    /// Called when an external watermark message arrives (e.g., from TPC coordination).
338    /// Updates the reactor's event time tracking and watermark generator state.
339    /// Any resulting watermark output will be included in the next `poll()` result.
340    pub fn advance_watermark(&mut self, timestamp: i64) {
341        // Update current event time if this watermark is newer
342        if timestamp > self.current_event_time {
343            self.current_event_time = timestamp;
344        }
345
346        // Feed the timestamp to the watermark generator so it can advance
347        if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
348            self.output_buffer
349                .push(Output::Watermark(watermark.timestamp()));
350        }
351    }
352
353    /// Triggers a checkpoint by snapshotting all operator states.
354    ///
355    /// Called when a `CheckpointRequest` arrives from the control plane.
356    /// Collects the serialized state from each operator and returns it
357    /// for persistence by Ring 1.
358    pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
359        self.operators.iter().map(|op| op.checkpoint()).collect()
360    }
361
362    /// Get current processing time in microseconds since reactor start
363    #[allow(clippy::cast_possible_truncation)] // Saturating conversion handles overflow on next line
364    fn get_processing_time(&self) -> i64 {
365        // Saturating conversion - after ~292 years this will saturate at i64::MAX
366        let micros = self.start_time.elapsed().as_micros();
367        if micros > i64::MAX as u128 {
368            i64::MAX
369        } else {
370            micros as i64
371        }
372    }
373
374    /// Get the number of events processed
375    #[must_use]
376    pub fn events_processed(&self) -> u64 {
377        self.events_processed
378    }
379
380    /// Get the number of events in the queue
381    #[must_use]
382    pub fn queue_size(&self) -> usize {
383        self.event_queue.len()
384    }
385
386    /// Set CPU affinity if configured
387    ///
388    /// # Errors
389    ///
390    /// Returns `ReactorError` if CPU affinity cannot be set (platform-specific)
391    #[allow(unused_variables)]
392    pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
393        if let Some(cpu_id) = self.config.cpu_affinity {
394            #[cfg(target_os = "linux")]
395            {
396                use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
397                use std::mem;
398
399                // SAFETY: We're calling libc functions with valid parameters.
400                // The cpu_set_t is properly initialized with CPU_ZERO.
401                // The process ID 0 refers to the current thread.
402                #[allow(unsafe_code)]
403                unsafe {
404                    let mut set: cpu_set_t = mem::zeroed();
405                    CPU_ZERO(&mut set);
406                    CPU_SET(cpu_id, &mut set);
407
408                    let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
409                    if result != 0 {
410                        return Err(ReactorError::InitializationFailed(format!(
411                            "Failed to set CPU affinity to core {}: {}",
412                            cpu_id,
413                            std::io::Error::last_os_error()
414                        )));
415                    }
416                }
417            }
418
419            #[cfg(target_os = "windows")]
420            {
421                use winapi::shared::basetsd::DWORD_PTR;
422                use winapi::um::processthreadsapi::GetCurrentThread;
423                use winapi::um::winbase::SetThreadAffinityMask;
424
425                // SAFETY: We're calling Windows API functions with valid parameters.
426                // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
427                // The mask is a valid CPU mask for the specified core.
428                #[allow(unsafe_code)]
429                unsafe {
430                    let mask: DWORD_PTR = 1 << cpu_id;
431                    let result = SetThreadAffinityMask(GetCurrentThread(), mask);
432                    if result == 0 {
433                        return Err(ReactorError::InitializationFailed(format!(
434                            "Failed to set CPU affinity to core {}: {}",
435                            cpu_id,
436                            std::io::Error::last_os_error()
437                        )));
438                    }
439                }
440            }
441
442            #[cfg(not(any(target_os = "linux", target_os = "windows")))]
443            {
444                tracing::warn!("CPU affinity is not implemented for this platform");
445            }
446        }
447        Ok(())
448    }
449
450    /// Runs the event loop continuously until shutdown
451    ///
452    /// # Errors
453    ///
454    /// Returns `ReactorError` if CPU affinity cannot be set or if shutdown fails
455    pub fn run(&mut self) -> Result<(), ReactorError> {
456        self.set_cpu_affinity()?;
457
458        while !self.shutdown.load(Ordering::Relaxed) {
459            // Process events
460            let outputs = self.poll();
461
462            // Send outputs to sink if configured
463            if !outputs.is_empty() {
464                if let Some(sink) = &mut self.sink {
465                    if let Err(e) = sink.write(outputs) {
466                        tracing::error!("Failed to write to sink: {e}");
467                        // Continue processing even if sink fails
468                    }
469                }
470            }
471
472            // If no events to process, emit a CPU spin hint (PAUSE on x86,
473            // YIELD on ARM) instead of the heavier yield_now() syscall.
474            // In a thread-per-core design the reactor owns its CPU, so a
475            // lightweight spin hint is preferred over a kernel-mediated yield.
476            if self.event_queue.is_empty() {
477                std::hint::spin_loop();
478            }
479
480            // Periodically check for shutdown signal
481            if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
482                break;
483            }
484        }
485
486        // Flush sink before shutdown
487        if let Some(sink) = &mut self.sink {
488            if let Err(e) = sink.flush() {
489                tracing::error!("Failed to flush sink during shutdown: {e}");
490            }
491        }
492
493        Ok(())
494    }
495
496    /// Stops the reactor gracefully
497    ///
498    /// # Errors
499    ///
500    /// Currently does not return any errors, but may in the future if shutdown fails
501    pub fn shutdown(&mut self) -> Result<(), ReactorError> {
502        // Signal shutdown
503        self.shutdown.store(true, Ordering::Relaxed);
504
505        // Process remaining events
506        while !self.event_queue.is_empty() {
507            let outputs = self.poll();
508
509            // Send final outputs to sink
510            if !outputs.is_empty() {
511                if let Some(sink) = &mut self.sink {
512                    if let Err(e) = sink.write(outputs) {
513                        tracing::error!("Failed to write final outputs during shutdown: {e}");
514                    }
515                }
516            }
517        }
518
519        // Final flush
520        if let Some(sink) = &mut self.sink {
521            if let Err(e) = sink.flush() {
522                tracing::error!("Failed to flush sink during shutdown: {e}");
523            }
524        }
525
526        Ok(())
527    }
528}
529
530/// Errors that can occur in the reactor
531#[derive(Debug, thiserror::Error)]
532pub enum ReactorError {
533    /// Failed to initialize the reactor
534    #[error("Initialization failed: {0}")]
535    InitializationFailed(String),
536
537    /// Event processing error
538    #[error("Event processing failed: {0}")]
539    EventProcessingFailed(String),
540
541    /// Shutdown error
542    #[error("Shutdown failed: {0}")]
543    ShutdownFailed(String),
544
545    /// Event queue is full
546    #[error("Event queue full (capacity: {capacity})")]
547    QueueFull {
548        /// The configured capacity of the event queue
549        capacity: usize,
550    },
551}
552
553/// A simple sink that writes outputs to stdout (for testing).
554pub struct StdoutSink;
555
556impl Sink for StdoutSink {
557    fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError> {
558        for output in outputs {
559            match output {
560                Output::Event(event) => {
561                    println!(
562                        "Event: timestamp={}, data={:?}",
563                        event.timestamp, event.data
564                    );
565                }
566                Output::Watermark(timestamp) => {
567                    println!("Watermark: {timestamp}");
568                }
569                Output::LateEvent(event) => {
570                    println!(
571                        "Late Event (dropped): timestamp={}, data={:?}",
572                        event.timestamp, event.data
573                    );
574                }
575                Output::SideOutput { name, event } => {
576                    println!(
577                        "Side Output [{}]: timestamp={}, data={:?}",
578                        name, event.timestamp, event.data
579                    );
580                }
581                Output::Changelog(record) => {
582                    println!(
583                        "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
584                        record.operation,
585                        record.weight,
586                        record.emit_timestamp,
587                        record.event.timestamp,
588                        record.event.data
589                    );
590                }
591                Output::CheckpointComplete {
592                    checkpoint_id,
593                    operator_states,
594                } => {
595                    println!(
596                        "Checkpoint: id={checkpoint_id}, operators={}",
597                        operator_states.len()
598                    );
599                }
600            }
601        }
602        Ok(())
603    }
604
605    fn flush(&mut self) -> Result<(), SinkError> {
606        Ok(())
607    }
608}
609
610/// A buffering sink that collects outputs (for testing).
611#[derive(Default)]
612pub struct BufferingSink {
613    buffer: Vec<Output>,
614}
615
616impl BufferingSink {
617    /// Create a new buffering sink.
618    #[must_use]
619    pub fn new() -> Self {
620        Self::default()
621    }
622
623    /// Get the buffered outputs.
624    #[must_use]
625    pub fn take_buffer(&mut self) -> Vec<Output> {
626        std::mem::take(&mut self.buffer)
627    }
628}
629
630impl Sink for BufferingSink {
631    fn write(&mut self, mut outputs: Vec<Output>) -> Result<(), SinkError> {
632        self.buffer.append(&mut outputs);
633        Ok(())
634    }
635
636    fn flush(&mut self) -> Result<(), SinkError> {
637        Ok(())
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644    use crate::operator::OutputVec;
645    use arrow_array::{Int64Array, RecordBatch};
646    use std::sync::Arc;
647
648    // Mock operator for testing
649    struct PassthroughOperator;
650
651    impl Operator for PassthroughOperator {
652        fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
653            let mut output = OutputVec::new();
654            output.push(Output::Event(event.clone()));
655            output
656        }
657
658        fn on_timer(
659            &mut self,
660            _timer: crate::operator::Timer,
661            _ctx: &mut OperatorContext,
662        ) -> OutputVec {
663            OutputVec::new()
664        }
665
666        fn checkpoint(&self) -> crate::operator::OperatorState {
667            crate::operator::OperatorState {
668                operator_id: "passthrough".to_string(),
669                data: vec![],
670            }
671        }
672
673        fn restore(
674            &mut self,
675            _state: crate::operator::OperatorState,
676        ) -> Result<(), crate::operator::OperatorError> {
677            Ok(())
678        }
679    }
680
681    #[test]
682    fn test_default_config() {
683        let config = ReactorConfig::default();
684        assert_eq!(config.batch_size, 1024);
685        assert_eq!(config.event_buffer_size, 65536);
686    }
687
688    #[test]
689    fn test_reactor_creation() {
690        let config = ReactorConfig::default();
691        let reactor = Reactor::new(config);
692        assert!(reactor.is_ok());
693    }
694
695    #[test]
696    fn test_reactor_add_operator() {
697        let config = ReactorConfig::default();
698        let mut reactor = Reactor::new(config).unwrap();
699
700        let operator = Box::new(PassthroughOperator);
701        reactor.add_operator(operator);
702
703        assert_eq!(reactor.operators.len(), 1);
704    }
705
706    #[test]
707    fn test_reactor_submit_event() {
708        let config = ReactorConfig::default();
709        let mut reactor = Reactor::new(config).unwrap();
710
711        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
712        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
713        let event = Event::new(12345, batch);
714
715        assert!(reactor.submit(event).is_ok());
716        assert_eq!(reactor.queue_size(), 1);
717    }
718
719    #[test]
720    fn test_reactor_poll_processes_events() {
721        let config = ReactorConfig::default();
722        let mut reactor = Reactor::new(config).unwrap();
723
724        // Add a passthrough operator
725        reactor.add_operator(Box::new(PassthroughOperator));
726
727        // Submit an event
728        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
729        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
730        let event = Event::new(12345, batch);
731
732        reactor.submit(event.clone()).unwrap();
733
734        // Poll should process the event
735        let outputs = reactor.poll();
736        assert!(!outputs.is_empty());
737        assert_eq!(reactor.events_processed(), 1);
738        assert_eq!(reactor.queue_size(), 0);
739    }
740
741    #[test]
742    fn test_reactor_queue_full() {
743        let config = ReactorConfig {
744            event_buffer_size: 2, // Very small buffer
745            ..ReactorConfig::default()
746        };
747        let mut reactor = Reactor::new(config).unwrap();
748
749        let array = Arc::new(Int64Array::from(vec![1]));
750        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
751
752        // Fill the queue
753        for i in 0..2 {
754            let event = Event::new(i64::from(i), batch.clone());
755            assert!(reactor.submit(event).is_ok());
756        }
757
758        // Next submit should fail
759        let event = Event::new(100, batch);
760        assert!(matches!(
761            reactor.submit(event),
762            Err(ReactorError::QueueFull { .. })
763        ));
764    }
765
766    #[test]
767    fn test_reactor_batch_processing() {
768        let config = ReactorConfig {
769            batch_size: 2, // Small batch size
770            ..ReactorConfig::default()
771        };
772        let mut reactor = Reactor::new(config).unwrap();
773
774        reactor.add_operator(Box::new(PassthroughOperator));
775
776        let array = Arc::new(Int64Array::from(vec![1]));
777        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
778
779        // Submit 5 events
780        for i in 0..5 {
781            let event = Event::new(i64::from(i), batch.clone());
782            reactor.submit(event).unwrap();
783        }
784
785        // First poll should process only batch_size events
786        reactor.poll();
787        assert_eq!(reactor.events_processed(), 2);
788        assert_eq!(reactor.queue_size(), 3);
789
790        // Second poll should process 2 more
791        reactor.poll();
792        assert_eq!(reactor.events_processed(), 4);
793        assert_eq!(reactor.queue_size(), 1);
794
795        // Third poll should process the last one
796        reactor.poll();
797        assert_eq!(reactor.events_processed(), 5);
798        assert_eq!(reactor.queue_size(), 0);
799    }
800
801    #[test]
802    fn test_reactor_with_sink() {
803        let config = ReactorConfig::default();
804        let mut reactor = Reactor::new(config).unwrap();
805
806        // Add a buffering sink
807        let sink = Box::new(BufferingSink::new());
808        reactor.set_sink(sink);
809
810        // Add passthrough operator
811        reactor.add_operator(Box::new(PassthroughOperator));
812
813        let array = Arc::new(Int64Array::from(vec![42]));
814        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
815        let event = Event::new(1000, batch);
816
817        // Submit an event
818        reactor.submit(event).unwrap();
819
820        // Process
821        let outputs = reactor.poll();
822        // Should get the event output (and possibly a watermark)
823        assert!(!outputs.is_empty());
824
825        // Verify we got the event
826        assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
827    }
828
829    #[test]
830    fn test_reactor_shutdown() {
831        let config = ReactorConfig::default();
832        let mut reactor = Reactor::new(config).unwrap();
833
834        // Get shutdown handle
835        let shutdown_handle = reactor.shutdown_handle();
836        assert!(!shutdown_handle.load(Ordering::Relaxed));
837
838        let array = Arc::new(Int64Array::from(vec![1]));
839        let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
840
841        // Submit some events
842        for i in 0..5 {
843            reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
844        }
845
846        // Shutdown should process remaining events
847        reactor.shutdown().unwrap();
848        assert!(shutdown_handle.load(Ordering::Relaxed));
849        assert_eq!(reactor.queue_size(), 0);
850    }
851}