1use crate::executor::NodeExecutor;
11use crate::worker::Worker;
12use jamjet_state::backend::StateBackend;
13use std::sync::Arc;
14use tokio::task::JoinHandle;
15use tracing::info;
16
17#[derive(Clone)]
19pub struct WorkerGroupConfig {
20 pub id_prefix: String,
22 pub concurrency: usize,
24 pub queue_types: Vec<String>,
26}
27
28impl WorkerGroupConfig {
29 pub fn new(id_prefix: impl Into<String>, concurrency: usize, queue_types: Vec<String>) -> Self {
30 Self {
31 id_prefix: id_prefix.into(),
32 concurrency,
33 queue_types,
34 }
35 }
36}
37
38pub struct WorkerPool {
43 backend: Arc<dyn StateBackend>,
44 groups: Vec<WorkerGroupConfig>,
45 executors: Vec<(String, Arc<dyn NodeExecutor>)>,
46}
47
48impl WorkerPool {
49 pub fn new(backend: Arc<dyn StateBackend>) -> Self {
50 Self {
51 backend,
52 groups: Vec::new(),
53 executors: Vec::new(),
54 }
55 }
56
57 pub fn with_group(mut self, group: WorkerGroupConfig) -> Self {
59 self.groups.push(group);
60 self
61 }
62
63 pub fn with_executor(
65 mut self,
66 kind: impl Into<String>,
67 executor: Arc<dyn NodeExecutor>,
68 ) -> Self {
69 self.executors.push((kind.into(), executor));
70 self
71 }
72
73 pub fn spawn(self) -> Vec<JoinHandle<()>> {
77 let executors: Vec<(String, Arc<dyn NodeExecutor>)> = self.executors;
78 let mut handles = Vec::new();
79
80 for group in &self.groups {
81 for i in 0..group.concurrency {
82 let worker_id = format!("{}-{}", group.id_prefix, i);
83 let queue_types = group.queue_types.clone();
84 let backend = Arc::clone(&self.backend);
85
86 let mut worker = Worker::new(worker_id.clone(), backend, queue_types);
87 for (kind, executor) in &executors {
88 worker = worker.register_executor(kind.clone(), Arc::clone(executor));
89 }
90
91 info!(
92 worker_id = %worker_id,
93 queues = ?group.queue_types,
94 "Spawning worker"
95 );
96
97 let handle = tokio::spawn(async move {
98 worker.run().await;
99 });
100 handles.push(handle);
101 }
102 }
103
104 handles
105 }
106}
107
108pub fn default_pool(backend: Arc<dyn StateBackend>) -> WorkerPool {
114 WorkerPool::new(backend)
115 .with_group(WorkerGroupConfig::new(
116 "general-worker",
117 2,
118 vec![
119 "general".into(),
120 "tool".into(),
121 "condition".into(),
122 "mcp_tool".into(),
123 ],
124 ))
125 .with_group(WorkerGroupConfig::new(
126 "model-worker",
127 2,
128 vec!["model".into()],
129 ))
130 .with_group(WorkerGroupConfig::new(
131 "privileged-worker",
132 1,
133 vec!["privileged".into()],
134 ))
135}