1use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{
31 CheckpointCompleteData, Event, Operator, OperatorContext, OperatorState, Output, SideOutputData,
32};
33use crate::state::{AHashMapStore, StateStore};
34use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
35
36pub trait Sink: Send {
38 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError>;
47
48 fn flush(&mut self) -> Result<(), SinkError>;
54}
55
56#[derive(Debug, thiserror::Error)]
58pub enum SinkError {
59 #[error("Write failed: {0}")]
61 WriteFailed(String),
62
63 #[error("Flush failed: {0}")]
65 FlushFailed(String),
66
67 #[error("Sink is closed")]
69 Closed,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReactorConfig {
75 pub batch_size: usize,
77 pub cpu_affinity: Option<usize>,
79 pub max_iteration_time: Duration,
81 pub event_buffer_size: usize,
83 pub max_out_of_orderness: i64,
85}
86
87impl Default for ReactorConfig {
88 fn default() -> Self {
89 Self {
90 batch_size: 1024,
91 cpu_affinity: None,
92 max_iteration_time: Duration::from_millis(10),
93 event_buffer_size: 65536,
94 max_out_of_orderness: 1000, }
96 }
97}
98
99pub struct Reactor {
101 config: ReactorConfig,
102 operators: Vec<Box<dyn Operator>>,
103 timer_service: TimerService,
104 event_queue: VecDeque<Event>,
105 output_buffer: Vec<Output>,
106 state_store: Box<dyn StateStore>,
107 watermark_generator: Box<dyn WatermarkGenerator>,
108 current_event_time: i64,
109 start_time: Instant,
110 events_processed: u64,
111 operator_buffer_1: Vec<Output>,
114 operator_buffer_2: Vec<Output>,
115 sink: Option<Box<dyn Sink>>,
117 shutdown: Arc<AtomicBool>,
119}
120
121impl Reactor {
122 pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
128 let event_queue = VecDeque::with_capacity(config.event_buffer_size);
129 let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
130 config.max_out_of_orderness,
131 ));
132
133 Ok(Self {
134 config,
135 operators: Vec::new(),
136 timer_service: TimerService::new(),
137 event_queue,
138 output_buffer: Vec::with_capacity(1024),
139 state_store: Box::new(AHashMapStore::new()),
140 watermark_generator,
141 current_event_time: 0,
142 start_time: Instant::now(),
143 events_processed: 0,
144 operator_buffer_1: Vec::with_capacity(256),
145 operator_buffer_2: Vec::with_capacity(256),
146 sink: None,
147 shutdown: Arc::new(AtomicBool::new(false)),
148 })
149 }
150
151 pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
153 self.operators.push(operator);
154 }
155
156 pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
158 self.sink = Some(sink);
159 }
160
161 #[must_use]
163 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
164 Arc::clone(&self.shutdown)
165 }
166
167 pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
173 if self.event_queue.len() >= self.config.event_buffer_size {
174 return Err(ReactorError::QueueFull {
175 capacity: self.config.event_buffer_size,
176 });
177 }
178
179 self.event_queue.push_back(event);
180 Ok(())
181 }
182
183 pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
189 let available = self.config.event_buffer_size - self.event_queue.len();
190 if events.len() > available {
191 return Err(ReactorError::QueueFull {
192 capacity: self.config.event_buffer_size,
193 });
194 }
195
196 self.event_queue.extend(events);
197 Ok(())
198 }
199
200 fn process_events(&mut self) {
205 let _guard = HotPathGuard::enter("Reactor::poll");
207
208 let _iteration_budget = TaskBudget::ring0_iteration();
210
211 let poll_start = Instant::now();
212 let processing_time = self.get_processing_time();
213
214 let fired_timers = self.timer_service.poll_timers(self.current_event_time);
216 for mut timer in fired_timers {
217 if let Some(idx) = timer.operator_index {
218 if let Some(operator) = self.operators.get_mut(idx) {
220 let timer_key = timer.key.take().unwrap_or_default();
221 let timer_for_operator = crate::operator::Timer {
222 key: timer_key,
223 timestamp: timer.timestamp,
224 };
225
226 let mut ctx = OperatorContext {
227 event_time: self.current_event_time,
228 processing_time,
229 timers: &mut self.timer_service,
230 state: self.state_store.as_mut(),
231 watermark_generator: self.watermark_generator.as_mut(),
232 operator_index: idx,
233 };
234
235 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
236 self.output_buffer.extend(outputs);
237 }
238 } else {
239 for (idx, operator) in self.operators.iter_mut().enumerate() {
241 let timer_key = timer.key.take().unwrap_or_default();
243 let timer_for_operator = crate::operator::Timer {
244 key: timer_key,
245 timestamp: timer.timestamp,
246 };
247
248 let mut ctx = OperatorContext {
249 event_time: self.current_event_time,
250 processing_time,
251 timers: &mut self.timer_service,
252 state: self.state_store.as_mut(),
253 watermark_generator: self.watermark_generator.as_mut(),
254 operator_index: idx,
255 };
256
257 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
258 self.output_buffer.extend(outputs);
259 }
260 }
261 }
262
263 let mut events_in_batch = 0;
265 while let Some(event) = self.event_queue.pop_front() {
266 if event.timestamp > self.current_event_time {
268 self.current_event_time = event.timestamp;
269 }
270
271 if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
273 self.output_buffer
274 .push(Output::Watermark(watermark.timestamp()));
275 }
276
277 self.operator_buffer_1.clear();
280 self.operator_buffer_1.push(Output::Event(event));
281
282 let mut current_buffer_is_1 = true;
283
284 for (idx, operator) in self.operators.iter_mut().enumerate() {
285 let (current_buffer, next_buffer) = if current_buffer_is_1 {
287 (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
288 } else {
289 (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
290 };
291
292 next_buffer.clear();
293
294 for output in current_buffer.drain(..) {
295 if let Output::Event(event) = output {
296 let mut ctx = OperatorContext {
297 event_time: self.current_event_time,
298 processing_time,
299 timers: &mut self.timer_service,
300 state: self.state_store.as_mut(),
301 watermark_generator: self.watermark_generator.as_mut(),
302 operator_index: idx,
303 };
304
305 let operator_outputs = operator.process(&event, &mut ctx);
306 next_buffer.extend(operator_outputs);
307 } else {
308 next_buffer.push(output);
310 }
311 }
312
313 current_buffer_is_1 = !current_buffer_is_1;
315 }
316
317 let final_buffer = if current_buffer_is_1 {
319 &mut self.operator_buffer_1
320 } else {
321 &mut self.operator_buffer_2
322 };
323 self.output_buffer.append(final_buffer);
324 self.events_processed += 1;
325 events_in_batch += 1;
326
327 if events_in_batch >= self.config.batch_size {
329 break;
330 }
331
332 if poll_start.elapsed() >= self.config.max_iteration_time {
334 break;
335 }
336 }
337 }
338
339 pub fn poll(&mut self) -> Vec<Output> {
345 self.process_events();
346 self.output_buffer.drain(..).collect()
347 }
348
349 pub fn poll_into(&mut self, output: &mut Vec<Output>) {
354 self.process_events();
355 output.append(&mut self.output_buffer);
356 }
357
358 pub fn advance_watermark(&mut self, timestamp: i64) {
364 if timestamp > self.current_event_time {
366 self.current_event_time = timestamp;
367 }
368
369 if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
371 self.output_buffer
372 .push(Output::Watermark(watermark.timestamp()));
373 }
374 }
375
376 pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
382 self.operators.iter().map(|op| op.checkpoint()).collect()
383 }
384
385 #[allow(clippy::cast_possible_truncation)] fn get_processing_time(&self) -> i64 {
388 let micros = self.start_time.elapsed().as_micros();
390 if micros > i64::MAX as u128 {
391 i64::MAX
392 } else {
393 micros as i64
394 }
395 }
396
397 #[must_use]
399 pub fn events_processed(&self) -> u64 {
400 self.events_processed
401 }
402
403 #[must_use]
405 pub fn queue_size(&self) -> usize {
406 self.event_queue.len()
407 }
408
409 #[allow(unused_variables)]
415 pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
416 if let Some(cpu_id) = self.config.cpu_affinity {
417 #[cfg(target_os = "linux")]
418 {
419 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
420 use std::mem;
421
422 #[allow(unsafe_code)]
426 unsafe {
427 let mut set: cpu_set_t = mem::zeroed();
428 CPU_ZERO(&mut set);
429 CPU_SET(cpu_id, &mut set);
430
431 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
432 if result != 0 {
433 return Err(ReactorError::InitializationFailed(format!(
434 "Failed to set CPU affinity to core {}: {}",
435 cpu_id,
436 std::io::Error::last_os_error()
437 )));
438 }
439 }
440 }
441
442 #[cfg(target_os = "windows")]
443 {
444 use windows_sys::Win32::System::Threading::{
445 GetCurrentThread, SetThreadAffinityMask,
446 };
447
448 #[allow(unsafe_code)]
452 unsafe {
453 let mask: usize = 1 << cpu_id;
454 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
455 if result == 0 {
456 return Err(ReactorError::InitializationFailed(format!(
457 "Failed to set CPU affinity to core {}: {}",
458 cpu_id,
459 std::io::Error::last_os_error()
460 )));
461 }
462 }
463 }
464
465 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
466 {
467 tracing::warn!("CPU affinity is not implemented for this platform");
468 }
469 }
470 Ok(())
471 }
472
473 pub fn run(&mut self) -> Result<(), ReactorError> {
479 self.set_cpu_affinity()?;
480
481 while !self.shutdown.load(Ordering::Relaxed) {
482 self.process_events();
484
485 if !self.output_buffer.is_empty() {
488 if let Some(sink) = &mut self.sink {
489 if let Err(e) = sink.write(&mut self.output_buffer) {
490 tracing::error!("Failed to write to sink: {e}");
491 }
493 }
494 self.output_buffer.clear();
495 }
496
497 if self.event_queue.is_empty() {
502 std::hint::spin_loop();
503 }
504
505 if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
507 break;
508 }
509 }
510
511 if let Some(sink) = &mut self.sink {
513 if let Err(e) = sink.flush() {
514 tracing::error!("Failed to flush sink during shutdown: {e}");
515 }
516 }
517
518 Ok(())
519 }
520
521 pub fn shutdown(&mut self) -> Result<(), ReactorError> {
527 self.shutdown.store(true, Ordering::Relaxed);
529
530 while !self.event_queue.is_empty() {
532 self.process_events();
533
534 if !self.output_buffer.is_empty() {
535 if let Some(sink) = &mut self.sink {
536 if let Err(e) = sink.write(&mut self.output_buffer) {
537 tracing::error!("Failed to write final outputs during shutdown: {e}");
538 }
539 }
540 self.output_buffer.clear();
541 }
542 }
543
544 if let Some(sink) = &mut self.sink {
546 if let Err(e) = sink.flush() {
547 tracing::error!("Failed to flush sink during shutdown: {e}");
548 }
549 }
550
551 Ok(())
552 }
553}
554
555#[derive(Debug, thiserror::Error)]
557pub enum ReactorError {
558 #[error("Initialization failed: {0}")]
560 InitializationFailed(String),
561
562 #[error("Event processing failed: {0}")]
564 EventProcessingFailed(String),
565
566 #[error("Shutdown failed: {0}")]
568 ShutdownFailed(String),
569
570 #[error("Event queue full (capacity: {capacity})")]
572 QueueFull {
573 capacity: usize,
575 },
576}
577
578pub struct StdoutSink;
580
581impl Sink for StdoutSink {
582 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
583 for output in outputs.drain(..) {
584 match output {
585 Output::Event(event) => {
586 println!(
587 "Event: timestamp={}, data={:?}",
588 event.timestamp, event.data
589 );
590 }
591 Output::Watermark(timestamp) => {
592 println!("Watermark: {timestamp}");
593 }
594 Output::LateEvent(event) => {
595 println!(
596 "Late Event (dropped): timestamp={}, data={:?}",
597 event.timestamp, event.data
598 );
599 }
600 Output::SideOutput(side_output) => {
601 let SideOutputData { name, event } = *side_output;
602 println!(
603 "Side Output [{}]: timestamp={}, data={:?}",
604 name, event.timestamp, event.data
605 );
606 }
607 Output::Changelog(record) => {
608 println!(
609 "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
610 record.operation,
611 record.weight,
612 record.emit_timestamp,
613 record.event.timestamp,
614 record.event.data
615 );
616 }
617 Output::CheckpointComplete(data) => {
618 let CheckpointCompleteData {
619 checkpoint_id,
620 operator_states,
621 } = *data;
622 println!(
623 "Checkpoint: id={checkpoint_id}, operators={}",
624 operator_states.len()
625 );
626 }
627 }
628 }
629 Ok(())
630 }
631
632 fn flush(&mut self) -> Result<(), SinkError> {
633 Ok(())
634 }
635}
636
637#[derive(Default)]
639pub struct BufferingSink {
640 buffer: Vec<Output>,
641}
642
643impl BufferingSink {
644 #[must_use]
646 pub fn new() -> Self {
647 Self::default()
648 }
649
650 #[must_use]
652 pub fn take_buffer(&mut self) -> Vec<Output> {
653 std::mem::take(&mut self.buffer)
654 }
655}
656
657impl Sink for BufferingSink {
658 fn write(&mut self, outputs: &mut Vec<Output>) -> Result<(), SinkError> {
659 self.buffer.append(outputs);
660 Ok(())
661 }
662
663 fn flush(&mut self) -> Result<(), SinkError> {
664 Ok(())
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671 use crate::operator::OutputVec;
672 use arrow_array::{Int64Array, RecordBatch};
673 use std::sync::Arc;
674
675 struct PassthroughOperator;
677
678 impl Operator for PassthroughOperator {
679 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
680 let mut output = OutputVec::new();
681 output.push(Output::Event(event.clone()));
682 output
683 }
684
685 fn on_timer(
686 &mut self,
687 _timer: crate::operator::Timer,
688 _ctx: &mut OperatorContext,
689 ) -> OutputVec {
690 OutputVec::new()
691 }
692
693 fn checkpoint(&self) -> crate::operator::OperatorState {
694 crate::operator::OperatorState {
695 operator_id: "passthrough".to_string(),
696 data: vec![],
697 }
698 }
699
700 fn restore(
701 &mut self,
702 _state: crate::operator::OperatorState,
703 ) -> Result<(), crate::operator::OperatorError> {
704 Ok(())
705 }
706 }
707
708 #[test]
709 fn test_default_config() {
710 let config = ReactorConfig::default();
711 assert_eq!(config.batch_size, 1024);
712 assert_eq!(config.event_buffer_size, 65536);
713 }
714
715 #[test]
716 fn test_reactor_creation() {
717 let config = ReactorConfig::default();
718 let reactor = Reactor::new(config);
719 assert!(reactor.is_ok());
720 }
721
722 #[test]
723 fn test_reactor_add_operator() {
724 let config = ReactorConfig::default();
725 let mut reactor = Reactor::new(config).unwrap();
726
727 let operator = Box::new(PassthroughOperator);
728 reactor.add_operator(operator);
729
730 assert_eq!(reactor.operators.len(), 1);
731 }
732
733 #[test]
734 fn test_reactor_submit_event() {
735 let config = ReactorConfig::default();
736 let mut reactor = Reactor::new(config).unwrap();
737
738 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
739 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
740 let event = Event::new(12345, batch);
741
742 assert!(reactor.submit(event).is_ok());
743 assert_eq!(reactor.queue_size(), 1);
744 }
745
746 #[test]
747 fn test_reactor_poll_processes_events() {
748 let config = ReactorConfig::default();
749 let mut reactor = Reactor::new(config).unwrap();
750
751 reactor.add_operator(Box::new(PassthroughOperator));
753
754 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
756 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
757 let event = Event::new(12345, batch);
758
759 reactor.submit(event.clone()).unwrap();
760
761 let outputs = reactor.poll();
763 assert!(!outputs.is_empty());
764 assert_eq!(reactor.events_processed(), 1);
765 assert_eq!(reactor.queue_size(), 0);
766 }
767
768 #[test]
769 fn test_reactor_queue_full() {
770 let config = ReactorConfig {
771 event_buffer_size: 2, ..ReactorConfig::default()
773 };
774 let mut reactor = Reactor::new(config).unwrap();
775
776 let array = Arc::new(Int64Array::from(vec![1]));
777 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
778
779 for i in 0..2 {
781 let event = Event::new(i64::from(i), batch.clone());
782 assert!(reactor.submit(event).is_ok());
783 }
784
785 let event = Event::new(100, batch);
787 assert!(matches!(
788 reactor.submit(event),
789 Err(ReactorError::QueueFull { .. })
790 ));
791 }
792
793 #[test]
794 fn test_reactor_batch_processing() {
795 let config = ReactorConfig {
796 batch_size: 2, ..ReactorConfig::default()
798 };
799 let mut reactor = Reactor::new(config).unwrap();
800
801 reactor.add_operator(Box::new(PassthroughOperator));
802
803 let array = Arc::new(Int64Array::from(vec![1]));
804 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
805
806 for i in 0..5 {
808 let event = Event::new(i64::from(i), batch.clone());
809 reactor.submit(event).unwrap();
810 }
811
812 reactor.poll();
814 assert_eq!(reactor.events_processed(), 2);
815 assert_eq!(reactor.queue_size(), 3);
816
817 reactor.poll();
819 assert_eq!(reactor.events_processed(), 4);
820 assert_eq!(reactor.queue_size(), 1);
821
822 reactor.poll();
824 assert_eq!(reactor.events_processed(), 5);
825 assert_eq!(reactor.queue_size(), 0);
826 }
827
828 #[test]
829 fn test_reactor_with_sink() {
830 let config = ReactorConfig::default();
831 let mut reactor = Reactor::new(config).unwrap();
832
833 let sink = Box::new(BufferingSink::new());
835 reactor.set_sink(sink);
836
837 reactor.add_operator(Box::new(PassthroughOperator));
839
840 let array = Arc::new(Int64Array::from(vec![42]));
841 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
842 let event = Event::new(1000, batch);
843
844 reactor.submit(event).unwrap();
846
847 let outputs = reactor.poll();
849 assert!(!outputs.is_empty());
851
852 assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
854 }
855
856 #[test]
857 fn test_reactor_shutdown() {
858 let config = ReactorConfig::default();
859 let mut reactor = Reactor::new(config).unwrap();
860
861 let shutdown_handle = reactor.shutdown_handle();
863 assert!(!shutdown_handle.load(Ordering::Relaxed));
864
865 let array = Arc::new(Int64Array::from(vec![1]));
866 let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
867
868 for i in 0..5 {
870 reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
871 }
872
873 reactor.shutdown().unwrap();
875 assert!(shutdown_handle.load(Ordering::Relaxed));
876 assert_eq!(reactor.queue_size(), 0);
877 }
878}