1use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{InMemoryStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34pub trait Sink: Send {
36 fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError>;
42
43 fn flush(&mut self) -> Result<(), SinkError>;
49}
50
51#[derive(Debug, thiserror::Error)]
53pub enum SinkError {
54 #[error("Write failed: {0}")]
56 WriteFailed(String),
57
58 #[error("Flush failed: {0}")]
60 FlushFailed(String),
61
62 #[error("Sink is closed")]
64 Closed,
65}
66
67#[derive(Debug, Clone)]
69pub struct ReactorConfig {
70 pub batch_size: usize,
72 pub cpu_affinity: Option<usize>,
74 pub max_iteration_time: Duration,
76 pub event_buffer_size: usize,
78 pub max_out_of_orderness: i64,
80}
81
82impl Default for ReactorConfig {
83 fn default() -> Self {
84 Self {
85 batch_size: 1024,
86 cpu_affinity: None,
87 max_iteration_time: Duration::from_millis(10),
88 event_buffer_size: 65536,
89 max_out_of_orderness: 1000, }
91 }
92}
93
94pub struct Reactor {
96 config: ReactorConfig,
97 operators: Vec<Box<dyn Operator>>,
98 timer_service: TimerService,
99 event_queue: VecDeque<Event>,
100 output_buffer: Vec<Output>,
101 state_store: Box<dyn StateStore>,
102 watermark_generator: Box<dyn WatermarkGenerator>,
103 current_event_time: i64,
104 start_time: Instant,
105 events_processed: u64,
106 operator_buffer_1: Vec<Output>,
109 operator_buffer_2: Vec<Output>,
110 sink: Option<Box<dyn Sink>>,
112 shutdown: Arc<AtomicBool>,
114}
115
116impl Reactor {
117 pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
123 let event_queue = VecDeque::with_capacity(config.event_buffer_size);
124 let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
125 config.max_out_of_orderness,
126 ));
127
128 Ok(Self {
129 config,
130 operators: Vec::new(),
131 timer_service: TimerService::new(),
132 event_queue,
133 output_buffer: Vec::with_capacity(1024),
134 state_store: Box::new(InMemoryStore::new()),
135 watermark_generator,
136 current_event_time: 0,
137 start_time: Instant::now(),
138 events_processed: 0,
139 operator_buffer_1: Vec::with_capacity(256),
140 operator_buffer_2: Vec::with_capacity(256),
141 sink: None,
142 shutdown: Arc::new(AtomicBool::new(false)),
143 })
144 }
145
146 pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
148 self.operators.push(operator);
149 }
150
151 pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
153 self.sink = Some(sink);
154 }
155
156 #[must_use]
158 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
159 Arc::clone(&self.shutdown)
160 }
161
162 pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
168 if self.event_queue.len() >= self.config.event_buffer_size {
169 return Err(ReactorError::QueueFull {
170 capacity: self.config.event_buffer_size,
171 });
172 }
173
174 self.event_queue.push_back(event);
175 Ok(())
176 }
177
178 pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
184 let available = self.config.event_buffer_size - self.event_queue.len();
185 if events.len() > available {
186 return Err(ReactorError::QueueFull {
187 capacity: self.config.event_buffer_size,
188 });
189 }
190
191 self.event_queue.extend(events);
192 Ok(())
193 }
194
195 pub fn poll(&mut self) -> Vec<Output> {
198 let _guard = HotPathGuard::enter("Reactor::poll");
200
201 let _iteration_budget = TaskBudget::ring0_iteration();
203
204 let poll_start = Instant::now();
205 let processing_time = self.get_processing_time();
206
207 let fired_timers = self.timer_service.poll_timers(self.current_event_time);
209 for mut timer in fired_timers {
210 if let Some(idx) = timer.operator_index {
211 if let Some(operator) = self.operators.get_mut(idx) {
213 let timer_key = timer.key.take().unwrap_or_default();
214 let timer_for_operator = crate::operator::Timer {
215 key: timer_key,
216 timestamp: timer.timestamp,
217 };
218
219 let mut ctx = OperatorContext {
220 event_time: self.current_event_time,
221 processing_time,
222 timers: &mut self.timer_service,
223 state: self.state_store.as_mut(),
224 watermark_generator: self.watermark_generator.as_mut(),
225 operator_index: idx,
226 };
227
228 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
229 self.output_buffer.extend(outputs);
230 }
231 } else {
232 for (idx, operator) in self.operators.iter_mut().enumerate() {
234 let timer_key = timer.key.take().unwrap_or_default();
236 let timer_for_operator = crate::operator::Timer {
237 key: timer_key,
238 timestamp: timer.timestamp,
239 };
240
241 let mut ctx = OperatorContext {
242 event_time: self.current_event_time,
243 processing_time,
244 timers: &mut self.timer_service,
245 state: self.state_store.as_mut(),
246 watermark_generator: self.watermark_generator.as_mut(),
247 operator_index: idx,
248 };
249
250 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
251 self.output_buffer.extend(outputs);
252 }
253 }
254 }
255
256 let mut events_in_batch = 0;
258 while let Some(event) = self.event_queue.pop_front() {
259 if event.timestamp > self.current_event_time {
261 self.current_event_time = event.timestamp;
262 }
263
264 if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
266 self.output_buffer
267 .push(Output::Watermark(watermark.timestamp()));
268 }
269
270 self.operator_buffer_1.clear();
273 self.operator_buffer_1.push(Output::Event(event));
274
275 let mut current_buffer_is_1 = true;
276
277 for (idx, operator) in self.operators.iter_mut().enumerate() {
278 let (current_buffer, next_buffer) = if current_buffer_is_1 {
280 (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
281 } else {
282 (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
283 };
284
285 next_buffer.clear();
286
287 for output in current_buffer.drain(..) {
288 if let Output::Event(event) = output {
289 let mut ctx = OperatorContext {
290 event_time: self.current_event_time,
291 processing_time,
292 timers: &mut self.timer_service,
293 state: self.state_store.as_mut(),
294 watermark_generator: self.watermark_generator.as_mut(),
295 operator_index: idx,
296 };
297
298 let operator_outputs = operator.process(&event, &mut ctx);
299 next_buffer.extend(operator_outputs);
300 } else {
301 next_buffer.push(output);
303 }
304 }
305
306 current_buffer_is_1 = !current_buffer_is_1;
308 }
309
310 let final_buffer = if current_buffer_is_1 {
312 &mut self.operator_buffer_1
313 } else {
314 &mut self.operator_buffer_2
315 };
316 self.output_buffer.append(final_buffer);
317 self.events_processed += 1;
318 events_in_batch += 1;
319
320 if events_in_batch >= self.config.batch_size {
322 break;
323 }
324
325 if poll_start.elapsed() >= self.config.max_iteration_time {
327 break;
328 }
329 }
330
331 std::mem::take(&mut self.output_buffer)
333 }
334
335 pub fn advance_watermark(&mut self, timestamp: i64) {
341 if timestamp > self.current_event_time {
343 self.current_event_time = timestamp;
344 }
345
346 if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
348 self.output_buffer
349 .push(Output::Watermark(watermark.timestamp()));
350 }
351 }
352
353 pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
359 self.operators.iter().map(|op| op.checkpoint()).collect()
360 }
361
362 #[allow(clippy::cast_possible_truncation)] fn get_processing_time(&self) -> i64 {
365 let micros = self.start_time.elapsed().as_micros();
367 if micros > i64::MAX as u128 {
368 i64::MAX
369 } else {
370 micros as i64
371 }
372 }
373
374 #[must_use]
376 pub fn events_processed(&self) -> u64 {
377 self.events_processed
378 }
379
380 #[must_use]
382 pub fn queue_size(&self) -> usize {
383 self.event_queue.len()
384 }
385
386 #[allow(unused_variables)]
392 pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
393 if let Some(cpu_id) = self.config.cpu_affinity {
394 #[cfg(target_os = "linux")]
395 {
396 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
397 use std::mem;
398
399 #[allow(unsafe_code)]
403 unsafe {
404 let mut set: cpu_set_t = mem::zeroed();
405 CPU_ZERO(&mut set);
406 CPU_SET(cpu_id, &mut set);
407
408 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
409 if result != 0 {
410 return Err(ReactorError::InitializationFailed(format!(
411 "Failed to set CPU affinity to core {}: {}",
412 cpu_id,
413 std::io::Error::last_os_error()
414 )));
415 }
416 }
417 }
418
419 #[cfg(target_os = "windows")]
420 {
421 use winapi::shared::basetsd::DWORD_PTR;
422 use winapi::um::processthreadsapi::GetCurrentThread;
423 use winapi::um::winbase::SetThreadAffinityMask;
424
425 #[allow(unsafe_code)]
429 unsafe {
430 let mask: DWORD_PTR = 1 << cpu_id;
431 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
432 if result == 0 {
433 return Err(ReactorError::InitializationFailed(format!(
434 "Failed to set CPU affinity to core {}: {}",
435 cpu_id,
436 std::io::Error::last_os_error()
437 )));
438 }
439 }
440 }
441
442 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
443 {
444 tracing::warn!("CPU affinity is not implemented for this platform");
445 }
446 }
447 Ok(())
448 }
449
450 pub fn run(&mut self) -> Result<(), ReactorError> {
456 self.set_cpu_affinity()?;
457
458 while !self.shutdown.load(Ordering::Relaxed) {
459 let outputs = self.poll();
461
462 if !outputs.is_empty() {
464 if let Some(sink) = &mut self.sink {
465 if let Err(e) = sink.write(outputs) {
466 tracing::error!("Failed to write to sink: {e}");
467 }
469 }
470 }
471
472 if self.event_queue.is_empty() {
477 std::hint::spin_loop();
478 }
479
480 if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
482 break;
483 }
484 }
485
486 if let Some(sink) = &mut self.sink {
488 if let Err(e) = sink.flush() {
489 tracing::error!("Failed to flush sink during shutdown: {e}");
490 }
491 }
492
493 Ok(())
494 }
495
496 pub fn shutdown(&mut self) -> Result<(), ReactorError> {
502 self.shutdown.store(true, Ordering::Relaxed);
504
505 while !self.event_queue.is_empty() {
507 let outputs = self.poll();
508
509 if !outputs.is_empty() {
511 if let Some(sink) = &mut self.sink {
512 if let Err(e) = sink.write(outputs) {
513 tracing::error!("Failed to write final outputs during shutdown: {e}");
514 }
515 }
516 }
517 }
518
519 if let Some(sink) = &mut self.sink {
521 if let Err(e) = sink.flush() {
522 tracing::error!("Failed to flush sink during shutdown: {e}");
523 }
524 }
525
526 Ok(())
527 }
528}
529
530#[derive(Debug, thiserror::Error)]
532pub enum ReactorError {
533 #[error("Initialization failed: {0}")]
535 InitializationFailed(String),
536
537 #[error("Event processing failed: {0}")]
539 EventProcessingFailed(String),
540
541 #[error("Shutdown failed: {0}")]
543 ShutdownFailed(String),
544
545 #[error("Event queue full (capacity: {capacity})")]
547 QueueFull {
548 capacity: usize,
550 },
551}
552
553pub struct StdoutSink;
555
556impl Sink for StdoutSink {
557 fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError> {
558 for output in outputs {
559 match output {
560 Output::Event(event) => {
561 println!(
562 "Event: timestamp={}, data={:?}",
563 event.timestamp, event.data
564 );
565 }
566 Output::Watermark(timestamp) => {
567 println!("Watermark: {timestamp}");
568 }
569 Output::LateEvent(event) => {
570 println!(
571 "Late Event (dropped): timestamp={}, data={:?}",
572 event.timestamp, event.data
573 );
574 }
575 Output::SideOutput { name, event } => {
576 println!(
577 "Side Output [{}]: timestamp={}, data={:?}",
578 name, event.timestamp, event.data
579 );
580 }
581 Output::Changelog(record) => {
582 println!(
583 "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
584 record.operation,
585 record.weight,
586 record.emit_timestamp,
587 record.event.timestamp,
588 record.event.data
589 );
590 }
591 Output::CheckpointComplete {
592 checkpoint_id,
593 operator_states,
594 } => {
595 println!(
596 "Checkpoint: id={checkpoint_id}, operators={}",
597 operator_states.len()
598 );
599 }
600 }
601 }
602 Ok(())
603 }
604
605 fn flush(&mut self) -> Result<(), SinkError> {
606 Ok(())
607 }
608}
609
610#[derive(Default)]
612pub struct BufferingSink {
613 buffer: Vec<Output>,
614}
615
616impl BufferingSink {
617 #[must_use]
619 pub fn new() -> Self {
620 Self::default()
621 }
622
623 #[must_use]
625 pub fn take_buffer(&mut self) -> Vec<Output> {
626 std::mem::take(&mut self.buffer)
627 }
628}
629
630impl Sink for BufferingSink {
631 fn write(&mut self, mut outputs: Vec<Output>) -> Result<(), SinkError> {
632 self.buffer.append(&mut outputs);
633 Ok(())
634 }
635
636 fn flush(&mut self) -> Result<(), SinkError> {
637 Ok(())
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644 use crate::operator::OutputVec;
645 use arrow_array::{Int64Array, RecordBatch};
646 use std::sync::Arc;
647
648 struct PassthroughOperator;
650
651 impl Operator for PassthroughOperator {
652 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
653 let mut output = OutputVec::new();
654 output.push(Output::Event(event.clone()));
655 output
656 }
657
658 fn on_timer(
659 &mut self,
660 _timer: crate::operator::Timer,
661 _ctx: &mut OperatorContext,
662 ) -> OutputVec {
663 OutputVec::new()
664 }
665
666 fn checkpoint(&self) -> crate::operator::OperatorState {
667 crate::operator::OperatorState {
668 operator_id: "passthrough".to_string(),
669 data: vec![],
670 }
671 }
672
673 fn restore(
674 &mut self,
675 _state: crate::operator::OperatorState,
676 ) -> Result<(), crate::operator::OperatorError> {
677 Ok(())
678 }
679 }
680
681 #[test]
682 fn test_default_config() {
683 let config = ReactorConfig::default();
684 assert_eq!(config.batch_size, 1024);
685 assert_eq!(config.event_buffer_size, 65536);
686 }
687
688 #[test]
689 fn test_reactor_creation() {
690 let config = ReactorConfig::default();
691 let reactor = Reactor::new(config);
692 assert!(reactor.is_ok());
693 }
694
695 #[test]
696 fn test_reactor_add_operator() {
697 let config = ReactorConfig::default();
698 let mut reactor = Reactor::new(config).unwrap();
699
700 let operator = Box::new(PassthroughOperator);
701 reactor.add_operator(operator);
702
703 assert_eq!(reactor.operators.len(), 1);
704 }
705
706 #[test]
707 fn test_reactor_submit_event() {
708 let config = ReactorConfig::default();
709 let mut reactor = Reactor::new(config).unwrap();
710
711 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
712 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
713 let event = Event::new(12345, batch);
714
715 assert!(reactor.submit(event).is_ok());
716 assert_eq!(reactor.queue_size(), 1);
717 }
718
719 #[test]
720 fn test_reactor_poll_processes_events() {
721 let config = ReactorConfig::default();
722 let mut reactor = Reactor::new(config).unwrap();
723
724 reactor.add_operator(Box::new(PassthroughOperator));
726
727 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
729 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
730 let event = Event::new(12345, batch);
731
732 reactor.submit(event.clone()).unwrap();
733
734 let outputs = reactor.poll();
736 assert!(!outputs.is_empty());
737 assert_eq!(reactor.events_processed(), 1);
738 assert_eq!(reactor.queue_size(), 0);
739 }
740
741 #[test]
742 fn test_reactor_queue_full() {
743 let config = ReactorConfig {
744 event_buffer_size: 2, ..ReactorConfig::default()
746 };
747 let mut reactor = Reactor::new(config).unwrap();
748
749 let array = Arc::new(Int64Array::from(vec![1]));
750 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
751
752 for i in 0..2 {
754 let event = Event::new(i64::from(i), batch.clone());
755 assert!(reactor.submit(event).is_ok());
756 }
757
758 let event = Event::new(100, batch);
760 assert!(matches!(
761 reactor.submit(event),
762 Err(ReactorError::QueueFull { .. })
763 ));
764 }
765
766 #[test]
767 fn test_reactor_batch_processing() {
768 let config = ReactorConfig {
769 batch_size: 2, ..ReactorConfig::default()
771 };
772 let mut reactor = Reactor::new(config).unwrap();
773
774 reactor.add_operator(Box::new(PassthroughOperator));
775
776 let array = Arc::new(Int64Array::from(vec![1]));
777 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
778
779 for i in 0..5 {
781 let event = Event::new(i64::from(i), batch.clone());
782 reactor.submit(event).unwrap();
783 }
784
785 reactor.poll();
787 assert_eq!(reactor.events_processed(), 2);
788 assert_eq!(reactor.queue_size(), 3);
789
790 reactor.poll();
792 assert_eq!(reactor.events_processed(), 4);
793 assert_eq!(reactor.queue_size(), 1);
794
795 reactor.poll();
797 assert_eq!(reactor.events_processed(), 5);
798 assert_eq!(reactor.queue_size(), 0);
799 }
800
801 #[test]
802 fn test_reactor_with_sink() {
803 let config = ReactorConfig::default();
804 let mut reactor = Reactor::new(config).unwrap();
805
806 let sink = Box::new(BufferingSink::new());
808 reactor.set_sink(sink);
809
810 reactor.add_operator(Box::new(PassthroughOperator));
812
813 let array = Arc::new(Int64Array::from(vec![42]));
814 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
815 let event = Event::new(1000, batch);
816
817 reactor.submit(event).unwrap();
819
820 let outputs = reactor.poll();
822 assert!(!outputs.is_empty());
824
825 assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
827 }
828
829 #[test]
830 fn test_reactor_shutdown() {
831 let config = ReactorConfig::default();
832 let mut reactor = Reactor::new(config).unwrap();
833
834 let shutdown_handle = reactor.shutdown_handle();
836 assert!(!shutdown_handle.load(Ordering::Relaxed));
837
838 let array = Arc::new(Int64Array::from(vec![1]));
839 let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
840
841 for i in 0..5 {
843 reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
844 }
845
846 reactor.shutdown().unwrap();
848 assert!(shutdown_handle.load(Ordering::Relaxed));
849 assert_eq!(reactor.queue_size(), 0);
850 }
851}