1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31use std::sync::Arc;
32use std::thread::{self, JoinHandle};
33
34#[cfg(all(target_os = "linux", feature = "io-uring"))]
35use crate::io_uring::{CoreRingManager, IoUringConfig};
36
37use crate::alloc::HotPathGuard;
38use crate::budget::TaskBudget;
39use crate::numa::{NumaAllocator, NumaTopology};
40use crate::operator::{Event, Operator, Output};
41use crate::reactor::{Reactor, ReactorConfig};
42
43use super::backpressure::{
44 BackpressureConfig, CreditAcquireResult, CreditGate, CreditMetrics, OverflowStrategy,
45};
46use super::spsc::SpscQueue;
47use super::TpcError;
48
49#[derive(Debug)]
51pub enum CoreMessage {
52 Event(Event),
54 Watermark(i64),
56 CheckpointRequest(u64),
58 Shutdown,
60}
61
62#[derive(Debug, Clone)]
64pub struct CoreConfig {
65 pub core_id: usize,
67 pub cpu_affinity: Option<usize>,
69 pub inbox_capacity: usize,
71 pub outbox_capacity: usize,
73 pub reactor_config: ReactorConfig,
75 pub backpressure: BackpressureConfig,
77 pub numa_aware: bool,
79 #[cfg(all(target_os = "linux", feature = "io-uring"))]
81 pub io_uring_config: Option<IoUringConfig>,
82}
83
84impl Default for CoreConfig {
85 fn default() -> Self {
86 Self {
87 core_id: 0,
88 cpu_affinity: None,
89 inbox_capacity: 65536,
90 outbox_capacity: 65536,
91 reactor_config: ReactorConfig::default(),
92 backpressure: BackpressureConfig::default(),
93 numa_aware: false,
94 #[cfg(all(target_os = "linux", feature = "io-uring"))]
95 io_uring_config: None,
96 }
97 }
98}
99
100pub struct CoreHandle {
105 core_id: usize,
107 numa_node: usize,
109 inbox: Arc<SpscQueue<CoreMessage>>,
111 outbox: Arc<SpscQueue<Output>>,
113 credit_gate: Arc<CreditGate>,
115 thread: Option<JoinHandle<Result<(), TpcError>>>,
117 shutdown: Arc<AtomicBool>,
119 events_processed: Arc<AtomicU64>,
121 outputs_dropped: Arc<AtomicU64>,
123 is_running: Arc<AtomicBool>,
125}
126
127impl CoreHandle {
128 pub fn spawn(config: CoreConfig) -> Result<Self, TpcError> {
134 Self::spawn_with_operators(config, Vec::new())
135 }
136
137 #[allow(clippy::needless_pass_by_value)]
143 pub fn spawn_with_operators(
144 config: CoreConfig,
145 operators: Vec<Box<dyn Operator>>,
146 ) -> Result<Self, TpcError> {
147 let core_id = config.core_id;
148 let cpu_affinity = config.cpu_affinity;
149 let reactor_config = config.reactor_config.clone();
150
151 let topology = NumaTopology::detect();
153 let numa_node =
154 cpu_affinity.map_or_else(|| topology.current_node(), |cpu| topology.node_for_cpu(cpu));
155
156 let inbox = Arc::new(SpscQueue::new(config.inbox_capacity));
157 let outbox = Arc::new(SpscQueue::new(config.outbox_capacity));
158 let credit_gate = Arc::new(CreditGate::new(config.backpressure.clone()));
159 let shutdown = Arc::new(AtomicBool::new(false));
160 let events_processed = Arc::new(AtomicU64::new(0));
161 let outputs_dropped = Arc::new(AtomicU64::new(0));
162 let is_running = Arc::new(AtomicBool::new(false));
163
164 let thread_context = CoreThreadContext {
165 core_id,
166 cpu_affinity,
167 reactor_config,
168 numa_aware: config.numa_aware,
169 numa_node,
170 inbox: Arc::clone(&inbox),
171 outbox: Arc::clone(&outbox),
172 credit_gate: Arc::clone(&credit_gate),
173 shutdown: Arc::clone(&shutdown),
174 events_processed: Arc::clone(&events_processed),
175 outputs_dropped: Arc::clone(&outputs_dropped),
176 is_running: Arc::clone(&is_running),
177 #[cfg(all(target_os = "linux", feature = "io-uring"))]
178 io_uring_config: config.io_uring_config,
179 };
180
181 let thread = thread::Builder::new()
182 .name(format!("laminar-core-{core_id}"))
183 .spawn(move || core_thread_main(&thread_context, operators))
184 .map_err(|e| TpcError::SpawnFailed {
185 core_id,
186 message: e.to_string(),
187 })?;
188
189 while !is_running.load(Ordering::Acquire) {
191 thread::yield_now();
192 }
193
194 Ok(Self {
195 core_id,
196 numa_node,
197 inbox,
198 outbox,
199 credit_gate,
200 thread: Some(thread),
201 shutdown,
202 events_processed,
203 outputs_dropped,
204 is_running,
205 })
206 }
207
208 #[must_use]
210 pub fn core_id(&self) -> usize {
211 self.core_id
212 }
213
214 #[must_use]
216 pub fn numa_node(&self) -> usize {
217 self.numa_node
218 }
219
220 #[must_use]
222 pub fn is_running(&self) -> bool {
223 self.is_running.load(Ordering::Acquire)
224 }
225
226 #[must_use]
228 pub fn events_processed(&self) -> u64 {
229 self.events_processed.load(Ordering::Relaxed)
230 }
231
232 #[must_use]
234 pub fn outputs_dropped(&self) -> u64 {
235 self.outputs_dropped.load(Ordering::Relaxed)
236 }
237
238 pub fn send(&self, message: CoreMessage) -> Result<(), TpcError> {
249 match self.credit_gate.try_acquire() {
251 CreditAcquireResult::Acquired => {
252 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
254 core_id: self.core_id,
255 })
256 }
257 CreditAcquireResult::WouldBlock => {
258 if self.credit_gate.config().overflow_strategy == OverflowStrategy::Block {
260 self.credit_gate.acquire_blocking(1);
262 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
263 core_id: self.core_id,
264 })
265 } else {
266 Err(TpcError::Backpressure {
268 core_id: self.core_id,
269 })
270 }
271 }
272 CreditAcquireResult::Dropped => {
273 Ok(())
275 }
276 }
277 }
278
279 pub fn try_send(&self, message: CoreMessage) -> Result<(), TpcError> {
288 match self.credit_gate.try_acquire() {
289 CreditAcquireResult::Acquired => {
290 self.inbox.push(message).map_err(|_| TpcError::QueueFull {
291 core_id: self.core_id,
292 })
293 }
294 CreditAcquireResult::WouldBlock | CreditAcquireResult::Dropped => {
295 Err(TpcError::Backpressure {
296 core_id: self.core_id,
297 })
298 }
299 }
300 }
301
302 pub fn send_event(&self, event: Event) -> Result<(), TpcError> {
308 self.send(CoreMessage::Event(event))
309 }
310
311 pub fn try_send_event(&self, event: Event) -> Result<(), TpcError> {
317 self.try_send(CoreMessage::Event(event))
318 }
319
320 #[must_use]
329 pub fn poll_outputs(&self, max_count: usize) -> Vec<Output> {
330 self.outbox.pop_batch(max_count)
331 }
332
333 #[inline]
356 pub fn poll_outputs_into(&self, buffer: &mut Vec<Output>, max_count: usize) -> usize {
357 let start_len = buffer.len();
358
359 self.outbox.pop_each(max_count, |output| {
360 buffer.push(output);
361 true
362 });
363
364 buffer.len() - start_len
365 }
366
367 #[inline]
389 pub fn poll_each<F>(&self, max_count: usize, f: F) -> usize
390 where
391 F: FnMut(Output) -> bool,
392 {
393 self.outbox.pop_each(max_count, f)
394 }
395
396 #[must_use]
398 pub fn poll_output(&self) -> Option<Output> {
399 self.outbox.pop()
400 }
401
402 #[must_use]
404 pub fn inbox_len(&self) -> usize {
405 self.inbox.len()
406 }
407
408 #[must_use]
410 pub fn outbox_len(&self) -> usize {
411 self.outbox.len()
412 }
413
414 #[must_use]
416 pub fn is_backpressured(&self) -> bool {
417 self.credit_gate.is_backpressured()
418 }
419
420 #[must_use]
422 pub fn available_credits(&self) -> usize {
423 self.credit_gate.available()
424 }
425
426 #[must_use]
428 pub fn max_credits(&self) -> usize {
429 self.credit_gate.max_credits()
430 }
431
432 #[must_use]
434 pub fn credit_metrics(&self) -> &CreditMetrics {
435 self.credit_gate.metrics()
436 }
437
438 pub fn shutdown(&self) {
440 self.shutdown.store(true, Ordering::Release);
441 let _ = self.inbox.push(CoreMessage::Shutdown);
443 }
444
445 pub fn join(mut self) -> Result<(), TpcError> {
451 if let Some(handle) = self.thread.take() {
452 handle.join().map_err(|_| TpcError::SpawnFailed {
453 core_id: self.core_id,
454 message: "Thread panicked".to_string(),
455 })?
456 } else {
457 Ok(())
458 }
459 }
460
461 pub fn shutdown_and_join(self) -> Result<(), TpcError> {
467 self.shutdown();
468 self.join()
469 }
470}
471
472impl Drop for CoreHandle {
473 fn drop(&mut self) {
474 self.shutdown.store(true, Ordering::Release);
476 let _ = self.inbox.push(CoreMessage::Shutdown);
478
479 if let Some(handle) = self.thread.take() {
481 let _ = handle.join();
482 }
483 }
484}
485
486impl std::fmt::Debug for CoreHandle {
487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488 f.debug_struct("CoreHandle")
489 .field("core_id", &self.core_id)
490 .field("numa_node", &self.numa_node)
491 .field("is_running", &self.is_running())
492 .field("events_processed", &self.events_processed())
493 .field("outputs_dropped", &self.outputs_dropped())
494 .field("inbox_len", &self.inbox_len())
495 .field("outbox_len", &self.outbox_len())
496 .field("available_credits", &self.available_credits())
497 .field("is_backpressured", &self.is_backpressured())
498 .finish_non_exhaustive()
499 }
500}
501
502struct CoreThreadContext {
504 core_id: usize,
505 cpu_affinity: Option<usize>,
506 reactor_config: ReactorConfig,
507 numa_aware: bool,
508 numa_node: usize,
509 inbox: Arc<SpscQueue<CoreMessage>>,
510 outbox: Arc<SpscQueue<Output>>,
511 credit_gate: Arc<CreditGate>,
512 shutdown: Arc<AtomicBool>,
513 events_processed: Arc<AtomicU64>,
514 outputs_dropped: Arc<AtomicU64>,
515 is_running: Arc<AtomicBool>,
516 #[cfg(all(target_os = "linux", feature = "io-uring"))]
517 io_uring_config: Option<IoUringConfig>,
518}
519
520fn init_core_thread(
522 ctx: &CoreThreadContext,
523 operators: Vec<Box<dyn Operator>>,
524) -> Result<Reactor, TpcError> {
525 if let Some(cpu_id) = ctx.cpu_affinity {
527 set_cpu_affinity(ctx.core_id, cpu_id)?;
528 }
529
530 if ctx.numa_aware {
532 tracing::info!(
533 "Core {} starting on NUMA node {}",
534 ctx.core_id,
535 ctx.numa_node
536 );
537 }
538
539 if ctx.numa_aware {
541 let topology = NumaTopology::detect();
542 let _numa_allocator = NumaAllocator::new(&topology);
543 }
544
545 #[cfg(all(target_os = "linux", feature = "io-uring"))]
547 let _ring_manager = if let Some(ref io_uring_config) = ctx.io_uring_config {
548 match CoreRingManager::new(ctx.core_id, io_uring_config) {
549 Ok(manager) => Some(manager),
550 Err(e) => {
551 eprintln!(
552 "Core {}: Failed to initialize io_uring ring: {e}. Falling back to standard I/O.",
553 ctx.core_id
554 );
555 None
556 }
557 }
558 } else {
559 None
560 };
561
562 let mut reactor_config = ctx.reactor_config.clone();
564 reactor_config.cpu_affinity = ctx.cpu_affinity;
565
566 let mut reactor = Reactor::new(reactor_config).map_err(|e| TpcError::ReactorError {
567 core_id: ctx.core_id,
568 source: e,
569 })?;
570
571 for op in operators {
573 reactor.add_operator(op);
574 }
575
576 Ok(reactor)
577}
578
579fn core_thread_main(
581 ctx: &CoreThreadContext,
582 operators: Vec<Box<dyn Operator>>,
583) -> Result<(), TpcError> {
584 let mut reactor = init_core_thread(ctx, operators)?;
585
586 ctx.is_running.store(true, Ordering::Release);
588
589 loop {
591 if ctx.shutdown.load(Ordering::Acquire) {
593 break;
594 }
595
596 let _guard = HotPathGuard::enter("CoreThread::process_inbox");
598
599 let batch_budget = TaskBudget::ring0_batch();
601
602 let mut had_work = false;
604 let mut messages_processed = 0usize;
605
606 while let Some(message) = ctx.inbox.pop() {
607 match message {
608 CoreMessage::Event(event) => {
609 if let Err(e) = reactor.submit(event) {
610 eprintln!("Core {}: Failed to submit event: {e}", ctx.core_id);
611 }
612 messages_processed += 1;
613 had_work = true;
614 }
615 CoreMessage::Watermark(timestamp) => {
616 reactor.advance_watermark(timestamp);
619 messages_processed += 1;
620 had_work = true;
621 }
622 CoreMessage::CheckpointRequest(checkpoint_id) => {
623 let operator_states = reactor.trigger_checkpoint();
626 let checkpoint_output = Output::CheckpointComplete {
627 checkpoint_id,
628 operator_states,
629 };
630 if ctx.outbox.push(checkpoint_output).is_err() {
631 ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
632 }
633 messages_processed += 1;
634 had_work = true;
635 }
636 CoreMessage::Shutdown => {
637 if messages_processed > 0 {
639 ctx.credit_gate.release(messages_processed);
640 }
641 break;
642 }
643 }
644
645 if batch_budget.almost_exceeded() {
648 break;
649 }
650 }
651
652 if messages_processed > 0 {
655 ctx.credit_gate.release(messages_processed);
656 }
657
658 let outputs = reactor.poll();
660 ctx.events_processed
661 .fetch_add(outputs.len() as u64, Ordering::Relaxed);
662
663 for output in outputs {
665 if ctx.outbox.push(output).is_err() {
666 ctx.outputs_dropped.fetch_add(1, Ordering::Relaxed);
667 }
668 had_work = true;
669 }
670
671 if !had_work {
673 thread::yield_now();
674 }
675 }
676
677 let outputs = reactor.poll();
679 for output in outputs {
680 let _ = ctx.outbox.push(output);
681 }
682
683 ctx.is_running.store(false, Ordering::Release);
684 Ok(())
685}
686
687fn set_cpu_affinity(core_id: usize, cpu_id: usize) -> Result<(), TpcError> {
689 #[cfg(target_os = "linux")]
690 {
691 use libc::{cpu_set_t, sched_setaffinity, CPU_SET, CPU_ZERO};
692 use std::mem;
693
694 #[allow(unsafe_code)]
698 unsafe {
699 let mut set: cpu_set_t = mem::zeroed();
700 CPU_ZERO(&mut set);
701 CPU_SET(cpu_id, &mut set);
702
703 let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &raw const set);
704 if result != 0 {
705 return Err(TpcError::AffinityFailed {
706 core_id,
707 message: format!(
708 "sched_setaffinity failed: {}",
709 std::io::Error::last_os_error()
710 ),
711 });
712 }
713 }
714 }
715
716 #[cfg(target_os = "windows")]
717 {
718 use winapi::shared::basetsd::DWORD_PTR;
719 use winapi::um::processthreadsapi::GetCurrentThread;
720 use winapi::um::winbase::SetThreadAffinityMask;
721
722 #[allow(unsafe_code)]
726 unsafe {
727 let mask: DWORD_PTR = 1 << cpu_id;
728 let result = SetThreadAffinityMask(GetCurrentThread(), mask);
729 if result == 0 {
730 return Err(TpcError::AffinityFailed {
731 core_id,
732 message: format!(
733 "SetThreadAffinityMask failed: {}",
734 std::io::Error::last_os_error()
735 ),
736 });
737 }
738 }
739 }
740
741 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
742 {
743 let _ = (core_id, cpu_id);
744 }
746
747 Ok(())
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use crate::operator::{OperatorState, OutputVec, Timer};
754 use arrow_array::{Int64Array, RecordBatch};
755 use std::sync::Arc;
756 use std::time::Duration;
757
758 struct PassthroughOperator;
760
761 impl Operator for PassthroughOperator {
762 fn process(
763 &mut self,
764 event: &Event,
765 _ctx: &mut crate::operator::OperatorContext,
766 ) -> OutputVec {
767 let mut output = OutputVec::new();
768 output.push(Output::Event(event.clone()));
769 output
770 }
771
772 fn on_timer(
773 &mut self,
774 _timer: Timer,
775 _ctx: &mut crate::operator::OperatorContext,
776 ) -> OutputVec {
777 OutputVec::new()
778 }
779
780 fn checkpoint(&self) -> OperatorState {
781 OperatorState {
782 operator_id: "passthrough".to_string(),
783 data: vec![],
784 }
785 }
786
787 fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
788 Ok(())
789 }
790 }
791
792 fn make_event(value: i64) -> Event {
793 let array = Arc::new(Int64Array::from(vec![value]));
794 let batch = RecordBatch::try_from_iter(vec![("value", array as _)]).unwrap();
795 Event::new(value, batch)
796 }
797
798 #[test]
799 fn test_core_handle_spawn() {
800 let config = CoreConfig {
801 core_id: 0,
802 cpu_affinity: None, inbox_capacity: 1024,
804 outbox_capacity: 1024,
805 reactor_config: ReactorConfig::default(),
806 backpressure: super::BackpressureConfig::default(),
807 numa_aware: false,
808 #[cfg(all(target_os = "linux", feature = "io-uring"))]
809 io_uring_config: None,
810 };
811
812 let handle = CoreHandle::spawn(config).unwrap();
813 assert!(handle.is_running());
814 assert_eq!(handle.core_id(), 0);
815
816 handle.shutdown_and_join().unwrap();
817 }
818
819 #[test]
820 fn test_core_handle_with_operator() {
821 let config = CoreConfig {
822 core_id: 0,
823 cpu_affinity: None,
824 inbox_capacity: 1024,
825 outbox_capacity: 1024,
826 reactor_config: ReactorConfig::default(),
827 backpressure: super::BackpressureConfig::default(),
828 numa_aware: false,
829 #[cfg(all(target_os = "linux", feature = "io-uring"))]
830 io_uring_config: None,
831 };
832
833 let handle =
834 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
835
836 let event = make_event(42);
838 handle.send_event(event).unwrap();
839
840 thread::sleep(Duration::from_millis(50));
842
843 let outputs = handle.poll_outputs(10);
845 assert!(!outputs.is_empty());
846
847 handle.shutdown_and_join().unwrap();
848 }
849
850 #[test]
851 fn test_core_handle_multiple_events() {
852 let config = CoreConfig {
853 core_id: 1,
854 cpu_affinity: None,
855 inbox_capacity: 1024,
856 outbox_capacity: 1024,
857 reactor_config: ReactorConfig::default(),
858 backpressure: super::BackpressureConfig::default(),
859 numa_aware: false,
860 #[cfg(all(target_os = "linux", feature = "io-uring"))]
861 io_uring_config: None,
862 };
863
864 let handle =
865 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
866
867 for i in 0..100 {
869 handle.send_event(make_event(i)).unwrap();
870 }
871
872 thread::sleep(Duration::from_millis(100));
874
875 let mut total_outputs = 0;
877 loop {
878 let outputs = handle.poll_outputs(1000);
879 if outputs.is_empty() {
880 break;
881 }
882 total_outputs += outputs.len();
883 }
884
885 assert!(total_outputs >= 100);
887
888 handle.shutdown_and_join().unwrap();
889 }
890
891 #[test]
892 fn test_core_handle_shutdown() {
893 let config = CoreConfig::default();
894 let handle = CoreHandle::spawn(config).unwrap();
895
896 assert!(handle.is_running());
897
898 handle.shutdown();
899
900 thread::sleep(Duration::from_millis(100));
902
903 assert!(!handle.is_running());
905 }
906
907 #[test]
908 fn test_core_handle_debug() {
909 let config = CoreConfig {
910 core_id: 42,
911 ..Default::default()
912 };
913 let handle = CoreHandle::spawn(config).unwrap();
914
915 let debug_str = format!("{handle:?}");
916 assert!(debug_str.contains("CoreHandle"));
917 assert!(debug_str.contains("42"));
918
919 handle.shutdown_and_join().unwrap();
920 }
921
922 #[test]
923 fn test_core_config_default() {
924 let config = CoreConfig::default();
925 assert_eq!(config.core_id, 0);
926 assert!(config.cpu_affinity.is_none());
927 assert_eq!(config.inbox_capacity, 65536);
928 assert_eq!(config.outbox_capacity, 65536);
929 assert!(!config.numa_aware);
930 }
931
932 #[test]
933 fn test_core_handle_numa_node() {
934 let config = CoreConfig {
935 core_id: 0,
936 cpu_affinity: None,
937 numa_aware: true,
938 ..Default::default()
939 };
940
941 let handle = CoreHandle::spawn(config).unwrap();
942 assert!(handle.numa_node() < 64);
944
945 handle.shutdown_and_join().unwrap();
946 }
947
948 #[test]
949 fn test_poll_outputs_into() {
950 let config = CoreConfig {
951 core_id: 0,
952 cpu_affinity: None,
953 inbox_capacity: 1024,
954 outbox_capacity: 1024,
955 reactor_config: ReactorConfig::default(),
956 backpressure: super::BackpressureConfig::default(),
957 numa_aware: false,
958 #[cfg(all(target_os = "linux", feature = "io-uring"))]
959 io_uring_config: None,
960 };
961
962 let handle =
963 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
964
965 for i in 0..10 {
967 handle.send_event(make_event(i)).unwrap();
968 }
969
970 thread::sleep(Duration::from_millis(100));
972
973 let mut buffer = Vec::with_capacity(100);
975 let count = handle.poll_outputs_into(&mut buffer, 100);
976
977 assert!(count > 0);
978 assert_eq!(buffer.len(), count);
979
980 let cap_before = buffer.capacity();
982 buffer.clear();
983 let _ = handle.poll_outputs_into(&mut buffer, 100);
984 assert_eq!(buffer.capacity(), cap_before); handle.shutdown_and_join().unwrap();
987 }
988
989 #[test]
990 fn test_poll_each() {
991 let config = CoreConfig {
992 core_id: 0,
993 cpu_affinity: None,
994 inbox_capacity: 1024,
995 outbox_capacity: 1024,
996 reactor_config: ReactorConfig::default(),
997 backpressure: super::BackpressureConfig::default(),
998 numa_aware: false,
999 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1000 io_uring_config: None,
1001 };
1002
1003 let handle =
1004 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1005
1006 for i in 0..10 {
1008 handle.send_event(make_event(i)).unwrap();
1009 }
1010
1011 thread::sleep(Duration::from_millis(100));
1013
1014 let mut event_count = 0;
1016 let count = handle.poll_each(100, |output| {
1017 if matches!(output, Output::Event(_)) {
1018 event_count += 1;
1019 }
1020 true
1021 });
1022
1023 assert!(count > 0);
1024 assert!(event_count > 0);
1025
1026 handle.shutdown_and_join().unwrap();
1027 }
1028
1029 #[test]
1030 fn test_poll_each_early_stop() {
1031 let config = CoreConfig {
1032 core_id: 0,
1033 cpu_affinity: None,
1034 inbox_capacity: 1024,
1035 outbox_capacity: 1024,
1036 reactor_config: ReactorConfig::default(),
1037 backpressure: super::BackpressureConfig::default(),
1038 numa_aware: false,
1039 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1040 io_uring_config: None,
1041 };
1042
1043 let handle =
1044 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1045
1046 for i in 0..20 {
1048 handle.send_event(make_event(i)).unwrap();
1049 }
1050
1051 thread::sleep(Duration::from_millis(100));
1053
1054 let mut processed = 0;
1056 let count = handle.poll_each(100, |_| {
1057 processed += 1;
1058 processed < 5 });
1060
1061 assert_eq!(count, 5);
1062 assert_eq!(processed, 5);
1063
1064 let remaining = handle.outbox_len();
1066 assert!(remaining > 0);
1067
1068 handle.shutdown_and_join().unwrap();
1069 }
1070
1071 #[test]
1072 fn test_watermark_propagation() {
1073 let config = CoreConfig {
1074 core_id: 0,
1075 cpu_affinity: None,
1076 inbox_capacity: 1024,
1077 outbox_capacity: 1024,
1078 reactor_config: ReactorConfig::default(),
1079 backpressure: super::BackpressureConfig::default(),
1080 numa_aware: false,
1081 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1082 io_uring_config: None,
1083 };
1084
1085 let handle =
1086 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1087
1088 handle.send(CoreMessage::Watermark(5000)).unwrap();
1090
1091 thread::sleep(Duration::from_millis(50));
1093
1094 let outputs = handle.poll_outputs(100);
1096 let has_watermark = outputs.iter().any(|o| matches!(o, Output::Watermark(_)));
1097 assert!(
1098 has_watermark,
1099 "Expected watermark output after Watermark message"
1100 );
1101
1102 handle.shutdown_and_join().unwrap();
1103 }
1104
1105 #[test]
1106 fn test_checkpoint_triggering() {
1107 let config = CoreConfig {
1108 core_id: 0,
1109 cpu_affinity: None,
1110 inbox_capacity: 1024,
1111 outbox_capacity: 1024,
1112 reactor_config: ReactorConfig::default(),
1113 backpressure: super::BackpressureConfig::default(),
1114 numa_aware: false,
1115 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1116 io_uring_config: None,
1117 };
1118
1119 let handle =
1120 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1121
1122 handle.send(CoreMessage::CheckpointRequest(42)).unwrap();
1124
1125 thread::sleep(Duration::from_millis(50));
1127
1128 let outputs = handle.poll_outputs(100);
1130 let checkpoint = outputs
1131 .iter()
1132 .find(|o| matches!(o, Output::CheckpointComplete { .. }));
1133 assert!(checkpoint.is_some(), "Expected CheckpointComplete output");
1134
1135 if let Some(Output::CheckpointComplete {
1136 checkpoint_id,
1137 operator_states,
1138 }) = checkpoint
1139 {
1140 assert_eq!(*checkpoint_id, 42);
1141 assert_eq!(operator_states.len(), 1);
1143 assert_eq!(operator_states[0].operator_id, "passthrough");
1144 }
1145
1146 handle.shutdown_and_join().unwrap();
1147 }
1148
1149 #[test]
1150 fn test_outputs_dropped_counter() {
1151 let config = CoreConfig {
1152 core_id: 0,
1153 cpu_affinity: None,
1154 inbox_capacity: 1024,
1155 outbox_capacity: 4, reactor_config: ReactorConfig::default(),
1157 backpressure: super::BackpressureConfig::default(),
1158 numa_aware: false,
1159 #[cfg(all(target_os = "linux", feature = "io-uring"))]
1160 io_uring_config: None,
1161 };
1162
1163 let handle =
1164 CoreHandle::spawn_with_operators(config, vec![Box::new(PassthroughOperator)]).unwrap();
1165
1166 for i in 0..100 {
1168 let _ = handle.send_event(make_event(i));
1169 }
1170
1171 thread::sleep(Duration::from_millis(200));
1173
1174 let dropped = handle.outputs_dropped();
1176 assert!(
1177 dropped > 0,
1178 "Expected some outputs to be dropped with outbox_capacity=4"
1179 );
1180
1181 handle.shutdown_and_join().unwrap();
1182 }
1183}