Skip to main content

jamjet_worker/
pool.rs

1//! Worker pool — manages a fleet of concurrent worker Tokio tasks.
2//!
3//! `WorkerPool` spawns N worker tasks, each with its own queue affinity.
4//! Workers share the same `StateBackend` (via `Arc`) and executor registry.
5//!
6//! Queue isolation is achieved by assigning specific `queue_types` to each
7//! worker. For example, model workers only claim from the `["model"]` queue,
8//! while general workers handle `["general", "tool"]`.
9
10use crate::executor::NodeExecutor;
11use crate::worker::Worker;
12use jamjet_state::backend::StateBackend;
13use std::sync::Arc;
14use tokio::task::JoinHandle;
15use tracing::info;
16
17/// Configuration for a worker pool group (workers sharing the same queues).
18#[derive(Clone)]
19pub struct WorkerGroupConfig {
20    /// Prefix for worker IDs (e.g. "model-worker"). IDs become `{prefix}-{n}`.
21    pub id_prefix: String,
22    /// Number of concurrent workers in this group.
23    pub concurrency: usize,
24    /// Queue types this group handles.
25    pub queue_types: Vec<String>,
26}
27
28impl WorkerGroupConfig {
29    pub fn new(id_prefix: impl Into<String>, concurrency: usize, queue_types: Vec<String>) -> Self {
30        Self {
31            id_prefix: id_prefix.into(),
32            concurrency,
33            queue_types,
34        }
35    }
36}
37
38/// A pool of concurrent worker tasks.
39///
40/// Call `spawn()` to start all workers. The returned handles can be awaited
41/// or aborted by the caller (e.g. on shutdown signal).
42pub struct WorkerPool {
43    backend: Arc<dyn StateBackend>,
44    groups: Vec<WorkerGroupConfig>,
45    executors: Vec<(String, Arc<dyn NodeExecutor>)>,
46}
47
48impl WorkerPool {
49    pub fn new(backend: Arc<dyn StateBackend>) -> Self {
50        Self {
51            backend,
52            groups: Vec::new(),
53            executors: Vec::new(),
54        }
55    }
56
57    /// Add a worker group (workers with a specific queue affinity).
58    pub fn with_group(mut self, group: WorkerGroupConfig) -> Self {
59        self.groups.push(group);
60        self
61    }
62
63    /// Register an executor for a node kind tag.
64    pub fn with_executor(
65        mut self,
66        kind: impl Into<String>,
67        executor: Arc<dyn NodeExecutor>,
68    ) -> Self {
69        self.executors.push((kind.into(), executor));
70        self
71    }
72
73    /// Spawn all worker tasks and return their join handles.
74    ///
75    /// Workers run indefinitely until their `JoinHandle` is aborted.
76    pub fn spawn(self) -> Vec<JoinHandle<()>> {
77        let executors: Vec<(String, Arc<dyn NodeExecutor>)> = self.executors;
78        let mut handles = Vec::new();
79
80        for group in &self.groups {
81            for i in 0..group.concurrency {
82                let worker_id = format!("{}-{}", group.id_prefix, i);
83                let queue_types = group.queue_types.clone();
84                let backend = Arc::clone(&self.backend);
85
86                let mut worker = Worker::new(worker_id.clone(), backend, queue_types);
87                for (kind, executor) in &executors {
88                    worker = worker.register_executor(kind.clone(), Arc::clone(executor));
89                }
90
91                info!(
92                    worker_id = %worker_id,
93                    queues = ?group.queue_types,
94                    "Spawning worker"
95                );
96
97                let handle = tokio::spawn(async move {
98                    worker.run().await;
99                });
100                handles.push(handle);
101            }
102        }
103
104        handles
105    }
106}
107
108/// Convenience: build a default pool with standard queue groups.
109///
110/// - 2 general workers (`["general", "tool", "condition"]`)
111/// - 2 model workers (`["model"]`)
112/// - 1 privileged worker (`["privileged"]`)
113pub fn default_pool(backend: Arc<dyn StateBackend>) -> WorkerPool {
114    WorkerPool::new(backend)
115        .with_group(WorkerGroupConfig::new(
116            "general-worker",
117            2,
118            vec![
119                "general".into(),
120                "tool".into(),
121                "condition".into(),
122                "mcp_tool".into(),
123            ],
124        ))
125        .with_group(WorkerGroupConfig::new(
126            "model-worker",
127            2,
128            vec!["model".into()],
129        ))
130        .with_group(WorkerGroupConfig::new(
131            "privileged-worker",
132            1,
133            vec!["privileged".into()],
134        ))
135}