1use std::ops::Deref;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34
35use crate::operator::{Event, Operator, Output};
36use crate::reactor::ReactorConfig;
37
38#[derive(Debug)]
63pub struct OutputBuffer {
64 items: Vec<Output>,
66}
67
68impl OutputBuffer {
69 #[must_use]
73 pub fn with_capacity(capacity: usize) -> Self {
74 Self {
75 items: Vec::with_capacity(capacity),
76 }
77 }
78
79 #[inline]
83 pub fn clear(&mut self) {
84 self.items.clear();
85 }
86
87 #[inline]
89 #[must_use]
90 pub fn len(&self) -> usize {
91 self.items.len()
92 }
93
94 #[inline]
96 #[must_use]
97 pub fn is_empty(&self) -> bool {
98 self.items.is_empty()
99 }
100
101 #[inline]
103 #[must_use]
104 pub fn capacity(&self) -> usize {
105 self.items.capacity()
106 }
107
108 #[inline]
110 #[must_use]
111 pub fn remaining(&self) -> usize {
112 self.items.capacity() - self.items.len()
113 }
114
115 #[inline]
117 #[must_use]
118 pub fn as_slice(&self) -> &[Output] {
119 &self.items
120 }
121
122 #[inline]
124 pub fn iter(&self) -> impl Iterator<Item = &Output> {
125 self.items.iter()
126 }
127
128 #[must_use]
130 pub fn into_vec(self) -> Vec<Output> {
131 self.items
132 }
133
134 #[inline]
139 pub fn extend<I: IntoIterator<Item = Output>>(&mut self, iter: I) {
140 self.items.extend(iter);
141 }
142
143 #[inline]
147 pub fn push(&mut self, output: Output) {
148 self.items.push(output);
149 }
150
151 #[inline]
155 pub fn as_vec_mut(&mut self) -> &mut Vec<Output> {
156 &mut self.items
157 }
158}
159
160impl Default for OutputBuffer {
161 fn default() -> Self {
162 Self::with_capacity(1024)
163 }
164}
165
166impl Deref for OutputBuffer {
167 type Target = [Output];
168
169 fn deref(&self) -> &Self::Target {
170 &self.items
171 }
172}
173
174impl<'a> IntoIterator for &'a OutputBuffer {
175 type Item = &'a Output;
176 type IntoIter = std::slice::Iter<'a, Output>;
177
178 fn into_iter(self) -> Self::IntoIter {
179 self.items.iter()
180 }
181}
182
183impl IntoIterator for OutputBuffer {
184 type Item = Output;
185 type IntoIter = std::vec::IntoIter<Output>;
186
187 fn into_iter(self) -> Self::IntoIter {
188 self.items.into_iter()
189 }
190}
191
192use super::core_handle::{CoreConfig, CoreHandle};
193use super::router::{KeyRouter, KeySpec};
194use super::TpcError;
195
196#[derive(Debug, Clone)]
198pub struct TpcConfig {
199 pub num_cores: usize,
201 pub key_spec: KeySpec,
203 pub cpu_pinning: bool,
205 pub cpu_start: usize,
207 pub inbox_capacity: usize,
209 pub outbox_capacity: usize,
211 pub reactor_config: ReactorConfig,
213 pub numa_aware: bool,
215}
216
217impl Default for TpcConfig {
218 fn default() -> Self {
219 Self {
220 num_cores: num_cpus::get(),
221 key_spec: KeySpec::RoundRobin,
222 cpu_pinning: false,
223 cpu_start: 0,
224 inbox_capacity: 65536,
225 outbox_capacity: 65536,
226 reactor_config: ReactorConfig::default(),
227 numa_aware: false,
228 }
229 }
230}
231
232impl TpcConfig {
233 #[must_use]
235 pub fn builder() -> TpcConfigBuilder {
236 TpcConfigBuilder::default()
237 }
238
239 #[must_use]
255 pub fn auto() -> Self {
256 let caps = crate::detect::SystemCapabilities::detect();
257 let recommended = caps.recommended_config();
258
259 Self {
260 num_cores: recommended.num_cores,
261 key_spec: KeySpec::RoundRobin,
262 cpu_pinning: recommended.cpu_pinning,
263 cpu_start: 0,
264 inbox_capacity: recommended.queue_capacity,
265 outbox_capacity: recommended.queue_capacity,
266 reactor_config: ReactorConfig::default(),
267 numa_aware: recommended.numa_aware,
268 }
269 }
270
271 pub fn validate(&self) -> Result<(), TpcError> {
277 if self.num_cores == 0 {
278 return Err(TpcError::InvalidConfig("num_cores must be > 0".to_string()));
279 }
280 if self.inbox_capacity == 0 {
281 return Err(TpcError::InvalidConfig(
282 "inbox_capacity must be > 0".to_string(),
283 ));
284 }
285 if self.outbox_capacity == 0 {
286 return Err(TpcError::InvalidConfig(
287 "outbox_capacity must be > 0".to_string(),
288 ));
289 }
290 Ok(())
291 }
292}
293
294#[derive(Debug, Default)]
296pub struct TpcConfigBuilder {
297 num_cores: Option<usize>,
298 key_spec: Option<KeySpec>,
299 cpu_pinning: Option<bool>,
300 cpu_start: Option<usize>,
301 inbox_capacity: Option<usize>,
302 outbox_capacity: Option<usize>,
303 reactor_config: Option<ReactorConfig>,
304 numa_aware: Option<bool>,
305}
306
307impl TpcConfigBuilder {
308 #[must_use]
310 pub fn num_cores(mut self, n: usize) -> Self {
311 self.num_cores = Some(n);
312 self
313 }
314
315 #[must_use]
317 pub fn key_spec(mut self, spec: KeySpec) -> Self {
318 self.key_spec = Some(spec);
319 self
320 }
321
322 #[must_use]
324 pub fn key_columns(self, columns: Vec<String>) -> Self {
325 self.key_spec(KeySpec::Columns(columns))
326 }
327
328 #[must_use]
330 pub fn cpu_pinning(mut self, enabled: bool) -> Self {
331 self.cpu_pinning = Some(enabled);
332 self
333 }
334
335 #[must_use]
337 pub fn cpu_start(mut self, cpu: usize) -> Self {
338 self.cpu_start = Some(cpu);
339 self
340 }
341
342 #[must_use]
344 pub fn inbox_capacity(mut self, capacity: usize) -> Self {
345 self.inbox_capacity = Some(capacity);
346 self
347 }
348
349 #[must_use]
351 pub fn outbox_capacity(mut self, capacity: usize) -> Self {
352 self.outbox_capacity = Some(capacity);
353 self
354 }
355
356 #[must_use]
358 pub fn reactor_config(mut self, config: ReactorConfig) -> Self {
359 self.reactor_config = Some(config);
360 self
361 }
362
363 #[must_use]
368 pub fn numa_aware(mut self, enabled: bool) -> Self {
369 self.numa_aware = Some(enabled);
370 self
371 }
372
373 pub fn build(self) -> Result<TpcConfig, TpcError> {
379 let config = TpcConfig {
380 num_cores: self.num_cores.unwrap_or_else(num_cpus::get),
381 key_spec: self.key_spec.unwrap_or_default(),
382 cpu_pinning: self.cpu_pinning.unwrap_or(false),
383 cpu_start: self.cpu_start.unwrap_or(0),
384 inbox_capacity: self.inbox_capacity.unwrap_or(65536),
385 outbox_capacity: self.outbox_capacity.unwrap_or(65536),
386 reactor_config: self.reactor_config.unwrap_or_default(),
387 numa_aware: self.numa_aware.unwrap_or(false),
388 };
389 config.validate()?;
390 Ok(config)
391 }
392}
393
394pub trait OperatorFactory: Send {
399 fn create(&self, core_id: usize) -> Vec<Box<dyn Operator>>;
401}
402
403impl<F> OperatorFactory for F
404where
405 F: Fn(usize) -> Vec<Box<dyn Operator>> + Send,
406{
407 fn create(&self, core_id: usize) -> Vec<Box<dyn Operator>> {
408 self(core_id)
409 }
410}
411
412pub struct ThreadPerCoreRuntime {
417 cores: Vec<CoreHandle>,
419 router: KeyRouter,
421 config: TpcConfig,
423 is_running: Arc<AtomicBool>,
425}
426
427impl ThreadPerCoreRuntime {
428 pub fn new(config: TpcConfig) -> Result<Self, TpcError> {
434 config.validate()?;
435 Self::new_with_factory(config, &|_| Vec::new())
436 }
437
438 pub fn new_with_factory<F>(config: TpcConfig, factory: &F) -> Result<Self, TpcError>
446 where
447 F: OperatorFactory,
448 {
449 config.validate()?;
450
451 let router = KeyRouter::new(config.num_cores, config.key_spec.clone());
452 let is_running = Arc::new(AtomicBool::new(true));
453
454 let mut cores = Vec::with_capacity(config.num_cores);
455
456 for core_id in 0..config.num_cores {
457 let cpu_affinity = if config.cpu_pinning {
458 Some(config.cpu_start + core_id)
459 } else {
460 None
461 };
462
463 let core_config = CoreConfig {
464 core_id,
465 cpu_affinity,
466 inbox_capacity: config.inbox_capacity,
467 outbox_capacity: config.outbox_capacity,
468 reactor_config: config.reactor_config.clone(),
469 backpressure: super::backpressure::BackpressureConfig::default(),
470 numa_aware: config.numa_aware,
471 #[cfg(all(target_os = "linux", feature = "io-uring"))]
472 io_uring_config: None,
473 };
474
475 let operators = factory.create(core_id);
476 let handle = CoreHandle::spawn_with_operators(core_config, operators)?;
477 cores.push(handle);
478 }
479
480 Ok(Self {
481 cores,
482 router,
483 config,
484 is_running,
485 })
486 }
487
488 #[must_use]
490 pub fn num_cores(&self) -> usize {
491 self.cores.len()
492 }
493
494 #[must_use]
496 pub fn is_running(&self) -> bool {
497 self.is_running.load(Ordering::Acquire)
498 }
499
500 pub fn submit(&self, event: Event) -> Result<(), TpcError> {
508 if !self.is_running() {
509 return Err(TpcError::NotRunning);
510 }
511
512 let core_id = self.router.route(&event)?;
513 self.cores[core_id].send_event(event)
514 }
515
516 pub fn submit_to_core(&self, core_id: usize, event: Event) -> Result<(), TpcError> {
524 if !self.is_running() {
525 return Err(TpcError::NotRunning);
526 }
527 if core_id >= self.cores.len() {
528 return Err(TpcError::InvalidConfig(format!(
529 "core_id {} out of range (0..{})",
530 core_id,
531 self.cores.len()
532 )));
533 }
534 self.cores[core_id].send_event(event)
535 }
536
537 pub fn submit_batch(&self, events: Vec<Event>) -> (usize, Option<TpcError>) {
546 if !self.is_running() {
547 return (0, Some(TpcError::NotRunning));
548 }
549
550 let mut submitted = 0;
551 for event in events {
552 match self.submit(event) {
553 Ok(()) => submitted += 1,
554 Err(e) => return (submitted, Some(e)),
555 }
556 }
557 (submitted, None)
558 }
559
560 #[must_use]
569 pub fn poll(&self) -> Vec<Output> {
570 let mut outputs = Vec::new();
571 for core in &self.cores {
572 outputs.extend(core.poll_outputs(1024));
573 }
574 outputs
575 }
576
577 pub fn poll_into(&self, buffer: &mut OutputBuffer, max_per_core: usize) -> usize {
607 let start_len = buffer.len();
608
609 for core in &self.cores {
610 let remaining = buffer.remaining();
612 if remaining == 0 {
613 break;
614 }
615
616 let max = max_per_core.min(remaining);
617 core.poll_outputs_into(buffer.as_vec_mut(), max);
618 }
619
620 buffer.len() - start_len
621 }
622
623 pub fn poll_each<F>(&self, max_per_core: usize, mut f: F) -> usize
661 where
662 F: FnMut(Output) -> bool,
663 {
664 let mut total = 0;
665 let mut should_continue = true;
666
667 for core in &self.cores {
668 if !should_continue {
669 break;
670 }
671
672 let count = core.poll_each(max_per_core, |output| {
673 let result = f(output);
674 if !result {
675 should_continue = false;
676 }
677 result
678 });
679
680 total += count;
681 }
682
683 total
684 }
685
686 #[must_use]
693 pub fn poll_core(&self, core_id: usize) -> Vec<Output> {
694 if core_id < self.cores.len() {
695 self.cores[core_id].poll_outputs(1024)
696 } else {
697 Vec::new()
698 }
699 }
700
701 pub fn poll_core_into(
705 &self,
706 core_id: usize,
707 buffer: &mut OutputBuffer,
708 max_count: usize,
709 ) -> usize {
710 if core_id < self.cores.len() {
711 self.cores[core_id].poll_outputs_into(buffer.as_vec_mut(), max_count)
712 } else {
713 0
714 }
715 }
716
717 pub fn poll_core_each<F>(&self, core_id: usize, max_count: usize, f: F) -> usize
721 where
722 F: FnMut(Output) -> bool,
723 {
724 if core_id < self.cores.len() {
725 self.cores[core_id].poll_each(max_count, f)
726 } else {
727 0
728 }
729 }
730
731 #[must_use]
733 pub fn stats(&self) -> RuntimeStats {
734 let core_stats: Vec<CoreStats> = self
735 .cores
736 .iter()
737 .map(|core| CoreStats {
738 core_id: core.core_id(),
739 numa_node: core.numa_node(),
740 events_processed: core.events_processed(),
741 inbox_len: core.inbox_len(),
742 outbox_len: core.outbox_len(),
743 is_running: core.is_running(),
744 })
745 .collect();
746
747 RuntimeStats {
748 num_cores: self.cores.len(),
749 total_events_processed: core_stats.iter().map(|s| s.events_processed).sum(),
750 cores: core_stats,
751 }
752 }
753
754 #[must_use]
756 pub fn router(&self) -> &KeyRouter {
757 &self.router
758 }
759
760 pub fn shutdown(mut self) -> Result<(), TpcError> {
768 self.is_running.store(false, Ordering::Release);
769
770 for core in &self.cores {
772 core.shutdown();
773 }
774
775 let cores = std::mem::take(&mut self.cores);
777 for core in cores {
778 core.join()?;
779 }
780
781 Ok(())
782 }
783
784 pub fn run_with_handler<F>(&self, mut handler: F, shutdown: &AtomicBool)
794 where
795 F: FnMut(Vec<Output>),
796 {
797 while !shutdown.load(Ordering::Acquire) && self.is_running() {
798 let outputs = self.poll();
799 if outputs.is_empty() {
800 std::thread::yield_now();
801 } else {
802 handler(outputs);
803 }
804 }
805 }
806}
807
808impl Drop for ThreadPerCoreRuntime {
809 fn drop(&mut self) {
810 self.is_running.store(false, Ordering::Release);
811
812 }
814}
815
816impl std::fmt::Debug for ThreadPerCoreRuntime {
817 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
818 f.debug_struct("ThreadPerCoreRuntime")
819 .field("num_cores", &self.cores.len())
820 .field("is_running", &self.is_running())
821 .field("config", &self.config)
822 .finish_non_exhaustive()
823 }
824}
825
826#[derive(Debug, Clone)]
828pub struct RuntimeStats {
829 pub num_cores: usize,
831 pub total_events_processed: u64,
833 pub cores: Vec<CoreStats>,
835}
836
837#[derive(Debug, Clone)]
839pub struct CoreStats {
840 pub core_id: usize,
842 pub numa_node: usize,
844 pub events_processed: u64,
846 pub inbox_len: usize,
848 pub outbox_len: usize,
850 pub is_running: bool,
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use crate::operator::{OperatorState, OutputVec, Timer};
858 use arrow_array::{Int64Array, RecordBatch};
859 use std::sync::Arc;
860 use std::time::Duration;
861
862 struct PassthroughOperator {
864 #[allow(dead_code)]
865 core_id: usize,
866 }
867
868 impl Operator for PassthroughOperator {
869 fn process(
870 &mut self,
871 event: &Event,
872 _ctx: &mut crate::operator::OperatorContext,
873 ) -> OutputVec {
874 let mut output = OutputVec::new();
875 output.push(Output::Event(event.clone()));
876 output
877 }
878
879 fn on_timer(
880 &mut self,
881 _timer: Timer,
882 _ctx: &mut crate::operator::OperatorContext,
883 ) -> OutputVec {
884 OutputVec::new()
885 }
886
887 fn checkpoint(&self) -> OperatorState {
888 OperatorState {
889 operator_id: "passthrough".to_string(),
890 data: vec![],
891 }
892 }
893
894 fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
895 Ok(())
896 }
897 }
898
899 fn make_event(user_id: i64, timestamp: i64) -> Event {
900 let user_ids = Arc::new(Int64Array::from(vec![user_id]));
901 let batch = RecordBatch::try_from_iter(vec![("user_id", user_ids as _)]).unwrap();
902 Event::new(timestamp, batch)
903 }
904
905 #[test]
906 fn test_config_builder() {
907 let config = TpcConfig::builder()
908 .num_cores(4)
909 .key_columns(vec!["user_id".to_string()])
910 .cpu_pinning(false)
911 .inbox_capacity(1024)
912 .build()
913 .unwrap();
914
915 assert_eq!(config.num_cores, 4);
916 assert!(!config.cpu_pinning);
917 assert_eq!(config.inbox_capacity, 1024);
918 }
919
920 #[test]
921 fn test_config_validation() {
922 let result = TpcConfig::builder().num_cores(0).build();
924 assert!(result.is_err());
925
926 let result = TpcConfig::builder().inbox_capacity(0).build();
928 assert!(result.is_err());
929 }
930
931 #[test]
932 fn test_runtime_creation() {
933 let config = TpcConfig::builder()
934 .num_cores(2)
935 .cpu_pinning(false)
936 .build()
937 .unwrap();
938
939 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
940 assert_eq!(runtime.num_cores(), 2);
941 assert!(runtime.is_running());
942
943 runtime.shutdown().unwrap();
944 }
945
946 #[test]
947 fn test_runtime_with_factory() {
948 let config = TpcConfig::builder()
949 .num_cores(2)
950 .cpu_pinning(false)
951 .build()
952 .unwrap();
953
954 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
955 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
956 })
957 .unwrap();
958
959 assert_eq!(runtime.num_cores(), 2);
960
961 for i in 0..10 {
963 runtime.submit(make_event(i, i * 1000)).unwrap();
964 }
965
966 std::thread::sleep(Duration::from_millis(100));
968
969 let outputs = runtime.poll();
971 assert!(!outputs.is_empty());
972
973 runtime.shutdown().unwrap();
974 }
975
976 #[test]
977 fn test_key_based_routing() {
978 let config = TpcConfig::builder()
979 .num_cores(4)
980 .key_columns(vec!["user_id".to_string()])
981 .cpu_pinning(false)
982 .build()
983 .unwrap();
984
985 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
986
987 let event1 = make_event(100, 1000);
989 let event2 = make_event(100, 2000);
990
991 let core1 = runtime.router().route(&event1).unwrap();
992 let core2 = runtime.router().route(&event2).unwrap();
993
994 assert_eq!(core1, core2);
995 assert!(core1 < 4);
996
997 runtime.shutdown().unwrap();
998 }
999
1000 #[test]
1001 fn test_submit_batch() {
1002 let config = TpcConfig::builder()
1003 .num_cores(2)
1004 .cpu_pinning(false)
1005 .build()
1006 .unwrap();
1007
1008 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1009
1010 let events: Vec<Event> = (0..100).map(|i| make_event(i, i * 1000)).collect();
1011
1012 let (submitted, error) = runtime.submit_batch(events);
1013 assert_eq!(submitted, 100);
1014 assert!(error.is_none());
1015
1016 runtime.shutdown().unwrap();
1017 }
1018
1019 #[test]
1020 fn test_runtime_stats() {
1021 let config = TpcConfig::builder()
1022 .num_cores(2)
1023 .cpu_pinning(false)
1024 .build()
1025 .unwrap();
1026
1027 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1028 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1029 })
1030 .unwrap();
1031
1032 for i in 0..50 {
1034 runtime.submit(make_event(i, i * 1000)).unwrap();
1035 }
1036
1037 std::thread::sleep(Duration::from_millis(100));
1039
1040 let _ = runtime.poll();
1042
1043 let stats = runtime.stats();
1044 assert_eq!(stats.num_cores, 2);
1045 assert!(stats.total_events_processed > 0);
1046
1047 for core_stat in &stats.cores {
1048 assert!(core_stat.is_running);
1049 }
1050
1051 runtime.shutdown().unwrap();
1052 }
1053
1054 #[test]
1055 fn test_submit_to_specific_core() {
1056 let config = TpcConfig::builder()
1057 .num_cores(4)
1058 .cpu_pinning(false)
1059 .build()
1060 .unwrap();
1061
1062 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1063
1064 runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1066 runtime.submit_to_core(1, make_event(2, 2000)).unwrap();
1067 runtime.submit_to_core(2, make_event(3, 3000)).unwrap();
1068 runtime.submit_to_core(3, make_event(4, 4000)).unwrap();
1069
1070 let result = runtime.submit_to_core(10, make_event(5, 5000));
1072 assert!(result.is_err());
1073
1074 runtime.shutdown().unwrap();
1075 }
1076
1077 #[test]
1078 fn test_poll_specific_core() {
1079 let config = TpcConfig::builder()
1080 .num_cores(2)
1081 .cpu_pinning(false)
1082 .build()
1083 .unwrap();
1084
1085 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1086 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1087 })
1088 .unwrap();
1089
1090 runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1092
1093 std::thread::sleep(Duration::from_millis(100));
1095
1096 let outputs = runtime.poll_core(0);
1098 assert!(!outputs.is_empty());
1099
1100 runtime.shutdown().unwrap();
1101 }
1102
1103 #[test]
1104 fn test_runtime_debug() {
1105 let config = TpcConfig::builder()
1106 .num_cores(2)
1107 .cpu_pinning(false)
1108 .build()
1109 .unwrap();
1110
1111 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1112
1113 let debug_str = format!("{runtime:?}");
1114 assert!(debug_str.contains("ThreadPerCoreRuntime"));
1115 assert!(debug_str.contains("num_cores"));
1116
1117 runtime.shutdown().unwrap();
1118 }
1119
1120 #[test]
1121 fn test_shutdown_stops_submission() {
1122 let config = TpcConfig::builder()
1123 .num_cores(2)
1124 .cpu_pinning(false)
1125 .build()
1126 .unwrap();
1127
1128 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1129
1130 drop(runtime);
1132
1133 let config = TpcConfig::builder()
1135 .num_cores(2)
1136 .cpu_pinning(false)
1137 .build()
1138 .unwrap();
1139
1140 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1141 runtime.shutdown().unwrap();
1142 }
1143
1144 #[test]
1145 fn test_numa_aware_config() {
1146 let config = TpcConfig::builder()
1147 .num_cores(2)
1148 .cpu_pinning(false)
1149 .numa_aware(true)
1150 .build()
1151 .unwrap();
1152
1153 assert!(config.numa_aware);
1154
1155 let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1156 assert_eq!(runtime.num_cores(), 2);
1157
1158 let stats = runtime.stats();
1160 for core_stat in &stats.cores {
1161 assert!(core_stat.numa_node < 64);
1163 }
1164
1165 runtime.shutdown().unwrap();
1166 }
1167
1168 #[test]
1169 fn test_output_buffer_basic() {
1170 let mut buffer = OutputBuffer::with_capacity(100);
1171
1172 assert!(buffer.is_empty());
1173 assert_eq!(buffer.len(), 0);
1174 assert_eq!(buffer.capacity(), 100);
1175 assert_eq!(buffer.remaining(), 100);
1176
1177 let event = make_event(1, 1000);
1179 buffer.push(Output::Event(event));
1180
1181 assert!(!buffer.is_empty());
1182 assert_eq!(buffer.len(), 1);
1183 assert_eq!(buffer.remaining(), 99);
1184
1185 buffer.clear();
1187 assert!(buffer.is_empty());
1188 assert_eq!(buffer.capacity(), 100); }
1190
1191 #[test]
1192 fn test_output_buffer_iteration() {
1193 let mut buffer = OutputBuffer::with_capacity(10);
1194
1195 for i in 0..5 {
1196 buffer.push(Output::Event(make_event(i, i * 1000)));
1197 }
1198
1199 let count = buffer.iter().count();
1201 assert_eq!(count, 5);
1202
1203 assert_eq!(buffer.as_slice().len(), 5);
1205
1206 let mut ref_count = 0;
1208 for _ in &buffer {
1209 ref_count += 1;
1210 }
1211 assert_eq!(ref_count, 5);
1212 }
1213
1214 #[test]
1215 fn test_output_buffer_into_vec() {
1216 let mut buffer = OutputBuffer::with_capacity(10);
1217
1218 for i in 0..3 {
1219 buffer.push(Output::Event(make_event(i, i * 1000)));
1220 }
1221
1222 let vec = buffer.into_vec();
1223 assert_eq!(vec.len(), 3);
1224 }
1225
1226 #[test]
1227 fn test_poll_into() {
1228 let config = TpcConfig::builder()
1229 .num_cores(2)
1230 .cpu_pinning(false)
1231 .build()
1232 .unwrap();
1233
1234 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1235 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1236 })
1237 .unwrap();
1238
1239 for i in 0..20 {
1241 runtime.submit(make_event(i, i * 1000)).unwrap();
1242 }
1243
1244 std::thread::sleep(Duration::from_millis(100));
1246
1247 let mut buffer = OutputBuffer::with_capacity(100);
1249 let count = runtime.poll_into(&mut buffer, 256);
1250
1251 assert!(count > 0);
1252 assert_eq!(buffer.len(), count);
1253
1254 let cap_before = buffer.capacity();
1256 buffer.clear();
1257 let _ = runtime.poll_into(&mut buffer, 256);
1258 assert_eq!(buffer.capacity(), cap_before);
1259
1260 runtime.shutdown().unwrap();
1261 }
1262
1263 #[test]
1264 fn test_poll_each() {
1265 let config = TpcConfig::builder()
1266 .num_cores(2)
1267 .cpu_pinning(false)
1268 .build()
1269 .unwrap();
1270
1271 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1272 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1273 })
1274 .unwrap();
1275
1276 for i in 0..20 {
1278 runtime.submit(make_event(i, i * 1000)).unwrap();
1279 }
1280
1281 std::thread::sleep(Duration::from_millis(100));
1283
1284 let mut event_count = 0;
1286 let count = runtime.poll_each(256, |output| {
1287 if matches!(output, Output::Event(_)) {
1288 event_count += 1;
1289 }
1290 true
1291 });
1292
1293 assert!(count > 0);
1294 assert!(event_count > 0);
1295
1296 runtime.shutdown().unwrap();
1297 }
1298
1299 #[test]
1300 fn test_poll_each_early_stop() {
1301 let config = TpcConfig::builder()
1302 .num_cores(2)
1303 .cpu_pinning(false)
1304 .build()
1305 .unwrap();
1306
1307 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1308 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1309 })
1310 .unwrap();
1311
1312 for i in 0..50 {
1314 runtime.submit(make_event(i, i * 1000)).unwrap();
1315 }
1316
1317 std::thread::sleep(Duration::from_millis(100));
1319
1320 let mut processed = 0;
1322 let count = runtime.poll_each(256, |_| {
1323 processed += 1;
1324 processed < 10 });
1326
1327 assert_eq!(count, 10);
1328 assert_eq!(processed, 10);
1329
1330 runtime.shutdown().unwrap();
1331 }
1332
1333 #[test]
1334 fn test_poll_core_into() {
1335 let config = TpcConfig::builder()
1336 .num_cores(2)
1337 .cpu_pinning(false)
1338 .build()
1339 .unwrap();
1340
1341 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1342 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1343 })
1344 .unwrap();
1345
1346 runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1348 runtime.submit_to_core(0, make_event(2, 2000)).unwrap();
1349
1350 std::thread::sleep(Duration::from_millis(100));
1352
1353 let mut buffer = OutputBuffer::with_capacity(100);
1355 let count = runtime.poll_core_into(0, &mut buffer, 100);
1356
1357 assert!(count > 0);
1358 assert_eq!(buffer.len(), count);
1359
1360 let count = runtime.poll_core_into(99, &mut buffer, 100);
1362 assert_eq!(count, 0);
1363
1364 runtime.shutdown().unwrap();
1365 }
1366
1367 #[test]
1368 fn test_poll_core_each() {
1369 let config = TpcConfig::builder()
1370 .num_cores(2)
1371 .cpu_pinning(false)
1372 .build()
1373 .unwrap();
1374
1375 let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1376 vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1377 })
1378 .unwrap();
1379
1380 runtime.submit_to_core(1, make_event(1, 1000)).unwrap();
1382
1383 std::thread::sleep(Duration::from_millis(100));
1385
1386 let mut event_count = 0;
1388 let count = runtime.poll_core_each(1, 100, |output| {
1389 if matches!(output, Output::Event(_)) {
1390 event_count += 1;
1391 }
1392 true
1393 });
1394
1395 assert!(count > 0);
1396 assert!(event_count > 0);
1397
1398 let count = runtime.poll_core_each(99, 100, |_| true);
1400 assert_eq!(count, 0);
1401
1402 runtime.shutdown().unwrap();
1403 }
1404
1405 #[test]
1406 fn test_tpc_config_auto() {
1407 let config = TpcConfig::auto();
1408
1409 assert!(config.num_cores >= 1);
1411 assert!(config.inbox_capacity > 0);
1412 assert!(config.outbox_capacity > 0);
1413
1414 if config.num_cores > 1 {
1416 assert!(config.cpu_pinning);
1417 }
1418
1419 assert!(config.validate().is_ok());
1421 }
1422
1423 #[test]
1424 fn test_tpc_config_auto_creates_runtime() {
1425 let mut config = TpcConfig::auto();
1428 config.cpu_pinning = false; config.num_cores = config.num_cores.min(2); let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1432 assert!(runtime.is_running());
1433
1434 runtime.shutdown().unwrap();
1435 }
1436}