Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the unified intent lifecycle (RFC-027).
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol.
4//! Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed (via LLM for complex tasks, or ad-hoc for simple tasks)
7//! 3. Execute the agent via the supervisor
8//! 4. Return the result to the user
9//!
10//! The orchestrator does NOT know about channels or HTTP โ€” it only
11//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
12
13// Legacy helper functions (save_seed, publish_phase_*, format_*) are
14// retained for reference but no longer called after RFC-027 migration.
15#![allow(dead_code)]
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::{Context, Result};
21use chrono;
22use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
23use parking_lot::RwLock;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::agent_lifecycle::AgentLifecycleManager;
28use crate::event_bus::{EventBus, KernelEvent};
29use crate::git_layer::GitLayer;
30use crate::metrics::get_metrics;
31use crate::mount::{MountId, MountManager};
32use crate::project::{ConversationBuffer, ProjectManager};
33use crate::scheduler::Priority;
34use crate::state_store::StateStore;
35use crate::types::AgentId;
36
37/// Role of an agent within a group.
38#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
39pub enum AgentRole {
40    /// Executes a specific subtask.
41    #[default]
42    Worker,
43    /// Coordinates subtasks, synthesizes results.
44    Manager,
45}
46
47/// A subtask within a multi-agent plan.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SubTask {
50    /// Unique subtask ID.
51    pub id: Uuid,
52    /// Human-readable description.
53    pub description: String,
54    /// Capability required (e.g., "code-review", "testing").
55    pub required_capability: Option<String>,
56    /// Result of the subtask (filled after execution).
57    pub result: Option<String>,
58    /// Whether this subtask succeeded.
59    pub success: bool,
60    /// Role of the agent assigned to this subtask.
61    #[serde(default)]
62    pub role: AgentRole,
63}
64
65impl SubTask {
66    /// Create a new subtask with the given description.
67    pub fn new(description: impl Into<String>) -> Self {
68        Self {
69            id: Uuid::new_v4(),
70            description: description.into(),
71            required_capability: None,
72            result: None,
73            success: false,
74            role: AgentRole::default(),
75        }
76    }
77
78    /// Set the required capability for this subtask.
79    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
80        self.required_capability = Some(cap.into());
81        self
82    }
83}
84
85/// The orchestrator coordinates the full Ouroboros lifecycle.
86pub struct Orchestrator {
87    #[allow(dead_code)]
88    ouroboros: Arc<dyn OuroborosProtocol>,
89    /// IntentEngine for the unified handle() path (RFC-027).
90    /// Lazily available when the kernel wires it; None in legacy constructions.
91    intent_engine: RwLock<Option<Arc<dyn oxios_ouroboros::IntentEngineOps>>>,
92    event_bus: EventBus,
93    state_store: Arc<StateStore>,
94    /// Git version control layer for auto-commits.
95    git_layer: Option<Arc<GitLayer>>,
96    /// Active interview sessions, keyed by session ID.
97    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
98    /// Agent lifecycle manager (fork, register, run, cleanup).
99    lifecycle: AgentLifecycleManager,
100    /// A2A protocol for inter-agent task delegation.
101    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
102    /// Project manager for context partitioning.
103    project_manager: RwLock<Option<Arc<ProjectManager>>>,
104    /// Mount manager for path-alias context (RFC-025).
105    mount_manager: RwLock<Option<Arc<MountManager>>>,
106    /// Conversation buffer for topic shift detection.
107    conversation_buffer: RwLock<ConversationBuffer>,
108    /// Orchestrator configuration (Ouroboros protocol settings).
109    delegation_config: DelegationConfig,
110    /// A2A circuit breaker for delegation reliability.
111    a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
112    /// RFC-027 intent config (retry settings, etc).
113    intent_config: RwLock<crate::config::IntentConfig>,
114}
115
116/// Configuration for A2A delegation retries.
117#[derive(Debug, Clone)]
118struct DelegationConfig {
119    /// Maximum retry attempts for A2A delegation.
120    max_retries: u32,
121    /// Base delay for exponential backoff (milliseconds).
122    base_delay_ms: u64,
123    /// Maximum delay cap for exponential backoff (milliseconds).
124    max_delay_ms: u64,
125    /// Timeout per delegation attempt (milliseconds).
126    #[allow(dead_code)]
127    timeout_ms: u64,
128}
129
130impl Default for DelegationConfig {
131    fn default() -> Self {
132        Self {
133            max_retries: 3,
134            base_delay_ms: 100,
135            max_delay_ms: 5000,
136            timeout_ms: 5000,
137        }
138    }
139}
140
141impl DelegationConfig {
142    /// Calculate exponential backoff delay.
143    fn backoff_delay(&self, attempt: u32) -> u64 {
144        let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
145        delay.min(self.max_delay_ms)
146    }
147}
148
149impl Orchestrator {
150    /// Creates a new orchestrator.
151    pub fn new(
152        ouroboros: Arc<dyn OuroborosProtocol>,
153        event_bus: EventBus,
154        state_store: Arc<StateStore>,
155        lifecycle: AgentLifecycleManager,
156    ) -> Self {
157        Self::with_config(
158            ouroboros,
159            event_bus,
160            state_store,
161            lifecycle,
162            crate::config::OrchestratorConfig::default(),
163        )
164    }
165
166    /// Creates a new orchestrator with custom config.
167    pub fn with_config(
168        ouroboros: Arc<dyn OuroborosProtocol>,
169        event_bus: EventBus,
170        state_store: Arc<StateStore>,
171        lifecycle: AgentLifecycleManager,
172        _config: crate::config::OrchestratorConfig,
173    ) -> Self {
174        Self {
175            ouroboros,
176            intent_engine: RwLock::new(None),
177            event_bus,
178            state_store,
179            git_layer: None,
180            sessions: RwLock::new(std::collections::HashMap::new()),
181            lifecycle,
182            a2a: None,
183            project_manager: RwLock::new(None),
184            mount_manager: RwLock::new(None),
185            conversation_buffer: RwLock::new(ConversationBuffer::default()),
186            delegation_config: DelegationConfig::default(),
187            intent_config: RwLock::new(crate::config::IntentConfig::default()),
188            a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
189        }
190    }
191
192    /// Wire the IntentEngine for unified handle() calls (RFC-027).
193    /// Called by the kernel assembler after construction.
194    pub fn set_intent_engine(&self, engine: Arc<dyn oxios_ouroboros::IntentEngineOps>) {
195        *self.intent_engine.write() = Some(engine);
196    }
197
198    /// Whether the IntentEngine is wired (unified path available).
199    pub fn has_intent_engine(&self) -> bool {
200        self.intent_engine.read().is_some()
201    }
202
203    /// Set the ProjectManager for context partitioning.
204    pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
205        *self.project_manager.write() = Some(manager);
206    }
207
208    /// Set the MountManager for path-alias context (RFC-025).
209    pub fn set_mount_manager(&self, manager: Arc<MountManager>) {
210        *self.mount_manager.write() = Some(manager);
211    }
212
213    /// Get a reference to the MountManager, if set (RFC-025).
214    pub fn mount_manager(&self) -> Option<Arc<MountManager>> {
215        self.mount_manager.read().as_ref().cloned()
216    }
217
218    /// Get a reference to the ProjectManager, if set.
219    pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
220        self.project_manager.read().as_ref().cloned()
221    }
222
223    /// Detect a project from a message, returning tag string.
224    pub fn detect_project_tag(&self, message: &str) -> Option<String> {
225        self.project_manager.read().as_ref().and_then(|pm| {
226            let projects = pm.list_projects();
227            let result = crate::project::detect_project(message, &projects);
228            match result {
229                crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
230                crate::project::DetectionResult::NoMatch { .. } => None,
231            }
232        })
233    }
234
235    /// Resolve the active Mounts for a message (RFC-025).
236    ///
237    /// Parses explicit `mount_ids` ("uuid1,uuid2,...", primary first); when
238    /// none are given, auto-detects from the message. Returns:
239    /// - the ordered list of active [`MountId`]s,
240    /// - the rendered `## Workspace Context` body (without the header),
241    /// - all resolved filesystem paths (primary first),
242    /// - a display tag like `[๐Ÿ”ง oxios + oxi-sdk]`.
243    ///
244    /// Honors the sticky-primary model: when `mount_ids` is explicitly
245    /// provided they are used as-is (detection is skipped). Detection only
246    /// runs when `mount_ids` is `None`, seeding the primary slot โ€” it never
247    /// replaces an explicit primary, only appends a secondary.
248    fn resolve_mount_workspace(
249        &self,
250        mount_ids: Option<&str>,
251        project_ids: Option<&str>,
252        user_message: &str,
253    ) -> (
254        Vec<MountId>,
255        Option<String>,
256        Vec<std::path::PathBuf>,
257        String,
258    ) {
259        use crate::mount::Mount;
260
261        let Some(mm) = self.mount_manager() else {
262            return (Vec::new(), None, Vec::new(), String::new());
263        };
264
265        // Parse explicit mount_ids; otherwise auto-detect (seeds the primary slot).
266        let mut ids: Vec<MountId> = if let Some(ids_str) = mount_ids {
267            ids_str
268                .split(',')
269                .filter_map(|s| MountId::parse_str(s.trim()).ok())
270                .collect()
271        } else {
272            match mm.detect(user_message) {
273                crate::mount::DetectionResult::Found(id) => vec![id],
274                crate::mount::DetectionResult::NoMatch { .. } => vec![],
275            }
276        };
277        // De-duplicate while preserving order (handles non-consecutive dups).
278        let mut seen = std::collections::HashSet::new();
279        ids.retain(|id| seen.insert(*id));
280
281        // โ”€โ”€ Project-referenced Mount activation (RFC-025) โ”€โ”€
282        // When a project_id is provided, auto-activate its referenced Mounts
283        // BEFORE we derive mounts/tag/context/paths, so they are fully
284        // visible in the system prompt and the badge โ€” not just granted
285        // path access. (Previously this ran after the prompt was built, so
286        // project-referenced Mounts were invisible in the context body.)
287        let project_for_instructions: Option<crate::project::Project> = if let Some(project_ids_str) =
288            project_ids
289            && let Some(first_id_str) = project_ids_str.split(',').next()
290            && let Some(pm) = self.project_manager()
291            && let Ok(pid) = Uuid::parse_str(first_id_str.trim())
292        {
293            let proj = pm.get_project(pid);
294            if let Some(ref project) = proj {
295                for mid in &project.mount_ids {
296                    if !ids.contains(mid) {
297                        ids.push(*mid);
298                    }
299                }
300            }
301            proj
302        } else {
303            None
304        };
305
306        if ids.is_empty() {
307            return (Vec::new(), None, Vec::new(), String::new());
308        }
309
310        // Touch each active Mount (record activity) โ€” now includes any
311        // Project-referenced Mounts activated above.
312        for id in &ids {
313            mm.touch(*id);
314        }
315
316        let mounts: Vec<Mount> = mm.get_mounts_ordered(&ids);
317        if mounts.is_empty() {
318            return (Vec::new(), None, Vec::new(), String::new());
319        }
320
321        // Collect all paths (primary first, deduped) over the full Mount set.
322        let mut paths: Vec<std::path::PathBuf> = Vec::new();
323        for m in &mounts {
324            for p in &m.paths {
325                if !paths.contains(p) {
326                    paths.push(p.clone());
327                }
328            }
329        }
330
331        // Legacy fallback (RFC-025 migration window): a Project created
332        // before Mounts may carry explicit `paths` but no `mount_ids`. In
333        // that case grant path access directly so pre-RFC-025 Projects still
334        // resolve a CWD and populate `allowed_paths` (see agent_runtime.rs).
335        if let Some(project) = &project_for_instructions
336            && project.mount_ids.is_empty()
337            && !project.paths.is_empty()
338        {
339            for p in &project.paths {
340                if !paths.contains(p) {
341                    paths.push(p.clone());
342                }
343            }
344        }
345
346        // Display tag.
347        let tag = if mounts.len() == 1 {
348            mounts[0].tag()
349        } else {
350            let names: Vec<&str> = mounts.iter().map(|m| m.name.as_str()).collect();
351            format!("[๐Ÿ”ง {}]", names.join(" + "))
352        };
353
354        let mut context = build_workspace_context_body(&mounts).unwrap_or_default();
355
356        // โ”€โ”€ Project instructions (RFC-025) โ”€โ”€
357        // Inject the project's instructions into the context body. The
358        // "### Active Mounts" header above is only present when there are
359        // actual mount entries in `context`; the Project Instructions section
360        // stands on its own when only instructions exist.
361        if let Some(project) = project_for_instructions {
362            // Cap instructions to stay within the prompt budget (~500 tokens).
363            let instructions = if project.instructions.len() > 2000 {
364                let mut end = 2000;
365                while end > 0 && !project.instructions.is_char_boundary(end) {
366                    end -= 1;
367                }
368                format!("{}...", &project.instructions[..end])
369            } else {
370                project.instructions.clone()
371            };
372            if !instructions.is_empty() {
373                context.push_str(&format!(
374                    "\n### Project Instructions: {}\n{}\n",
375                    project.name, instructions
376                ));
377            }
378        }
379
380        // Enforce a hard prompt budget on the final context body (~1500 tokens).
381        const MAX_CONTEXT_CHARS: usize = 6000;
382        if context.len() > MAX_CONTEXT_CHARS {
383            let mut end = MAX_CONTEXT_CHARS;
384            while end > 0 && !context.is_char_boundary(end) {
385                end -= 1;
386            }
387            context.truncate(end);
388            context.push_str("\n...(context truncated)...\n");
389        }
390
391        let context_opt = if context.is_empty() {
392            None
393        } else {
394            Some(context)
395        };
396        (ids, context_opt, paths, tag)
397    }
398
399    /// Set the A2A protocol for inter-agent task delegation.
400    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
401        self.a2a = Some(a2a);
402    }
403
404    /// Set the GitLayer for auto-commits after state saves.
405    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
406        self.git_layer = Some(git_layer);
407    }
408
409    /// Restore sessions from persisted state.
410    ///
411    /// Loads sessions from the `StateStore` that have an `active_seed_id`
412    /// (meaning they are mid-orchestration) and repopulates the in-memory
413    /// interview session map so that follow-up messages can continue
414    /// the conversation.
415    pub async fn restore_sessions(&self) {
416        let summaries = match self.state_store.list_sessions().await {
417            Ok(s) => s,
418            Err(e) => {
419                tracing::warn!(error = %e, "Failed to list sessions for restore");
420                return;
421            }
422        };
423
424        let mut restored = 0usize;
425        for summary in &summaries {
426            // Only restore sessions that are mid-orchestration (have an active seed).
427            let Some(ref seed_id_str) = summary.active_seed_id else {
428                continue;
429            };
430
431            let session_id = crate::state_store::SessionId(summary.id.clone());
432            let session = match self.state_store.load_session(&session_id).await {
433                Ok(Some(s)) => s,
434                Ok(None) => continue,
435                Err(e) => {
436                    tracing::warn!(
437                        session_id = %summary.id,
438                        error = %e,
439                        "Failed to load session for restore"
440                    );
441                    continue;
442                }
443            };
444
445            // Reconstruct an InterviewSession from the persisted data.
446            // The interview result is rebuilt from conversation history so
447            // that multi-turn context is available on follow-up messages.
448            let mut interview = oxios_ouroboros::InterviewResult::new();
449            interview.is_task = true; // Has active seed โ†’ was a task.
450            interview.original_message = session
451                .user_messages
452                .last()
453                .map(|m| m.content.clone())
454                .unwrap_or_default();
455
456            // Rebuild conversation history from user/agent exchanges.
457            // Use an index loop (not zip) so a trailing user message
458            // without a stored agent response (e.g. crash before flush)
459            // is preserved with an empty agent turn instead of dropped.
460            let history: Vec<oxios_ouroboros::interview::Exchange> = session
461                .user_messages
462                .iter()
463                .enumerate()
464                .map(|(i, user)| oxios_ouroboros::interview::Exchange {
465                    user: user.content.clone(),
466                    agent: session
467                        .agent_responses
468                        .get(i)
469                        .map(|a| a.content.clone())
470                        .unwrap_or_default(),
471                })
472                .collect();
473            interview.conversation_history = history;
474
475            let seed_id = seed_id_str.parse::<Uuid>().ok();
476
477            let interview_session = InterviewSession {
478                id: session.id.0.clone(),
479                interview,
480                phase: Phase::Execute,
481                seed_id,
482                agent_id: None,
483            };
484
485            {
486                let mut sessions = self.sessions.write();
487                sessions.insert(session.id.0.clone(), interview_session);
488            }
489
490            restored += 1;
491        }
492
493        if restored > 0 {
494            tracing::info!(restored, total = summaries.len(), "Sessions restored");
495        }
496    }
497
498    /// Commit a file to git if GitLayer is configured and enabled.
499    fn git_commit(&self, rel_path: &str, message: &str) {
500        if let Some(ref gl) = self.git_layer
501            && gl.is_enabled()
502        {
503            let _ = gl.commit_file(rel_path, message);
504        }
505    }
506
507    /// Save a seed to the state store.
508    async fn save_seed(&self, seed: &Seed) -> Result<()> {
509        let key = seed.id.to_string();
510
511        self.state_store
512            .save_json("seeds", &key, seed)
513            .await
514            .context("failed to save seed to state store")?;
515
516        self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
517
518        Ok(())
519    }
520
521    /// Save an evaluation result to the state store.
522    /// Publish a PhaseStarted event.
523    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
524        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
525            session_id: session_id.to_owned(),
526            phase,
527        });
528    }
529
530    /// Publish a PhaseCompleted event.
531    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
532        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
533            session_id: session_id.to_owned(),
534            phase,
535            result_summary: result.to_owned(),
536        });
537    }
538
539    /// Execute multiple subtasks using separate agents in parallel.
540    ///
541    /// When A2A is available, the orchestrator delegates tasks through the
542    /// A2A protocol with circuit breaker and retry support.
543    /// Otherwise, falls back to direct lifecycle execution.
544    ///
545    /// Results are collected as they complete using `JoinSet`.
546    pub async fn delegate_subtasks(
547        &self,
548        subtasks: Vec<SubTask>,
549        parent_seed: &Seed,
550    ) -> Result<Vec<SubTask>> {
551        // Single task โ€” execute directly without group overhead.
552        if subtasks.len() == 1 {
553            return self.execute_single_subtask(subtasks, parent_seed).await;
554        }
555
556        // Try A2A-based delegation when the protocol is available.
557        if let Some(ref a2a) = self.a2a {
558            // Check circuit breaker
559            if !self.a2a_breaker.is_allowed() {
560                tracing::warn!(
561                    state = ?self.a2a_breaker.state(),
562                    "A2A circuit breaker open, using lifecycle fallback"
563                );
564                return self.delegate_via_lifecycle(subtasks, parent_seed).await;
565            }
566
567            // Delegate with retry
568            return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
569        }
570
571        // Fallback: direct lifecycle execution (no A2A).
572        self.delegate_via_lifecycle(subtasks, parent_seed).await
573    }
574
575    /// Delegate subtasks via A2A with circuit breaker and retry support.
576    async fn delegate_with_retry(
577        &self,
578        subtasks: Vec<SubTask>,
579        parent_seed: &Seed,
580        a2a: &Arc<crate::a2a::A2AProtocol>,
581    ) -> Result<Vec<SubTask>> {
582        let mut attempt = 0;
583        let max_retries = self.delegation_config.max_retries;
584
585        loop {
586            match self
587                .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
588                .await
589            {
590                Ok(results) => {
591                    self.a2a_breaker.record_success();
592                    return Ok(results);
593                }
594                Err(e) => {
595                    self.a2a_breaker.record_failure();
596                    attempt += 1;
597
598                    if attempt >= max_retries {
599                        tracing::error!(
600                            attempts = attempt,
601                            error = %e,
602                            "A2A delegation exhausted after {} attempts, using lifecycle fallback",
603                            attempt
604                        );
605                        return self.delegate_via_lifecycle(subtasks, parent_seed).await;
606                    }
607
608                    // Exponential backoff
609                    let delay = self.delegation_config.backoff_delay(attempt);
610                    tracing::warn!(
611                        attempt,
612                        delay_ms = delay,
613                        error = %e,
614                        "A2A delegation failed, retrying with backoff"
615                    );
616                    tokio::time::sleep(Duration::from_millis(delay)).await;
617                }
618            }
619        }
620    }
621
622    /// Execute a single subtask directly via lifecycle manager.
623    async fn execute_single_subtask(
624        &self,
625        subtasks: Vec<SubTask>,
626        parent_seed: &Seed,
627    ) -> Result<Vec<SubTask>> {
628        let mut task = subtasks.into_iter().next().ok_or_else(|| {
629            anyhow::anyhow!("execute_single_subtask called with an empty subtask list")
630        })?;
631        let child_seed = Seed {
632            id: Uuid::new_v4(),
633            goal: task.description.clone(),
634            constraints: parent_seed.constraints.clone(),
635            acceptance_criteria: vec!["Task completes successfully".into()],
636            ontology: parent_seed.ontology.clone(),
637            created_at: chrono::Utc::now(),
638            generation: parent_seed.generation + 1,
639            parent_seed_id: Some(parent_seed.id),
640            cspace_hint: None,
641            original_request: parent_seed.original_request.clone(),
642            output_schema: None,
643            project_id: parent_seed.project_id,
644            workspace_context: parent_seed.workspace_context.clone(),
645            mount_paths: parent_seed.mount_paths.clone(),
646        };
647        match self
648            .lifecycle
649            .spawn_and_run(&child_seed, Priority::Normal)
650            .await
651        {
652            Ok(result) => {
653                task.result = Some(result.output.clone());
654            }
655            Err(e) => {
656                task.result = Some(format!("Failed: {e}"));
657                task.success = false;
658            }
659        }
660        Ok(vec![task])
661    }
662
663    /// Delegate subtasks via A2A protocol.
664    ///
665    /// Queries the AgentCardRegistry for agents matching each subtask's
666    /// Execute subtasks via A2A dispatch handler.
667    ///
668    /// Queries the AgentCardRegistry for agents matching each subtask's
669    /// required capability, then calls `execute_delegation` which runs
670    /// the task through the registered handler (lifecycle).
671    /// Falls back to direct lifecycle execution when no handler is registered.
672    async fn delegate_via_a2a(
673        &self,
674        subtasks: Vec<SubTask>,
675        parent_seed: &Seed,
676        a2a: &Arc<crate::a2a::A2AProtocol>,
677    ) -> Result<Vec<SubTask>> {
678        use crate::a2a::TaskPriority;
679        use tokio::task::JoinSet;
680
681        tracing::info!(
682            subtasks = subtasks.len(),
683            "Delegating subtasks via A2A protocol"
684        );
685
686        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
687        let subtask_count = subtasks.len();
688        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
689
690        for (idx, subtask) in subtasks.into_iter().enumerate() {
691            let capability = subtask.required_capability.clone();
692            let description = subtask.description.clone();
693            let subtask_id = subtask.id;
694            let role = subtask.role.clone();
695            let a2a = Arc::clone(a2a);
696            let parent_seed = parent_seed.clone();
697            let lifecycle = self.lifecycle.clone();
698
699            join_set.spawn(async move {
700                // Find agent with the required capability via A2A registry.
701                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
702                    a2a.query_capabilities(cap).await.ok()
703                        .and_then(|agents| agents.into_iter().next())
704                } else {
705                    None
706                };
707
708                let (output, success) = if let Some(ref target_card) = target {
709                    let target_id = target_card.agent_id;
710                    tracing::info!(
711                        subtask_index = idx,
712                        target = %target_card.name,
713                        target_id = %target_id,
714                        "A2A dispatching subtask"
715                    );
716
717                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
718                        "parent_seed": parent_seed.id.to_string(),
719                        "goal": description,
720                    }))
721                    .with_priority(TaskPriority::Normal);
722
723                    // Enqueue audit trail (fire-and-forget into queue).
724                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
725
726                    // Execute through dispatch handler (blocking).
727                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
728                        Some(Ok(result)) => {
729                            let out = result.get("output")
730                                .and_then(|v| v.as_str())
731                                .unwrap_or("")
732                                .to_string();
733                            let ok = result.get("success")
734                                .and_then(|v| v.as_bool())
735                                .unwrap_or(false);
736                            (out, ok)
737                        }
738                        Some(Err(e)) => {
739                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
740                            (format!("Failed: {e}"), false)
741                        }
742                        None => {
743                            // No handler โ€” fallback to lifecycle.
744                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
745                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
746                        }
747                    }
748                } else {
749                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
750                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
751                };
752
753                (idx, SubTask {
754                    id: subtask_id,
755                    description,
756                    required_capability: capability,
757                    result: Some(output),
758                    success,
759                    role,
760                })
761            });
762        }
763
764        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
765        while let Some(join_result) = join_set.join_next().await {
766            match join_result {
767                Ok((idx, subtask)) => {
768                    results[idx] = Some(subtask);
769                }
770                Err(e) => {
771                    tracing::error!(error = %e, "A2A task panicked");
772                }
773            }
774        }
775
776        // Preserve subtask_count: a `None` slot means the task panicked or
777        // was lost. Previously flatten() dropped them, so `all(success)` on
778        // an empty vec returned true โ€” total failure reported as success.
779        let completed: Vec<SubTask> = results
780            .into_iter()
781            .enumerate()
782            .map(|(idx, opt)| {
783                opt.unwrap_or_else(|| SubTask {
784                    id: Uuid::new_v4(),
785                    description: format!("subtask {idx} (failed)"),
786                    required_capability: None,
787                    result: Some("Task panicked or did not complete".into()),
788                    success: false,
789                    role: AgentRole::default(),
790                })
791            })
792            .collect();
793        tracing::info!(
794            completed = completed.len(),
795            succeeded = completed.iter().filter(|r| r.success).count(),
796            "A2A delegation complete"
797        );
798        Ok(completed)
799    }
800
801    async fn delegate_via_lifecycle(
802        &self,
803        subtasks: Vec<SubTask>,
804        parent_seed: &Seed,
805    ) -> Result<Vec<SubTask>> {
806        use crate::agent_group::OxiosAgentGroup;
807        use tokio::task::JoinSet;
808
809        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
810        let group = OxiosAgentGroup::new(parent_seed, descriptions);
811        let group_id = group.id;
812
813        self.event_bus.publish(KernelEvent::AgentGroupCreated {
814            group_id,
815            agent_count: group.agents.len(),
816        })?;
817
818        tracing::info!(
819            group_id = %group_id,
820            agent_count = group.agents.len(),
821            "Starting parallel multi-agent execution"
822        );
823
824        let mut join_set: JoinSet<(
825            usize,
826            crate::types::AgentId,
827            Result<oxios_ouroboros::ExecutionResult>,
828        )> = JoinSet::new();
829
830        for (idx, agent_entry) in group.agents.iter().enumerate() {
831            let child_seed = agent_entry.seed.clone();
832            let agent_id = agent_entry.id;
833            let lifecycle = self.lifecycle.clone();
834
835            join_set.spawn(async move {
836                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
837                (idx, agent_id, result)
838            });
839        }
840
841        let subtask_count = subtasks.len();
842        let mut completed = vec![None; subtask_count];
843        while let Some(join_result) = join_set.join_next().await {
844            match join_result {
845                Ok((idx, agent_id, Ok(exec_result))) => {
846                    let _ = self
847                        .event_bus
848                        .publish(KernelEvent::AgentGroupMemberCompleted {
849                            group_id,
850                            agent_id,
851                            success: exec_result.success,
852                        });
853                    completed[idx] = Some(SubTask {
854                        id: subtasks[idx].id,
855                        description: subtasks[idx].description.clone(),
856                        required_capability: subtasks[idx].required_capability.clone(),
857                        result: Some(exec_result.output.clone()),
858                        success: exec_result.success,
859                        role: subtasks[idx].role.clone(),
860                    });
861                }
862                Ok((idx, agent_id, Err(e))) => {
863                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
864                    let _ = self
865                        .event_bus
866                        .publish(KernelEvent::AgentGroupMemberCompleted {
867                            group_id,
868                            agent_id,
869                            success: false,
870                        });
871                    completed[idx] = Some(SubTask {
872                        id: subtasks[idx].id,
873                        description: subtasks[idx].description.clone(),
874                        required_capability: subtasks[idx].required_capability.clone(),
875                        result: Some(format!("Failed: {e}")),
876                        success: false,
877                        role: subtasks[idx].role.clone(),
878                    });
879                }
880                Err(e) => {
881                    tracing::error!(error = %e, "JoinSet task panicked");
882                }
883            }
884        }
885
886        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
887        let succeeded = completed.iter().filter(|r| r.success).count();
888        let total = completed.len();
889
890        tracing::info!(
891            group_id = %group_id,
892            succeeded,
893            total,
894            "Parallel multi-agent execution complete"
895        );
896
897        // Persist group state
898        let _ = self
899            .state_store
900            .save_json("agent_groups", &group_id.to_string(), &group)
901            .await;
902        self.git_commit(
903            &format!("agent_groups/{group_id}.json"),
904            "orchestrator: save group",
905        );
906
907        Ok(completed)
908    }
909
910    // โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
911    // RFC-027 ยง3 โ€” Unified intent handler
912    // โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
913    //
914    // Phase 5 entry point. Routes a single user message through:
915    //   assess โ†’ (crystallize) โ†’ execute โ†’ (review + retry) โ†’ Response
916    //
917    // The legacy `handle_message()` and `chat()` paths remain intact;
918    // this method coexists alongside them until Phase 6 cuts over.
919
920    /// Unified entry point for every user message (RFC-027 ยง3).
921    ///
922    /// One method, one match โ€” depth falls out of [`Scope`]:
923    ///
924    /// 1. [`IntentEngine::assess`] classifies the message (conversation /
925    ///    clarify / task + scope). Always called once.
926    /// 2. For `Assessment::Task(scope)` we build a [`Directive`]:
927    ///    - `Scope::Trivial` โ€” `Directive::from_message(msg)` verbatim.
928    ///    - `Scope::Substantial` โ€” [`IntentEngine::crystallize`] produces a
929    ///      structured directive with goal, constraints, and acceptance
930    ///      criteria.
931    /// 3. The [`ExecEnv`] is resolved from [`MsgCtx`] independently of the
932    ///    directive (Mount workspace context, paths, project ID, cspace hint).
933    /// 4. Execution delegates to the existing [`AgentLifecycleManager`].
934    ///    In Phase 5 the Directive is shimmed into a [`Seed`] for the
935    ///    current pipeline; Phase 6 will replace this with
936    ///    `lifecycle.execute_directive(&Directive, &ExecEnv)`.
937    /// 5. For `Scope::Substantial` we call [`IntentEngine::review`] and, if
938    ///    the verdict fails, retry once with the verdict's gaps folded back
939    ///    into the directive as additional constraints.
940    ///
941    /// Does not modify or call the legacy `handle_message()` / `chat()`
942    /// paths โ€” they remain the canonical entry points until Phase 6 cuts
943    /// the gateway over to `handle()`.
944    ///
945    /// # Parameters
946    /// - `engine` โ€” the LLM-backed intent engine (assess/crystallize/review).
947    /// - `msg` โ€” the user's raw message text.
948    /// - `ctx` โ€” per-message context (session, history, project/mount hints).
949    pub async fn handle(
950        &self,
951        engine: &dyn oxios_ouroboros::IntentEngineOps,
952        msg: &str,
953        ctx: &oxios_ouroboros::MsgCtx,
954    ) -> Result<HandleResponse> {
955        // 1. assess โ€” always called once, routes the message.
956        let assessment = engine.assess(msg, ctx).await?;
957
958        match assessment {
959            oxios_ouroboros::Assessment::Conversation(reply) => Ok(HandleResponse::Reply(reply)),
960
961            oxios_ouroboros::Assessment::Clarify { questions } => {
962                Ok(HandleResponse::Clarify(questions))
963            }
964
965            oxios_ouroboros::Assessment::Task(scope) => {
966                // 2. Build the Directive based on scope.
967                let mut directive = match scope {
968                    oxios_ouroboros::Scope::Trivial => {
969                        oxios_ouroboros::Directive::from_message(msg)
970                    }
971                    oxios_ouroboros::Scope::Substantial => engine.crystallize(msg, ctx).await?,
972                };
973
974                // 3. Resolve the execution environment from MsgCtx.
975                let env = self.resolve_exec_env(ctx, msg);
976
977                // 4. Execute (always). Trivial tasks skip review; substantial
978                //    tasks go through verify_or_retry below.
979                let mut result = self.execute_directive(&directive, &env).await?;
980
981                // 5. Verify + optional retry (Substantial only).
982                let (verdict, evaluation_passed) = match scope {
983                    oxios_ouroboros::Scope::Trivial => (None, None),
984                    oxios_ouroboros::Scope::Substantial => {
985                        let (r, v) = self
986                            .verify_or_retry(engine, &mut directive, &env, result, msg, ctx)
987                            .await?;
988                        result = r;
989                        let passed = v.all_passed();
990                        (Some(v), Some(passed))
991                    }
992                };
993
994                self.lifecycle.reap_zombies();
995
996                Ok(HandleResponse::Task {
997                    scope,
998                    directive: Box::new(directive),
999                    env,
1000                    result: Box::new(result),
1001                    verdict,
1002                    evaluation_passed,
1003                })
1004            }
1005        }
1006    }
1007
1008    /// Unified entry point that accepts legacy-style parameters and returns
1009    /// an `OrchestrationResult` (RFC-027).
1010    ///
1011    /// Builds a [`MsgCtx`] from the session history (if any), then delegates
1012    /// to [`handle`](Self::handle). Falls back to `handle_message` if no
1013    /// `IntentEngine` is wired.
1014    pub async fn handle_unified(
1015        &self,
1016        user_id: &str,
1017        msg: &str,
1018        session_id: Option<&str>,
1019        project_ids: Option<&str>,
1020        mount_ids: Option<&str>,
1021        request_id: &str,
1022    ) -> Result<OrchestrationResult> {
1023        // Get the IntentEngine (always wired by the kernel assembler).
1024        let engine = self
1025            .intent_engine
1026            .read()
1027            .clone()
1028            .expect("IntentEngine not wired โ€” kernel assembler bug");
1029
1030        // Build MsgCtx.
1031        let sid = session_id.unwrap_or(request_id).to_string();
1032        let history = self.load_session_history(&sid).await;
1033        let ctx = oxios_ouroboros::MsgCtx {
1034            session_id: sid.clone(),
1035            history,
1036            project_ids: project_ids.map(String::from),
1037            mount_ids: mount_ids.map(String::from),
1038            user_id: user_id.to_string(),
1039        };
1040
1041        // Call the unified path.
1042        let start = std::time::Instant::now();
1043        let response = self.handle(engine.as_ref(), msg, &ctx).await?;
1044        let duration_ms = start.elapsed().as_millis() as u64;
1045
1046        Ok(self.handle_response_to_orchestration_result(response, &ctx, duration_ms))
1047    }
1048
1049    /// Load conversation history for a session from the state store.
1050    async fn load_session_history(&self, session_id: &str) -> Vec<oxios_ouroboros::Exchange> {
1051        let sid = crate::state_store::SessionId(session_id.to_string());
1052        match self.state_store.load_session(&sid).await {
1053            Ok(Some(session)) => session
1054                .user_messages
1055                .iter()
1056                .zip(session.agent_responses.iter())
1057                .map(|(u, a)| oxios_ouroboros::Exchange {
1058                    user: u.content.clone(),
1059                    agent: a.content.clone(),
1060                })
1061                .collect(),
1062            _ => Vec::new(),
1063        }
1064    }
1065
1066    fn handle_response_to_orchestration_result(
1067        &self,
1068        response: HandleResponse,
1069        ctx: &oxios_ouroboros::MsgCtx,
1070        duration_ms: u64,
1071    ) -> OrchestrationResult {
1072        let metrics = get_metrics();
1073        metrics.orch_duration.observe(duration_ms as f64 / 1000.0);
1074
1075        match response {
1076            HandleResponse::Reply(reply) => OrchestrationResult {
1077                session_id: Some(ctx.session_id.clone()),
1078                primary_project_id: None,
1079                project_tag: None,
1080                active_mount_ids: Vec::new(),
1081                mount_tag: None,
1082                response: reply,
1083                seed_id: None,
1084                agent_id: None,
1085                phase_reached: oxios_ouroboros::Phase::Interview,
1086                evaluation_passed: None,
1087                output: None,
1088                tool_calls: Vec::new(),
1089                interview_questions: None,
1090                interview_round: None,
1091                interview_ambiguity: None,
1092                mode: "unified".to_string(),
1093            },
1094            HandleResponse::Clarify(questions) => {
1095                let questions_text = questions
1096                    .iter()
1097                    .map(|q| q.text.clone())
1098                    .collect::<Vec<_>>()
1099                    .join("\n");
1100                let structured = Some(
1101                    questions
1102                        .iter()
1103                        .map(
1104                            |q| oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput {
1105                                id: q.id.clone(),
1106                                text: q.text.clone(),
1107                                kind: format!("{:?}", q.kind).to_lowercase(),
1108                                options: q
1109                                    .options
1110                                    .iter()
1111                                    .map(|o| {
1112                                        oxios_ouroboros::ouroboros_engine::InterviewOptionOutput {
1113                                            value: o.value.clone(),
1114                                            label: o.label.clone(),
1115                                            description: String::new(),
1116                                        }
1117                                    })
1118                                    .collect(),
1119                            },
1120                        )
1121                        .collect(),
1122                );
1123                OrchestrationResult {
1124                    session_id: Some(ctx.session_id.clone()),
1125                    primary_project_id: None,
1126                    project_tag: None,
1127                    active_mount_ids: Vec::new(),
1128                    mount_tag: None,
1129                    response: questions_text,
1130                    seed_id: None,
1131                    agent_id: None,
1132                    phase_reached: oxios_ouroboros::Phase::Interview,
1133                    evaluation_passed: None,
1134                    output: None,
1135                    tool_calls: Vec::new(),
1136                    interview_questions: structured,
1137                    interview_round: Some(((ctx.history.len() / 2) as u32).max(1)),
1138                    interview_ambiguity: None,
1139                    mode: "unified".to_string(),
1140                }
1141            }
1142            HandleResponse::Task {
1143                scope: _,
1144                directive,
1145                env,
1146                result,
1147                verdict,
1148                evaluation_passed,
1149            } => {
1150                let response_text = if directive.acceptance_criteria.is_empty() {
1151                    result.output.clone()
1152                } else {
1153                    match &verdict {
1154                        Some(v) if v.all_passed() => result.output.clone(),
1155                        Some(v) => format!(
1156                            "{}\n\nโš  Review notes:\n{}",
1157                            result.output,
1158                            v.notes.join("\n")
1159                        ),
1160                        None => result.output.clone(),
1161                    }
1162                };
1163                if evaluation_passed.unwrap_or(false) {
1164                    metrics.agents_completed.inc();
1165                } else {
1166                    metrics.agents_failed.inc();
1167                }
1168                OrchestrationResult {
1169                    session_id: Some(ctx.session_id.clone()),
1170                    primary_project_id: env.project_id,
1171                    project_tag: None,
1172                    active_mount_ids: Vec::new(),
1173                    mount_tag: None,
1174                    response: response_text,
1175                    seed_id: None,
1176                    agent_id: None,
1177                    phase_reached: oxios_ouroboros::Phase::Execute,
1178                    evaluation_passed,
1179                    output: Some(result.output.clone()),
1180                    tool_calls: result.tool_calls.clone(),
1181                    interview_questions: None,
1182                    interview_round: None,
1183                    interview_ambiguity: None,
1184                    mode: "unified".to_string(),
1185                }
1186            }
1187        }
1188    }
1189
1190    /// Resolve an [`ExecEnv`] from the per-message context.
1191    ///
1192    /// Mirrors the Mount workspace resolution done by `handle_message()`
1193    /// and `chat()` but packages the result as the new [`ExecEnv`] type.
1194    /// Independent of the directive โ€” runs whether the task is Trivial
1195    /// or Substantial.
1196    fn resolve_exec_env(
1197        &self,
1198        ctx: &oxios_ouroboros::MsgCtx,
1199        msg: &str,
1200    ) -> oxios_ouroboros::ExecEnv {
1201        let (active_mount_ids, workspace_context, mount_paths, _mount_tag) =
1202            self.resolve_mount_workspace(ctx.mount_ids.as_deref(), ctx.project_ids.as_deref(), msg);
1203        // active_mount_ids + mount_tag are surfaced via the legacy path;
1204        // ExecEnv carries the resolved paths/context/project that the
1205        // agent runtime actually consumes.
1206        let _ = active_mount_ids;
1207
1208        // Resolve a primary project ID (matches handle_message semantics):
1209        // explicit project_ids takes precedence over auto-detection.
1210        let project_id = ctx
1211            .project_ids
1212            .as_deref()
1213            .and_then(|ids| {
1214                ids.split(',')
1215                    .next()
1216                    .and_then(|s| Uuid::parse_str(s.trim()).ok())
1217            })
1218            .or_else(|| {
1219                self.detect_project_tag(msg).and_then(|_tag| {
1220                    self.project_manager().and_then(|pm| {
1221                        let projects = pm.list_projects();
1222                        match crate::project::detect_project(msg, &projects) {
1223                            crate::project::DetectionResult::Found(id) => Some(id),
1224                            crate::project::DetectionResult::NoMatch { .. } => None,
1225                        }
1226                    })
1227                })
1228            });
1229
1230        // Touch the project to record activity (mirrors handle_message).
1231        if let Some(pid) = project_id
1232            && let Some(pm) = self.project_manager()
1233        {
1234            pm.touch(pid);
1235        }
1236
1237        oxios_ouroboros::ExecEnv {
1238            workspace_context,
1239            mount_paths,
1240            project_id,
1241            cspace_hint: None,
1242        }
1243    }
1244
1245    /// Execute a [`Directive`] under an [`ExecEnv`].
1246    ///
1247    /// **Phase 5 stub:** builds a legacy [`Seed`] from the directive and
1248    /// delegates to the existing [`AgentLifecycleManager::spawn_and_run`]
1249    /// pipeline. Phase 6 will replace this with
1250    /// `lifecycle.execute_directive(&directive, &env)` once
1251    /// AgentRuntime/Supervisor/AgentLifecycleManager have their
1252    /// Directive-based methods (see sibling subagents Runtime and
1253    /// RuntimeDirective).
1254    async fn execute_directive(
1255        &self,
1256        directive: &oxios_ouroboros::Directive,
1257        env: &oxios_ouroboros::ExecEnv,
1258    ) -> Result<ExecutionResult> {
1259        let seed = self.directive_to_seed(directive, env);
1260        self.lifecycle.spawn_and_run(&seed, Priority::Normal).await
1261    }
1262
1263    /// Shim a [`Directive`] + [`ExecEnv`] into a legacy [`Seed`].
1264    ///
1265    /// Used only by [`Self::execute_directive`] during the Phase 5 stub
1266    /// window. Carries every field the agent runtime reads: goal,
1267    /// constraints, acceptance criteria, original request, output schema,
1268    /// project, workspace context, mount paths, and cspace hint.
1269    fn directive_to_seed(
1270        &self,
1271        directive: &oxios_ouroboros::Directive,
1272        env: &oxios_ouroboros::ExecEnv,
1273    ) -> Seed {
1274        let mut seed = Seed::new(directive.goal.clone());
1275        seed.original_request = directive.original_request.clone();
1276        seed.constraints = directive.constraints.clone();
1277        seed.acceptance_criteria = directive.acceptance_criteria.clone();
1278        seed.output_schema = directive.output_schema.clone();
1279        seed.project_id = env.project_id;
1280        seed.workspace_context = env.workspace_context.clone();
1281        seed.mount_paths = env.mount_paths.clone();
1282        seed.cspace_hint = env.cspace_hint.clone();
1283        seed
1284    }
1285
1286    /// Review the result against the directive's criteria; on failure,
1287    /// retry once with the verdict's gaps folded back as constraints.
1288    ///
1289    /// Phase 5 caps retries at one explicit attempt; once IntentConfig
1290    /// lands (Phase 5 sibling subtask) this will read the configured
1291    /// `max_retries` instead.
1292    async fn verify_or_retry(
1293        &self,
1294        engine: &dyn oxios_ouroboros::IntentEngineOps,
1295        directive: &mut oxios_ouroboros::Directive,
1296        env: &oxios_ouroboros::ExecEnv,
1297        initial_result: ExecutionResult,
1298        _msg: &str,
1299        _ctx: &oxios_ouroboros::MsgCtx,
1300    ) -> Result<(ExecutionResult, oxios_ouroboros::Verdict)> {
1301        let verdict = engine.review(directive, &initial_result).await?;
1302
1303        if verdict.all_passed() || verdict.gaps.is_empty() {
1304            return Ok((initial_result, verdict));
1305        }
1306
1307        // Check if retry is enabled (RFC-027 Decision 6).
1308        // When disabled, return the initial result with the failed verdict.
1309        let enable_retry = self.intent_config.read().enable_retry;
1310        if !enable_retry {
1311            tracing::info!("Review failed but retry disabled (enable_retry=false)");
1312            return Ok((initial_result, verdict));
1313        }
1314
1315        let metrics = get_metrics();
1316        metrics.retry_attempted.inc();
1317
1318        tracing::info!(
1319            gaps = verdict.gaps.len(),
1320            "Review failed โ€” retrying with feedback"
1321        );
1322
1323        // Execute with feedback: previous output + gaps injected.
1324        let retry_result = self
1325            .lifecycle
1326            .execute_with_feedback(
1327                directive,
1328                env,
1329                &initial_result,
1330                &verdict.gaps,
1331                Priority::Normal,
1332            )
1333            .await?;
1334
1335        // Re-review.
1336        let retry_verdict = engine.review(directive, &retry_result).await?;
1337
1338        // Track retry effectiveness.
1339        if retry_verdict.score > verdict.score {
1340            metrics.retry_improved.inc();
1341        } else if retry_verdict.score < verdict.score {
1342            metrics.retry_degraded.inc();
1343        } else {
1344            metrics.retry_unchanged.inc();
1345        }
1346
1347        // Return best result.
1348        let chosen_result = if retry_verdict.score >= verdict.score {
1349            retry_result
1350        } else {
1351            initial_result
1352        };
1353
1354        Ok((chosen_result, retry_verdict))
1355    }
1356}
1357
1358/// Response envelope for [`Orchestrator::handle`] (RFC-027 ยง3).
1359///
1360/// One variant per terminal state of the unified handler:
1361///
1362/// - [`HandleResponse::Reply`] โ€” conversational answer, no agent spawned.
1363/// - [`HandleResponse::Clarify`] โ€” the message was ambiguous; ask these
1364///   structured questions before acting.
1365/// - [`HandleResponse::Task`] โ€” an agent executed the task. Carries the
1366///   scope, the directive that was run, the resolved environment, the
1367///   execution result, and (for substantial tasks) the review verdict +
1368///   pass/fail.
1369#[derive(Debug, Clone)]
1370pub enum HandleResponse {
1371    /// Conversational reply โ€” no agent was spawned.
1372    Reply(String),
1373    /// Structured clarifying questions to ask before acting.
1374    Clarify(Vec<oxios_ouroboros::Question>),
1375    /// A task was executed by an agent.
1376    Task {
1377        /// The scope decided by `assess` โ€” Trivial skips review.
1378        scope: oxios_ouroboros::Scope,
1379        /// The directive that was executed (post-retry if a retry ran).
1380        directive: Box<oxios_ouroboros::Directive>,
1381        /// The execution environment resolved for this message.
1382        env: oxios_ouroboros::ExecEnv,
1383        /// The execution result.
1384        result: Box<ExecutionResult>,
1385        /// The review verdict โ€” `None` for `Scope::Trivial`.
1386        verdict: Option<oxios_ouroboros::Verdict>,
1387        /// Whether the (final) verdict passed โ€” `None` for `Scope::Trivial`.
1388        evaluation_passed: Option<bool>,
1389    },
1390}
1391
1392/// Active session state for multi-turn interviews.
1393#[derive(Debug, Clone)]
1394#[allow(unused)]
1395struct InterviewSession {
1396    id: String,
1397    interview: InterviewResult,
1398    phase: Phase,
1399    seed_id: Option<Uuid>,
1400    agent_id: Option<AgentId>,
1401}
1402
1403fn default_chat_mode() -> String {
1404    "chat".into()
1405}
1406
1407/// Result of a full orchestration cycle.
1408#[derive(Debug, Clone, Serialize, Deserialize)]
1409pub struct OrchestrationResult {
1410    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
1411    #[serde(skip_serializing_if = "Option::is_none")]
1412    pub session_id: Option<String>,
1413    /// The Space ID that handled this message.
1414    #[serde(skip_serializing_if = "Option::is_none")]
1415    pub primary_project_id: Option<Uuid>,
1416    /// Space decoration tag for the response (e.g. "[๐Ÿ”ง oxios]").
1417    #[serde(skip_serializing_if = "Option::is_none")]
1418    pub project_tag: Option<String>,
1419    /// Active Mount IDs for this message (RFC-025), primary first.
1420    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1421    pub active_mount_ids: Vec<MountId>,
1422    /// Mount decoration tag for the response (e.g. "[๐Ÿ”ง oxios + oxi-sdk]").
1423    #[serde(skip_serializing_if = "Option::is_none")]
1424    pub mount_tag: Option<String>,
1425    /// The response to send back to the user.
1426    pub response: String,
1427    /// The seed that was created (if seed phase was reached).
1428    #[serde(skip_serializing_if = "Option::is_none")]
1429    pub seed_id: Option<Uuid>,
1430    /// The agent that executed (if execute phase was reached).
1431    #[serde(skip_serializing_if = "Option::is_none")]
1432    pub agent_id: Option<AgentId>,
1433    /// The furthest phase reached.
1434    pub phase_reached: Phase,
1435    /// Whether evaluation passed.
1436    ///
1437    /// - `None` โ€” evaluation was not applicable (interview, chat, non-task).
1438    /// - `Some(true)` โ€” evaluation passed.
1439    /// - `Some(false)` โ€” evaluation failed or execution unsuccessful.
1440    pub evaluation_passed: Option<bool>,
1441    /// Output or notes from evaluation.
1442    #[serde(skip_serializing_if = "Option::is_none")]
1443    pub output: Option<String>,
1444    /// Tool calls recorded during execution.
1445    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1446    pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1447    /// Structured interview questions (chat UI redesign โ€” interactive
1448    /// interview). Populated when the interview phase needs clarification
1449    /// and the LLM produced a structured form of the questions. The
1450    /// Gateway forwards this to the WebSocket as an `interview` chunk;
1451    /// the Web UI renders it as interactive widgets (chips, yes/no
1452    /// buttons). When `None`, the frontend falls back to rendering
1453    /// `response` as plain markdown.
1454    #[serde(default, skip_serializing_if = "Option::is_none")]
1455    pub interview_questions:
1456        Option<Vec<oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput>>,
1457    /// Current interview round (1-based). Populated alongside
1458    /// `interview_questions`. Drives the "Round N/M" indicator.
1459    #[serde(default, skip_serializing_if = "Option::is_none")]
1460    pub interview_round: Option<u32>,
1461    /// Current ambiguity score (0.0 = clear, 1.0 = fully ambiguous).
1462    /// Populated alongside `interview_questions`. Drives the progress bar.
1463    #[serde(default, skip_serializing_if = "Option::is_none")]
1464    pub interview_ambiguity: Option<f64>,
1465    /// Execution mode: "chat" (default agent) | "ouroboros" (spec-first pipeline).
1466    #[serde(default = "default_chat_mode")]
1467    pub mode: String,
1468}
1469
1470/// Format clarifying questions for display.
1471fn format_questions(questions: &[String]) -> String {
1472    if questions.is_empty() {
1473        "I need a bit more clarification before I can proceed.".to_string()
1474    } else {
1475        format!(
1476            "I'd like to understand your request better. Could you help clarify:\n\n{}",
1477            questions
1478                .iter()
1479                .enumerate()
1480                .map(|(i, q)| format!("{}. {}", i + 1, q))
1481                .collect::<Vec<_>>()
1482                .join("\n")
1483        )
1484    }
1485}
1486
1487/// Format the final result for display.
1488/// Format execution result for display to the user.
1489fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1490    let mut lines = Vec::new();
1491
1492    if exec.success {
1493        lines.push(format!("โœ… '{}'", seed.goal));
1494    } else {
1495        lines.push(format!(
1496            "โš ๏ธ '{}'์„(๋ฅผ) ์‹œ๋„ํ–ˆ์ง€๋งŒ ์™„์ „ํžˆ ์„ฑ๊ณตํ•˜์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.",
1497            seed.goal
1498        ));
1499    }
1500
1501    // Show a truncated preview of the output if present.
1502    if !exec.output.is_empty() {
1503        let preview = if exec.output.len() > 500 {
1504            // Char-boundary safe: roll back to avoid splitting a
1505            // multibyte UTF-8 sequence (Korean, CJK, emoji).
1506            let mut end = 500;
1507            while end > 0 && !exec.output.is_char_boundary(end) {
1508                end -= 1;
1509            }
1510            format!("{}...", &exec.output[..end])
1511        } else {
1512            exec.output.clone()
1513        };
1514        lines.push(String::new());
1515        lines.push(preview);
1516    }
1517
1518    lines.join("\n")
1519}
1520
1521/// Check if a seed should be split into subtasks.
1522///
1523/// Simple heuristic: if the seed has 3 or more acceptance criteria,
1524/// it likely contains distinct concerns that can be parallelized.
1525fn should_split_seed(seed: &Seed) -> bool {
1526    // Only split for genuinely complex tasks with many criteria.
1527    // Simple tasks (even with 3-4 criteria) are better handled by a single agent
1528    // to preserve context coherence.
1529    seed.acceptance_criteria.len() >= 5
1530}
1531
1532/// Split a seed into subtasks based on acceptance criteria.
1533///
1534/// Each acceptance criterion becomes a separate subtask with the
1535/// parent seed's goal as context. Infers required capability from
1536/// the goal text using the same heuristic as `build_agent_card`.
1537fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1538    seed.acceptance_criteria
1539        .iter()
1540        .map(|criterion| {
1541            let desc = format!("{}: {}", seed.goal, criterion);
1542            let desc_lower = desc.to_lowercase();
1543
1544            // Infer capability from subtask description.
1545            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1546                Some("code-review".to_string())
1547            } else if desc_lower.contains("test") {
1548                Some("testing".to_string())
1549            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1550                Some("refactoring".to_string())
1551            } else if desc_lower.contains("write")
1552                || desc_lower.contains("create")
1553                || desc_lower.contains("implement")
1554            {
1555                Some("code-generation".to_string())
1556            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1557                Some("debugging".to_string())
1558            } else {
1559                None
1560            };
1561
1562            let mut subtask = SubTask::new(desc);
1563            subtask.required_capability = cap;
1564            subtask
1565        })
1566        .collect()
1567}
1568
1569/// Format combined results from multi-agent execution.
1570fn format_result_combined(combined: &str) -> String {
1571    if combined.is_empty() {
1572        "No subtasks completed successfully.".to_string()
1573    } else {
1574        format!("Multi-agent execution completed:\n\n{combined}")
1575    }
1576}
1577
1578/// Execute a subtask via lifecycle manager, returning (output, success).
1579async fn run_via_lifecycle(
1580    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1581    parent_seed: &Seed,
1582    description: &str,
1583) -> (String, bool) {
1584    let child_seed = Seed {
1585        id: Uuid::new_v4(),
1586        goal: description.to_string(),
1587        constraints: parent_seed.constraints.clone(),
1588        acceptance_criteria: vec!["Task completes successfully".into()],
1589        ontology: parent_seed.ontology.clone(),
1590        created_at: chrono::Utc::now(),
1591        generation: parent_seed.generation + 1,
1592        parent_seed_id: Some(parent_seed.id),
1593        cspace_hint: None,
1594        original_request: parent_seed.original_request.clone(),
1595        output_schema: None,
1596        project_id: parent_seed.project_id,
1597        workspace_context: parent_seed.workspace_context.clone(),
1598        mount_paths: parent_seed.mount_paths.clone(),
1599    };
1600    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1601        Ok(result) => (result.output, result.success),
1602        Err(e) => (format!("Failed: {e}"), false),
1603    }
1604}
1605
1606/// Render the body of the `## Workspace Context` prompt section (RFC-025).
1607///
1608/// The caller (`build_system_prompt`) wraps this in the `## Workspace
1609/// Context` header. Returns `None` when there are no Mounts to describe.
1610///
1611/// Fill order respects the prompt budget (~1500 tokens soft):
1612/// 1. Primary Mount โ€” full (description + summary + path).
1613/// 2. Secondary Mounts โ€” name + path + one-line summary only.
1614fn build_workspace_context_body(mounts: &[crate::mount::Mount]) -> Option<String> {
1615    if mounts.is_empty() {
1616        return None;
1617    }
1618    let mut out = String::new();
1619    out.push_str("### Active Mounts\n");
1620
1621    for (i, m) in mounts.iter().enumerate() {
1622        let primary = i == 0;
1623        let path = m
1624            .primary_path()
1625            .map(|p| p.to_string_lossy().to_string())
1626            .unwrap_or_else(|| "(no path)".to_string());
1627
1628        if primary {
1629            out.push_str(&format!("- **{}** โ†’ {}\n", m.name, path));
1630            if !m.auto_description.is_empty() {
1631                // First ~3 lines of the agent-written description.
1632                let desc: String = m
1633                    .auto_description
1634                    .lines()
1635                    .take(3)
1636                    .collect::<Vec<_>>()
1637                    .join("\n  ");
1638                out.push_str(&format!("  {}\n", desc));
1639            }
1640            let summary = m.summary_line();
1641            if !summary.is_empty() {
1642                out.push_str(&format!("  _{}_\n", summary));
1643            }
1644            if m.enrichment_pending {
1645                out.push_str("  _(content changed โ€” consider re-scanning this Mount)_\n");
1646            }
1647        } else {
1648            // Secondary: name + path + one-line summary only.
1649            let summary = m.summary_line();
1650            let suffix = if summary.is_empty() {
1651                String::new()
1652            } else {
1653                format!(" โ€” {}", summary)
1654            };
1655            out.push_str(&format!("- **{}** โ†’ {}{}\n", m.name, path, suffix));
1656        }
1657    }
1658
1659    Some(out)
1660}
1661
1662#[cfg(test)]
1663mod mount_workspace_tests {
1664    use super::*;
1665    use crate::mount::{Mount, MountSource};
1666    use std::path::PathBuf;
1667
1668    #[test]
1669    fn test_workspace_context_primary_full_secondary_terse() {
1670        let mut oxios =
1671            Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1672        oxios.auto_description = "Agent OS.\nRust + tokio.".to_string();
1673        oxios.auto_meta.summary = "Rust agent OS".to_string();
1674
1675        let mut oxi = Mount::from_name_and_path("oxi", PathBuf::from("/oxi"));
1676        oxi.auto_meta.summary = "SDK".to_string();
1677
1678        let body = build_workspace_context_body(&[oxios, oxi]).unwrap();
1679        assert!(body.contains("### Active Mounts"));
1680        // Primary gets full description.
1681        assert!(body.contains("Agent OS."));
1682        assert!(body.contains("_Rust agent OS_"));
1683        // Secondary is terse.
1684        assert!(body.contains("**oxi** โ†’ /oxi โ€” SDK"));
1685    }
1686
1687    #[test]
1688    fn test_workspace_context_empty_is_none() {
1689        assert!(build_workspace_context_body(&[]).is_none());
1690    }
1691
1692    /// End-to-end: a real MountManager + Orchestrator-less call to
1693    /// `resolve_mount_workspace` proves that detection seeds the primary,
1694    /// builds the context body, and collects all paths (multi-path access).
1695    #[test]
1696    fn test_resolve_mount_workspace_detects_and_collects_paths() {
1697        use crate::mount::MountManager;
1698        use oxios_memory::memory::sqlite::MemoryDatabase;
1699        use std::sync::Arc;
1700
1701        let db = Arc::new(MemoryDatabase::open_in_memory(64).unwrap());
1702        let mm = Arc::new(MountManager::new(db, None).unwrap());
1703
1704        // Register two mounts.
1705        let oxios = mm
1706            .create_mount(
1707                "oxios".to_string(),
1708                vec![PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios")],
1709                MountSource::Manual,
1710            )
1711            .unwrap();
1712        let oxi_sdk = mm
1713            .create_mount(
1714                "oxi-sdk".to_string(),
1715                vec![PathBuf::from("/Users/me/oxi")],
1716                MountSource::Manual,
1717            )
1718            .unwrap();
1719        mm.update_enrichment(oxios.id, Some("Agent OS in Rust.".to_string()), None)
1720            .unwrap();
1721
1722        // Build a minimal Orchestrator-free resolver path: replicate what
1723        // resolve_mount_workspace does, but against the manager directly,
1724        // since the full Orchestrator needs many subsystems.
1725        let mounts = mm.get_mounts_ordered(&[oxios.id, oxi_sdk.id]);
1726        assert_eq!(mounts.len(), 2);
1727
1728        let body = build_workspace_context_body(&mounts).unwrap();
1729        assert!(body.contains("oxios"));
1730        assert!(body.contains("Agent OS in Rust."));
1731        assert!(body.contains("oxi-sdk"));
1732
1733        // Collect paths like the orchestrator does.
1734        let mut paths = Vec::new();
1735        for m in &mounts {
1736            for p in &m.paths {
1737                if !paths.contains(p) {
1738                    paths.push(p.clone());
1739                }
1740            }
1741        }
1742        assert_eq!(paths.len(), 2);
1743        assert_eq!(paths[0], PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1744        assert_eq!(paths[1], PathBuf::from("/Users/me/oxi"));
1745    }
1746
1747    /// Detection layer 1 (name match) seeds the primary when no explicit
1748    /// mount_ids are given โ€” the core promise of RFC-025.
1749    #[test]
1750    fn test_detection_seeds_primary_on_name_mention() {
1751        use crate::mount::{DetectionResult, detect_mounts};
1752
1753        let oxios =
1754            Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1755        let result = detect_mounts("oxios ์ฝ”๋“œ๋ฆฌ๋ทฐํ•ด์ค˜", std::slice::from_ref(&oxios));
1756        assert!(matches!(result, DetectionResult::Found(id) if id == oxios.id));
1757    }
1758}