1use std::collections::VecDeque;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27
28use crate::alloc::HotPathGuard;
29use crate::budget::TaskBudget;
30use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output};
31use crate::state::{InMemoryStore, StateStore};
32use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
33
34pub trait Sink: Send {
36 fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError>;
42
43 fn flush(&mut self) -> Result<(), SinkError>;
49}
50
51#[derive(Debug, thiserror::Error)]
53pub enum SinkError {
54 #[error("Write failed: {0}")]
56 WriteFailed(String),
57
58 #[error("Flush failed: {0}")]
60 FlushFailed(String),
61
62 #[error("Sink is closed")]
64 Closed,
65}
66
67#[derive(Debug, Clone)]
69pub struct ReactorConfig {
70 pub batch_size: usize,
72 pub cpu_affinity: Option<usize>,
74 pub max_iteration_time: Duration,
76 pub event_buffer_size: usize,
78 pub max_out_of_orderness: i64,
80}
81
82impl Default for ReactorConfig {
83 fn default() -> Self {
84 Self {
85 batch_size: 1024,
86 cpu_affinity: None,
87 max_iteration_time: Duration::from_millis(10),
88 event_buffer_size: 65536,
89 max_out_of_orderness: 1000, }
91 }
92}
93
94pub struct Reactor {
96 config: ReactorConfig,
97 operators: Vec<Box<dyn Operator>>,
98 timer_service: TimerService,
99 event_queue: VecDeque<Event>,
100 output_buffer: Vec<Output>,
101 state_store: Box<dyn StateStore>,
102 watermark_generator: Box<dyn WatermarkGenerator>,
103 current_event_time: i64,
104 start_time: Instant,
105 events_processed: u64,
106 operator_buffer_1: Vec<Output>,
109 operator_buffer_2: Vec<Output>,
110 sink: Option<Box<dyn Sink>>,
112 shutdown: Arc<AtomicBool>,
114}
115
116impl Reactor {
117 pub fn new(config: ReactorConfig) -> Result<Self, ReactorError> {
123 let event_queue = VecDeque::with_capacity(config.event_buffer_size);
124 let watermark_generator = Box::new(BoundedOutOfOrdernessGenerator::new(
125 config.max_out_of_orderness,
126 ));
127
128 Ok(Self {
129 config,
130 operators: Vec::new(),
131 timer_service: TimerService::new(),
132 event_queue,
133 output_buffer: Vec::with_capacity(1024),
134 state_store: Box::new(InMemoryStore::new()),
135 watermark_generator,
136 current_event_time: 0,
137 start_time: Instant::now(),
138 events_processed: 0,
139 operator_buffer_1: Vec::with_capacity(256),
140 operator_buffer_2: Vec::with_capacity(256),
141 sink: None,
142 shutdown: Arc::new(AtomicBool::new(false)),
143 })
144 }
145
146 pub fn add_operator(&mut self, operator: Box<dyn Operator>) {
148 self.operators.push(operator);
149 }
150
151 pub fn set_sink(&mut self, sink: Box<dyn Sink>) {
153 self.sink = Some(sink);
154 }
155
156 #[must_use]
158 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
159 Arc::clone(&self.shutdown)
160 }
161
162 pub fn submit(&mut self, event: Event) -> Result<(), ReactorError> {
168 if self.event_queue.len() >= self.config.event_buffer_size {
169 return Err(ReactorError::QueueFull {
170 capacity: self.config.event_buffer_size,
171 });
172 }
173
174 self.event_queue.push_back(event);
175 Ok(())
176 }
177
178 pub fn submit_batch(&mut self, events: Vec<Event>) -> Result<(), ReactorError> {
184 let available = self.config.event_buffer_size - self.event_queue.len();
185 if events.len() > available {
186 return Err(ReactorError::QueueFull {
187 capacity: self.config.event_buffer_size,
188 });
189 }
190
191 self.event_queue.extend(events);
192 Ok(())
193 }
194
195 pub fn poll(&mut self) -> Vec<Output> {
198 let _guard = HotPathGuard::enter("Reactor::poll");
200
201 let _iteration_budget = TaskBudget::ring0_iteration();
203
204 let poll_start = Instant::now();
205 let processing_time = self.get_processing_time();
206
207 let fired_timers = self.timer_service.poll_timers(self.current_event_time);
209 for mut timer in fired_timers {
210 if let Some(idx) = timer.operator_index {
211 if let Some(operator) = self.operators.get_mut(idx) {
213 let timer_key = timer.key.take().unwrap_or_default();
214 let timer_for_operator = crate::operator::Timer {
215 key: timer_key,
216 timestamp: timer.timestamp,
217 };
218
219 let mut ctx = OperatorContext {
220 event_time: self.current_event_time,
221 processing_time,
222 timers: &mut self.timer_service,
223 state: self.state_store.as_mut(),
224 watermark_generator: self.watermark_generator.as_mut(),
225 operator_index: idx,
226 };
227
228 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
229 self.output_buffer.extend(outputs);
230 }
231 } else {
232 for (idx, operator) in self.operators.iter_mut().enumerate() {
234 let timer_key = timer.key.take().unwrap_or_default();
236 let timer_for_operator = crate::operator::Timer {
237 key: timer_key,
238 timestamp: timer.timestamp,
239 };
240
241 let mut ctx = OperatorContext {
242 event_time: self.current_event_time,
243 processing_time,
244 timers: &mut self.timer_service,
245 state: self.state_store.as_mut(),
246 watermark_generator: self.watermark_generator.as_mut(),
247 operator_index: idx,
248 };
249
250 let outputs = operator.on_timer(timer_for_operator, &mut ctx);
251 self.output_buffer.extend(outputs);
252 }
253 }
254 }
255
256 let mut events_in_batch = 0;
258 while let Some(event) = self.event_queue.pop_front() {
259 if event.timestamp > self.current_event_time {
261 self.current_event_time = event.timestamp;
262 }
263
264 if let Some(watermark) = self.watermark_generator.on_event(event.timestamp) {
266 self.output_buffer
267 .push(Output::Watermark(watermark.timestamp()));
268 }
269
270 self.operator_buffer_1.clear();
273 self.operator_buffer_1.push(Output::Event(event));
274
275 let mut current_buffer_is_1 = true;
276
277 for (idx, operator) in self.operators.iter_mut().enumerate() {
278 let (current_buffer, next_buffer) = if current_buffer_is_1 {
280 (&mut self.operator_buffer_1, &mut self.operator_buffer_2)
281 } else {
282 (&mut self.operator_buffer_2, &mut self.operator_buffer_1)
283 };
284
285 next_buffer.clear();
286
287 for output in current_buffer.drain(..) {
288 if let Output::Event(event) = output {
289 let mut ctx = OperatorContext {
290 event_time: self.current_event_time,
291 processing_time,
292 timers: &mut self.timer_service,
293 state: self.state_store.as_mut(),
294 watermark_generator: self.watermark_generator.as_mut(),
295 operator_index: idx,
296 };
297
298 let operator_outputs = operator.process(&event, &mut ctx);
299 next_buffer.extend(operator_outputs);
300 } else {
301 next_buffer.push(output);
303 }
304 }
305
306 current_buffer_is_1 = !current_buffer_is_1;
308 }
309
310 let final_buffer = if current_buffer_is_1 {
312 &mut self.operator_buffer_1
313 } else {
314 &mut self.operator_buffer_2
315 };
316 self.output_buffer.append(final_buffer);
317 self.events_processed += 1;
318 events_in_batch += 1;
319
320 if events_in_batch >= self.config.batch_size {
322 break;
323 }
324
325 if poll_start.elapsed() >= self.config.max_iteration_time {
327 break;
328 }
329 }
330
331 std::mem::take(&mut self.output_buffer)
333 }
334
335 pub fn advance_watermark(&mut self, timestamp: i64) {
341 if timestamp > self.current_event_time {
343 self.current_event_time = timestamp;
344 }
345
346 if let Some(watermark) = self.watermark_generator.on_event(timestamp) {
348 self.output_buffer
349 .push(Output::Watermark(watermark.timestamp()));
350 }
351 }
352
353 pub fn trigger_checkpoint(&mut self) -> Vec<OperatorState> {
359 self.operators.iter().map(|op| op.checkpoint()).collect()
360 }
361
362 #[allow(clippy::cast_possible_truncation)] fn get_processing_time(&self) -> i64 {
365 let micros = self.start_time.elapsed().as_micros();
367 if micros > i64::MAX as u128 {
368 i64::MAX
369 } else {
370 micros as i64
371 }
372 }
373
374 #[must_use]
376 pub fn events_processed(&self) -> u64 {
377 self.events_processed
378 }
379
380 #[must_use]
382 pub fn queue_size(&self) -> usize {
383 self.event_queue.len()
384 }
385
386 #[allow(unused_variables)]
392 pub fn set_cpu_affinity(&self) -> Result<(), ReactorError> {
393 if let Some(cpu_id) = self.config.cpu_affinity {
394 #[cfg(target_os = "linux")]
395 {
396 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
397 use std::mem;
398
399 #[allow(unsafe_code)]
403 unsafe {
404 let mut set: cpu_set_t = mem::zeroed();
405 CPU_ZERO(&mut set);
406 CPU_SET(cpu_id, &mut set);
407
408 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
409 if result != 0 {
410 return Err(ReactorError::InitializationFailed(format!(
411 "Failed to set CPU affinity to core {}: {}",
412 cpu_id,
413 std::io::Error::last_os_error()
414 )));
415 }
416 }
417 }
418
419 #[cfg(target_os = "windows")]
420 {
421 use winapi::shared::basetsd::DWORD_PTR;
422 use winapi::um::processthreadsapi::GetCurrentThread;
423 use winapi::um::winbase::SetThreadAffinityMask;
424
425 #[allow(unsafe_code)]
429 unsafe {
430 let mask: DWORD_PTR = 1 << cpu_id;
431 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
432 if result == 0 {
433 return Err(ReactorError::InitializationFailed(format!(
434 "Failed to set CPU affinity to core {}: {}",
435 cpu_id,
436 std::io::Error::last_os_error()
437 )));
438 }
439 }
440 }
441
442 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
443 {
444 eprintln!("Warning: CPU affinity is not implemented for this platform");
445 }
446 }
447 Ok(())
448 }
449
450 pub fn run(&mut self) -> Result<(), ReactorError> {
456 self.set_cpu_affinity()?;
457
458 while !self.shutdown.load(Ordering::Relaxed) {
459 let outputs = self.poll();
461
462 if !outputs.is_empty() {
464 if let Some(sink) = &mut self.sink {
465 if let Err(e) = sink.write(outputs) {
466 eprintln!("Failed to write to sink: {e}");
467 }
469 }
470 }
471
472 if self.event_queue.is_empty() {
474 std::thread::yield_now();
475 }
476
477 if self.events_processed.is_multiple_of(1000) && self.shutdown.load(Ordering::Relaxed) {
479 break;
480 }
481 }
482
483 if let Some(sink) = &mut self.sink {
485 if let Err(e) = sink.flush() {
486 eprintln!("Failed to flush sink during shutdown: {e}");
487 }
488 }
489
490 Ok(())
491 }
492
493 pub fn shutdown(&mut self) -> Result<(), ReactorError> {
499 self.shutdown.store(true, Ordering::Relaxed);
501
502 while !self.event_queue.is_empty() {
504 let outputs = self.poll();
505
506 if !outputs.is_empty() {
508 if let Some(sink) = &mut self.sink {
509 if let Err(e) = sink.write(outputs) {
510 eprintln!("Failed to write final outputs during shutdown: {e}");
511 }
512 }
513 }
514 }
515
516 if let Some(sink) = &mut self.sink {
518 if let Err(e) = sink.flush() {
519 eprintln!("Failed to flush sink during shutdown: {e}");
520 }
521 }
522
523 Ok(())
524 }
525}
526
527#[derive(Debug, thiserror::Error)]
529pub enum ReactorError {
530 #[error("Initialization failed: {0}")]
532 InitializationFailed(String),
533
534 #[error("Event processing failed: {0}")]
536 EventProcessingFailed(String),
537
538 #[error("Shutdown failed: {0}")]
540 ShutdownFailed(String),
541
542 #[error("Event queue full (capacity: {capacity})")]
544 QueueFull {
545 capacity: usize,
547 },
548}
549
550pub struct StdoutSink;
552
553impl Sink for StdoutSink {
554 fn write(&mut self, outputs: Vec<Output>) -> Result<(), SinkError> {
555 for output in outputs {
556 match output {
557 Output::Event(event) => {
558 println!(
559 "Event: timestamp={}, data={:?}",
560 event.timestamp, event.data
561 );
562 }
563 Output::Watermark(timestamp) => {
564 println!("Watermark: {timestamp}");
565 }
566 Output::LateEvent(event) => {
567 println!(
568 "Late Event (dropped): timestamp={}, data={:?}",
569 event.timestamp, event.data
570 );
571 }
572 Output::SideOutput { name, event } => {
573 println!(
574 "Side Output [{}]: timestamp={}, data={:?}",
575 name, event.timestamp, event.data
576 );
577 }
578 Output::Changelog(record) => {
579 println!(
580 "Changelog: op={:?}, weight={}, emit_ts={}, event_ts={}, data={:?}",
581 record.operation,
582 record.weight,
583 record.emit_timestamp,
584 record.event.timestamp,
585 record.event.data
586 );
587 }
588 Output::CheckpointComplete {
589 checkpoint_id,
590 operator_states,
591 } => {
592 println!(
593 "Checkpoint: id={checkpoint_id}, operators={}",
594 operator_states.len()
595 );
596 }
597 }
598 }
599 Ok(())
600 }
601
602 fn flush(&mut self) -> Result<(), SinkError> {
603 Ok(())
604 }
605}
606
607#[derive(Default)]
609pub struct BufferingSink {
610 buffer: Vec<Output>,
611}
612
613impl BufferingSink {
614 #[must_use]
616 pub fn new() -> Self {
617 Self::default()
618 }
619
620 #[must_use]
622 pub fn take_buffer(&mut self) -> Vec<Output> {
623 std::mem::take(&mut self.buffer)
624 }
625}
626
627impl Sink for BufferingSink {
628 fn write(&mut self, mut outputs: Vec<Output>) -> Result<(), SinkError> {
629 self.buffer.append(&mut outputs);
630 Ok(())
631 }
632
633 fn flush(&mut self) -> Result<(), SinkError> {
634 Ok(())
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641 use crate::operator::OutputVec;
642 use arrow_array::{Int64Array, RecordBatch};
643 use std::sync::Arc;
644
645 struct PassthroughOperator;
647
648 impl Operator for PassthroughOperator {
649 fn process(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
650 let mut output = OutputVec::new();
651 output.push(Output::Event(event.clone()));
652 output
653 }
654
655 fn on_timer(
656 &mut self,
657 _timer: crate::operator::Timer,
658 _ctx: &mut OperatorContext,
659 ) -> OutputVec {
660 OutputVec::new()
661 }
662
663 fn checkpoint(&self) -> crate::operator::OperatorState {
664 crate::operator::OperatorState {
665 operator_id: "passthrough".to_string(),
666 data: vec![],
667 }
668 }
669
670 fn restore(
671 &mut self,
672 _state: crate::operator::OperatorState,
673 ) -> Result<(), crate::operator::OperatorError> {
674 Ok(())
675 }
676 }
677
678 #[test]
679 fn test_default_config() {
680 let config = ReactorConfig::default();
681 assert_eq!(config.batch_size, 1024);
682 assert_eq!(config.event_buffer_size, 65536);
683 }
684
685 #[test]
686 fn test_reactor_creation() {
687 let config = ReactorConfig::default();
688 let reactor = Reactor::new(config);
689 assert!(reactor.is_ok());
690 }
691
692 #[test]
693 fn test_reactor_add_operator() {
694 let config = ReactorConfig::default();
695 let mut reactor = Reactor::new(config).unwrap();
696
697 let operator = Box::new(PassthroughOperator);
698 reactor.add_operator(operator);
699
700 assert_eq!(reactor.operators.len(), 1);
701 }
702
703 #[test]
704 fn test_reactor_submit_event() {
705 let config = ReactorConfig::default();
706 let mut reactor = Reactor::new(config).unwrap();
707
708 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
709 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
710 let event = Event::new(12345, batch);
711
712 assert!(reactor.submit(event).is_ok());
713 assert_eq!(reactor.queue_size(), 1);
714 }
715
716 #[test]
717 fn test_reactor_poll_processes_events() {
718 let config = ReactorConfig::default();
719 let mut reactor = Reactor::new(config).unwrap();
720
721 reactor.add_operator(Box::new(PassthroughOperator));
723
724 let array = Arc::new(Int64Array::from(vec![1, 2, 3]));
726 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
727 let event = Event::new(12345, batch);
728
729 reactor.submit(event.clone()).unwrap();
730
731 let outputs = reactor.poll();
733 assert!(!outputs.is_empty());
734 assert_eq!(reactor.events_processed(), 1);
735 assert_eq!(reactor.queue_size(), 0);
736 }
737
738 #[test]
739 fn test_reactor_queue_full() {
740 let config = ReactorConfig {
741 event_buffer_size: 2, ..ReactorConfig::default()
743 };
744 let mut reactor = Reactor::new(config).unwrap();
745
746 let array = Arc::new(Int64Array::from(vec![1]));
747 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
748
749 for i in 0..2 {
751 let event = Event::new(i64::from(i), batch.clone());
752 assert!(reactor.submit(event).is_ok());
753 }
754
755 let event = Event::new(100, batch);
757 assert!(matches!(
758 reactor.submit(event),
759 Err(ReactorError::QueueFull { .. })
760 ));
761 }
762
763 #[test]
764 fn test_reactor_batch_processing() {
765 let config = ReactorConfig {
766 batch_size: 2, ..ReactorConfig::default()
768 };
769 let mut reactor = Reactor::new(config).unwrap();
770
771 reactor.add_operator(Box::new(PassthroughOperator));
772
773 let array = Arc::new(Int64Array::from(vec![1]));
774 let batch = RecordBatch::try_from_iter(vec![("col1", array as _)]).unwrap();
775
776 for i in 0..5 {
778 let event = Event::new(i64::from(i), batch.clone());
779 reactor.submit(event).unwrap();
780 }
781
782 reactor.poll();
784 assert_eq!(reactor.events_processed(), 2);
785 assert_eq!(reactor.queue_size(), 3);
786
787 reactor.poll();
789 assert_eq!(reactor.events_processed(), 4);
790 assert_eq!(reactor.queue_size(), 1);
791
792 reactor.poll();
794 assert_eq!(reactor.events_processed(), 5);
795 assert_eq!(reactor.queue_size(), 0);
796 }
797
798 #[test]
799 fn test_reactor_with_sink() {
800 let config = ReactorConfig::default();
801 let mut reactor = Reactor::new(config).unwrap();
802
803 let sink = Box::new(BufferingSink::new());
805 reactor.set_sink(sink);
806
807 reactor.add_operator(Box::new(PassthroughOperator));
809
810 let array = Arc::new(Int64Array::from(vec![42]));
811 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
812 let event = Event::new(1000, batch);
813
814 reactor.submit(event).unwrap();
816
817 let outputs = reactor.poll();
819 assert!(!outputs.is_empty());
821
822 assert!(outputs.iter().any(|o| matches!(o, Output::Event(_))));
824 }
825
826 #[test]
827 fn test_reactor_shutdown() {
828 let config = ReactorConfig::default();
829 let mut reactor = Reactor::new(config).unwrap();
830
831 let shutdown_handle = reactor.shutdown_handle();
833 assert!(!shutdown_handle.load(Ordering::Relaxed));
834
835 let array = Arc::new(Int64Array::from(vec![1]));
836 let batch = RecordBatch::try_from_iter(vec![("col", array as _)]).unwrap();
837
838 for i in 0..5 {
840 reactor.submit(Event::new(i * 1000, batch.clone())).unwrap();
841 }
842
843 reactor.shutdown().unwrap();
845 assert!(shutdown_handle.load(Ordering::Relaxed));
846 assert_eq!(reactor.queue_size(), 0);
847 }
848}