Skip to main content

laminar_core/tpc/
runtime.rs

1//! # Thread-Per-Core Runtime
2//!
3//! Orchestrates multiple cores for parallel event processing with state locality.
4//!
5//! ## Architecture
6//!
7//! The runtime manages N reactors (one per core), routing events by key hash
8//! to ensure state locality. Each core processes events independently with
9//! no cross-core synchronization on the hot path.
10//!
11//! ## Usage
12//!
13//! ```rust,ignore
14//! use laminar_core::tpc::{TpcConfig, ThreadPerCoreRuntime, KeySpec};
15//!
16//! // Create runtime with 4 cores
17//! let config = TpcConfig::builder()
18//!     .num_cores(4)
19//!     .key_spec(KeySpec::Columns(vec!["user_id".to_string()]))
20//!     .build()?;
21//!
22//! let mut runtime = ThreadPerCoreRuntime::new(config)?;
23//!
24//! // Submit events
25//! runtime.submit(event)?;
26//!
27//! // Process and collect outputs
28//! let outputs = runtime.poll();
29//! ```
30
31use std::ops::Deref;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34
35use crate::operator::{Event, Operator, Output};
36use crate::reactor::ReactorConfig;
37
38// OutputBuffer - Pre-allocated buffer for zero-allocation polling
39
40/// A pre-allocated buffer for collecting outputs without allocation.
41///
42/// This buffer can be reused across multiple poll cycles, avoiding
43/// memory allocation on the hot path.
44///
45/// # Example
46///
47/// ```rust,ignore
48/// use laminar_core::tpc::OutputBuffer;
49///
50/// // Create buffer once at startup
51/// let mut buffer = OutputBuffer::with_capacity(1024);
52///
53/// // Poll loop - no allocation after warmup
54/// loop {
55///     let count = runtime.poll_into(&mut buffer, 256);
56///     for output in buffer.iter() {
57///         process(output);
58///     }
59///     buffer.clear();
60/// }
61/// ```
62#[derive(Debug)]
63pub struct OutputBuffer {
64    /// Internal storage (pre-allocated)
65    items: Vec<Output>,
66}
67
68impl OutputBuffer {
69    /// Creates a new output buffer with the given capacity.
70    ///
71    /// The buffer will not allocate until `capacity` items are added.
72    #[must_use]
73    pub fn with_capacity(capacity: usize) -> Self {
74        Self {
75            items: Vec::with_capacity(capacity),
76        }
77    }
78
79    /// Clears the buffer for reuse (no deallocation).
80    ///
81    /// The capacity remains unchanged, allowing zero-allocation reuse.
82    #[inline]
83    pub fn clear(&mut self) {
84        self.items.clear();
85    }
86
87    /// Returns the number of items in the buffer.
88    #[inline]
89    #[must_use]
90    pub fn len(&self) -> usize {
91        self.items.len()
92    }
93
94    /// Returns true if the buffer is empty.
95    #[inline]
96    #[must_use]
97    pub fn is_empty(&self) -> bool {
98        self.items.is_empty()
99    }
100
101    /// Returns the current capacity of the buffer.
102    #[inline]
103    #[must_use]
104    pub fn capacity(&self) -> usize {
105        self.items.capacity()
106    }
107
108    /// Returns the remaining capacity before reallocation.
109    #[inline]
110    #[must_use]
111    pub fn remaining(&self) -> usize {
112        self.items.capacity() - self.items.len()
113    }
114
115    /// Returns a slice of the collected outputs.
116    #[inline]
117    #[must_use]
118    pub fn as_slice(&self) -> &[Output] {
119        &self.items
120    }
121
122    /// Returns an iterator over the outputs.
123    #[inline]
124    pub fn iter(&self) -> impl Iterator<Item = &Output> {
125        self.items.iter()
126    }
127
128    /// Consumes the buffer and returns the inner Vec.
129    #[must_use]
130    pub fn into_vec(self) -> Vec<Output> {
131        self.items
132    }
133
134    /// Extends the buffer with outputs from an iterator.
135    ///
136    /// Note: This may allocate if the iterator produces more items than
137    /// the remaining capacity.
138    #[inline]
139    pub fn extend<I: IntoIterator<Item = Output>>(&mut self, iter: I) {
140        self.items.extend(iter);
141    }
142
143    /// Pushes a single output to the buffer.
144    ///
145    /// Note: This may allocate if the buffer is at capacity.
146    #[inline]
147    pub fn push(&mut self, output: Output) {
148        self.items.push(output);
149    }
150
151    /// Returns a mutable reference to the internal Vec.
152    ///
153    /// This is useful for passing to functions that expect `&mut Vec<Output>`.
154    #[inline]
155    pub fn as_vec_mut(&mut self) -> &mut Vec<Output> {
156        &mut self.items
157    }
158}
159
160impl Default for OutputBuffer {
161    fn default() -> Self {
162        Self::with_capacity(1024)
163    }
164}
165
166impl Deref for OutputBuffer {
167    type Target = [Output];
168
169    fn deref(&self) -> &Self::Target {
170        &self.items
171    }
172}
173
174impl<'a> IntoIterator for &'a OutputBuffer {
175    type Item = &'a Output;
176    type IntoIter = std::slice::Iter<'a, Output>;
177
178    fn into_iter(self) -> Self::IntoIter {
179        self.items.iter()
180    }
181}
182
183impl IntoIterator for OutputBuffer {
184    type Item = Output;
185    type IntoIter = std::vec::IntoIter<Output>;
186
187    fn into_iter(self) -> Self::IntoIter {
188        self.items.into_iter()
189    }
190}
191
192use super::core_handle::{CoreConfig, CoreHandle};
193use super::router::{KeyRouter, KeySpec};
194use super::TpcError;
195
196/// Configuration for the thread-per-core runtime.
197#[derive(Debug, Clone)]
198pub struct TpcConfig {
199    /// Number of cores to use
200    pub num_cores: usize,
201    /// Key specification for routing
202    pub key_spec: KeySpec,
203    /// Whether to pin cores to CPUs
204    pub cpu_pinning: bool,
205    /// Starting CPU ID for pinning (cores use `cpu_start`, `cpu_start+1`, ...)
206    pub cpu_start: usize,
207    /// Inbox queue capacity per core
208    pub inbox_capacity: usize,
209    /// Outbox queue capacity per core
210    pub outbox_capacity: usize,
211    /// Reactor configuration (applied to all cores)
212    pub reactor_config: ReactorConfig,
213    /// Enable NUMA-aware memory allocation
214    pub numa_aware: bool,
215}
216
217impl Default for TpcConfig {
218    fn default() -> Self {
219        Self {
220            num_cores: num_cpus::get(),
221            key_spec: KeySpec::RoundRobin,
222            cpu_pinning: false,
223            cpu_start: 0,
224            inbox_capacity: 65536,
225            outbox_capacity: 65536,
226            reactor_config: ReactorConfig::default(),
227            numa_aware: false,
228        }
229    }
230}
231
232impl TpcConfig {
233    /// Creates a new configuration builder.
234    #[must_use]
235    pub fn builder() -> TpcConfigBuilder {
236        TpcConfigBuilder::default()
237    }
238
239    /// Creates configuration with automatic detection.
240    ///
241    /// Detects system capabilities and generates an optimal configuration:
242    /// - Uses all available physical cores (minus 1 on systems with >8 cores)
243    /// - Enables CPU pinning on multi-core systems
244    /// - Enables NUMA-aware allocation on multi-socket systems
245    ///
246    /// # Example
247    ///
248    /// ```rust,ignore
249    /// use laminar_core::tpc::TpcConfig;
250    ///
251    /// let config = TpcConfig::auto();
252    /// println!("Using {} cores", config.num_cores);
253    /// ```
254    #[must_use]
255    pub fn auto() -> Self {
256        let caps = crate::detect::SystemCapabilities::detect();
257        let recommended = caps.recommended_config();
258
259        Self {
260            num_cores: recommended.num_cores,
261            key_spec: KeySpec::RoundRobin,
262            cpu_pinning: recommended.cpu_pinning,
263            cpu_start: 0,
264            inbox_capacity: recommended.queue_capacity,
265            outbox_capacity: recommended.queue_capacity,
266            reactor_config: ReactorConfig::default(),
267            numa_aware: recommended.numa_aware,
268        }
269    }
270
271    /// Validates the configuration.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the configuration is invalid.
276    pub fn validate(&self) -> Result<(), TpcError> {
277        if self.num_cores == 0 {
278            return Err(TpcError::InvalidConfig("num_cores must be > 0".to_string()));
279        }
280        if self.inbox_capacity == 0 {
281            return Err(TpcError::InvalidConfig(
282                "inbox_capacity must be > 0".to_string(),
283            ));
284        }
285        if self.outbox_capacity == 0 {
286            return Err(TpcError::InvalidConfig(
287                "outbox_capacity must be > 0".to_string(),
288            ));
289        }
290        Ok(())
291    }
292}
293
294/// Builder for `TpcConfig`.
295#[derive(Debug, Default)]
296pub struct TpcConfigBuilder {
297    num_cores: Option<usize>,
298    key_spec: Option<KeySpec>,
299    cpu_pinning: Option<bool>,
300    cpu_start: Option<usize>,
301    inbox_capacity: Option<usize>,
302    outbox_capacity: Option<usize>,
303    reactor_config: Option<ReactorConfig>,
304    numa_aware: Option<bool>,
305}
306
307impl TpcConfigBuilder {
308    /// Sets the number of cores.
309    #[must_use]
310    pub fn num_cores(mut self, n: usize) -> Self {
311        self.num_cores = Some(n);
312        self
313    }
314
315    /// Sets the key specification for routing.
316    #[must_use]
317    pub fn key_spec(mut self, spec: KeySpec) -> Self {
318        self.key_spec = Some(spec);
319        self
320    }
321
322    /// Sets key columns for routing (convenience method).
323    #[must_use]
324    pub fn key_columns(self, columns: Vec<String>) -> Self {
325        self.key_spec(KeySpec::Columns(columns))
326    }
327
328    /// Enables or disables CPU pinning.
329    #[must_use]
330    pub fn cpu_pinning(mut self, enabled: bool) -> Self {
331        self.cpu_pinning = Some(enabled);
332        self
333    }
334
335    /// Sets the starting CPU ID for pinning.
336    #[must_use]
337    pub fn cpu_start(mut self, cpu: usize) -> Self {
338        self.cpu_start = Some(cpu);
339        self
340    }
341
342    /// Sets the inbox capacity per core.
343    #[must_use]
344    pub fn inbox_capacity(mut self, capacity: usize) -> Self {
345        self.inbox_capacity = Some(capacity);
346        self
347    }
348
349    /// Sets the outbox capacity per core.
350    #[must_use]
351    pub fn outbox_capacity(mut self, capacity: usize) -> Self {
352        self.outbox_capacity = Some(capacity);
353        self
354    }
355
356    /// Sets the reactor configuration.
357    #[must_use]
358    pub fn reactor_config(mut self, config: ReactorConfig) -> Self {
359        self.reactor_config = Some(config);
360        self
361    }
362
363    /// Enables or disables NUMA-aware memory allocation.
364    ///
365    /// When enabled, per-core state stores and buffers are allocated
366    /// on the NUMA node local to that core, improving memory access latency.
367    #[must_use]
368    pub fn numa_aware(mut self, enabled: bool) -> Self {
369        self.numa_aware = Some(enabled);
370        self
371    }
372
373    /// Builds the configuration.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the configuration is invalid.
378    pub fn build(self) -> Result<TpcConfig, TpcError> {
379        let config = TpcConfig {
380            num_cores: self.num_cores.unwrap_or_else(num_cpus::get),
381            key_spec: self.key_spec.unwrap_or_default(),
382            cpu_pinning: self.cpu_pinning.unwrap_or(false),
383            cpu_start: self.cpu_start.unwrap_or(0),
384            inbox_capacity: self.inbox_capacity.unwrap_or(65536),
385            outbox_capacity: self.outbox_capacity.unwrap_or(65536),
386            reactor_config: self.reactor_config.unwrap_or_default(),
387            numa_aware: self.numa_aware.unwrap_or(false),
388        };
389        config.validate()?;
390        Ok(config)
391    }
392}
393
394/// Factory for creating operators for each core.
395///
396/// This trait allows the runtime to create separate operator instances
397/// for each core, ensuring no shared state between cores.
398pub trait OperatorFactory: Send {
399    /// Creates operators for a specific core.
400    fn create(&self, core_id: usize) -> Vec<Box<dyn Operator>>;
401}
402
403impl<F> OperatorFactory for F
404where
405    F: Fn(usize) -> Vec<Box<dyn Operator>> + Send,
406{
407    fn create(&self, core_id: usize) -> Vec<Box<dyn Operator>> {
408        self(core_id)
409    }
410}
411
412/// Thread-per-core runtime for parallel event processing.
413///
414/// Manages multiple reactor threads, routing events by key hash
415/// to ensure state locality.
416pub struct ThreadPerCoreRuntime {
417    /// Core handles
418    cores: Vec<CoreHandle>,
419    /// Key router
420    router: KeyRouter,
421    /// Configuration
422    config: TpcConfig,
423    /// Running state
424    is_running: Arc<AtomicBool>,
425}
426
427impl ThreadPerCoreRuntime {
428    /// Creates a new runtime with the given configuration.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if any core thread cannot be spawned.
433    pub fn new(config: TpcConfig) -> Result<Self, TpcError> {
434        config.validate()?;
435        Self::new_with_factory(config, &|_| Vec::new())
436    }
437
438    /// Creates a new runtime with operators from a factory.
439    ///
440    /// The factory is called once per core to create that core's operators.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if any core thread cannot be spawned.
445    pub fn new_with_factory<F>(config: TpcConfig, factory: &F) -> Result<Self, TpcError>
446    where
447        F: OperatorFactory,
448    {
449        config.validate()?;
450
451        let router = KeyRouter::new(config.num_cores, config.key_spec.clone());
452        let is_running = Arc::new(AtomicBool::new(true));
453
454        let mut cores = Vec::with_capacity(config.num_cores);
455
456        for core_id in 0..config.num_cores {
457            let cpu_affinity = if config.cpu_pinning {
458                Some(config.cpu_start + core_id)
459            } else {
460                None
461            };
462
463            let core_config = CoreConfig {
464                core_id,
465                cpu_affinity,
466                inbox_capacity: config.inbox_capacity,
467                outbox_capacity: config.outbox_capacity,
468                reactor_config: config.reactor_config.clone(),
469                backpressure: super::backpressure::BackpressureConfig::default(),
470                numa_aware: config.numa_aware,
471                #[cfg(all(target_os = "linux", feature = "io-uring"))]
472                io_uring_config: None,
473            };
474
475            let operators = factory.create(core_id);
476            let handle = CoreHandle::spawn_with_operators(core_config, operators)?;
477            cores.push(handle);
478        }
479
480        Ok(Self {
481            cores,
482            router,
483            config,
484            is_running,
485        })
486    }
487
488    /// Returns the number of cores.
489    #[must_use]
490    pub fn num_cores(&self) -> usize {
491        self.cores.len()
492    }
493
494    /// Returns true if the runtime is running.
495    #[must_use]
496    pub fn is_running(&self) -> bool {
497        self.is_running.load(Ordering::Acquire)
498    }
499
500    /// Submits an event for processing.
501    ///
502    /// The event is routed to a core based on its key.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the target core's queue is full.
507    pub fn submit(&self, event: Event) -> Result<(), TpcError> {
508        if !self.is_running() {
509            return Err(TpcError::NotRunning);
510        }
511
512        let core_id = self.router.route(&event)?;
513        self.cores[core_id].send_event(event)
514    }
515
516    /// Submits an event to a specific core.
517    ///
518    /// Use this when you've already computed the routing.
519    ///
520    /// # Errors
521    ///
522    /// Returns an error if the core's queue is full or the `core_id` is invalid.
523    pub fn submit_to_core(&self, core_id: usize, event: Event) -> Result<(), TpcError> {
524        if !self.is_running() {
525            return Err(TpcError::NotRunning);
526        }
527        if core_id >= self.cores.len() {
528            return Err(TpcError::InvalidConfig(format!(
529                "core_id {} out of range (0..{})",
530                core_id,
531                self.cores.len()
532            )));
533        }
534        self.cores[core_id].send_event(event)
535    }
536
537    /// Submits a batch of events for processing.
538    ///
539    /// Events are routed to cores based on their keys.
540    ///
541    /// # Errors
542    ///
543    /// Returns the number of successfully submitted events and an error
544    /// if any event couldn't be submitted.
545    pub fn submit_batch(&self, events: Vec<Event>) -> (usize, Option<TpcError>) {
546        if !self.is_running() {
547            return (0, Some(TpcError::NotRunning));
548        }
549
550        let mut submitted = 0;
551        for event in events {
552            match self.submit(event) {
553                Ok(()) => submitted += 1,
554                Err(e) => return (submitted, Some(e)),
555            }
556        }
557        (submitted, None)
558    }
559
560    /// Polls all cores for outputs.
561    ///
562    /// Returns all available outputs from all cores.
563    ///
564    /// # Note
565    ///
566    /// This method allocates memory. For zero-allocation polling, use
567    /// [`poll_into`](Self::poll_into) or [`poll_each`](Self::poll_each) instead.
568    #[must_use]
569    pub fn poll(&self) -> Vec<Output> {
570        let mut outputs = Vec::new();
571        for core in &self.cores {
572            outputs.extend(core.poll_outputs(1024));
573        }
574        outputs
575    }
576
577    /// Polls all cores for outputs into a pre-allocated buffer (zero-allocation).
578    ///
579    /// Returns the total number of outputs collected across all cores.
580    ///
581    /// # Arguments
582    ///
583    /// * `buffer` - Pre-allocated buffer to receive outputs. Use [`OutputBuffer`]
584    ///   for optimal performance.
585    /// * `max_per_core` - Maximum outputs to collect from each core.
586    ///
587    /// # Example
588    ///
589    /// ```rust,ignore
590    /// use laminar_core::tpc::OutputBuffer;
591    ///
592    /// // Create buffer once
593    /// let mut buffer = OutputBuffer::with_capacity(4096);
594    ///
595    /// // Poll loop - no allocation after warmup
596    /// loop {
597    ///     let count = runtime.poll_into(&mut buffer, 256);
598    ///
599    ///     for output in buffer.iter() {
600    ///         process(output);
601    ///     }
602    ///
603    ///     buffer.clear(); // Reuse buffer
604    /// }
605    /// ```
606    pub fn poll_into(&self, buffer: &mut OutputBuffer, max_per_core: usize) -> usize {
607        let start_len = buffer.len();
608
609        for core in &self.cores {
610            // Check remaining capacity to avoid overflowing
611            let remaining = buffer.remaining();
612            if remaining == 0 {
613                break;
614            }
615
616            let max = max_per_core.min(remaining);
617            core.poll_outputs_into(buffer.as_vec_mut(), max);
618        }
619
620        buffer.len() - start_len
621    }
622
623    /// Polls all cores with a callback for each output (zero-allocation).
624    ///
625    /// Processing continues until:
626    /// - All cores have been polled up to `max_per_core`
627    /// - The callback returns `ControlFlow::Break`
628    ///
629    /// Returns the total number of outputs processed.
630    ///
631    /// # Arguments
632    ///
633    /// * `max_per_core` - Maximum outputs to process from each core.
634    /// * `f` - Callback function for each output. Return `true` to continue, `false` to stop.
635    ///
636    /// # Example
637    ///
638    /// ```rust,ignore
639    /// // Process outputs without any allocation
640    /// let count = runtime.poll_each(256, |output| {
641    ///     match output {
642    ///         Output::Event(event) => {
643    ///             send_to_sink(event);
644    ///         }
645    ///         _ => {}
646    ///     }
647    ///     true // Continue processing
648    /// });
649    ///
650    /// // Or stop early on condition
651    /// let count = runtime.poll_each(256, |output| {
652    ///     if should_stop() {
653    ///         false // Stop processing
654    ///     } else {
655    ///         process(output);
656    ///         true
657    ///     }
658    /// });
659    /// ```
660    pub fn poll_each<F>(&self, max_per_core: usize, mut f: F) -> usize
661    where
662        F: FnMut(Output) -> bool,
663    {
664        let mut total = 0;
665        let mut should_continue = true;
666
667        for core in &self.cores {
668            if !should_continue {
669                break;
670            }
671
672            let count = core.poll_each(max_per_core, |output| {
673                let result = f(output);
674                if !result {
675                    should_continue = false;
676                }
677                result
678            });
679
680            total += count;
681        }
682
683        total
684    }
685
686    /// Polls a specific core for outputs.
687    ///
688    /// # Note
689    ///
690    /// This method allocates memory. For zero-allocation polling, use
691    /// [`poll_core_into`](Self::poll_core_into) or [`poll_core_each`](Self::poll_core_each) instead.
692    #[must_use]
693    pub fn poll_core(&self, core_id: usize) -> Vec<Output> {
694        if core_id < self.cores.len() {
695            self.cores[core_id].poll_outputs(1024)
696        } else {
697            Vec::new()
698        }
699    }
700
701    /// Polls a specific core into a pre-allocated buffer (zero-allocation).
702    ///
703    /// Returns the number of outputs collected.
704    pub fn poll_core_into(
705        &self,
706        core_id: usize,
707        buffer: &mut OutputBuffer,
708        max_count: usize,
709    ) -> usize {
710        if core_id < self.cores.len() {
711            self.cores[core_id].poll_outputs_into(buffer.as_vec_mut(), max_count)
712        } else {
713            0
714        }
715    }
716
717    /// Polls a specific core with a callback for each output (zero-allocation).
718    ///
719    /// Returns the number of outputs processed.
720    pub fn poll_core_each<F>(&self, core_id: usize, max_count: usize, f: F) -> usize
721    where
722        F: FnMut(Output) -> bool,
723    {
724        if core_id < self.cores.len() {
725            self.cores[core_id].poll_each(max_count, f)
726        } else {
727            0
728        }
729    }
730
731    /// Returns statistics for all cores.
732    #[must_use]
733    pub fn stats(&self) -> RuntimeStats {
734        let core_stats: Vec<CoreStats> = self
735            .cores
736            .iter()
737            .map(|core| CoreStats {
738                core_id: core.core_id(),
739                numa_node: core.numa_node(),
740                events_processed: core.events_processed(),
741                inbox_len: core.inbox_len(),
742                outbox_len: core.outbox_len(),
743                is_running: core.is_running(),
744            })
745            .collect();
746
747        RuntimeStats {
748            num_cores: self.cores.len(),
749            total_events_processed: core_stats.iter().map(|s| s.events_processed).sum(),
750            cores: core_stats,
751        }
752    }
753
754    /// Returns the key router.
755    #[must_use]
756    pub fn router(&self) -> &KeyRouter {
757        &self.router
758    }
759
760    /// Shuts down the runtime gracefully.
761    ///
762    /// Signals all cores to stop and waits for them to finish.
763    ///
764    /// # Errors
765    ///
766    /// Returns an error if any core cannot be joined cleanly.
767    pub fn shutdown(mut self) -> Result<(), TpcError> {
768        self.is_running.store(false, Ordering::Release);
769
770        // Signal all cores to shut down
771        for core in &self.cores {
772            core.shutdown();
773        }
774
775        // Join all cores
776        let cores = std::mem::take(&mut self.cores);
777        for core in cores {
778            core.join()?;
779        }
780
781        Ok(())
782    }
783
784    /// Runs the runtime with a custom output handler.
785    ///
786    /// This is a convenience method that polls outputs and passes them
787    /// to the handler in a loop until shutdown is signaled.
788    ///
789    /// # Arguments
790    ///
791    /// * `handler` - Function called with batches of outputs
792    /// * `shutdown` - Atomic flag to signal shutdown
793    pub fn run_with_handler<F>(&self, mut handler: F, shutdown: &AtomicBool)
794    where
795        F: FnMut(Vec<Output>),
796    {
797        while !shutdown.load(Ordering::Acquire) && self.is_running() {
798            let outputs = self.poll();
799            if outputs.is_empty() {
800                std::thread::yield_now();
801            } else {
802                handler(outputs);
803            }
804        }
805    }
806}
807
808impl Drop for ThreadPerCoreRuntime {
809    fn drop(&mut self) {
810        self.is_running.store(false, Ordering::Release);
811
812        // Cores will be dropped and their threads joined in CoreHandle::drop
813    }
814}
815
816impl std::fmt::Debug for ThreadPerCoreRuntime {
817    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
818        f.debug_struct("ThreadPerCoreRuntime")
819            .field("num_cores", &self.cores.len())
820            .field("is_running", &self.is_running())
821            .field("config", &self.config)
822            .finish_non_exhaustive()
823    }
824}
825
826/// Statistics for the runtime.
827#[derive(Debug, Clone)]
828pub struct RuntimeStats {
829    /// Number of cores
830    pub num_cores: usize,
831    /// Total events processed across all cores
832    pub total_events_processed: u64,
833    /// Per-core statistics
834    pub cores: Vec<CoreStats>,
835}
836
837/// Statistics for a single core.
838#[derive(Debug, Clone)]
839pub struct CoreStats {
840    /// Core ID
841    pub core_id: usize,
842    /// NUMA node for this core
843    pub numa_node: usize,
844    /// Events processed by this core
845    pub events_processed: u64,
846    /// Current inbox queue length
847    pub inbox_len: usize,
848    /// Current outbox queue length
849    pub outbox_len: usize,
850    /// Whether the core is running
851    pub is_running: bool,
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use crate::operator::{OperatorState, OutputVec, Timer};
858    use arrow_array::{Int64Array, RecordBatch};
859    use std::sync::Arc;
860    use std::time::Duration;
861
862    // Simple passthrough operator for testing
863    struct PassthroughOperator {
864        #[allow(dead_code)]
865        core_id: usize,
866    }
867
868    impl Operator for PassthroughOperator {
869        fn process(
870            &mut self,
871            event: &Event,
872            _ctx: &mut crate::operator::OperatorContext,
873        ) -> OutputVec {
874            let mut output = OutputVec::new();
875            output.push(Output::Event(event.clone()));
876            output
877        }
878
879        fn on_timer(
880            &mut self,
881            _timer: Timer,
882            _ctx: &mut crate::operator::OperatorContext,
883        ) -> OutputVec {
884            OutputVec::new()
885        }
886
887        fn checkpoint(&self) -> OperatorState {
888            OperatorState {
889                operator_id: "passthrough".to_string(),
890                data: vec![],
891            }
892        }
893
894        fn restore(&mut self, _state: OperatorState) -> Result<(), crate::operator::OperatorError> {
895            Ok(())
896        }
897    }
898
899    fn make_event(user_id: i64, timestamp: i64) -> Event {
900        let user_ids = Arc::new(Int64Array::from(vec![user_id]));
901        let batch = RecordBatch::try_from_iter(vec![("user_id", user_ids as _)]).unwrap();
902        Event::new(timestamp, batch)
903    }
904
905    #[test]
906    fn test_config_builder() {
907        let config = TpcConfig::builder()
908            .num_cores(4)
909            .key_columns(vec!["user_id".to_string()])
910            .cpu_pinning(false)
911            .inbox_capacity(1024)
912            .build()
913            .unwrap();
914
915        assert_eq!(config.num_cores, 4);
916        assert!(!config.cpu_pinning);
917        assert_eq!(config.inbox_capacity, 1024);
918    }
919
920    #[test]
921    fn test_config_validation() {
922        // Zero cores should fail
923        let result = TpcConfig::builder().num_cores(0).build();
924        assert!(result.is_err());
925
926        // Zero inbox capacity should fail
927        let result = TpcConfig::builder().inbox_capacity(0).build();
928        assert!(result.is_err());
929    }
930
931    #[test]
932    fn test_runtime_creation() {
933        let config = TpcConfig::builder()
934            .num_cores(2)
935            .cpu_pinning(false)
936            .build()
937            .unwrap();
938
939        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
940        assert_eq!(runtime.num_cores(), 2);
941        assert!(runtime.is_running());
942
943        runtime.shutdown().unwrap();
944    }
945
946    #[test]
947    fn test_runtime_with_factory() {
948        let config = TpcConfig::builder()
949            .num_cores(2)
950            .cpu_pinning(false)
951            .build()
952            .unwrap();
953
954        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
955            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
956        })
957        .unwrap();
958
959        assert_eq!(runtime.num_cores(), 2);
960
961        // Submit events
962        for i in 0..10 {
963            runtime.submit(make_event(i, i * 1000)).unwrap();
964        }
965
966        // Wait for processing
967        std::thread::sleep(Duration::from_millis(100));
968
969        // Poll outputs
970        let outputs = runtime.poll();
971        assert!(!outputs.is_empty());
972
973        runtime.shutdown().unwrap();
974    }
975
976    #[test]
977    fn test_key_based_routing() {
978        let config = TpcConfig::builder()
979            .num_cores(4)
980            .key_columns(vec!["user_id".to_string()])
981            .cpu_pinning(false)
982            .build()
983            .unwrap();
984
985        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
986
987        // Same user_id should always route to same core
988        let event1 = make_event(100, 1000);
989        let event2 = make_event(100, 2000);
990
991        let core1 = runtime.router().route(&event1).unwrap();
992        let core2 = runtime.router().route(&event2).unwrap();
993
994        assert_eq!(core1, core2);
995        assert!(core1 < 4);
996
997        runtime.shutdown().unwrap();
998    }
999
1000    #[test]
1001    fn test_submit_batch() {
1002        let config = TpcConfig::builder()
1003            .num_cores(2)
1004            .cpu_pinning(false)
1005            .build()
1006            .unwrap();
1007
1008        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1009
1010        let events: Vec<Event> = (0..100).map(|i| make_event(i, i * 1000)).collect();
1011
1012        let (submitted, error) = runtime.submit_batch(events);
1013        assert_eq!(submitted, 100);
1014        assert!(error.is_none());
1015
1016        runtime.shutdown().unwrap();
1017    }
1018
1019    #[test]
1020    fn test_runtime_stats() {
1021        let config = TpcConfig::builder()
1022            .num_cores(2)
1023            .cpu_pinning(false)
1024            .build()
1025            .unwrap();
1026
1027        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1028            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1029        })
1030        .unwrap();
1031
1032        // Submit some events
1033        for i in 0..50 {
1034            runtime.submit(make_event(i, i * 1000)).unwrap();
1035        }
1036
1037        // Wait for processing
1038        std::thread::sleep(Duration::from_millis(100));
1039
1040        // Poll to clear outboxes
1041        let _ = runtime.poll();
1042
1043        let stats = runtime.stats();
1044        assert_eq!(stats.num_cores, 2);
1045        assert!(stats.total_events_processed > 0);
1046
1047        for core_stat in &stats.cores {
1048            assert!(core_stat.is_running);
1049        }
1050
1051        runtime.shutdown().unwrap();
1052    }
1053
1054    #[test]
1055    fn test_submit_to_specific_core() {
1056        let config = TpcConfig::builder()
1057            .num_cores(4)
1058            .cpu_pinning(false)
1059            .build()
1060            .unwrap();
1061
1062        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1063
1064        // Submit to specific cores
1065        runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1066        runtime.submit_to_core(1, make_event(2, 2000)).unwrap();
1067        runtime.submit_to_core(2, make_event(3, 3000)).unwrap();
1068        runtime.submit_to_core(3, make_event(4, 4000)).unwrap();
1069
1070        // Invalid core should fail
1071        let result = runtime.submit_to_core(10, make_event(5, 5000));
1072        assert!(result.is_err());
1073
1074        runtime.shutdown().unwrap();
1075    }
1076
1077    #[test]
1078    fn test_poll_specific_core() {
1079        let config = TpcConfig::builder()
1080            .num_cores(2)
1081            .cpu_pinning(false)
1082            .build()
1083            .unwrap();
1084
1085        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1086            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1087        })
1088        .unwrap();
1089
1090        // Submit to core 0
1091        runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1092
1093        // Wait for processing
1094        std::thread::sleep(Duration::from_millis(100));
1095
1096        // Poll core 0
1097        let outputs = runtime.poll_core(0);
1098        assert!(!outputs.is_empty());
1099
1100        runtime.shutdown().unwrap();
1101    }
1102
1103    #[test]
1104    fn test_runtime_debug() {
1105        let config = TpcConfig::builder()
1106            .num_cores(2)
1107            .cpu_pinning(false)
1108            .build()
1109            .unwrap();
1110
1111        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1112
1113        let debug_str = format!("{runtime:?}");
1114        assert!(debug_str.contains("ThreadPerCoreRuntime"));
1115        assert!(debug_str.contains("num_cores"));
1116
1117        runtime.shutdown().unwrap();
1118    }
1119
1120    #[test]
1121    fn test_shutdown_stops_submission() {
1122        let config = TpcConfig::builder()
1123            .num_cores(2)
1124            .cpu_pinning(false)
1125            .build()
1126            .unwrap();
1127
1128        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1129
1130        // Shutdown via drop
1131        drop(runtime);
1132
1133        // Create new runtime and shutdown properly
1134        let config = TpcConfig::builder()
1135            .num_cores(2)
1136            .cpu_pinning(false)
1137            .build()
1138            .unwrap();
1139
1140        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1141        runtime.shutdown().unwrap();
1142    }
1143
1144    #[test]
1145    fn test_numa_aware_config() {
1146        let config = TpcConfig::builder()
1147            .num_cores(2)
1148            .cpu_pinning(false)
1149            .numa_aware(true)
1150            .build()
1151            .unwrap();
1152
1153        assert!(config.numa_aware);
1154
1155        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1156        assert_eq!(runtime.num_cores(), 2);
1157
1158        // Stats should include NUMA node info
1159        let stats = runtime.stats();
1160        for core_stat in &stats.cores {
1161            // On any system, numa_node should be valid (0 on non-NUMA)
1162            assert!(core_stat.numa_node < 64);
1163        }
1164
1165        runtime.shutdown().unwrap();
1166    }
1167
1168    #[test]
1169    fn test_output_buffer_basic() {
1170        let mut buffer = OutputBuffer::with_capacity(100);
1171
1172        assert!(buffer.is_empty());
1173        assert_eq!(buffer.len(), 0);
1174        assert_eq!(buffer.capacity(), 100);
1175        assert_eq!(buffer.remaining(), 100);
1176
1177        // Push some items
1178        let event = make_event(1, 1000);
1179        buffer.push(Output::Event(event));
1180
1181        assert!(!buffer.is_empty());
1182        assert_eq!(buffer.len(), 1);
1183        assert_eq!(buffer.remaining(), 99);
1184
1185        // Clear and reuse
1186        buffer.clear();
1187        assert!(buffer.is_empty());
1188        assert_eq!(buffer.capacity(), 100); // Capacity preserved
1189    }
1190
1191    #[test]
1192    fn test_output_buffer_iteration() {
1193        let mut buffer = OutputBuffer::with_capacity(10);
1194
1195        for i in 0..5 {
1196            buffer.push(Output::Event(make_event(i, i * 1000)));
1197        }
1198
1199        // Test iter()
1200        let count = buffer.iter().count();
1201        assert_eq!(count, 5);
1202
1203        // Test Deref to slice
1204        assert_eq!(buffer.as_slice().len(), 5);
1205
1206        // Test IntoIterator for reference
1207        let mut ref_count = 0;
1208        for _ in &buffer {
1209            ref_count += 1;
1210        }
1211        assert_eq!(ref_count, 5);
1212    }
1213
1214    #[test]
1215    fn test_output_buffer_into_vec() {
1216        let mut buffer = OutputBuffer::with_capacity(10);
1217
1218        for i in 0..3 {
1219            buffer.push(Output::Event(make_event(i, i * 1000)));
1220        }
1221
1222        let vec = buffer.into_vec();
1223        assert_eq!(vec.len(), 3);
1224    }
1225
1226    #[test]
1227    fn test_poll_into() {
1228        let config = TpcConfig::builder()
1229            .num_cores(2)
1230            .cpu_pinning(false)
1231            .build()
1232            .unwrap();
1233
1234        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1235            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1236        })
1237        .unwrap();
1238
1239        // Submit events
1240        for i in 0..20 {
1241            runtime.submit(make_event(i, i * 1000)).unwrap();
1242        }
1243
1244        // Wait for processing
1245        std::thread::sleep(Duration::from_millis(100));
1246
1247        // Poll into buffer
1248        let mut buffer = OutputBuffer::with_capacity(100);
1249        let count = runtime.poll_into(&mut buffer, 256);
1250
1251        assert!(count > 0);
1252        assert_eq!(buffer.len(), count);
1253
1254        // Reuse buffer - no new allocation
1255        let cap_before = buffer.capacity();
1256        buffer.clear();
1257        let _ = runtime.poll_into(&mut buffer, 256);
1258        assert_eq!(buffer.capacity(), cap_before);
1259
1260        runtime.shutdown().unwrap();
1261    }
1262
1263    #[test]
1264    fn test_poll_each() {
1265        let config = TpcConfig::builder()
1266            .num_cores(2)
1267            .cpu_pinning(false)
1268            .build()
1269            .unwrap();
1270
1271        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1272            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1273        })
1274        .unwrap();
1275
1276        // Submit events
1277        for i in 0..20 {
1278            runtime.submit(make_event(i, i * 1000)).unwrap();
1279        }
1280
1281        // Wait for processing
1282        std::thread::sleep(Duration::from_millis(100));
1283
1284        // Poll with callback
1285        let mut event_count = 0;
1286        let count = runtime.poll_each(256, |output| {
1287            if matches!(output, Output::Event(_)) {
1288                event_count += 1;
1289            }
1290            true
1291        });
1292
1293        assert!(count > 0);
1294        assert!(event_count > 0);
1295
1296        runtime.shutdown().unwrap();
1297    }
1298
1299    #[test]
1300    fn test_poll_each_early_stop() {
1301        let config = TpcConfig::builder()
1302            .num_cores(2)
1303            .cpu_pinning(false)
1304            .build()
1305            .unwrap();
1306
1307        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1308            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1309        })
1310        .unwrap();
1311
1312        // Submit many events
1313        for i in 0..50 {
1314            runtime.submit(make_event(i, i * 1000)).unwrap();
1315        }
1316
1317        // Wait for processing
1318        std::thread::sleep(Duration::from_millis(100));
1319
1320        // Poll with early stop after 10 items
1321        let mut processed = 0;
1322        let count = runtime.poll_each(256, |_| {
1323            processed += 1;
1324            processed < 10 // Stop after 10
1325        });
1326
1327        assert_eq!(count, 10);
1328        assert_eq!(processed, 10);
1329
1330        runtime.shutdown().unwrap();
1331    }
1332
1333    #[test]
1334    fn test_poll_core_into() {
1335        let config = TpcConfig::builder()
1336            .num_cores(2)
1337            .cpu_pinning(false)
1338            .build()
1339            .unwrap();
1340
1341        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1342            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1343        })
1344        .unwrap();
1345
1346        // Submit to specific core
1347        runtime.submit_to_core(0, make_event(1, 1000)).unwrap();
1348        runtime.submit_to_core(0, make_event(2, 2000)).unwrap();
1349
1350        // Wait for processing
1351        std::thread::sleep(Duration::from_millis(100));
1352
1353        // Poll core 0 into buffer
1354        let mut buffer = OutputBuffer::with_capacity(100);
1355        let count = runtime.poll_core_into(0, &mut buffer, 100);
1356
1357        assert!(count > 0);
1358        assert_eq!(buffer.len(), count);
1359
1360        // Invalid core returns 0
1361        let count = runtime.poll_core_into(99, &mut buffer, 100);
1362        assert_eq!(count, 0);
1363
1364        runtime.shutdown().unwrap();
1365    }
1366
1367    #[test]
1368    fn test_poll_core_each() {
1369        let config = TpcConfig::builder()
1370            .num_cores(2)
1371            .cpu_pinning(false)
1372            .build()
1373            .unwrap();
1374
1375        let runtime = ThreadPerCoreRuntime::new_with_factory(config, &|core_id| {
1376            vec![Box::new(PassthroughOperator { core_id }) as Box<dyn Operator>]
1377        })
1378        .unwrap();
1379
1380        // Submit to specific core
1381        runtime.submit_to_core(1, make_event(1, 1000)).unwrap();
1382
1383        // Wait for processing
1384        std::thread::sleep(Duration::from_millis(100));
1385
1386        // Poll core 1 with callback
1387        let mut event_count = 0;
1388        let count = runtime.poll_core_each(1, 100, |output| {
1389            if matches!(output, Output::Event(_)) {
1390                event_count += 1;
1391            }
1392            true
1393        });
1394
1395        assert!(count > 0);
1396        assert!(event_count > 0);
1397
1398        // Invalid core returns 0
1399        let count = runtime.poll_core_each(99, 100, |_| true);
1400        assert_eq!(count, 0);
1401
1402        runtime.shutdown().unwrap();
1403    }
1404
1405    #[test]
1406    fn test_tpc_config_auto() {
1407        let config = TpcConfig::auto();
1408
1409        // Auto config should have valid values
1410        assert!(config.num_cores >= 1);
1411        assert!(config.inbox_capacity > 0);
1412        assert!(config.outbox_capacity > 0);
1413
1414        // On multi-core systems, cpu_pinning should be enabled
1415        if config.num_cores > 1 {
1416            assert!(config.cpu_pinning);
1417        }
1418
1419        // Validation should pass
1420        assert!(config.validate().is_ok());
1421    }
1422
1423    #[test]
1424    fn test_tpc_config_auto_creates_runtime() {
1425        // Auto config should be usable to create a runtime
1426        // (but with cpu_pinning disabled to avoid permission issues in tests)
1427        let mut config = TpcConfig::auto();
1428        config.cpu_pinning = false; // Disable for test environments
1429        config.num_cores = config.num_cores.min(2); // Limit cores for faster test
1430
1431        let runtime = ThreadPerCoreRuntime::new(config).unwrap();
1432        assert!(runtime.is_running());
1433
1434        runtime.shutdown().unwrap();
1435    }
1436}