Skip to main content

laminar_core/tpc/
mod.rs

1//! # Thread-Per-Core (TPC) Module
2//!
3//! Implements thread-per-core architecture for linear scaling on multi-core systems.
4//!
5//! ## Architecture
6//!
7//! Each CPU core runs a dedicated [`Reactor`](crate::reactor::Reactor) with its own state
8//! partition. Events are routed to cores based on key hash, ensuring state locality.
9//!
10//! ```text
11//! ┌─────────────────────────────────────────────────────────────┐
12//! │                   ThreadPerCoreRuntime                       │
13//! │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
14//! │  │ Core 0   │  │ Core 1   │  │ Core 2   │  │ Core N   │    │
15//! │  │ Reactor  │  │ Reactor  │  │ Reactor  │  │ Reactor  │    │
16//! │  │ State₀   │  │ State₁   │  │ State₂   │  │ StateN   │    │
17//! │  └────▲─────┘  └────▲─────┘  └────▲─────┘  └────▲─────┘    │
18//! │       │             │             │             │          │
19//! │       └─────────────┴──────┬──────┴─────────────┘          │
20//! │                            │                                │
21//! │                    ┌───────┴───────┐                        │
22//! │                    │  KeyRouter    │                        │
23//! │                    │ hash(key) % N │                        │
24//! │                    └───────┬───────┘                        │
25//! │                            │                                │
26//! └────────────────────────────┼────────────────────────────────┘
27//!                              │
28//!                        Input Events
29//! ```
30//!
31//! ## Components
32//!
33//! - [`SpscQueue`] - Lock-free single-producer single-consumer queue
34//! - [`KeyRouter`] - Routes events to cores based on key hash
35//! - [`CoreHandle`] - Manages a single core's reactor thread
36//! - [`ThreadPerCoreRuntime`] - Orchestrates multi-core processing
37//!
38//! ## Example
39//!
40//! ```rust,ignore
41//! use laminar_core::tpc::{TpcConfig, ThreadPerCoreRuntime, KeySpec};
42//!
43//! // Configure for 4 cores, routing by "user_id" column
44//! let config = TpcConfig::builder()
45//!     .num_cores(4)
46//!     .key_columns(vec!["user_id"])
47//!     .build();
48//!
49//! let mut runtime = ThreadPerCoreRuntime::new(config)?;
50//!
51//! // Submit events - automatically routed by key
52//! runtime.submit(event)?;
53//!
54//! // Poll all cores for outputs
55//! let outputs = runtime.poll();
56//! ```
57//!
58//! ## Performance Targets
59//!
60//! | Metric | Target |
61//! |--------|--------|
62//! | SPSC push/pop | < 50ns |
63//! | Scaling efficiency | > 80% |
64//! | Inter-core latency | < 1μs |
65
66mod backpressure;
67mod core_handle;
68mod router;
69mod runtime;
70mod spsc;
71#[cfg(test)]
72mod zero_alloc_tests;
73
74pub use backpressure::{
75    BackpressureConfig, BackpressureConfigBuilder, CreditAcquireResult, CreditChannel, CreditGate,
76    CreditMetrics, CreditMetricsSnapshot, CreditReceiver, CreditSender, OverflowStrategy,
77};
78pub use core_handle::{CoreConfig, CoreHandle, CoreMessage};
79pub use router::{KeyRouter, KeySpec, RouterError};
80pub use runtime::{OutputBuffer, ThreadPerCoreRuntime, TpcConfig, TpcConfigBuilder};
81pub use spsc::{CachePadded, SpscQueue};
82
83/// Errors that can occur in the TPC runtime.
84#[derive(Debug, thiserror::Error)]
85pub enum TpcError {
86    /// Failed to spawn a core thread
87    #[error("Failed to spawn core {core_id}: {message}")]
88    SpawnFailed {
89        /// The core ID that failed to spawn
90        core_id: usize,
91        /// Error message
92        message: String,
93    },
94
95    /// Failed to set CPU affinity
96    #[error("Failed to set CPU affinity for core {core_id}: {message}")]
97    AffinityFailed {
98        /// The core ID
99        core_id: usize,
100        /// Error message
101        message: String,
102    },
103
104    /// Queue is full, cannot accept more events
105    #[error("Queue full for core {core_id}")]
106    QueueFull {
107        /// The core ID whose queue is full
108        core_id: usize,
109    },
110
111    /// Backpressure active, no credits available
112    #[error("Backpressure active for core {core_id}")]
113    Backpressure {
114        /// The core ID that is backpressured
115        core_id: usize,
116    },
117
118    /// Runtime is not running
119    #[error("Runtime is not running")]
120    NotRunning,
121
122    /// Runtime is already running
123    #[error("Runtime is already running")]
124    AlreadyRunning,
125
126    /// Invalid configuration
127    #[error("Invalid configuration: {0}")]
128    InvalidConfig(String),
129
130    /// Reactor error from a core
131    #[error("Reactor error on core {core_id}: {source}")]
132    ReactorError {
133        /// The core ID
134        core_id: usize,
135        /// The underlying reactor error
136        #[source]
137        source: crate::reactor::ReactorError,
138    },
139
140    /// Key extraction failed
141    #[error("Key extraction failed: {0}")]
142    KeyExtractionFailed(String),
143
144    /// Router error (zero-allocation variant)
145    #[error("Router error: {0}")]
146    RouterError(#[from] RouterError),
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn test_error_display() {
155        let err = TpcError::QueueFull { core_id: 3 };
156        assert_eq!(err.to_string(), "Queue full for core 3");
157    }
158}