Skip to main content

ao_core/
orchestrator_spawn.rs

1//! Orchestrator spawn helper (TS `spawnOrchestrator` equivalent).
2//!
3//! Slice 2 of issue #165 — creates a new orchestrator session in its own
4//! worktree (`orchestrator/<prefix>-orchestrator-N`) and launches the
5//! configured agent with a rendered orchestrator system prompt.
6//!
7//! Kept deliberately plugin-generic: callers pass plugin instances as trait
8//! objects so ao-core doesn't need to know about tmux, worktree,
9//! claude-code, ...
10//!
11//! Parity notes:
12//! - Identity reservation mirrors
13//!   `reserveNextOrchestratorIdentity` in
14//!   `packages/core/src/session-manager.ts` — smallest `N` not used by any
15//!   active or archived session (limit 10k).
16//! - Classification (`is_orchestrator_session`) mirrors
17//!   `isOrchestratorSessionRecord` with the Rust port's simpler session
18//!   shape (no `metadata.role` field yet, so the id pattern is the sole
19//!   signal).
20//! - Env vars follow the ao-ts orchestrator launch: `AO_CALLER_TYPE`,
21//!   `AO_SESSION`, `AO_DATA_DIR`, `AO_PROJECT_ID`, `AO_CONFIG_PATH`,
22//!   `AO_PORT`.
23
24use std::collections::HashSet;
25use std::path::PathBuf;
26use std::time::Duration;
27
28use crate::{
29    config::{AoConfig, ProjectConfig},
30    error::{AoError, Result},
31    orchestrator_prompt::{generate_orchestrator_prompt, OrchestratorPromptConfig},
32    session_manager::SessionManager,
33    traits::{Agent, Runtime, Workspace},
34    types::{now_ms, Session, SessionId, SessionStatus, WorkspaceCreateConfig},
35};
36
37/// Inputs for `spawn_orchestrator`. Borrows everything so callers can
38/// keep ownership of config/project structs across multiple spawns.
39pub struct OrchestratorSpawnConfig<'a> {
40    pub project_id: &'a str,
41    pub project_config: &'a ProjectConfig,
42    pub config: &'a AoConfig,
43    /// Path to the loaded `ao-rs.yaml`, passed through as `AO_CONFIG_PATH`.
44    pub config_path: Option<PathBuf>,
45    /// Dashboard port — rendered into the orchestrator prompt and exported as `AO_PORT`.
46    pub port: u16,
47    pub agent_name: &'a str,
48    pub runtime_name: &'a str,
49    pub repo_path: PathBuf,
50    pub default_branch: String,
51    /// Skip sending the rendered orchestrator prompt after launch.
52    pub no_prompt: bool,
53}
54
55/// Reserve the next unused `<prefix>-orchestrator-N` id for a project.
56///
57/// Scans `existing` (typically `list_for_project` ∪ `list_archived`) for
58/// ids matching the orchestrator pattern and returns the smallest `N`
59/// not already taken. Fails after 10k attempts — mirrors the TS cap.
60pub fn reserve_orchestrator_identity(project_prefix: &str, existing: &[Session]) -> Result<String> {
61    let orch_prefix = format!("{project_prefix}-orchestrator");
62    let search = format!("{orch_prefix}-");
63    let mut used: HashSet<u32> = HashSet::new();
64
65    for s in existing {
66        if let Some(rest) = s.id.0.strip_prefix(&search) {
67            if let Ok(n) = rest.parse::<u32>() {
68                used.insert(n);
69            }
70        }
71    }
72
73    for n in 1..=10_000u32 {
74        if !used.contains(&n) {
75            return Ok(format!("{orch_prefix}-{n}"));
76        }
77    }
78
79    Err(AoError::Runtime(format!(
80        "failed to reserve orchestrator id after 10000 attempts (prefix: {orch_prefix})"
81    )))
82}
83
84/// Classify a session as an orchestrator based on its id.
85///
86/// The Rust session YAML doesn't yet carry a `role` field, so we rely
87/// entirely on the id pattern — matches either a literal `-orchestrator`
88/// suffix or the standard `<prefix>-orchestrator-<digits>` form.
89pub fn is_orchestrator_session(s: &Session) -> bool {
90    let id = &s.id.0;
91    if id.ends_with("-orchestrator") {
92        return true;
93    }
94    if let Some(pos) = id.rfind("-orchestrator-") {
95        let suffix = &id[pos + "-orchestrator-".len()..];
96        return !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit());
97    }
98    false
99}
100
101/// Spawn a new orchestrator session: reserve id, create worktree, launch
102/// runtime, and deliver the rendered orchestrator prompt.
103///
104/// On error after the worktree is created, the worktree is destroyed
105/// (best-effort) so a failed spawn doesn't leave dangling state on disk.
106pub async fn spawn_orchestrator(
107    cfg: OrchestratorSpawnConfig<'_>,
108    sessions: &SessionManager,
109    workspace: &dyn Workspace,
110    agent: &dyn Agent,
111    runtime: &dyn Runtime,
112) -> Result<Session> {
113    let project_prefix = cfg
114        .project_config
115        .session_prefix
116        .as_deref()
117        .unwrap_or(cfg.project_id);
118
119    // Combine active + archived so a reused N can't collide with a historical
120    // session yaml sitting in `.archive/`.
121    let mut pool = sessions.list_for_project(cfg.project_id).await?;
122    pool.extend(
123        sessions
124            .list_archived(cfg.project_id)
125            .await
126            .unwrap_or_default(),
127    );
128    let session_id_str = reserve_orchestrator_identity(project_prefix, &pool)?;
129    let session_id = SessionId(session_id_str.clone());
130    let branch = format!("orchestrator/{session_id_str}");
131
132    let system_prompt = generate_orchestrator_prompt(OrchestratorPromptConfig {
133        config: cfg.config,
134        project_id: cfg.project_id,
135        project: cfg.project_config,
136        dashboard_port: cfg.port,
137    })?;
138
139    // Create the worktree. From this point on any failure must clean it up.
140    let workspace_cfg = WorkspaceCreateConfig {
141        project_id: cfg.project_id.to_string(),
142        session_id: session_id_str.clone(),
143        branch: branch.clone(),
144        repo_path: cfg.repo_path.clone(),
145        default_branch: cfg.default_branch.clone(),
146        symlinks: cfg.project_config.symlinks.clone(),
147        post_create: cfg.project_config.post_create.clone(),
148    };
149    let workspace_path = workspace.create(&workspace_cfg).await?;
150
151    let spawn_result = async {
152        let mut session = Session {
153            id: session_id.clone(),
154            project_id: cfg.project_id.to_string(),
155            status: SessionStatus::Spawning,
156            agent: cfg.agent_name.to_string(),
157            agent_config: cfg.project_config.agent_config.clone(),
158            branch: branch.clone(),
159            task: "orchestrator".to_string(),
160            workspace_path: Some(workspace_path.clone()),
161            runtime_handle: None,
162            runtime: cfg.runtime_name.to_string(),
163            activity: None,
164            created_at: now_ms(),
165            cost: None,
166            issue_id: None,
167            issue_url: None,
168            claimed_pr_number: None,
169            claimed_pr_url: None,
170            initial_prompt_override: Some(system_prompt.clone()),
171            spawned_by: None,
172            last_merge_conflict_dispatched: None,
173            last_review_backlog_fingerprint: None,
174        };
175        sessions.save(&session).await?;
176
177        let launch_command = agent.launch_command(&session);
178        let mut env = agent.environment(&session);
179        env.push(("AO_CALLER_TYPE".into(), "orchestrator".into()));
180        env.push(("AO_SESSION".into(), session_id_str.clone()));
181        env.push(("AO_SESSION_NAME".into(), session_id_str.clone()));
182        env.push((
183            "AO_DATA_DIR".into(),
184            sessions.base_dir().to_string_lossy().into_owned(),
185        ));
186        env.push(("AO_PROJECT_ID".into(), cfg.project_id.to_string()));
187        if let Some(cp) = cfg.config_path.as_ref() {
188            env.push(("AO_CONFIG_PATH".into(), cp.to_string_lossy().into_owned()));
189        }
190        env.push(("AO_PORT".into(), cfg.port.to_string()));
191
192        let handle = runtime
193            .create(&session_id_str, &workspace_path, &launch_command, &env)
194            .await?;
195        session.runtime_handle = Some(handle.clone());
196        session.status = SessionStatus::Working;
197        sessions.save(&session).await?;
198
199        if !cfg.no_prompt {
200            tokio::time::sleep(Duration::from_millis(2500)).await;
201            runtime.send_message(&handle, &system_prompt).await?;
202        }
203
204        Ok::<Session, AoError>(session)
205    }
206    .await;
207
208    match spawn_result {
209        Ok(s) => Ok(s),
210        Err(e) => {
211            // Best-effort cleanup; swallow any secondary error so the user
212            // sees the original failure cause.
213            let _ = workspace.destroy(&workspace_path).await;
214            let _ = sessions.delete(cfg.project_id, &session_id).await;
215            Err(e)
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::types::{now_ms, SessionId, SessionStatus};
224
225    fn session_with_id(id: &str, project: &str) -> Session {
226        Session {
227            id: SessionId(id.into()),
228            project_id: project.into(),
229            status: SessionStatus::Working,
230            agent: "claude-code".into(),
231            agent_config: None,
232            branch: "orchestrator/x".into(),
233            task: "orchestrator".into(),
234            workspace_path: None,
235            runtime_handle: None,
236            runtime: "tmux".into(),
237            activity: None,
238            created_at: now_ms(),
239            cost: None,
240            issue_id: None,
241            issue_url: None,
242            claimed_pr_number: None,
243            claimed_pr_url: None,
244            initial_prompt_override: None,
245            spawned_by: None,
246            last_merge_conflict_dispatched: None,
247            last_review_backlog_fingerprint: None,
248        }
249    }
250
251    #[test]
252    fn reserve_starts_at_one_when_no_existing() {
253        let got = reserve_orchestrator_identity("app", &[]).unwrap();
254        assert_eq!(got, "app-orchestrator-1");
255    }
256
257    #[test]
258    fn reserve_skips_used_numbers_in_order() {
259        let existing = vec![
260            session_with_id("app-orchestrator-1", "app"),
261            session_with_id("app-orchestrator-3", "app"),
262        ];
263        let got = reserve_orchestrator_identity("app", &existing).unwrap();
264        assert_eq!(got, "app-orchestrator-2");
265
266        let existing_full = vec![
267            session_with_id("app-orchestrator-1", "app"),
268            session_with_id("app-orchestrator-2", "app"),
269        ];
270        let got = reserve_orchestrator_identity("app", &existing_full).unwrap();
271        assert_eq!(got, "app-orchestrator-3");
272    }
273
274    #[test]
275    fn reserve_ignores_other_projects_and_worker_ids() {
276        let existing = vec![
277            session_with_id("app-1", "app"), // worker with uuid-style id
278            session_with_id("other-orchestrator-1", "other"), // other project
279            session_with_id("app-orchestrator-abc", "app"), // non-numeric
280        ];
281        let got = reserve_orchestrator_identity("app", &existing).unwrap();
282        assert_eq!(got, "app-orchestrator-1");
283    }
284
285    #[test]
286    fn is_orchestrator_session_matches_numbered_pattern() {
287        assert!(is_orchestrator_session(&session_with_id(
288            "app-orchestrator-1",
289            "app"
290        )));
291        assert!(is_orchestrator_session(&session_with_id(
292            "my-project-orchestrator-42",
293            "my-project"
294        )));
295    }
296
297    #[test]
298    fn is_orchestrator_session_matches_suffix_only() {
299        assert!(is_orchestrator_session(&session_with_id(
300            "app-orchestrator",
301            "app"
302        )));
303    }
304
305    #[test]
306    fn is_orchestrator_session_rejects_workers_and_unrelated() {
307        assert!(!is_orchestrator_session(&session_with_id("app-1", "app")));
308        assert!(!is_orchestrator_session(&session_with_id(
309            "deadbeef-aaaa-bbbb",
310            "app"
311        )));
312        assert!(!is_orchestrator_session(&session_with_id(
313            "app-orchestrator-abc",
314            "app"
315        )));
316        assert!(!is_orchestrator_session(&session_with_id(
317            "app-orchestrator-",
318            "app"
319        )));
320    }
321}