laminar_core/tpc/mod.rs
1//! # Thread-Per-Core (TPC) Module
2//!
3//! Implements thread-per-core architecture for linear scaling on multi-core systems.
4//!
5//! ## Architecture
6//!
7//! Each CPU core runs a dedicated [`Reactor`](crate::reactor::Reactor) with its own state
8//! partition. Events are routed to cores based on key hash, ensuring state locality.
9//!
10//! ```text
11//! ┌─────────────────────────────────────────────────────────────┐
12//! │ ThreadPerCoreRuntime │
13//! │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
14//! │ │ Core 0 │ │ Core 1 │ │ Core 2 │ │ Core N │ │
15//! │ │ Reactor │ │ Reactor │ │ Reactor │ │ Reactor │ │
16//! │ │ State₀ │ │ State₁ │ │ State₂ │ │ StateN │ │
17//! │ └────▲─────┘ └────▲─────┘ └────▲─────┘ └────▲─────┘ │
18//! │ │ │ │ │ │
19//! │ └─────────────┴──────┬──────┴─────────────┘ │
20//! │ │ │
21//! │ ┌───────┴───────┐ │
22//! │ │ KeyRouter │ │
23//! │ │ hash(key) % N │ │
24//! │ └───────┬───────┘ │
25//! │ │ │
26//! └────────────────────────────┼────────────────────────────────┘
27//! │
28//! Input Events
29//! ```
30//!
31//! ## Components
32//!
33//! - [`SpscQueue`] - Lock-free single-producer single-consumer queue
34//! - [`KeyRouter`] - Routes events to cores based on key hash
35//! - [`CoreHandle`] - Manages a single core's reactor thread
36//! - [`ThreadPerCoreRuntime`] - Orchestrates multi-core processing
37//!
38//! ## Example
39//!
40//! ```rust,ignore
41//! use laminar_core::tpc::{TpcConfig, ThreadPerCoreRuntime, KeySpec};
42//!
43//! // Configure for 4 cores, routing by "user_id" column
44//! let config = TpcConfig::builder()
45//! .num_cores(4)
46//! .key_columns(vec!["user_id"])
47//! .build();
48//!
49//! let mut runtime = ThreadPerCoreRuntime::new(config)?;
50//!
51//! // Submit events - automatically routed by key
52//! runtime.submit(event)?;
53//!
54//! // Poll all cores for outputs
55//! let outputs = runtime.poll();
56//! ```
57//!
58//! ## Performance Targets
59//!
60//! | Metric | Target |
61//! |--------|--------|
62//! | SPSC push/pop | < 50ns |
63//! | Scaling efficiency | > 80% |
64//! | Inter-core latency | < 1μs |
65
66mod backpressure;
67mod core_handle;
68mod router;
69mod runtime;
70mod spsc;
71#[cfg(test)]
72mod zero_alloc_tests;
73
74pub use backpressure::{
75 BackpressureConfig, BackpressureConfigBuilder, CreditAcquireResult, CreditChannel, CreditGate,
76 CreditMetrics, CreditMetricsSnapshot, CreditReceiver, CreditSender, OverflowStrategy,
77};
78pub use core_handle::{CoreConfig, CoreHandle, CoreMessage};
79pub use router::{KeyRouter, KeySpec, RouterError};
80pub use runtime::{OutputBuffer, ThreadPerCoreRuntime, TpcConfig, TpcConfigBuilder};
81pub use spsc::{CachePadded, SpscQueue};
82
83/// Errors that can occur in the TPC runtime.
84#[derive(Debug, thiserror::Error)]
85pub enum TpcError {
86 /// Failed to spawn a core thread
87 #[error("Failed to spawn core {core_id}: {message}")]
88 SpawnFailed {
89 /// The core ID that failed to spawn
90 core_id: usize,
91 /// Error message
92 message: String,
93 },
94
95 /// Failed to set CPU affinity
96 #[error("Failed to set CPU affinity for core {core_id}: {message}")]
97 AffinityFailed {
98 /// The core ID
99 core_id: usize,
100 /// Error message
101 message: String,
102 },
103
104 /// Queue is full, cannot accept more events
105 #[error("Queue full for core {core_id}")]
106 QueueFull {
107 /// The core ID whose queue is full
108 core_id: usize,
109 },
110
111 /// Backpressure active, no credits available
112 #[error("Backpressure active for core {core_id}")]
113 Backpressure {
114 /// The core ID that is backpressured
115 core_id: usize,
116 },
117
118 /// Runtime is not running
119 #[error("Runtime is not running")]
120 NotRunning,
121
122 /// Runtime is already running
123 #[error("Runtime is already running")]
124 AlreadyRunning,
125
126 /// Invalid configuration
127 #[error("Invalid configuration: {0}")]
128 InvalidConfig(String),
129
130 /// Reactor error from a core
131 #[error("Reactor error on core {core_id}: {source}")]
132 ReactorError {
133 /// The core ID
134 core_id: usize,
135 /// The underlying reactor error
136 #[source]
137 source: crate::reactor::ReactorError,
138 },
139
140 /// Key extraction failed
141 #[error("Key extraction failed: {0}")]
142 KeyExtractionFailed(String),
143
144 /// Router error (zero-allocation variant)
145 #[error("Router error: {0}")]
146 RouterError(#[from] RouterError),
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn test_error_display() {
155 let err = TpcError::QueueFull { core_id: 3 };
156 assert_eq!(err.to_string(), "Queue full for core 3");
157 }
158}