Skip to main content

laminar_core/reactor/
mod.rs

1//! # Reactor Module
2//!
3//! The core event loop for `LaminarDB`, implementing a single-threaded reactor pattern
4//! optimized for streaming workloads.
5//!
6//! ## Design Goals
7//!
8//! - **Zero allocations** during event processing
9//! - **CPU-pinned** execution for cache locality
10//! - **Lock-free** communication with other threads
11//! - **500K+ events/sec** per core throughput
12//!
13//! ## Architecture
14//!
15//! The reactor runs a tight event loop that:
16//! 1. Polls input sources for events
17//! 2. Routes events to operators
18//! 3. Manages operator state
19//! 4. Emits results to sinks
20//!
21//! Communication with Ring 1 (background tasks) happens via SPSC queues.
22
23use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{InMemoryStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34/// Trait for output sinks that consume reactor outputs.
35pub trait Sink: Send {
36    /// Write outputs to the sink.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the sink cannot accept the outputs.
41    fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError>;
42
43    /// Flush any buffered data.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the flush operation fails.
48    fn flush(&mut self) -> Result<(), SinkError>;
49}
50
51/// Errors that can occur in sinks.
52#[derive(Debug, thiserror::Error)]
53pub enum SinkError {
54    /// Failed to write to sink
55    #[error("Write failed: {0}")]
56    WriteFailed(String),
57
58    /// Failed to flush sink
59    #[error("Flush failed: {0}")]
60    FlushFailed(String),
61
62    /// Sink is closed
63    #[error("Sink is closed")]
64    Closed,
65}
66
67/// Configuration for the reactor
68#[derive(Debug, Clone)]
69pub struct ReactorConfig {
70    /// Maximum events to process per poll
71    pub batch_size: usize,
72    /// CPU core to pin the reactor thread to (None = no pinning)
73    pub cpu_affinity: Option<usize>,
74    /// Maximum time to spend in one iteration
75    pub max_iteration_time: Duration,
76    /// Size of the event buffer
77    pub event_buffer_size: usize,
78    /// Maximum out-of-orderness for watermark generation (milliseconds)
79    pub max_out_of_orderness: i64,
80}
81
82impl Default for ReactorConfig {
83    fn default() -> Self {
84        Self {
85            batch_size: 1024,
86            cpu_affinity: None,
87            max_iteration_time: Duration::from_millis(10),
88            event_buffer_size: 65536,
89            max_out_of_orderness: 1000, // 1 second
90        }
91    }
92}
93
94/// The main reactor for event processing
95pub struct Reactor {
96    config: ReactorConfig,
97    operators: Vec<Box<dyn Operator>>,
98    timer_service: TimerService,
99    event_queue: VecDeque<Event>,
100    output_buffer: Vec<Output>,
101    state_store: Box<dyn StateStore>,
102    watermark_generator: Box<dyn WatermarkGenerator>,
103    current_event_time: i64,
104    start_time: Instant,
105    events_processed: u64,
106    /// Pre-allocated buffers for operator chain processing
107    /// We use two buffers and swap between them to avoid allocations
108    operator_buffer_1: Vec<Output>,
109    operator_buffer_2: Vec<Output>,
110    /// Optional sink for outputs
111    sink: Option<Box<dyn Sink>>,
112    /// Shutdown flag for graceful termination
113    shutdown: Arc<AtomicBool>,
114}
115
116impl Reactor {
117    /// Creates a new reactor with the given configuration
118    ///
119    /// # Errors
120    ///
121    /// Currently does not return any errors, but may in the future if initialization fails
122    pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
123        let event_queue = VecDeque::with_capacity(config.event_buffer_size);
124        let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
125            config.max_out_of_orderness,
126        ));
127
128        Ok(Self {
129            config,
130            operators: Vec::new(),
131            timer_service: TimerService::new(),
132            event_queue,
133            output_buffer: Vec::with_capacity(1024),
134            state_store: Box::new(InMemoryStore::new()),
135            watermark_generator,
136            current_event_time: 0,
137            start_time: Instant::now(),
138            events_processed: 0,
139            operator_buffer_1: Vec::with_capacity(256),
140            operator_buffer_2: Vec::with_capacity(256),
141            sink: None,
142            shutdown: Arc::new(AtomicBool::new(false)),
143        })
144    }
145
146    /// Register an operator in the processing chain
147    pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
148        self.operators.push(operator);
149    }
150
151    /// Set the output sink for the reactor
152    pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
153        self.sink = Some(sink);
154    }
155
156    /// Get a handle to the shutdown flag
157    #[must_use]
158    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
159        Arc::clone(&self.shutdown)
160    }
161
162    /// Submit an event for processing
163    ///
164    /// # Errors
165    ///
166    /// Returns `ReactorError::QueueFull` if the event queue is at capacity
167    pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
168        if self.event_queue.len() >= self.config.event_buffer_size {
169            return Err(ReactorError::QueueFull {
170                capacity: self.config.event_buffer_size,
171            });
172        }
173
174        self.event_queue.push_back(event);
175        Ok(())
176    }
177
178    /// Submit multiple events for processing
179    ///
180    /// # Errors
181    ///
182    /// Returns `ReactorError::QueueFull` if there's insufficient capacity for all events
183    pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
184        let available = self.config.event_buffer_size - self.event_queue.len();
185        if events.len() > available {
186            return Err(ReactorError::QueueFull {
187                capacity: self.config.event_buffer_size,
188            });
189        }
190
191        self.event_queue.extend(events);
192        Ok(())
193    }
194
195    /// Run one iteration of the event loop
196    /// Returns outputs ready for downstream
197    pub fn poll(&mut self) -> Vec<Output> {
198        // Hot path guard - will panic on allocation when allocation-tracking is enabled
199        let _guard = HotPathGuard::enter("Reactor::poll");
200
201        // Task budget tracking - records metrics on drop
202        let _iteration_budget = TaskBudget::ring0_iteration();
203
204        let poll_start = Instant::now();
205        let processing_time = self.get_processing_time();
206
207        // 1. Fire expired timers
208        let fired_timers = self.timer_service.poll_timers(self.current_event_time);
209        for mut timer in fired_timers {
210            if let Some(idx) = timer.operator_index {
211                // Route to specific operator
212                if let Some(operator) = self.operators.get_mut(idx) {
213                    let timer_key = timer.key.take().unwrap_or_default();
214                    let timer_for_operator = crate::operator::Timer {
215                        key: timer_key,
216                        timestamp: timer.timestamp,
217                    };
218
219                    let mut ctx = OperatorContext {
220                        event_time: self.current_event_time,
221                        processing_time,
222                        timers: &mut self.timer_service,
223                        state: self.state_store.as_mut(),
224                        watermark_generator: self.watermark_generator.as_mut(),
225                        operator_index: idx,
226                    };
227
228                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
229                    self.output_buffer.extend(outputs);
230                }
231            } else {
232                // Legacy: Broadcast to all operators (warning: creates key contention)
233                for (idx, operator) in self.operators.iter_mut().enumerate() {
234                    // Move key out of timer (only first operator gets it!)
235                    let timer_key = timer.key.take().unwrap_or_default();
236                    let timer_for_operator = crate::operator::Timer {
237                        key: timer_key,
238                        timestamp: timer.timestamp,
239                    };
240
241                    let mut ctx = OperatorContext {
242                        event_time: self.current_event_time,
243                        processing_time,
244                        timers: &mut self.timer_service,
245                        state: self.state_store.as_mut(),
246                        watermark_generator: self.watermark_generator.as_mut(),
247                        operator_index: idx,
248                    };
249
250                    let outputs = operator.on_timer(timer_for_operator, &mut ctx);
251                    self.output_buffer.extend(outputs);
252                }
253            }
254        }
255
256        // 2. Process events
257        let mut events_in_batch = 0;
258        while let Some(event) = self.event_queue.pop_front() {
259            // Update current event time
260            if event.timestamp > self.current_event_time {
261                self.current_event_time = event.timestamp;
262            }
263
264            // Generate watermark if needed
265            if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
266                self.output_buffer
267                    .push(Output::Watermark(watermark.timestamp()));
268            }
269
270            // Process through operator chain using pre-allocated buffers
271            // Start with the event in buffer 1
272            self.operator_buffer_1.clear();
273            self.operator_buffer_1.push(Output::Event(event));
274
275            let mut current_buffer_is_1 = true;
276
277            for (idx, operator) in self.operators.iter_mut().enumerate() {
278                // Determine which buffer to read from and which to write to
279                let (current_buffer, next_buffer) = if current_buffer_is_1 {
280                    (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
281                } else {
282                    (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
283                };
284
285                next_buffer.clear();
286
287                for output in current_buffer.drain(..) {
288                    if let Output::Event(event) = output {
289                        let mut ctx = OperatorContext {
290                            event_time: self.current_event_time,
291                            processing_time,
292                            timers: &mut self.timer_service,
293                            state: self.state_store.as_mut(),
294                            watermark_generator: self.watermark_generator.as_mut(),
295                            operator_index: idx,
296                        };
297
298                        let operator_outputs = operator.process(&event, &mut ctx);
299                        next_buffer.extend(operator_outputs);
300                    } else {
301                        // Pass through watermarks and late events
302                        next_buffer.push(output);
303                    }
304                }
305
306                // Swap buffers for next iteration
307                current_buffer_is_1 = !current_buffer_is_1;
308            }
309
310            // Extend output buffer with final results
311            let final_buffer = if current_buffer_is_1 {
312                &mut self.operator_buffer_1
313            } else {
314                &mut self.operator_buffer_2
315            };
316            self.output_buffer.append(final_buffer);
317            self.events_processed += 1;
318            events_in_batch += 1;
319
320            // Check batch size limit
321            if events_in_batch >= self.config.batch_size {
322                break;
323            }
324
325            // Check time limit
326            if poll_start.elapsed() >= self.config.max_iteration_time {
327                break;
328            }
329        }
330
331        // 3. Return outputs
332        std::mem::take(&mut self.output_buffer)
333    }
334
335    /// Advances the watermark to the given timestamp.
336    ///
337    /// Called when an external watermark message arrives (e.g., from TPC coordination).
338    /// Updates the reactor's event time tracking and watermark generator state.
339    /// Any resulting watermark output will be included in the next `poll()` result.
340    pub fn advance_watermark(&mut self, timestamp: i64) {
341        // Update current event time if this watermark is newer
342        if timestamp > self.current_event_time {
343            self.current_event_time = timestamp;
344        }
345
346        // Feed the timestamp to the watermark generator so it can advance
347        if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
348            self.output_buffer
349                .push(Output::Watermark(watermark.timestamp()));
350        }
351    }
352
353    /// Triggers a checkpoint by snapshotting all operator states.
354    ///
355    /// Called when a `CheckpointRequest` arrives from the control plane.
356    /// Collects the serialized state from each operator and returns it
357    /// for persistence by Ring 1.
358    pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
359        self.operators.iter().map(|op| op.checkpoint()).collect()
360    }
361
362    /// Get current processing time in microseconds since reactor start
363    #[allow(clippy::cast_possible_truncation)] // Saturating conversion handles overflow on next line
364    fn get_processing_time(&self) -> i64 {
365        // Saturating conversion - after ~292 years this will saturate at i64::MAX
366        let micros = self.start_time.elapsed().as_micros();
367        if micros > i64::MAX as u128 {
368            i64::MAX
369        } else {
370            micros as i64
371        }
372    }
373
374    /// Get the number of events processed
375    #[must_use]
376    pub fn events_processed(&self) -> u64 {
377        self.events_processed
378    }
379
380    /// Get the number of events in the queue
381    #[must_use]
382    pub fn queue_size(&self) -> usize {
383        self.event_queue.len()
384    }
385
386    /// Set CPU affinity if configured
387    ///
388    /// # Errors
389    ///
390    /// Returns `ReactorError` if CPU affinity cannot be set (platform-specific)
391    #[allow(unused_variables)]
392    pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
393        if let Some(cpu_id) = self.config.cpu_affinity {
394            #[cfg(target_os = "linux")]
395            {
396                use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
397                use std::mem;
398
399                // SAFETY: We're calling libc functions with valid parameters.
400                // The cpu_set_t is properly initialized with CPU_ZERO.
401                // The process ID 0 refers to the current thread.
402                #[allow(unsafe_code)]
403                unsafe {
404                    let mut set: cpu_set_t = mem::zeroed();
405                    CPU_ZERO(&mut set);
406                    CPU_SET(cpu_id, &mut set);
407
408                    let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
409                    if result != 0 {
410                        return Err(ReactorError::InitializationFailed(format!(
411                            "Failed to set CPU affinity to core {}: {}",
412                            cpu_id,
413                            std::io::Error::last_os_error()
414                        )));
415                    }
416                }
417            }
418
419            #[cfg(target_os = "windows")]
420            {
421                use winapi::shared::basetsd::DWORD_PTR;
422                use winapi::um::processthreadsapi::GetCurrentThread;
423                use winapi::um::winbase::SetThreadAffinityMask;
424
425                // SAFETY: We're calling Windows API functions with valid parameters.
426                // GetCurrentThread returns a pseudo-handle that doesn't need to be closed.
427                // The mask is a valid CPU mask for the specified core.
428                #[allow(unsafe_code)]
429                unsafe {
430                    let mask: DWORD_PTR = 1 << cpu_id;
431                    let result = SetThreadAffinityMask(GetCurrentThread(), mask);
432                    if result == 0 {
433                        return Err(ReactorError::InitializationFailed(format!(
434                            "Failed to set CPU affinity to core {}: {}",
435                            cpu_id,
436                            std::io::Error::last_os_error()
437                        )));
438                    }
439                }
440            }
441
442            #[cfg(not(any(target_os = "linux", target_os = "windows")))]
443            {
444                eprintln!("Warning: CPU affinity is not implemented for this platform");
445            }
446        }
447        Ok(())
448    }
449
450    /// Runs the event loop continuously until shutdown
451    ///
452    /// # Errors
453    ///
454    /// Returns `ReactorError` if CPU affinity cannot be set or if shutdown fails
455    pub fn run(&mut self) -> Result<(), ReactorError> {
456        self.set_cpu_affinity()?;
457
458        while !self.shutdown.load(Ordering::Relaxed) {
459            // Process events
460            let outputs = self.poll();
461
462            // Send outputs to sink if configured
463            if !outputs.is_empty() {
464                if let Some(sink) = &mut self.sink {
465                    if let Err(e) = sink.write(outputs) {
466                        eprintln!("Failed to write to sink: {e}");
467                        // Continue processing even if sink fails
468                    }
469                }
470            }
471
472            // If no events to process, yield to avoid busy-waiting
473            if self.event_queue.is_empty() {
474                std::thread::yield_now();
475            }
476
477            // Periodically check for shutdown signal
478            if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
479                break;
480            }
481        }
482
483        // Flush sink before shutdown
484        if let Some(sink) = &mut self.sink {
485            if let Err(e) = sink.flush() {
486                eprintln!("Failed to flush sink during shutdown: {e}");
487            }
488        }
489
490        Ok(())
491    }
492
493    /// Stops the reactor gracefully
494    ///
495    /// # Errors
496    ///
497    /// Currently does not return any errors, but may in the future if shutdown fails
498    pub fn shutdown(&mut self) -> Result<(), ReactorError> {
499        // Signal shutdown
500        self.shutdown.store(true, Ordering::Relaxed);
501
502        // Process remaining events
503        while !self.event_queue.is_empty() {
504            let outputs = self.poll();
505
506            // Send final outputs to sink
507            if !outputs.is_empty() {
508                if let Some(sink) = &mut self.sink {
509                    if let Err(e) = sink.write(outputs) {
510                        eprintln!("Failed to write final outputs during shutdown: {e}");
511                    }
512                }
513            }
514        }
515
516        // Final flush
517        if let Some(sink) = &mut self.sink {
518            if let Err(e) = sink.flush() {
519                eprintln!("Failed to flush sink during shutdown: {e}");
520            }
521        }
522
523        Ok(())
524    }
525}
526
527/// Errors that can occur in the reactor
528#[derive(Debug, thiserror::Error)]
529pub enum ReactorError {
530    /// Failed to initialize the reactor
531    #[error("Initialization failed: {0}")]
532    InitializationFailed(String),
533
534    /// Event processing error
535    #[error("Event processing failed: {0}")]
536    EventProcessingFailed(String),
537
538    /// Shutdown error
539    #[error("Shutdown failed: {0}")]
540    ShutdownFailed(String),
541
542    /// Event queue is full
543    #[error("Event queue full (capacity: {capacity})")]
544    QueueFull {
545        /// The configured capacity of the event queue
546        capacity: usize,
547    },
548}
549
550/// A simple sink that writes outputs to stdout (for testing).
551pub struct StdoutSink;
552
553impl Sink for StdoutSink {
554    fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError> {
555        for output in outputs {
556            match output {
557                Output::Event(event) => {
558                    println!(
559                        "Event: timestamp={}, data={:?}",
560                        event.timestamp, event.data
561                    );
562                }
563                Output::Watermark(timestamp) => {
564                    println!("Watermark: {timestamp}");
565                }
566                Output::LateEvent(event) => {
567                    println!(
568                        "Late Event (dropped): timestamp={}, data={:?}",
569                        event.timestamp, event.data
570                    );
571                }
572                Output::SideOutput { name, event } => {
573                    println!(
574                        "Side Output [{}]: timestamp={}, data={:?}",
575                        name, event.timestamp, event.data
576                    );
577                }
578                Output::Changelog(record) => {
579                    println!(
580                        "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
581                        record.operation,
582                        record.weight,
583                        record.emit_timestamp,
584                        record.event.timestamp,
585                        record.event.data
586                    );
587                }
588                Output::CheckpointComplete {
589                    checkpoint_id,
590                    operator_states,
591                } => {
592                    println!(
593                        "Checkpoint: id={checkpoint_id}, operators={}",
594                        operator_states.len()
595                    );
596                }
597            }
598        }
599        Ok(())
600    }
601
602    fn flush(&mut self) -> Result<(), SinkError> {
603        Ok(())
604    }
605}
606
607/// A buffering sink that collects outputs (for testing).
608#[derive(Default)]
609pub struct BufferingSink {
610    buffer: Vec<Output>,
611}
612
613impl BufferingSink {
614    /// Create a new buffering sink.
615    #[must_use]
616    pub fn new() -> Self {
617        Self::default()
618    }
619
620    /// Get the buffered outputs.
621    #[must_use]
622    pub fn take_buffer(&mut self) -> Vec<Output> {
623        std::mem::take(&mut self.buffer)
624    }
625}
626
627impl Sink for BufferingSink {
628    fn write(&mut self, mut outputs: Vec<Output>) -> Result<(), SinkError> {
629        self.buffer.append(&mut outputs);
630        Ok(())
631    }
632
633    fn flush(&mut self) -> Result<(), SinkError> {
634        Ok(())
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use crate::operator::OutputVec;
642    use arrow_array::{Int64Array, RecordBatch};
643    use std::sync::Arc;
644
645    // Mock operator for testing
646    struct PassthroughOperator;
647
648    impl Operator for PassthroughOperator {
649        fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
650            let mut output = OutputVec::new();
651            output.push(Output::Event(event.clone()));
652            output
653        }
654
655        fn on_timer(
656            &mut self,
657            _timer: crate::operator::Timer,
658            _ctx: &mut OperatorContext,
659        ) -> OutputVec {
660            OutputVec::new()
661        }
662
663        fn checkpoint(&self) -> crate::operator::OperatorState {
664            crate::operator::OperatorState {
665                operator_id: "passthrough".to_string(),
666                data: vec![],
667            }
668        }
669
670        fn restore(
671            &mut self,
672            _state: crate::operator::OperatorState,
673        ) -> Result<(), crate::operator::OperatorError> {
674            Ok(())
675        }
676    }
677
678    #[test]
679    fn test_default_config() {
680        let config = ReactorConfig::default();
681        assert_eq!(config.batch_size, 1024);
682        assert_eq!(config.event_buffer_size, 65536);
683    }
684
685    #[test]
686    fn test_reactor_creation() {
687        let config = ReactorConfig::default();
688        let reactor = Reactor::new(config);
689        assert!(reactor.is_ok());
690    }
691
692    #[test]
693    fn test_reactor_add_operator() {
694        let config = ReactorConfig::default();
695        let mut reactor = Reactor::new(config).unwrap();
696
697        let operator = Box::new(PassthroughOperator);
698        reactor.add_operator(operator);
699
700        assert_eq!(reactor.operators.len(), 1);
701    }
702
703    #[test]
704    fn test_reactor_submit_event() {
705        let config = ReactorConfig::default();
706        let mut reactor = Reactor::new(config).unwrap();
707
708        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
709        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
710        let event = Event::new(12345, batch);
711
712        assert!(reactor.submit(event).is_ok());
713        assert_eq!(reactor.queue_size(), 1);
714    }
715
716    #[test]
717    fn test_reactor_poll_processes_events() {
718        let config = ReactorConfig::default();
719        let mut reactor = Reactor::new(config).unwrap();
720
721        // Add a passthrough operator
722        reactor.add_operator(Box::new(PassthroughOperator));
723
724        // Submit an event
725        let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
726        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
727        let event = Event::new(12345, batch);
728
729        reactor.submit(event.clone()).unwrap();
730
731        // Poll should process the event
732        let outputs = reactor.poll();
733        assert!(!outputs.is_empty());
734        assert_eq!(reactor.events_processed(), 1);
735        assert_eq!(reactor.queue_size(), 0);
736    }
737
738    #[test]
739    fn test_reactor_queue_full() {
740        let config = ReactorConfig {
741            event_buffer_size: 2, // Very small buffer
742            ..ReactorConfig::default()
743        };
744        let mut reactor = Reactor::new(config).unwrap();
745
746        let array = Arc::new(Int64Array::from(vec![1]));
747        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
748
749        // Fill the queue
750        for i in 0..2 {
751            let event = Event::new(i64::from(i), batch.clone());
752            assert!(reactor.submit(event).is_ok());
753        }
754
755        // Next submit should fail
756        let event = Event::new(100, batch);
757        assert!(matches!(
758            reactor.submit(event),
759            Err(ReactorError::QueueFull { .. })
760        ));
761    }
762
763    #[test]
764    fn test_reactor_batch_processing() {
765        let config = ReactorConfig {
766            batch_size: 2, // Small batch size
767            ..ReactorConfig::default()
768        };
769        let mut reactor = Reactor::new(config).unwrap();
770
771        reactor.add_operator(Box::new(PassthroughOperator));
772
773        let array = Arc::new(Int64Array::from(vec![1]));
774        let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
775
776        // Submit 5 events
777        for i in 0..5 {
778            let event = Event::new(i64::from(i), batch.clone());
779            reactor.submit(event).unwrap();
780        }
781
782        // First poll should process only batch_size events
783        reactor.poll();
784        assert_eq!(reactor.events_processed(), 2);
785        assert_eq!(reactor.queue_size(), 3);
786
787        // Second poll should process 2 more
788        reactor.poll();
789        assert_eq!(reactor.events_processed(), 4);
790        assert_eq!(reactor.queue_size(), 1);
791
792        // Third poll should process the last one
793        reactor.poll();
794        assert_eq!(reactor.events_processed(), 5);
795        assert_eq!(reactor.queue_size(), 0);
796    }
797
798    #[test]
799    fn test_reactor_with_sink() {
800        let config = ReactorConfig::default();
801        let mut reactor = Reactor::new(config).unwrap();
802
803        // Add a buffering sink
804        let sink = Box::new(BufferingSink::new());
805        reactor.set_sink(sink);
806
807        // Add passthrough operator
808        reactor.add_operator(Box::new(PassthroughOperator));
809
810        let array = Arc::new(Int64Array::from(vec![42]));
811        let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
812        let event = Event::new(1000, batch);
813
814        // Submit an event
815        reactor.submit(event).unwrap();
816
817        // Process
818        let outputs = reactor.poll();
819        // Should get the event output (and possibly a watermark)
820        assert!(!outputs.is_empty());
821
822        // Verify we got the event
823        assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
824    }
825
826    #[test]
827    fn test_reactor_shutdown() {
828        let config = ReactorConfig::default();
829        let mut reactor = Reactor::new(config).unwrap();
830
831        // Get shutdown handle
832        let shutdown_handle = reactor.shutdown_handle();
833        assert!(!shutdown_handle.load(Ordering::Relaxed));
834
835        let array = Arc::new(Int64Array::from(vec![1]));
836        let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
837
838        // Submit some events
839        for i in 0..5 {
840            reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
841        }
842
843        // Shutdown should process remaining events
844        reactor.shutdown().unwrap();
845        assert!(shutdown_handle.load(Ordering::Relaxed));
846        assert_eq!(reactor.queue_size(), 0);
847    }
848}