Skip to main content

laminar_core/tpc/
mod.rs

1//! # Thread-Per-Core (TPC) Module
2//!
3//! Implements thread-per-core architecture for linear scaling on multi-core systems.
4//!
5//! ## Components
6//!
7//! - [`SpscQueue`] - Lock-free single-producer single-consumer queue
8//! - [`CoreHandle`] - Manages a single core's reactor thread
9//! - [`CreditGate`] - Credit-based backpressure
10//! - [`TpcConfig`] / [`OutputBuffer`] - Runtime configuration and output collection
11
12mod backpressure;
13mod core_handle;
14mod partitioned_router;
15mod router;
16mod runtime;
17mod spsc;
18#[cfg(test)]
19mod zero_alloc_tests;
20
21pub use backpressure::{
22    BackpressureConfig, BackpressureConfigBuilder, CreditAcquireResult, CreditGate, CreditMetrics,
23    CreditMetricsSnapshot, OverflowStrategy,
24};
25pub use core_handle::{CoreConfig, CoreHandle, CoreMessage, TaggedOutput};
26pub use partitioned_router::PartitionedRouter;
27pub use router::{KeySpec, RouterError};
28pub use runtime::{OutputBuffer, TpcConfig, TpcConfigBuilder};
29pub use spsc::{CachePadded, SpscQueue};
30
31/// Errors that can occur in the TPC runtime.
32#[derive(Debug, thiserror::Error)]
33pub enum TpcError {
34    /// Failed to spawn a core thread
35    #[error("Failed to spawn core {core_id}: {message}")]
36    SpawnFailed {
37        /// The core ID that failed to spawn
38        core_id: usize,
39        /// Error message
40        message: String,
41    },
42
43    /// Failed to set CPU affinity
44    #[error("Failed to set CPU affinity for core {core_id}: {message}")]
45    AffinityFailed {
46        /// The core ID
47        core_id: usize,
48        /// Error message
49        message: String,
50    },
51
52    /// Queue is full, cannot accept more events
53    #[error("Queue full for core {core_id}")]
54    QueueFull {
55        /// The core ID whose queue is full
56        core_id: usize,
57    },
58
59    /// Backpressure active, no credits available
60    #[error("Backpressure active for core {core_id}")]
61    Backpressure {
62        /// The core ID that is backpressured
63        core_id: usize,
64    },
65
66    /// Runtime is not running
67    #[error("Runtime is not running")]
68    NotRunning,
69
70    /// Runtime is already running
71    #[error("Runtime is already running")]
72    AlreadyRunning,
73
74    /// Invalid configuration
75    #[error("Invalid configuration: {0}")]
76    InvalidConfig(String),
77
78    /// Reactor error from a core
79    #[error("Reactor error on core {core_id}: {source}")]
80    ReactorError {
81        /// The core ID
82        core_id: usize,
83        /// The underlying reactor error
84        #[source]
85        source: crate::reactor::ReactorError,
86    },
87
88    /// Key extraction failed
89    #[error("Key extraction failed: {0}")]
90    KeyExtractionFailed(String),
91
92    /// Router error (zero-allocation variant)
93    #[error("Router error: {0}")]
94    RouterError(#[from] RouterError),
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn test_error_display() {
103        let err = TpcError::QueueFull { core_id: 3 };
104        assert_eq!(err.to_string(), "Queue full for core 3");
105    }
106}