Skip to main content

fynd_core/worker_pool/
pool.rs

1//! Worker pool for processing solve tasks.
2//!
3//! The worker pool manages multiple dedicated OS threads for CPU-bound route finding.
4//! Each pool owns multiple SolverWorker instances that compete for tasks from the queue.
5//! A pool is configured with a specific algorithm (by name), allowing multiple pools
6//! with different algorithms to compete via the WorkerPoolRouter.
7//!
8//! Pools can use either a built-in algorithm (by name via [`WorkerPoolBuilder::algorithm`])
9//! or a custom [`Algorithm`](crate::algorithm::Algorithm) implementation (via
10//! [`WorkerPoolBuilder::with_algorithm`]).
11use std::thread::JoinHandle;
12
13use tokio::sync::broadcast;
14use tracing::{error, info};
15
16use crate::{
17    algorithm::AlgorithmConfig,
18    derived::{events::DerivedDataEvent, SharedDerivedDataRef},
19    feed::{
20        events::{MarketEvent, MarketEventHandler},
21        market_data::SharedMarketDataRef,
22    },
23    graph::EdgeWeightUpdaterWithDerived,
24    types::internal::SolveTask,
25    worker_pool::{
26        registry::{
27            spawn_workers_generic, AlgorithmSpawner, SpawnWorkersParams, UnknownAlgorithmError,
28            DEFAULT_ALGORITHM,
29        },
30        task_queue::{TaskQueue, TaskQueueConfig, TaskQueueHandle},
31    },
32};
33
34/// Configuration for the worker pool.
35#[derive(Debug)]
36pub struct WorkerPoolConfig {
37    /// Human-readable name for this pool (used in logging/metrics).
38    /// Can differ from algorithm to distinguish pools with same algorithm but different configs.
39    name: String,
40    /// How to spawn workers — either a built-in registry lookup or a custom factory.
41    spawner: AlgorithmSpawner,
42    /// Number of worker threads.
43    num_workers: usize,
44    /// Configuration for the algorithm used by each worker.
45    algorithm_config: AlgorithmConfig,
46    /// Task queue capacity (maximum number of pending tasks).
47    task_queue_capacity: usize,
48}
49
50impl WorkerPoolConfig {
51    /// Returns the algorithm name for this pool.
52    pub fn algorithm_name(&self) -> &str {
53        self.spawner.algorithm_name()
54    }
55}
56
57impl Default for WorkerPoolConfig {
58    fn default() -> Self {
59        Self {
60            name: DEFAULT_ALGORITHM.to_string(),
61            spawner: AlgorithmSpawner::Registry { algorithm: DEFAULT_ALGORITHM.to_string() },
62            num_workers: num_cpus::get(),
63            algorithm_config: AlgorithmConfig::default(),
64            task_queue_capacity: 1000,
65        }
66    }
67}
68
69/// A pool of worker threads for processing solve tasks.
70///
71/// Each pool is dedicated to a specific algorithm. Workers in the pool
72/// compete for tasks from the shared queue.
73pub struct WorkerPool {
74    /// Human-readable name for this pool.
75    name: String,
76    /// Algorithm name for this pool.
77    algorithm: String,
78    /// Handles to worker threads.
79    workers: Vec<JoinHandle<()>>,
80    /// Shutdown signal sender.
81    shutdown_tx: broadcast::Sender<()>,
82}
83
84impl WorkerPool {
85    /// Spawns a new worker pool.
86    ///
87    /// # Arguments
88    ///
89    /// * `config` - Worker pool configuration
90    /// * `task_rx` - Receiver for tasks from the queue
91    /// * `market_data` - Shared market data reference
92    /// * `derived_data` - Shared derived data reference (pool depths, token prices)
93    /// * `event_rx` - Broadcast receiver for market events (workers subscribe to this)
94    /// * `derived_event_rx` - Broadcast receiver for derived data events (resubscribed per worker)
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the algorithm name in config is not registered.
99    pub fn spawn(
100        config: WorkerPoolConfig,
101        task_rx: async_channel::Receiver<SolveTask>,
102        market_data: SharedMarketDataRef,
103        derived_data: SharedDerivedDataRef,
104        event_rx: broadcast::Receiver<MarketEvent>,
105        derived_event_rx: broadcast::Receiver<DerivedDataEvent>,
106    ) -> Result<Self, UnknownAlgorithmError> {
107        let (shutdown_tx, _) = broadcast::channel(1);
108        let name = config.name.clone();
109        let algorithm = config
110            .spawner
111            .algorithm_name()
112            .to_string();
113
114        // Spawn workers
115        let params = SpawnWorkersParams {
116            algorithm: algorithm.clone(),
117            num_workers: config.num_workers,
118            algorithm_config: config.algorithm_config,
119            task_rx,
120            market_data,
121            derived_data,
122            event_rx,
123            derived_event_rx,
124            shutdown_tx: shutdown_tx.clone(),
125        };
126        let workers = config.spawner.spawn(params)?;
127
128        info!(
129            name = %name,
130            algorithm = %algorithm,
131            num_workers = workers.len(),
132            "worker pool spawned"
133        );
134
135        Ok(Self { name, algorithm, workers, shutdown_tx })
136    }
137
138    /// Returns the pool name.
139    pub fn name(&self) -> &str {
140        &self.name
141    }
142
143    /// Returns the algorithm name for this pool.
144    pub fn algorithm(&self) -> &str {
145        &self.algorithm
146    }
147
148    /// Returns the number of workers.
149    pub fn num_workers(&self) -> usize {
150        self.workers.len()
151    }
152
153    /// Shuts down all workers and waits for them to finish.
154    pub fn shutdown(self) {
155        info!(name = %self.name, "shutting down worker pool");
156
157        // Send shutdown signal
158        let _ = self.shutdown_tx.send(());
159
160        // Wait for all workers to finish
161        for (i, handle) in self.workers.into_iter().enumerate() {
162            if let Err(e) = handle.join() {
163                error!(
164                    name = %self.name,
165                    worker_id = i,
166                    "worker thread panicked: {:?}",
167                    e
168                );
169            }
170        }
171
172        info!(name = %self.name, "worker pool shut down");
173    }
174}
175
176/// Builder for WorkerPool with a fluent API.
177///
178/// # Built-in algorithms
179///
180/// Use [`algorithm`](Self::algorithm) to select a built-in algorithm by name (e.g.,
181/// `"most_liquid"`).
182///
183/// # Custom algorithms
184///
185/// Use [`with_algorithm`](Self::with_algorithm) to plug in any type implementing
186/// [`Algorithm`](crate::algorithm::Algorithm) via a factory closure, bypassing the built-in
187/// registry entirely. See the `custom_algorithm` example for a full walkthrough.
188#[must_use = "a builder does nothing until .build() is called"]
189pub struct WorkerPoolBuilder {
190    config: WorkerPoolConfig,
191}
192
193impl WorkerPoolBuilder {
194    pub fn new() -> Self {
195        Self { config: WorkerPoolConfig::default() }
196    }
197
198    /// Sets the pool name.
199    pub fn name(mut self, name: impl Into<String>) -> Self {
200        self.config.name = name.into();
201        self
202    }
203
204    /// Sets the algorithm by name (built-in registry lookup).
205    ///
206    /// Available built-in algorithms: `"most_liquid"`.
207    pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
208        self.config.spawner = AlgorithmSpawner::Registry { algorithm: algorithm.into() };
209        self
210    }
211
212    /// Sets a custom algorithm implementation via a factory closure.
213    ///
214    /// The `factory` is called once per worker thread to create an algorithm instance.
215    /// This bypasses the built-in registry, so any type implementing
216    /// [`Algorithm`](crate::algorithm::Algorithm) can be used.
217    ///
218    /// # Example
219    ///
220    /// ```ignore
221    /// builder.with_algorithm("my_algo", |config| MyAlgorithm::new(config))
222    /// ```
223    pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
224    where
225        A: crate::algorithm::Algorithm + 'static,
226        A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
227        F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
228    {
229        let name = name.into();
230        let spawner =
231            Box::new(move |params: SpawnWorkersParams| spawn_workers_generic(params, &factory));
232        self.config.spawner = AlgorithmSpawner::Custom { algorithm: name, spawner };
233        self
234    }
235
236    /// Sets the algorithm configuration.
237    pub fn algorithm_config(mut self, config: AlgorithmConfig) -> Self {
238        self.config.algorithm_config = config;
239        self
240    }
241
242    /// Sets the number of worker threads.
243    pub fn num_workers(mut self, n: usize) -> Self {
244        self.config.num_workers = n;
245        self
246    }
247
248    /// Sets the task queue capacity.
249    pub fn task_queue_capacity(mut self, capacity: usize) -> Self {
250        self.config.task_queue_capacity = capacity;
251        self
252    }
253
254    /// Builds and spawns the worker pool.
255    ///
256    /// Creates an internal task queue and returns both the worker pool and a handle
257    /// for enqueueing tasks.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the algorithm name is not registered.
262    pub fn build(
263        self,
264        market_data: SharedMarketDataRef,
265        derived_data: SharedDerivedDataRef,
266        event_rx: broadcast::Receiver<MarketEvent>,
267        derived_event_rx: broadcast::Receiver<DerivedDataEvent>,
268    ) -> Result<(WorkerPool, TaskQueueHandle), UnknownAlgorithmError> {
269        // Create task queue internally
270        let task_queue =
271            TaskQueue::new(TaskQueueConfig { capacity: self.config.task_queue_capacity });
272        let (task_handle, task_rx) = task_queue.split();
273
274        // Spawn worker pool
275        let pool = WorkerPool::spawn(
276            self.config,
277            task_rx,
278            market_data,
279            derived_data,
280            event_rx,
281            derived_event_rx,
282        )?;
283
284        Ok((pool, task_handle))
285    }
286}
287
288impl Default for WorkerPoolBuilder {
289    fn default() -> Self {
290        Self::new()
291    }
292}