Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the full Ouroboros lifecycle for user messages.
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol
4//! end-to-end. Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed when ambiguity is low enough
7//! 3. Fork and execute an agent via the supervisor
8//! 4. Evaluate the result
9//! 5. Evolve and re-execute if evaluation fails
10//!
11//! The orchestrator does NOT know about channels or HTTP — it only
12//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
13
14use std::sync::Arc;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use crate::agent_lifecycle::AgentLifecycleManager;
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::git_layer::GitLayer;
26use crate::metrics::get_metrics;
27use crate::scheduler::Priority;
28use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
29use crate::state_store::StateStore;
30use crate::types::AgentId;
31
32/// Role of an agent within a group.
33#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
34pub enum AgentRole {
35    /// Executes a specific subtask.
36    #[default]
37    Worker,
38    /// Coordinates subtasks, synthesizes results.
39    Manager,
40}
41
42/// A subtask within a multi-agent plan.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SubTask {
45    /// Unique subtask ID.
46    pub id: Uuid,
47    /// Human-readable description.
48    pub description: String,
49    /// Capability required (e.g., "code-review", "testing").
50    pub required_capability: Option<String>,
51    /// Result of the subtask (filled after execution).
52    pub result: Option<String>,
53    /// Whether this subtask succeeded.
54    pub success: bool,
55    /// Role of the agent assigned to this subtask.
56    #[serde(default)]
57    pub role: AgentRole,
58}
59
60impl SubTask {
61    /// Create a new subtask with the given description.
62    pub fn new(description: impl Into<String>) -> Self {
63        Self {
64            id: Uuid::new_v4(),
65            description: description.into(),
66            required_capability: None,
67            result: None,
68            success: false,
69            role: AgentRole::default(),
70        }
71    }
72
73    /// Set the required capability for this subtask.
74    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
75        self.required_capability = Some(cap.into());
76        self
77    }
78}
79
80/// Maximum number of Ouroboros loops before giving up.
81const MAX_EVOLUTION_ITERATIONS: usize = 3;
82
83/// The orchestrator coordinates the full Ouroboros lifecycle.
84pub struct Orchestrator {
85    ouroboros: Arc<dyn OuroborosProtocol>,
86    event_bus: EventBus,
87    state_store: Arc<StateStore>,
88    /// Git version control layer for auto-commits.
89    git_layer: Option<Arc<GitLayer>>,
90    /// Active interview sessions, keyed by session ID.
91    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
92    /// Agent lifecycle manager (fork, register, run, cleanup).
93    lifecycle: AgentLifecycleManager,
94    /// A2A protocol for inter-agent task delegation.
95    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
96    /// Space manager for context partitioning.
97    space_manager: RwLock<Option<Arc<SpaceManager>>>,
98    /// Conversation buffer for topic shift detection.
99    conversation_buffer: RwLock<ConversationBuffer>,
100}
101
102impl Orchestrator {
103    /// Creates a new orchestrator.
104    pub fn new(
105        ouroboros: Arc<dyn OuroborosProtocol>,
106        event_bus: EventBus,
107        state_store: Arc<StateStore>,
108        lifecycle: AgentLifecycleManager,
109    ) -> Self {
110        Self {
111            ouroboros,
112            event_bus,
113            state_store,
114            git_layer: None,
115            sessions: RwLock::new(std::collections::HashMap::new()),
116            lifecycle,
117            a2a: None,
118            space_manager: RwLock::new(None),
119            conversation_buffer: RwLock::new(ConversationBuffer::default()),
120        }
121    }
122
123    /// Set the SpaceManager for context partitioning.
124    pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
125        *self.space_manager.write() = Some(manager);
126    }
127
128    /// Get the current Space ID, if SpaceManager is set.
129    pub fn current_space_id(&self) -> Option<SpaceId> {
130        self.space_manager
131            .read()
132            .as_ref()
133            .map(|m| m.current_space_id())
134    }
135
136    /// Get the current Space name tag for response decoration.
137    pub fn current_space_tag(&self) -> String {
138        self.space_manager
139            .read()
140            .as_ref()
141            .and_then(|m| {
142                m.current_space().map(|s| {
143                    if s.is_default() {
144                        String::new()
145                    } else {
146                        format!("[{} {}]", s.emoji(), s.name)
147                    }
148                })
149            })
150            .unwrap_or_default()
151    }
152
153    /// Set the A2A protocol for inter-agent task delegation.
154    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
155        self.a2a = Some(a2a);
156    }
157
158    /// Set the GitLayer for auto-commits after state saves.
159    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
160        self.git_layer = Some(git_layer);
161    }
162
163    /// Commit a file to git if GitLayer is configured and enabled.
164    fn git_commit(&self, rel_path: &str, message: &str) {
165        if let Some(ref gl) = self.git_layer {
166            if gl.is_enabled() {
167                let _ = gl.commit_file(rel_path, message);
168            }
169        }
170    }
171
172    /// Handle a user message through the full Ouroboros loop.
173    ///
174    /// Returns an `OrchestrationResult` with the response and metadata.
175    ///
176    /// If the interview phase needs clarification (ambiguity > 0.2),
177    /// the result will contain the questions and the phase will be
178    /// `Phase::Interview`. The caller should send these questions to
179    /// the user and include the `session_id` in follow-up messages.
180    #[allow(clippy::await_holding_lock)]
181    pub async fn handle_message(
182        &self,
183        user_id: &str,
184        user_message: &str,
185        session_id: Option<&str>,
186    ) -> Result<OrchestrationResult> {
187        tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
188        get_metrics().messages.inc();
189        let orch_start = std::time::Instant::now();
190
191        let session_id = session_id
192            .map(String::from)
193            .unwrap_or_else(|| Uuid::new_v4().to_string());
194
195        tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
196
197        // ── Space Detection ──
198        let space_tag = self.current_space_tag();
199        let space_id = {
200            let buffer = self.conversation_buffer.read();
201            let space_manager = self.space_manager.read();
202            if let Some(ref sm) = *space_manager {
203                match sm.detect_or_create(user_message, &buffer).await {
204                    Ok(id) => {
205                        tracing::info!(space_id = %id, "Space detected/created for message");
206                        id
207                    }
208                    Err(e) => {
209                        tracing::warn!(error = %e, "Space detection failed, using default");
210                        sm.default_space_id()
211                    }
212                }
213            } else {
214                // No SpaceManager set — use zero ID
215                uuid::Uuid::nil()
216            }
217        };
218
219        // Record user message in conversation buffer
220        {
221            let mut buffer = self.conversation_buffer.write();
222            buffer.push_user(user_message);
223        }
224
225        // Phase 1: Interview
226        self.publish_phase_started(&session_id, Phase::Interview)
227            .await;
228
229        // Get or create the interview session.
230        let needs_interview = {
231            let sessions = self.sessions.read();
232            !sessions.contains_key(&session_id)
233        };
234
235        // Conduct the interview.
236        let interview = {
237            tracing::info!(phase = "interview", "Starting interview phase");
238            if needs_interview {
239                self.ouroboros.interview(user_message).await?
240            } else {
241                // This is a follow-up message in an existing interview.
242                // Record the user's answer in the session and extract the Q&A context.
243                let qa_context = {
244                    let mut sessions = self.sessions.write();
245                    if let Some(session) = sessions.get_mut(&session_id) {
246                        session.interview.add_exchange("", user_message);
247                    }
248                    // Extract Q&A context while holding the write lock, then drop.
249                    let sessions = self.sessions.read();
250                    let session = sessions.get(&session_id).expect("session exists");
251                    session
252                        .interview
253                        .questions
254                        .iter()
255                        .zip(session.interview.answers.iter())
256                        .map(|(q, a)| format!("Q: {}\nA: {}", q, a))
257                        .collect::<Vec<_>>()
258                        .join("\n\n")
259                };
260
261                // Run another interview pass with accumulated context.
262                self.ouroboros.interview(&qa_context).await?
263            }
264        };
265
266        // If ambiguity is too high, return questions for the user to answer.
267        if !interview.ready_for_seed {
268            // Store the interview so follow-up messages continue it.
269            {
270                let mut sessions = self.sessions.write();
271                sessions.insert(
272                    session_id.clone(),
273                    InterviewSession {
274                        id: session_id.clone(),
275                        interview: interview.clone(),
276                        phase: Phase::Interview,
277                        seed_id: None,
278                        agent_id: None,
279                    },
280                );
281            }
282
283            let questions = interview
284                .questions
285                .iter()
286                .filter(|q| !q.is_empty())
287                .cloned()
288                .collect::<Vec<_>>();
289
290            tracing::info!(
291                session_id = %session_id,
292                ambiguity = interview.ambiguity.ambiguity(),
293                questions = questions.len(),
294                "Interview needs clarification"
295            );
296
297            self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
298                .await;
299
300            return Ok(OrchestrationResult {
301                session_id: Some(session_id.clone()),
302                space_id: Some(space_id),
303                space_tag: Some(space_tag.clone()),
304                response: format_questions(&questions),
305                seed_id: None,
306                agent_id: None,
307                phase_reached: Phase::Interview,
308                evaluation_passed: false,
309                output: None,
310            });
311        }
312
313        // Record agent response in conversation buffer (for topic shift detection)
314        // Note: interview phase returns questions, not a full agent response,
315        // but we record it for completeness.
316        {
317            let mut buffer = self.conversation_buffer.write();
318            buffer.push_agent("[interview: questions]", &space_id);
319        }
320
321        // Interview complete and ready. Proceed to seed generation.
322        self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
323            .await;
324        self.publish_phase_started(&session_id, Phase::Seed).await;
325
326        // Phase 2: Generate seed
327        tracing::info!(phase = "seed", "Starting seed generation");
328        let seed = self.ouroboros.generate_seed(&interview).await?;
329
330        // Save seed to state store.
331        self.save_seed(&seed).await?;
332
333        // Publish seed created event.
334        self.event_bus
335            .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
336
337        self.publish_phase_completed(&session_id, Phase::Seed, "generated")
338            .await;
339        self.publish_phase_started(&session_id, Phase::Execute)
340            .await;
341
342        // Check if the seed should be split into multi-agent execution.
343        // When the seed has 3+ acceptance criteria, we treat each criterion
344        // as a distinct subtask and delegate to separate agents.
345        if should_split_seed(&seed) {
346            let subtasks = split_into_subtasks(&seed);
347            if subtasks.len() > 1 {
348                tracing::info!(
349                    phase = "delegate",
350                    subtasks = subtasks.len(),
351                    "Delegating to multi-agent"
352                );
353                let results = self.delegate_subtasks(subtasks, &seed).await?;
354
355                // Combine successful results
356                let combined: String = results
357                    .iter()
358                    .filter(|r| r.success)
359                    .filter_map(|r| r.result.as_deref())
360                    .collect::<Vec<_>>()
361                    .join("\n\n");
362
363                let all_passed = results.iter().all(|r| r.success);
364
365                // Clean up the session.
366                {
367                    let mut sessions = self.sessions.write();
368                    sessions.remove(&session_id);
369                }
370
371                tracing::info!(
372                    session_id = %session_id,
373                    subtasks = results.len(),
374                    passed = all_passed,
375                    "Multi-agent orchestration complete"
376                );
377
378                return Ok(OrchestrationResult {
379                    session_id: Some(session_id),
380                    space_id: Some(space_id),
381                    space_tag: Some(space_tag.clone()),
382                    response: format_result_combined(&combined),
383                    seed_id: Some(seed.id),
384                    agent_id: None,
385                    phase_reached: Phase::Execute,
386                    evaluation_passed: all_passed,
387                    output: Some(combined),
388                });
389            }
390        }
391
392        // Record agent response in conversation buffer (for multi-agent case)
393        {
394            let mut buffer = self.conversation_buffer.write();
395            buffer.push_agent("[multi-agent: complete]", &space_id);
396        }
397
398        // Phase 3: Fork and execute agent via lifecycle manager
399        tracing::info!(phase = "execute", "Starting execution phase");
400        let exec_result = self
401            .lifecycle
402            .spawn_and_run(&seed, Priority::Normal)
403            .await?;
404
405        // Periodically reap zombie tasks.
406        self.lifecycle.reap_zombies();
407
408        // Phase 4: Evaluate
409        self.publish_phase_completed(&session_id, Phase::Execute, "completed")
410            .await;
411        self.publish_phase_started(&session_id, Phase::Evaluate)
412            .await;
413
414        tracing::info!(phase = "evaluate", "Starting evaluation phase");
415        let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
416
417        self.publish_phase_completed(
418            &session_id,
419            Phase::Evaluate,
420            &format!("score={:.2}", evaluation.score),
421        )
422        .await;
423
424        self.event_bus.publish(KernelEvent::EvaluationComplete {
425            seed_id: seed.id,
426            passed: evaluation.all_passed(),
427        })?;
428
429        // Save evaluation to state store for lineage tracking.
430        self.save_evaluation(&seed, &evaluation).await?;
431
432        // Phase 5: Evolve if needed
433        let mut current_seed = Some(seed);
434        let mut current_evaluation = evaluation;
435        let mut iterations = 0;
436
437        while !current_evaluation.all_passed()
438            && current_evaluation.score < 0.8
439            && iterations < MAX_EVOLUTION_ITERATIONS
440        {
441            iterations += 1;
442            self.publish_phase_started(&session_id, Phase::Evolve).await;
443
444            tracing::info!(
445                phase = "evolve",
446                iteration = iterations,
447                "Starting evolve phase"
448            );
449            let evolve_result = self
450                .ouroboros
451                .evolve(
452                    current_seed.as_ref().expect("seed exists"),
453                    &current_evaluation,
454                )
455                .await?;
456
457            if let Some(evolved) = evolve_result {
458                current_seed = Some(evolved.clone());
459
460                // Save evolved seed.
461                self.save_seed(&evolved).await?;
462                self.event_bus.publish(KernelEvent::SeedCreated {
463                    seed_id: evolved.id,
464                })?;
465
466                self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
467                    .await;
468                self.publish_phase_started(&session_id, Phase::Execute)
469                    .await;
470
471                tracing::info!(
472                    phase = "re-execute",
473                    iteration = iterations,
474                    "Re-executing with evolved seed"
475                );
476                let new_exec = self
477                    .lifecycle
478                    .spawn_and_run(&evolved, Priority::High)
479                    .await?;
480
481                self.publish_phase_completed(&session_id, Phase::Execute, "completed")
482                    .await;
483                self.publish_phase_started(&session_id, Phase::Evaluate)
484                    .await;
485
486                tracing::info!(
487                    phase = "re-evaluate",
488                    iteration = iterations,
489                    "Re-evaluating evolved result"
490                );
491                let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
492                current_evaluation = new_eval;
493
494                self.publish_phase_completed(
495                    &session_id,
496                    Phase::Evaluate,
497                    &format!("score={:.2}", current_evaluation.score),
498                )
499                .await;
500                // Save evolved seed evaluation for lineage tracking.
501                self.save_evaluation(&evolved, &current_evaluation).await?;
502            } else {
503                // No evolution possible.
504                self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
505                    .await;
506                break;
507            }
508        }
509
510        // Clean up the session.
511        {
512            let mut sessions = self.sessions.write();
513            sessions.remove(&session_id);
514        }
515
516        let final_seed = current_seed.expect("at least one seed exists");
517        let passed = current_evaluation.all_passed();
518
519        tracing::info!(
520            session_id = %session_id,
521            iterations,
522            score = current_evaluation.score,
523            passed,
524            "Orchestration complete"
525        );
526
527        // Measure orchestration duration.
528        let metrics = get_metrics();
529        metrics
530            .orch_duration
531            .observe(orch_start.elapsed().as_secs_f64());
532        if passed {
533            metrics.agents_completed.inc();
534        } else {
535            metrics.agents_failed.inc();
536        }
537
538        // Record agent response in conversation buffer (for topic shift detection)
539        {
540            let mut buffer = self.conversation_buffer.write();
541            buffer.push_agent(&final_seed.goal, &space_id);
542        }
543
544        Ok(OrchestrationResult {
545            session_id: Some(session_id),
546            space_id: Some(space_id),
547            space_tag: Some(space_tag.clone()),
548            response: format_result(&final_seed, &current_evaluation),
549            seed_id: Some(final_seed.id),
550            agent_id: None,
551            phase_reached: Phase::Evaluate,
552            evaluation_passed: passed,
553            output: Some(current_evaluation.notes.join("; ")),
554        })
555    }
556
557    /// Save a seed to the state store.
558    async fn save_seed(&self, seed: &Seed) -> Result<()> {
559        let key = seed.id.to_string();
560
561        self.state_store
562            .save_json("seeds", &key, seed)
563            .await
564            .context("failed to save seed to state store")?;
565
566        self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
567
568        Ok(())
569    }
570
571    /// Save an evaluation result to the state store.
572    async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
573        let key = format!("{}-eval", seed.id);
574
575        self.state_store
576            .save_json("evals", &key, evaluation)
577            .await
578            .context("failed to save evaluation to state store")?;
579
580        self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
581
582        Ok(())
583    }
584
585    /// Publish a PhaseStarted event.
586    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
587        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
588            session_id: session_id.to_owned(),
589            phase,
590        });
591    }
592
593    /// Publish a PhaseCompleted event.
594    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
595        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
596            session_id: session_id.to_owned(),
597            phase,
598            result_summary: result.to_owned(),
599        });
600    }
601
602    /// Execute multiple subtasks using separate agents in parallel.
603    ///
604    /// When A2A is available, the orchestrator delegates tasks through the
605    /// A2A protocol — finding capable agents via the AgentCardRegistry.
606    /// Otherwise, falls back to direct lifecycle execution.
607    ///
608    /// Results are collected as they complete using `JoinSet`.
609    pub async fn delegate_subtasks(
610        &self,
611        subtasks: Vec<SubTask>,
612        parent_seed: &Seed,
613    ) -> Result<Vec<SubTask>> {
614        // Single task — execute directly without group overhead.
615        if subtasks.len() == 1 {
616            return self.execute_single_subtask(subtasks, parent_seed).await;
617        }
618
619        // Try A2A-based delegation when the protocol is available.
620        if let Some(ref a2a) = self.a2a {
621            return self.delegate_via_a2a(subtasks, parent_seed, a2a).await;
622        }
623
624        // Fallback: direct lifecycle execution (no A2A).
625        self.delegate_via_lifecycle(subtasks, parent_seed).await
626    }
627
628    /// Execute a single subtask directly via lifecycle manager.
629    async fn execute_single_subtask(
630        &self,
631        subtasks: Vec<SubTask>,
632        parent_seed: &Seed,
633    ) -> Result<Vec<SubTask>> {
634        let mut task = subtasks.into_iter().next().unwrap();
635        let child_seed = Seed {
636            id: Uuid::new_v4(),
637            goal: task.description.clone(),
638            constraints: parent_seed.constraints.clone(),
639            acceptance_criteria: vec!["Task completes successfully".into()],
640            ontology: parent_seed.ontology.clone(),
641            created_at: chrono::Utc::now(),
642            generation: parent_seed.generation + 1,
643            parent_seed_id: Some(parent_seed.id),
644            cspace_hint: None,
645        };
646        match self
647            .lifecycle
648            .spawn_and_run(&child_seed, Priority::Normal)
649            .await
650        {
651            Ok(result) => {
652                task.result = Some(result.output.clone());
653            }
654            Err(e) => {
655                task.result = Some(format!("Failed: {e}"));
656                task.success = false;
657            }
658        }
659        Ok(vec![task])
660    }
661
662    /// Delegate subtasks via A2A protocol.
663    ///
664    /// Queries the AgentCardRegistry for agents matching each subtask's
665    /// Execute subtasks via A2A dispatch handler.
666    ///
667    /// Queries the AgentCardRegistry for agents matching each subtask's
668    /// required capability, then calls `execute_delegation` which runs
669    /// the task through the registered handler (lifecycle).
670    /// Falls back to direct lifecycle execution when no handler is registered.
671    async fn delegate_via_a2a(
672        &self,
673        subtasks: Vec<SubTask>,
674        parent_seed: &Seed,
675        a2a: &Arc<crate::a2a::A2AProtocol>,
676    ) -> Result<Vec<SubTask>> {
677        use crate::a2a::TaskPriority;
678        use tokio::task::JoinSet;
679
680        tracing::info!(
681            subtasks = subtasks.len(),
682            "Delegating subtasks via A2A protocol"
683        );
684
685        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
686        let subtask_count = subtasks.len();
687        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
688
689        for (idx, subtask) in subtasks.into_iter().enumerate() {
690            let capability = subtask.required_capability.clone();
691            let description = subtask.description.clone();
692            let subtask_id = subtask.id;
693            let role = subtask.role.clone();
694            let a2a = Arc::clone(a2a);
695            let parent_seed = parent_seed.clone();
696            let lifecycle = self.lifecycle.clone();
697
698            join_set.spawn(async move {
699                // Find agent with the required capability via A2A registry.
700                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
701                    a2a.query_capabilities(cap).await.ok()
702                        .and_then(|agents| agents.into_iter().next())
703                } else {
704                    None
705                };
706
707                let (output, success) = if let Some(ref target_card) = target {
708                    let target_id = target_card.agent_id;
709                    tracing::info!(
710                        subtask_index = idx,
711                        target = %target_card.name,
712                        target_id = %target_id,
713                        "A2A dispatching subtask"
714                    );
715
716                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
717                        "parent_seed": parent_seed.id.to_string(),
718                        "goal": description,
719                    }))
720                    .with_priority(TaskPriority::Normal);
721
722                    // Enqueue audit trail (fire-and-forget into queue).
723                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
724
725                    // Execute through dispatch handler (blocking).
726                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
727                        Some(Ok(result)) => {
728                            let out = result.get("output")
729                                .and_then(|v| v.as_str())
730                                .unwrap_or("")
731                                .to_string();
732                            let ok = result.get("success")
733                                .and_then(|v| v.as_bool())
734                                .unwrap_or(false);
735                            (out, ok)
736                        }
737                        Some(Err(e)) => {
738                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
739                            (format!("Failed: {e}"), false)
740                        }
741                        None => {
742                            // No handler — fallback to lifecycle.
743                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
744                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
745                        }
746                    }
747                } else {
748                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
749                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
750                };
751
752                (idx, SubTask {
753                    id: subtask_id,
754                    description,
755                    required_capability: capability,
756                    result: Some(output),
757                    success,
758                    role,
759                })
760            });
761        }
762
763        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
764        while let Some(join_result) = join_set.join_next().await {
765            match join_result {
766                Ok((idx, subtask)) => {
767                    results[idx] = Some(subtask);
768                }
769                Err(e) => {
770                    tracing::error!(error = %e, "A2A task panicked");
771                }
772            }
773        }
774
775        let completed: Vec<SubTask> = results.into_iter().flatten().collect();
776        tracing::info!(
777            completed = completed.len(),
778            succeeded = completed.iter().filter(|r| r.success).count(),
779            "A2A delegation complete"
780        );
781        Ok(completed)
782    }
783
784    async fn delegate_via_lifecycle(
785        &self,
786        subtasks: Vec<SubTask>,
787        parent_seed: &Seed,
788    ) -> Result<Vec<SubTask>> {
789        use crate::agent_group::OxiosAgentGroup;
790        use tokio::task::JoinSet;
791
792        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
793        let group = OxiosAgentGroup::new(parent_seed, descriptions);
794        let group_id = group.id;
795
796        self.event_bus.publish(KernelEvent::AgentGroupCreated {
797            group_id,
798            agent_count: group.agents.len(),
799        })?;
800
801        tracing::info!(
802            group_id = %group_id,
803            agent_count = group.agents.len(),
804            "Starting parallel multi-agent execution"
805        );
806
807        let mut join_set: JoinSet<(
808            usize,
809            crate::types::AgentId,
810            Result<oxios_ouroboros::ExecutionResult>,
811        )> = JoinSet::new();
812
813        for (idx, agent_entry) in group.agents.iter().enumerate() {
814            let child_seed = agent_entry.seed.clone();
815            let agent_id = agent_entry.id;
816            let lifecycle = self.lifecycle.clone();
817
818            join_set.spawn(async move {
819                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
820                (idx, agent_id, result)
821            });
822        }
823
824        let subtask_count = subtasks.len();
825        let mut completed = vec![None; subtask_count];
826        while let Some(join_result) = join_set.join_next().await {
827            match join_result {
828                Ok((idx, agent_id, Ok(exec_result))) => {
829                    let _ = self
830                        .event_bus
831                        .publish(KernelEvent::AgentGroupMemberCompleted {
832                            group_id,
833                            agent_id,
834                            success: exec_result.success,
835                        });
836                    completed[idx] = Some(SubTask {
837                        id: subtasks[idx].id,
838                        description: subtasks[idx].description.clone(),
839                        required_capability: subtasks[idx].required_capability.clone(),
840                        result: Some(exec_result.output.clone()),
841                        success: exec_result.success,
842                        role: subtasks[idx].role.clone(),
843                    });
844                }
845                Ok((idx, agent_id, Err(e))) => {
846                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
847                    let _ = self
848                        .event_bus
849                        .publish(KernelEvent::AgentGroupMemberCompleted {
850                            group_id,
851                            agent_id,
852                            success: false,
853                        });
854                    completed[idx] = Some(SubTask {
855                        id: subtasks[idx].id,
856                        description: subtasks[idx].description.clone(),
857                        required_capability: subtasks[idx].required_capability.clone(),
858                        result: Some(format!("Failed: {e}")),
859                        success: false,
860                        role: subtasks[idx].role.clone(),
861                    });
862                }
863                Err(e) => {
864                    tracing::error!(error = %e, "JoinSet task panicked");
865                }
866            }
867        }
868
869        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
870        let succeeded = completed.iter().filter(|r| r.success).count();
871        let total = completed.len();
872
873        tracing::info!(
874            group_id = %group_id,
875            succeeded,
876            total,
877            "Parallel multi-agent execution complete"
878        );
879
880        // Persist group state
881        let _ = self
882            .state_store
883            .save_json("agent_groups", &group_id.to_string(), &group)
884            .await;
885        self.git_commit(
886            &format!("agent_groups/{}.json", group_id),
887            "orchestrator: save group",
888        );
889
890        Ok(completed)
891    }
892}
893
894/// Active session state for multi-turn interviews.
895#[derive(Debug, Clone)]
896#[allow(unused)]
897struct InterviewSession {
898    id: String,
899    interview: InterviewResult,
900    phase: Phase,
901    seed_id: Option<Uuid>,
902    agent_id: Option<AgentId>,
903}
904
905/// Result of a full orchestration cycle.
906#[derive(Debug, Clone, Serialize, Deserialize)]
907pub struct OrchestrationResult {
908    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
909    #[serde(skip_serializing_if = "Option::is_none")]
910    pub session_id: Option<String>,
911    /// The Space ID that handled this message.
912    #[serde(skip_serializing_if = "Option::is_none")]
913    pub space_id: Option<Uuid>,
914    /// Space decoration tag for the response (e.g. "[🔧 oxios]").
915    #[serde(skip_serializing_if = "Option::is_none")]
916    pub space_tag: Option<String>,
917    /// The response to send back to the user.
918    pub response: String,
919    /// The seed that was created (if seed phase was reached).
920    #[serde(skip_serializing_if = "Option::is_none")]
921    pub seed_id: Option<Uuid>,
922    /// The agent that executed (if execute phase was reached).
923    #[serde(skip_serializing_if = "Option::is_none")]
924    pub agent_id: Option<AgentId>,
925    /// The furthest phase reached.
926    pub phase_reached: Phase,
927    /// Whether evaluation passed (false if evaluation was skipped or failed).
928    pub evaluation_passed: bool,
929    /// Output or notes from evaluation.
930    #[serde(skip_serializing_if = "Option::is_none")]
931    pub output: Option<String>,
932}
933
934/// Format clarifying questions for display.
935fn format_questions(questions: &[String]) -> String {
936    if questions.is_empty() {
937        "I need a bit more clarification before I can proceed.".to_string()
938    } else {
939        format!(
940            "I'd like to understand your request better. Could you help clarify:\n\n{}",
941            questions
942                .iter()
943                .enumerate()
944                .map(|(i, q)| format!("{}. {}", i + 1, q))
945                .collect::<Vec<_>>()
946                .join("\n")
947        )
948    }
949}
950
951/// Format the final result for display.
952fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
953    let passed = evaluation.all_passed();
954
955    let mut lines = Vec::new();
956    lines.push(format!("Task '{}' completed.", seed.goal));
957
958    if passed {
959        lines.push("✅ Evaluation passed.".to_string());
960    } else {
961        lines.push(format!(
962            "⚠️ Evaluation score: {:.0}%",
963            evaluation.score * 100.0
964        ));
965    }
966
967    if !evaluation.notes.is_empty() {
968        lines.push("\nNotes:".to_string());
969        for note in &evaluation.notes {
970            lines.push(format!("- {}", note));
971        }
972    }
973
974    lines.join("\n")
975}
976
977/// Check if a seed should be split into subtasks.
978///
979/// Simple heuristic: if the seed has 3 or more acceptance criteria,
980/// it likely contains distinct concerns that can be parallelized.
981fn should_split_seed(seed: &Seed) -> bool {
982    seed.acceptance_criteria.len() >= 3
983}
984
985/// Split a seed into subtasks based on acceptance criteria.
986///
987/// Each acceptance criterion becomes a separate subtask with the
988/// parent seed's goal as context. Infers required capability from
989/// the goal text using the same heuristic as `build_agent_card`.
990fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
991    seed.acceptance_criteria
992        .iter()
993        .map(|criterion| {
994            let desc = format!("{}: {}", seed.goal, criterion);
995            let desc_lower = desc.to_lowercase();
996
997            // Infer capability from subtask description.
998            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
999                Some("code-review".to_string())
1000            } else if desc_lower.contains("test") {
1001                Some("testing".to_string())
1002            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1003                Some("refactoring".to_string())
1004            } else if desc_lower.contains("write")
1005                || desc_lower.contains("create")
1006                || desc_lower.contains("implement")
1007            {
1008                Some("code-generation".to_string())
1009            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1010                Some("debugging".to_string())
1011            } else {
1012                None
1013            };
1014
1015            let mut subtask = SubTask::new(desc);
1016            subtask.required_capability = cap;
1017            subtask
1018        })
1019        .collect()
1020}
1021
1022/// Format combined results from multi-agent execution.
1023fn format_result_combined(combined: &str) -> String {
1024    if combined.is_empty() {
1025        "No subtasks completed successfully.".to_string()
1026    } else {
1027        format!("Multi-agent execution completed:\n\n{}", combined)
1028    }
1029}
1030
1031/// Execute a subtask via lifecycle manager, returning (output, success).
1032async fn run_via_lifecycle(
1033    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1034    parent_seed: &Seed,
1035    description: &str,
1036) -> (String, bool) {
1037    let child_seed = Seed {
1038        id: Uuid::new_v4(),
1039        goal: description.to_string(),
1040        constraints: parent_seed.constraints.clone(),
1041        acceptance_criteria: vec!["Task completes successfully".into()],
1042        ontology: parent_seed.ontology.clone(),
1043        created_at: chrono::Utc::now(),
1044        generation: parent_seed.generation + 1,
1045        parent_seed_id: Some(parent_seed.id),
1046        cspace_hint: None,
1047    };
1048    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1049        Ok(result) => (result.output, result.success),
1050        Err(e) => (format!("Failed: {e}"), false),
1051    }
1052}