1use std::collections::HashSet;
25use std::path::PathBuf;
26use std::time::Duration;
27
28use crate::{
29 config::{AoConfig, ProjectConfig},
30 error::{AoError, Result},
31 orchestrator_prompt::{generate_orchestrator_prompt, OrchestratorPromptConfig},
32 session_manager::SessionManager,
33 traits::{Agent, Runtime, Workspace},
34 types::{now_ms, Session, SessionId, SessionStatus, WorkspaceCreateConfig},
35};
36
37pub struct OrchestratorSpawnConfig<'a> {
40 pub project_id: &'a str,
41 pub project_config: &'a ProjectConfig,
42 pub config: &'a AoConfig,
43 pub config_path: Option<PathBuf>,
45 pub port: u16,
47 pub agent_name: &'a str,
48 pub runtime_name: &'a str,
49 pub repo_path: PathBuf,
50 pub default_branch: String,
51 pub no_prompt: bool,
53}
54
55pub fn reserve_orchestrator_identity(project_prefix: &str, existing: &[Session]) -> Result<String> {
61 let orch_prefix = format!("{project_prefix}-orchestrator");
62 let search = format!("{orch_prefix}-");
63 let mut used: HashSet<u32> = HashSet::new();
64
65 for s in existing {
66 if let Some(rest) = s.id.0.strip_prefix(&search) {
67 if let Ok(n) = rest.parse::<u32>() {
68 used.insert(n);
69 }
70 }
71 }
72
73 for n in 1..=10_000u32 {
74 if !used.contains(&n) {
75 return Ok(format!("{orch_prefix}-{n}"));
76 }
77 }
78
79 Err(AoError::Runtime(format!(
80 "failed to reserve orchestrator id after 10000 attempts (prefix: {orch_prefix})"
81 )))
82}
83
84pub fn is_orchestrator_session(s: &Session) -> bool {
90 let id = &s.id.0;
91 if id.ends_with("-orchestrator") {
92 return true;
93 }
94 if let Some(pos) = id.rfind("-orchestrator-") {
95 let suffix = &id[pos + "-orchestrator-".len()..];
96 return !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit());
97 }
98 false
99}
100
101pub async fn spawn_orchestrator(
107 cfg: OrchestratorSpawnConfig<'_>,
108 sessions: &SessionManager,
109 workspace: &dyn Workspace,
110 agent: &dyn Agent,
111 runtime: &dyn Runtime,
112) -> Result<Session> {
113 let project_prefix = cfg
114 .project_config
115 .session_prefix
116 .as_deref()
117 .unwrap_or(cfg.project_id);
118
119 let mut pool = sessions.list_for_project(cfg.project_id).await?;
122 pool.extend(
123 sessions
124 .list_archived(cfg.project_id)
125 .await
126 .unwrap_or_default(),
127 );
128 let session_id_str = reserve_orchestrator_identity(project_prefix, &pool)?;
129 let session_id = SessionId(session_id_str.clone());
130 let branch = format!("orchestrator/{session_id_str}");
131
132 let system_prompt = generate_orchestrator_prompt(OrchestratorPromptConfig {
133 config: cfg.config,
134 project_id: cfg.project_id,
135 project: cfg.project_config,
136 dashboard_port: cfg.port,
137 })?;
138
139 let workspace_cfg = WorkspaceCreateConfig {
141 project_id: cfg.project_id.to_string(),
142 session_id: session_id_str.clone(),
143 branch: branch.clone(),
144 repo_path: cfg.repo_path.clone(),
145 default_branch: cfg.default_branch.clone(),
146 symlinks: cfg.project_config.symlinks.clone(),
147 post_create: cfg.project_config.post_create.clone(),
148 };
149 let workspace_path = workspace.create(&workspace_cfg).await?;
150
151 let spawn_result = async {
152 let mut session = Session {
153 id: session_id.clone(),
154 project_id: cfg.project_id.to_string(),
155 status: SessionStatus::Spawning,
156 agent: cfg.agent_name.to_string(),
157 agent_config: cfg.project_config.agent_config.clone(),
158 branch: branch.clone(),
159 task: "orchestrator".to_string(),
160 workspace_path: Some(workspace_path.clone()),
161 runtime_handle: None,
162 runtime: cfg.runtime_name.to_string(),
163 activity: None,
164 created_at: now_ms(),
165 cost: None,
166 issue_id: None,
167 issue_url: None,
168 claimed_pr_number: None,
169 claimed_pr_url: None,
170 initial_prompt_override: Some(system_prompt.clone()),
171 spawned_by: None,
172 last_merge_conflict_dispatched: None,
173 last_review_backlog_fingerprint: None,
174 };
175 sessions.save(&session).await?;
176
177 let launch_command = agent.launch_command(&session);
178 let mut env = agent.environment(&session);
179 env.push(("AO_CALLER_TYPE".into(), "orchestrator".into()));
180 env.push(("AO_SESSION".into(), session_id_str.clone()));
181 env.push(("AO_SESSION_NAME".into(), session_id_str.clone()));
182 env.push((
183 "AO_DATA_DIR".into(),
184 sessions.base_dir().to_string_lossy().into_owned(),
185 ));
186 env.push(("AO_PROJECT_ID".into(), cfg.project_id.to_string()));
187 if let Some(cp) = cfg.config_path.as_ref() {
188 env.push(("AO_CONFIG_PATH".into(), cp.to_string_lossy().into_owned()));
189 }
190 env.push(("AO_PORT".into(), cfg.port.to_string()));
191
192 let handle = runtime
193 .create(&session_id_str, &workspace_path, &launch_command, &env)
194 .await?;
195 session.runtime_handle = Some(handle.clone());
196 session.status = SessionStatus::Working;
197 sessions.save(&session).await?;
198
199 if !cfg.no_prompt {
200 tokio::time::sleep(Duration::from_millis(2500)).await;
201 runtime.send_message(&handle, &system_prompt).await?;
202 }
203
204 Ok::<Session, AoError>(session)
205 }
206 .await;
207
208 match spawn_result {
209 Ok(s) => Ok(s),
210 Err(e) => {
211 let _ = workspace.destroy(&workspace_path).await;
214 let _ = sessions.delete(cfg.project_id, &session_id).await;
215 Err(e)
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::types::{now_ms, SessionId, SessionStatus};
224
225 fn session_with_id(id: &str, project: &str) -> Session {
226 Session {
227 id: SessionId(id.into()),
228 project_id: project.into(),
229 status: SessionStatus::Working,
230 agent: "claude-code".into(),
231 agent_config: None,
232 branch: "orchestrator/x".into(),
233 task: "orchestrator".into(),
234 workspace_path: None,
235 runtime_handle: None,
236 runtime: "tmux".into(),
237 activity: None,
238 created_at: now_ms(),
239 cost: None,
240 issue_id: None,
241 issue_url: None,
242 claimed_pr_number: None,
243 claimed_pr_url: None,
244 initial_prompt_override: None,
245 spawned_by: None,
246 last_merge_conflict_dispatched: None,
247 last_review_backlog_fingerprint: None,
248 }
249 }
250
251 #[test]
252 fn reserve_starts_at_one_when_no_existing() {
253 let got = reserve_orchestrator_identity("app", &[]).unwrap();
254 assert_eq!(got, "app-orchestrator-1");
255 }
256
257 #[test]
258 fn reserve_skips_used_numbers_in_order() {
259 let existing = vec![
260 session_with_id("app-orchestrator-1", "app"),
261 session_with_id("app-orchestrator-3", "app"),
262 ];
263 let got = reserve_orchestrator_identity("app", &existing).unwrap();
264 assert_eq!(got, "app-orchestrator-2");
265
266 let existing_full = vec![
267 session_with_id("app-orchestrator-1", "app"),
268 session_with_id("app-orchestrator-2", "app"),
269 ];
270 let got = reserve_orchestrator_identity("app", &existing_full).unwrap();
271 assert_eq!(got, "app-orchestrator-3");
272 }
273
274 #[test]
275 fn reserve_ignores_other_projects_and_worker_ids() {
276 let existing = vec![
277 session_with_id("app-1", "app"), session_with_id("other-orchestrator-1", "other"), session_with_id("app-orchestrator-abc", "app"), ];
281 let got = reserve_orchestrator_identity("app", &existing).unwrap();
282 assert_eq!(got, "app-orchestrator-1");
283 }
284
285 #[test]
286 fn is_orchestrator_session_matches_numbered_pattern() {
287 assert!(is_orchestrator_session(&session_with_id(
288 "app-orchestrator-1",
289 "app"
290 )));
291 assert!(is_orchestrator_session(&session_with_id(
292 "my-project-orchestrator-42",
293 "my-project"
294 )));
295 }
296
297 #[test]
298 fn is_orchestrator_session_matches_suffix_only() {
299 assert!(is_orchestrator_session(&session_with_id(
300 "app-orchestrator",
301 "app"
302 )));
303 }
304
305 #[test]
306 fn is_orchestrator_session_rejects_workers_and_unrelated() {
307 assert!(!is_orchestrator_session(&session_with_id("app-1", "app")));
308 assert!(!is_orchestrator_session(&session_with_id(
309 "deadbeef-aaaa-bbbb",
310 "app"
311 )));
312 assert!(!is_orchestrator_session(&session_with_id(
313 "app-orchestrator-abc",
314 "app"
315 )));
316 assert!(!is_orchestrator_session(&session_with_id(
317 "app-orchestrator-",
318 "app"
319 )));
320 }
321}