fynd_core/worker_pool/pool.rs
1//! Worker pool for processing solve tasks.
2//!
3//! The worker pool manages multiple dedicated OS threads for CPU-bound route finding.
4//! Each pool owns multiple SolverWorker instances that compete for tasks from the queue.
5//! A pool is configured with a specific algorithm (by name), allowing multiple pools
6//! with different algorithms to compete via the WorkerPoolRouter.
7//!
8//! Pools can use either a built-in algorithm (by name via [`WorkerPoolBuilder::algorithm`])
9//! or a custom [`Algorithm`](crate::algorithm::Algorithm) implementation (via
10//! [`WorkerPoolBuilder::with_algorithm`]).
11use std::thread::JoinHandle;
12
13use tokio::sync::broadcast;
14use tracing::{error, info};
15
16use crate::{
17 algorithm::AlgorithmConfig,
18 derived::{events::DerivedDataEvent, SharedDerivedDataRef},
19 feed::{
20 events::{MarketEvent, MarketEventHandler},
21 market_data::MarketData,
22 },
23 graph::EdgeWeightUpdaterWithDerived,
24 types::internal::SolveTask,
25 worker_pool::{
26 registry::{
27 spawn_workers_generic, AlgorithmSpawner, SpawnWorkersParams, UnknownAlgorithmError,
28 DEFAULT_ALGORITHM,
29 },
30 task_queue::{TaskQueue, TaskQueueConfig, TaskQueueHandle},
31 },
32};
33
34/// Configuration for the worker pool.
35#[derive(Debug)]
36pub struct WorkerPoolConfig {
37 /// Human-readable name for this pool (used in logging/metrics).
38 /// Can differ from algorithm to distinguish pools with same algorithm but different configs.
39 name: String,
40 /// How to spawn workers — either a built-in registry lookup or a custom factory.
41 spawner: AlgorithmSpawner,
42 /// Number of worker threads.
43 num_workers: usize,
44 /// Configuration for the algorithm used by each worker.
45 algorithm_config: AlgorithmConfig,
46 /// Task queue capacity (maximum number of pending tasks).
47 task_queue_capacity: usize,
48}
49
50impl WorkerPoolConfig {
51 /// Returns the algorithm name for this pool.
52 pub fn algorithm_name(&self) -> &str {
53 self.spawner.algorithm_name()
54 }
55}
56
57impl Default for WorkerPoolConfig {
58 fn default() -> Self {
59 Self {
60 name: DEFAULT_ALGORITHM.to_string(),
61 spawner: AlgorithmSpawner::Registry { algorithm: DEFAULT_ALGORITHM.to_string() },
62 num_workers: num_cpus::get(),
63 algorithm_config: AlgorithmConfig::default(),
64 task_queue_capacity: 1000,
65 }
66 }
67}
68
69/// A pool of worker threads for processing solve tasks.
70///
71/// Each pool is dedicated to a specific algorithm. Workers in the pool
72/// compete for tasks from the shared queue.
73pub struct WorkerPool {
74 /// Human-readable name for this pool.
75 name: String,
76 /// Algorithm name for this pool.
77 algorithm: String,
78 /// Handles to worker threads.
79 workers: Vec<JoinHandle<()>>,
80 /// Shutdown signal sender.
81 shutdown_tx: broadcast::Sender<()>,
82}
83
84impl WorkerPool {
85 /// Spawns a new worker pool.
86 ///
87 /// # Arguments
88 ///
89 /// * `config` - Worker pool configuration
90 /// * `task_rx` - Receiver for tasks from the queue
91 /// * `market_data` - Shared market data reference
92 /// * `derived_data` - Shared derived data reference (pool depths, token prices)
93 /// * `event_rx` - Broadcast receiver for market events (workers subscribe to this)
94 /// * `derived_event_rx` - Broadcast receiver for derived data events (resubscribed per worker)
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if the algorithm name in config is not registered.
99 pub fn spawn(
100 config: WorkerPoolConfig,
101 task_rx: async_channel::Receiver<SolveTask>,
102 market_data: MarketData,
103 derived_data: SharedDerivedDataRef,
104 event_rx: broadcast::Receiver<MarketEvent>,
105 derived_event_rx: broadcast::Receiver<DerivedDataEvent>,
106 ) -> Result<Self, UnknownAlgorithmError> {
107 let (shutdown_tx, _) = broadcast::channel(1);
108 let name = config.name.clone();
109 let algorithm = config
110 .spawner
111 .algorithm_name()
112 .to_string();
113
114 // Spawn workers
115 let params = SpawnWorkersParams {
116 algorithm: algorithm.clone(),
117 num_workers: config.num_workers,
118 algorithm_config: config.algorithm_config,
119 task_rx,
120 market_data,
121 derived_data,
122 event_rx,
123 derived_event_rx,
124 shutdown_tx: shutdown_tx.clone(),
125 };
126 let workers = config.spawner.spawn(params)?;
127
128 info!(
129 name = %name,
130 algorithm = %algorithm,
131 num_workers = workers.len(),
132 "worker pool spawned"
133 );
134
135 Ok(Self { name, algorithm, workers, shutdown_tx })
136 }
137
138 /// Returns the pool name.
139 pub fn name(&self) -> &str {
140 &self.name
141 }
142
143 /// Returns the algorithm name for this pool.
144 pub fn algorithm(&self) -> &str {
145 &self.algorithm
146 }
147
148 /// Returns the number of workers.
149 pub fn num_workers(&self) -> usize {
150 self.workers.len()
151 }
152
153 /// Shuts down all workers and waits for them to finish.
154 pub fn shutdown(self) {
155 info!(name = %self.name, "shutting down worker pool");
156
157 // Send shutdown signal
158 let _ = self.shutdown_tx.send(());
159
160 // Wait for all workers to finish
161 for (i, handle) in self.workers.into_iter().enumerate() {
162 if let Err(e) = handle.join() {
163 error!(
164 name = %self.name,
165 worker_id = i,
166 "worker thread panicked: {:?}",
167 e
168 );
169 }
170 }
171
172 info!(name = %self.name, "worker pool shut down");
173 }
174}
175
176/// Builder for WorkerPool with a fluent API.
177///
178/// # Built-in algorithms
179///
180/// Use [`algorithm`](Self::algorithm) to select a built-in algorithm by name (e.g.,
181/// `"most_liquid"`).
182///
183/// # Custom algorithms
184///
185/// Use [`with_algorithm`](Self::with_algorithm) to plug in any type implementing
186/// [`Algorithm`](crate::algorithm::Algorithm) via a factory closure, bypassing the built-in
187/// registry entirely. See the `custom_algorithm` example for a full walkthrough.
188#[must_use = "a builder does nothing until .build() is called"]
189pub struct WorkerPoolBuilder {
190 config: WorkerPoolConfig,
191}
192
193impl WorkerPoolBuilder {
194 /// Create a builder with default configuration values.
195 pub fn new() -> Self {
196 Self { config: WorkerPoolConfig::default() }
197 }
198
199 /// Sets the pool name.
200 pub fn name(mut self, name: impl Into<String>) -> Self {
201 self.config.name = name.into();
202 self
203 }
204
205 /// Sets the algorithm by name (built-in registry lookup).
206 ///
207 /// Available built-in algorithms: `"most_liquid"`.
208 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
209 self.config.spawner = AlgorithmSpawner::Registry { algorithm: algorithm.into() };
210 self
211 }
212
213 /// Sets a custom algorithm implementation via a factory closure.
214 ///
215 /// The `factory` is called once per worker thread to create an algorithm instance.
216 /// This bypasses the built-in registry, so any type implementing
217 /// [`Algorithm`](crate::algorithm::Algorithm) can be used.
218 ///
219 /// # Example
220 ///
221 /// ```ignore
222 /// builder.with_algorithm("my_algo", |config| MyAlgorithm::new(config))
223 /// ```
224 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
225 where
226 A: crate::algorithm::Algorithm + 'static,
227 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
228 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
229 {
230 let name = name.into();
231 let spawner =
232 Box::new(move |params: SpawnWorkersParams| spawn_workers_generic(params, &factory));
233 self.config.spawner = AlgorithmSpawner::Custom { algorithm: name, spawner };
234 self
235 }
236
237 /// Sets the algorithm configuration.
238 pub fn algorithm_config(mut self, config: AlgorithmConfig) -> Self {
239 self.config.algorithm_config = config;
240 self
241 }
242
243 /// Sets the number of worker threads.
244 pub fn num_workers(mut self, n: usize) -> Self {
245 self.config.num_workers = n;
246 self
247 }
248
249 /// Sets the task queue capacity.
250 pub fn task_queue_capacity(mut self, capacity: usize) -> Self {
251 self.config.task_queue_capacity = capacity;
252 self
253 }
254
255 /// Builds and spawns the worker pool.
256 ///
257 /// Creates an internal task queue and returns both the worker pool and a handle
258 /// for enqueueing tasks.
259 ///
260 /// # Errors
261 ///
262 /// Returns an error if the algorithm name is not registered.
263 pub fn build(
264 self,
265 market_data: MarketData,
266 derived_data: SharedDerivedDataRef,
267 event_rx: broadcast::Receiver<MarketEvent>,
268 derived_event_rx: broadcast::Receiver<DerivedDataEvent>,
269 ) -> Result<(WorkerPool, TaskQueueHandle), UnknownAlgorithmError> {
270 // Create task queue internally
271 let task_queue =
272 TaskQueue::new(TaskQueueConfig { capacity: self.config.task_queue_capacity });
273 let (task_handle, task_rx) = task_queue.split();
274
275 // Spawn worker pool
276 let pool = WorkerPool::spawn(
277 self.config,
278 task_rx,
279 market_data,
280 derived_data,
281 event_rx,
282 derived_event_rx,
283 )?;
284
285 Ok((pool, task_handle))
286 }
287}
288
289impl Default for WorkerPoolBuilder {
290 fn default() -> Self {
291 Self::new()
292 }
293}