Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the full Ouroboros lifecycle for user messages.
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol
4//! end-to-end. Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed when ambiguity is low enough
7//! 3. Fork and execute an agent via the supervisor
8//! 4. Evaluate the result
9//! 5. Evolve and re-execute if evaluation fails
10//!
11//! The orchestrator does NOT know about channels or HTTP — it only
12//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::{Context, Result};
18use chrono;
19use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::agent_lifecycle::AgentLifecycleManager;
25use crate::event_bus::{EventBus, KernelEvent};
26use crate::git_layer::GitLayer;
27use crate::metrics::get_metrics;
28use crate::scheduler::Priority;
29use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
30use crate::state_store::StateStore;
31use crate::types::AgentId;
32
33/// Role of an agent within a group.
34#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
35pub enum AgentRole {
36    /// Executes a specific subtask.
37    #[default]
38    Worker,
39    /// Coordinates subtasks, synthesizes results.
40    Manager,
41}
42
43/// A subtask within a multi-agent plan.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SubTask {
46    /// Unique subtask ID.
47    pub id: Uuid,
48    /// Human-readable description.
49    pub description: String,
50    /// Capability required (e.g., "code-review", "testing").
51    pub required_capability: Option<String>,
52    /// Result of the subtask (filled after execution).
53    pub result: Option<String>,
54    /// Whether this subtask succeeded.
55    pub success: bool,
56    /// Role of the agent assigned to this subtask.
57    #[serde(default)]
58    pub role: AgentRole,
59}
60
61impl SubTask {
62    /// Create a new subtask with the given description.
63    pub fn new(description: impl Into<String>) -> Self {
64        Self {
65            id: Uuid::new_v4(),
66            description: description.into(),
67            required_capability: None,
68            result: None,
69            success: false,
70            role: AgentRole::default(),
71        }
72    }
73
74    /// Set the required capability for this subtask.
75    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
76        self.required_capability = Some(cap.into());
77        self
78    }
79}
80
81/// The orchestrator coordinates the full Ouroboros lifecycle.
82pub struct Orchestrator {
83    ouroboros: Arc<dyn OuroborosProtocol>,
84    event_bus: EventBus,
85    state_store: Arc<StateStore>,
86    /// Git version control layer for auto-commits.
87    git_layer: Option<Arc<GitLayer>>,
88    /// Active interview sessions, keyed by session ID.
89    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
90    /// Agent lifecycle manager (fork, register, run, cleanup).
91    lifecycle: AgentLifecycleManager,
92    /// A2A protocol for inter-agent task delegation.
93    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
94    /// Space manager for context partitioning.
95    space_manager: RwLock<Option<Arc<SpaceManager>>>,
96    /// Conversation buffer for topic shift detection.
97    conversation_buffer: RwLock<ConversationBuffer>,
98    /// Orchestrator configuration (Ouroboros protocol settings).
99    config: crate::config::OrchestratorConfig,
100    /// Delegation configuration for A2A retries.
101    delegation_config: DelegationConfig,
102    /// A2A circuit breaker for delegation reliability.
103    a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
104}
105
106/// Configuration for A2A delegation retries.
107#[derive(Debug, Clone)]
108struct DelegationConfig {
109    /// Maximum retry attempts for A2A delegation.
110    max_retries: u32,
111    /// Base delay for exponential backoff (milliseconds).
112    base_delay_ms: u64,
113    /// Maximum delay cap for exponential backoff (milliseconds).
114    max_delay_ms: u64,
115    /// Timeout per delegation attempt (milliseconds).
116    timeout_ms: u64,
117}
118
119impl Default for DelegationConfig {
120    fn default() -> Self {
121        Self {
122            max_retries: 3,
123            base_delay_ms: 100,
124            max_delay_ms: 5000,
125            timeout_ms: 5000,
126        }
127    }
128}
129
130impl DelegationConfig {
131    /// Calculate exponential backoff delay.
132    fn backoff_delay(&self, attempt: u32) -> u64 {
133        let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10) as u32);
134        delay.min(self.max_delay_ms)
135    }
136}
137
138impl Orchestrator {
139    /// Creates a new orchestrator.
140    pub fn new(
141        ouroboros: Arc<dyn OuroborosProtocol>,
142        event_bus: EventBus,
143        state_store: Arc<StateStore>,
144        lifecycle: AgentLifecycleManager,
145    ) -> Self {
146        Self::with_config(
147            ouroboros,
148            event_bus,
149            state_store,
150            lifecycle,
151            crate::config::OrchestratorConfig::default(),
152        )
153    }
154
155    /// Creates a new orchestrator with custom config.
156    pub fn with_config(
157        ouroboros: Arc<dyn OuroborosProtocol>,
158        event_bus: EventBus,
159        state_store: Arc<StateStore>,
160        lifecycle: AgentLifecycleManager,
161        config: crate::config::OrchestratorConfig,
162    ) -> Self {
163        Self {
164            ouroboros,
165            event_bus,
166            state_store,
167            git_layer: None,
168            sessions: RwLock::new(std::collections::HashMap::new()),
169            lifecycle,
170            a2a: None,
171            space_manager: RwLock::new(None),
172            conversation_buffer: RwLock::new(ConversationBuffer::default()),
173            config,
174            delegation_config: DelegationConfig::default(),
175            a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
176        }
177    }
178
179    /// Set the SpaceManager for context partitioning.
180    pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
181        *self.space_manager.write() = Some(manager);
182    }
183
184    /// Get the current Space ID, if SpaceManager is set.
185    pub fn current_space_id(&self) -> Option<SpaceId> {
186        self.space_manager
187            .read()
188            .as_ref()
189            .map(|m| m.current_space_id())
190    }
191
192    /// Get the current Space name tag for response decoration.
193    pub fn current_space_tag(&self) -> String {
194        self.space_manager
195            .read()
196            .as_ref()
197            .and_then(|m| {
198                m.current_space().map(|s| {
199                    if s.is_default() {
200                        String::new()
201                    } else {
202                        format!("[{} {}]", s.emoji(), s.name)
203                    }
204                })
205            })
206            .unwrap_or_default()
207    }
208
209    /// Set the A2A protocol for inter-agent task delegation.
210    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
211        self.a2a = Some(a2a);
212    }
213
214    /// Set the GitLayer for auto-commits after state saves.
215    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
216        self.git_layer = Some(git_layer);
217    }
218
219    /// Commit a file to git if GitLayer is configured and enabled.
220    fn git_commit(&self, rel_path: &str, message: &str) {
221        if let Some(ref gl) = self.git_layer {
222            if gl.is_enabled() {
223                let _ = gl.commit_file(rel_path, message);
224            }
225        }
226    }
227
228    /// Handle a user message through the full Ouroboros loop.
229    ///
230    /// Returns an `OrchestrationResult` with the response and metadata.
231    ///
232    /// If the interview phase needs clarification (ambiguity > 0.2),
233    /// the result will contain the questions and the phase will be
234    /// `Phase::Interview`. The caller should send these questions to
235    /// the user and include the `session_id` in follow-up messages.
236    #[allow(clippy::await_holding_lock)]
237    pub async fn handle_message(
238        &self,
239        user_id: &str,
240        user_message: &str,
241        session_id: Option<&str>,
242    ) -> Result<OrchestrationResult> {
243        tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
244        get_metrics().messages.inc();
245        let orch_start = std::time::Instant::now();
246
247        let session_id = session_id
248            .map(String::from)
249            .unwrap_or_else(|| Uuid::new_v4().to_string());
250
251        tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
252
253        // ── Space Detection ──
254        let space_tag = self.current_space_tag();
255        let (turns, sm_arc) = {
256            let buffer = self.conversation_buffer.read();
257            let sm_guard = self.space_manager.read();
258            let turns: Vec<_> = buffer.turns().iter().cloned().collect();
259            // Extract Arc from guard and drop guard before .await
260            let sm_arc = sm_guard.as_ref().cloned();
261            (turns, sm_arc)
262        };
263        let space_id = if let Some(ref sm) = sm_arc {
264            match sm.detect_or_create(user_message, &turns).await {
265                Ok(id) => {
266                    tracing::info!(space_id = %id, "Space detected/created for message");
267                    id
268                }
269                Err(e) => {
270                    tracing::warn!(error = %e, "Space detection failed, using default");
271                    sm.default_space_id()
272                }
273            }
274        } else {
275            uuid::Uuid::nil()
276        };
277
278        // Record user message in conversation buffer
279        {
280            let mut buffer = self.conversation_buffer.write();
281            buffer.push_user(user_message);
282        }
283
284        // Phase 1: Interview
285        self.publish_phase_started(&session_id, Phase::Interview)
286            .await;
287
288        // Get or create the interview session (pre-fetch to avoid lock across await).
289        let needs_interview;
290        let existing_history: Option<Vec<_>>;
291        {
292            let sessions = self.sessions.read();
293            needs_interview = !sessions.contains_key(&session_id);
294            existing_history = if !needs_interview {
295                sessions.get(&session_id).map(|s| s.interview.conversation_history.clone())
296            } else {
297                None
298            };
299            // Lock dropped here before any .await
300        }
301
302        // Conduct the interview.
303        let interview = {
304            tracing::info!(phase = "interview", "Starting interview phase");
305            if needs_interview {
306                self.ouroboros.interview(user_message).await?
307            } else {
308                // This is a follow-up message in an existing interview.
309                // Build multi-turn context from conversation history.
310                let multi_turn_context = {
311                    let mut context_parts = Vec::new();
312                    if let Some(ref history) = existing_history {
313                        for exchange in history {
314                            context_parts.push(format!("User: {}\nAgent: {}", exchange.user, exchange.agent));
315                        }
316                    }
317                    context_parts.push(format!("User: {}", user_message));
318                    context_parts.join("\n\n")
319                };
320
321                // Record user's answer in session for future turns (brief write lock)
322                {
323                    let mut sessions = self.sessions.write();
324                    if let Some(s) = sessions.get_mut(&session_id) {
325                        let last_q = s.interview.questions.last().cloned().unwrap_or_default();
326                        s.interview.add_exchange(&last_q, user_message);
327                    }
328                }
329
330                // Run another interview pass with full conversation history.
331                self.ouroboros.interview(&multi_turn_context).await?
332            }
333        };
334
335        // If this is a non-task message (greeting, small talk), return the chat response directly.
336        if !interview.is_task {
337            tracing::info!(session_id = %session_id, "Chat response (non-task)");
338
339            let response_text = if interview.chat_response.is_empty() {
340                "Hello! How can I help you today?".to_string()
341            } else {
342                interview.chat_response.clone()
343            };
344
345            // Record agent response in conversation buffer
346            {
347                let mut buffer = self.conversation_buffer.write();
348                buffer.push_agent(&response_text, &space_id);
349            }
350
351            // Record exchange in conversation history for multi-turn
352            // and store session so multi-turn works on follow-up messages
353            {
354                let mut sessions = self.sessions.write();
355                if let Some(session) = sessions.get_mut(&session_id) {
356                    tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
357                    session.interview.add_to_history(user_message, &response_text);
358                } else {
359                    // First non-task message — create a minimal session for history
360                    let mut interview = InterviewResult::new();
361                    interview.is_task = false;
362                    interview.chat_response = response_text.clone();
363                    interview.add_to_history(user_message, &response_text);
364                    sessions.insert(
365                        session_id.clone(),
366                        InterviewSession {
367                            id: session_id.clone(),
368                            interview,
369                            phase: Phase::Interview,
370                            seed_id: None,
371                            agent_id: None,
372                        },
373                    );
374                }
375            }
376
377            self.publish_phase_completed(&session_id, Phase::Interview, "chat")
378                .await;
379
380            return Ok(OrchestrationResult {
381                session_id: Some(session_id.clone()),
382                space_id: Some(space_id),
383                space_tag: Some(space_tag.clone()),
384                response: response_text,
385                seed_id: None,
386                agent_id: None,
387                phase_reached: Phase::Interview,
388                evaluation_passed: false,
389                output: None,
390            });
391        }
392
393        // If ambiguity is too high, return questions for the user to answer.
394        if !interview.ready_for_seed {
395            // Record this exchange in conversation history and store the interview.
396            {
397                let mut sessions = self.sessions.write();
398                let session = sessions.entry(session_id.clone()).or_insert_with(|| {
399                    InterviewSession {
400                        id: session_id.clone(),
401                        interview: interview.clone(),
402                        phase: Phase::Interview,
403                        seed_id: None,
404                        agent_id: None,
405                    }
406                });
407                // The session already has user's answer recorded via add_exchange above.
408                // Record the questions as the agent's response in history.
409                let questions_text = interview.questions.join("\n");
410                let last_answer = session.interview.answers.last().cloned();
411                if let Some(ref ans) = last_answer {
412                    if !ans.is_empty() {
413                        session.interview.add_to_history(ans, &questions_text);
414                    }
415                }
416            } // Lock dropped before .await
417
418            let questions = interview
419                .questions
420                .iter()
421                .filter(|q| !q.is_empty())
422                .cloned()
423                .collect::<Vec<_>>();
424
425            tracing::info!(
426                session_id = %session_id,
427                ambiguity = interview.ambiguity.ambiguity(),
428                questions = questions.len(),
429                "Interview needs clarification"
430            );
431
432            self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
433                .await;
434
435            return Ok(OrchestrationResult {
436                session_id: Some(session_id.clone()),
437                space_id: Some(space_id),
438                space_tag: Some(space_tag.clone()),
439                response: format_questions(&questions),
440                seed_id: None,
441                agent_id: None,
442                phase_reached: Phase::Interview,
443                evaluation_passed: false,
444                output: None,
445            });
446        }
447
448        // Record agent response in conversation buffer (for topic shift detection)
449        // Note: interview phase returns questions, not a full agent response,
450        // but we record it for completeness.
451        {
452            let mut buffer = self.conversation_buffer.write();
453            buffer.push_agent("[interview: questions]", &space_id);
454        }
455
456        // Interview complete and ready. Proceed to seed generation.
457        self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
458            .await;
459        self.publish_phase_started(&session_id, Phase::Seed).await;
460
461        // Phase 2: Generate seed
462        tracing::info!(phase = "seed", "Starting seed generation");
463        let seed = self.ouroboros.generate_seed(&interview).await?;
464
465        // Save seed to state store.
466        self.save_seed(&seed).await?;
467
468        // Publish seed created event.
469        self.event_bus
470            .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
471
472        self.publish_phase_completed(&session_id, Phase::Seed, "generated")
473            .await;
474        self.publish_phase_started(&session_id, Phase::Execute)
475            .await;
476
477        // Check if the seed should be split into multi-agent execution.
478        // When the seed has 3+ acceptance criteria, we treat each criterion
479        // as a distinct subtask and delegate to separate agents.
480        if should_split_seed(&seed) {
481            let subtasks = split_into_subtasks(&seed);
482            if subtasks.len() > 1 {
483                tracing::info!(
484                    phase = "delegate",
485                    subtasks = subtasks.len(),
486                    "Delegating to multi-agent"
487                );
488                let results = self.delegate_subtasks(subtasks, &seed).await?;
489
490                // Combine successful results
491                let combined: String = results
492                    .iter()
493                    .filter(|r| r.success)
494                    .filter_map(|r| r.result.as_deref())
495                    .collect::<Vec<_>>()
496                    .join("\n\n");
497
498                let all_passed = results.iter().all(|r| r.success);
499
500                // Clean up the session.
501                {
502                    let mut sessions = self.sessions.write();
503                    sessions.remove(&session_id);
504                }
505
506                tracing::info!(
507                    session_id = %session_id,
508                    subtasks = results.len(),
509                    passed = all_passed,
510                    "Multi-agent orchestration complete"
511                );
512
513                return Ok(OrchestrationResult {
514                    session_id: Some(session_id),
515                    space_id: Some(space_id),
516                    space_tag: Some(space_tag.clone()),
517                    response: format_result_combined(&combined),
518                    seed_id: Some(seed.id),
519                    agent_id: None,
520                    phase_reached: Phase::Execute,
521                    evaluation_passed: all_passed,
522                    output: Some(combined),
523                });
524            }
525        }
526
527        // Record agent response in conversation buffer (for multi-agent case)
528        {
529            let mut buffer = self.conversation_buffer.write();
530            buffer.push_agent("[multi-agent: complete]", &space_id);
531        }
532
533        // Phase 3: Fork and execute agent via lifecycle manager
534        tracing::info!(phase = "execute", "Starting execution phase");
535        let exec_result = self
536            .lifecycle
537            .spawn_and_run(&seed, Priority::Normal)
538            .await?;
539
540        // Periodically reap zombie tasks.
541        self.lifecycle.reap_zombies();
542
543        // Phase 4: Evaluate
544        self.publish_phase_completed(&session_id, Phase::Execute, "completed")
545            .await;
546        self.publish_phase_started(&session_id, Phase::Evaluate)
547            .await;
548
549        tracing::info!(phase = "evaluate", "Starting evaluation phase");
550        let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
551
552        self.publish_phase_completed(
553            &session_id,
554            Phase::Evaluate,
555            &format!("score={:.2}", evaluation.score),
556        )
557        .await;
558
559        self.event_bus.publish(KernelEvent::EvaluationComplete {
560            seed_id: seed.id,
561            passed: evaluation.all_passed(),
562        })?;
563
564        // Save evaluation to state store for lineage tracking.
565        self.save_evaluation(&seed, &evaluation).await?;
566
567        // Phase 5: Evolve if needed
568        let mut current_seed = Some(seed);
569        let mut current_evaluation = evaluation;
570        let mut iterations = 0;
571
572        while !current_evaluation.all_passed()
573            && current_evaluation.score < self.config.min_evaluation_score
574            && iterations < self.config.max_evolution_iterations
575        {
576            iterations += 1;
577            self.publish_phase_started(&session_id, Phase::Evolve).await;
578
579            tracing::info!(
580                phase = "evolve",
581                iteration = iterations,
582                max_iterations = self.config.max_evolution_iterations,
583                "Starting evolve phase"
584            );
585            let evolve_result = self
586                .ouroboros
587                .evolve(
588                    current_seed.as_ref().expect("seed exists"),
589                    &current_evaluation,
590                )
591                .await?;
592
593            if let Some(evolved) = evolve_result {
594                current_seed = Some(evolved.clone());
595
596                // Save evolved seed.
597                self.save_seed(&evolved).await?;
598                self.event_bus.publish(KernelEvent::SeedCreated {
599                    seed_id: evolved.id,
600                })?;
601
602                self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
603                    .await;
604                self.publish_phase_started(&session_id, Phase::Execute)
605                    .await;
606
607                tracing::info!(
608                    phase = "re-execute",
609                    iteration = iterations,
610                    "Re-executing with evolved seed"
611                );
612                let new_exec = self
613                    .lifecycle
614                    .spawn_and_run(&evolved, Priority::High)
615                    .await?;
616
617                self.publish_phase_completed(&session_id, Phase::Execute, "completed")
618                    .await;
619                self.publish_phase_started(&session_id, Phase::Evaluate)
620                    .await;
621
622                tracing::info!(
623                    phase = "re-evaluate",
624                    iteration = iterations,
625                    "Re-evaluating evolved result"
626                );
627                let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
628                current_evaluation = new_eval;
629
630                self.publish_phase_completed(
631                    &session_id,
632                    Phase::Evaluate,
633                    &format!("score={:.2}", current_evaluation.score),
634                )
635                .await;
636                // Save evolved seed evaluation for lineage tracking.
637                self.save_evaluation(&evolved, &current_evaluation).await?;
638            } else {
639                // No evolution possible.
640                self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
641                    .await;
642                break;
643            }
644        }
645
646        // Clean up the session.
647        {
648            let mut sessions = self.sessions.write();
649            sessions.remove(&session_id);
650        }
651
652        let final_seed = current_seed.expect("at least one seed exists");
653        let passed = current_evaluation.all_passed() || current_evaluation.score >= 0.7;
654
655        tracing::info!(
656            session_id = %session_id,
657            iterations,
658            score = current_evaluation.score,
659            passed,
660            "Orchestration complete"
661        );
662
663        // Measure orchestration duration.
664        let metrics = get_metrics();
665        metrics
666            .orch_duration
667            .observe(orch_start.elapsed().as_secs_f64());
668        if passed {
669            metrics.agents_completed.inc();
670        } else {
671            metrics.agents_failed.inc();
672        }
673
674        // Record agent response in conversation buffer (for topic shift detection)
675        {
676            let mut buffer = self.conversation_buffer.write();
677            buffer.push_agent(&final_seed.goal, &space_id);
678        }
679
680        Ok(OrchestrationResult {
681            session_id: Some(session_id),
682            space_id: Some(space_id),
683            space_tag: Some(space_tag.clone()),
684            response: format_result(&final_seed, &current_evaluation),
685            seed_id: Some(final_seed.id),
686            agent_id: None,
687            phase_reached: Phase::Evaluate,
688            evaluation_passed: passed,
689            output: Some(current_evaluation.notes.join("; ")),
690        })
691    }
692
693    /// Save a seed to the state store.
694    async fn save_seed(&self, seed: &Seed) -> Result<()> {
695        let key = seed.id.to_string();
696
697        self.state_store
698            .save_json("seeds", &key, seed)
699            .await
700            .context("failed to save seed to state store")?;
701
702        self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
703
704        Ok(())
705    }
706
707    /// Save an evaluation result to the state store.
708    async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
709        let key = format!("{}-eval", seed.id);
710
711        self.state_store
712            .save_json("evals", &key, evaluation)
713            .await
714            .context("failed to save evaluation to state store")?;
715
716        self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
717
718        Ok(())
719    }
720
721    /// Publish a PhaseStarted event.
722    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
723        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
724            session_id: session_id.to_owned(),
725            phase,
726        });
727    }
728
729    /// Publish a PhaseCompleted event.
730    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
731        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
732            session_id: session_id.to_owned(),
733            phase,
734            result_summary: result.to_owned(),
735        });
736    }
737
738    /// Execute multiple subtasks using separate agents in parallel.
739    ///
740    /// When A2A is available, the orchestrator delegates tasks through the
741    /// A2A protocol with circuit breaker and retry support.
742    /// Otherwise, falls back to direct lifecycle execution.
743    ///
744    /// Results are collected as they complete using `JoinSet`.
745    pub async fn delegate_subtasks(
746        &self,
747        subtasks: Vec<SubTask>,
748        parent_seed: &Seed,
749    ) -> Result<Vec<SubTask>> {
750        // Single task — execute directly without group overhead.
751        if subtasks.len() == 1 {
752            return self.execute_single_subtask(subtasks, parent_seed).await;
753        }
754
755        // Try A2A-based delegation when the protocol is available.
756        if let Some(ref a2a) = self.a2a {
757            // Check circuit breaker
758            if !self.a2a_breaker.is_allowed() {
759                tracing::warn!(
760                    state = ?self.a2a_breaker.state(),
761                    "A2A circuit breaker open, using lifecycle fallback"
762                );
763                return self.delegate_via_lifecycle(subtasks, parent_seed).await;
764            }
765            
766            // Delegate with retry
767            return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
768        }
769
770        // Fallback: direct lifecycle execution (no A2A).
771        self.delegate_via_lifecycle(subtasks, parent_seed).await
772    }
773
774    /// Delegate subtasks via A2A with circuit breaker and retry support.
775    async fn delegate_with_retry(
776        &self,
777        subtasks: Vec<SubTask>,
778        parent_seed: &Seed,
779        a2a: &Arc<crate::a2a::A2AProtocol>,
780    ) -> Result<Vec<SubTask>> {
781        let mut attempt = 0;
782        let max_retries = self.delegation_config.max_retries;
783        
784        loop {
785            match self.delegate_via_a2a(subtasks.clone(), parent_seed, a2a).await {
786                Ok(results) => {
787                    self.a2a_breaker.record_success();
788                    return Ok(results);
789                }
790                Err(e) => {
791                    self.a2a_breaker.record_failure();
792                    attempt += 1;
793                    
794                    if attempt >= max_retries {
795                        tracing::error!(
796                            attempts = attempt,
797                            error = %e,
798                            "A2A delegation exhausted after {} attempts, using lifecycle fallback",
799                            attempt
800                        );
801                        return self.delegate_via_lifecycle(subtasks, parent_seed).await;
802                    }
803                    
804                    // Exponential backoff
805                    let delay = self.delegation_config.backoff_delay(attempt);
806                    tracing::warn!(
807                        attempt,
808                        delay_ms = delay,
809                        error = %e,
810                        "A2A delegation failed, retrying with backoff"
811                    );
812                    tokio::time::sleep(Duration::from_millis(delay)).await;
813                }
814            }
815        }
816    }
817
818    /// Execute a single subtask directly via lifecycle manager.
819    async fn execute_single_subtask(
820        &self,
821        subtasks: Vec<SubTask>,
822        parent_seed: &Seed,
823    ) -> Result<Vec<SubTask>> {
824        let mut task = subtasks.into_iter().next().unwrap();
825        let child_seed = Seed {
826            id: Uuid::new_v4(),
827            goal: task.description.clone(),
828            constraints: parent_seed.constraints.clone(),
829            acceptance_criteria: vec!["Task completes successfully".into()],
830            ontology: parent_seed.ontology.clone(),
831            created_at: chrono::Utc::now(),
832            generation: parent_seed.generation + 1,
833            parent_seed_id: Some(parent_seed.id),
834            cspace_hint: None,
835        };
836        match self
837            .lifecycle
838            .spawn_and_run(&child_seed, Priority::Normal)
839            .await
840        {
841            Ok(result) => {
842                task.result = Some(result.output.clone());
843            }
844            Err(e) => {
845                task.result = Some(format!("Failed: {e}"));
846                task.success = false;
847            }
848        }
849        Ok(vec![task])
850    }
851
852    /// Delegate subtasks via A2A protocol.
853    ///
854    /// Queries the AgentCardRegistry for agents matching each subtask's
855    /// Execute subtasks via A2A dispatch handler.
856    ///
857    /// Queries the AgentCardRegistry for agents matching each subtask's
858    /// required capability, then calls `execute_delegation` which runs
859    /// the task through the registered handler (lifecycle).
860    /// Falls back to direct lifecycle execution when no handler is registered.
861    async fn delegate_via_a2a(
862        &self,
863        subtasks: Vec<SubTask>,
864        parent_seed: &Seed,
865        a2a: &Arc<crate::a2a::A2AProtocol>,
866    ) -> Result<Vec<SubTask>> {
867        use crate::a2a::TaskPriority;
868        use tokio::task::JoinSet;
869
870        tracing::info!(
871            subtasks = subtasks.len(),
872            "Delegating subtasks via A2A protocol"
873        );
874
875        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
876        let subtask_count = subtasks.len();
877        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
878
879        for (idx, subtask) in subtasks.into_iter().enumerate() {
880            let capability = subtask.required_capability.clone();
881            let description = subtask.description.clone();
882            let subtask_id = subtask.id;
883            let role = subtask.role.clone();
884            let a2a = Arc::clone(a2a);
885            let parent_seed = parent_seed.clone();
886            let lifecycle = self.lifecycle.clone();
887
888            join_set.spawn(async move {
889                // Find agent with the required capability via A2A registry.
890                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
891                    a2a.query_capabilities(cap).await.ok()
892                        .and_then(|agents| agents.into_iter().next())
893                } else {
894                    None
895                };
896
897                let (output, success) = if let Some(ref target_card) = target {
898                    let target_id = target_card.agent_id;
899                    tracing::info!(
900                        subtask_index = idx,
901                        target = %target_card.name,
902                        target_id = %target_id,
903                        "A2A dispatching subtask"
904                    );
905
906                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
907                        "parent_seed": parent_seed.id.to_string(),
908                        "goal": description,
909                    }))
910                    .with_priority(TaskPriority::Normal);
911
912                    // Enqueue audit trail (fire-and-forget into queue).
913                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
914
915                    // Execute through dispatch handler (blocking).
916                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
917                        Some(Ok(result)) => {
918                            let out = result.get("output")
919                                .and_then(|v| v.as_str())
920                                .unwrap_or("")
921                                .to_string();
922                            let ok = result.get("success")
923                                .and_then(|v| v.as_bool())
924                                .unwrap_or(false);
925                            (out, ok)
926                        }
927                        Some(Err(e)) => {
928                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
929                            (format!("Failed: {e}"), false)
930                        }
931                        None => {
932                            // No handler — fallback to lifecycle.
933                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
934                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
935                        }
936                    }
937                } else {
938                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
939                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
940                };
941
942                (idx, SubTask {
943                    id: subtask_id,
944                    description,
945                    required_capability: capability,
946                    result: Some(output),
947                    success,
948                    role,
949                })
950            });
951        }
952
953        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
954        while let Some(join_result) = join_set.join_next().await {
955            match join_result {
956                Ok((idx, subtask)) => {
957                    results[idx] = Some(subtask);
958                }
959                Err(e) => {
960                    tracing::error!(error = %e, "A2A task panicked");
961                }
962            }
963        }
964
965        let completed: Vec<SubTask> = results.into_iter().flatten().collect();
966        tracing::info!(
967            completed = completed.len(),
968            succeeded = completed.iter().filter(|r| r.success).count(),
969            "A2A delegation complete"
970        );
971        Ok(completed)
972    }
973
974    async fn delegate_via_lifecycle(
975        &self,
976        subtasks: Vec<SubTask>,
977        parent_seed: &Seed,
978    ) -> Result<Vec<SubTask>> {
979        use crate::agent_group::OxiosAgentGroup;
980        use tokio::task::JoinSet;
981
982        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
983        let group = OxiosAgentGroup::new(parent_seed, descriptions);
984        let group_id = group.id;
985
986        self.event_bus.publish(KernelEvent::AgentGroupCreated {
987            group_id,
988            agent_count: group.agents.len(),
989        })?;
990
991        tracing::info!(
992            group_id = %group_id,
993            agent_count = group.agents.len(),
994            "Starting parallel multi-agent execution"
995        );
996
997        let mut join_set: JoinSet<(
998            usize,
999            crate::types::AgentId,
1000            Result<oxios_ouroboros::ExecutionResult>,
1001        )> = JoinSet::new();
1002
1003        for (idx, agent_entry) in group.agents.iter().enumerate() {
1004            let child_seed = agent_entry.seed.clone();
1005            let agent_id = agent_entry.id;
1006            let lifecycle = self.lifecycle.clone();
1007
1008            join_set.spawn(async move {
1009                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1010                (idx, agent_id, result)
1011            });
1012        }
1013
1014        let subtask_count = subtasks.len();
1015        let mut completed = vec![None; subtask_count];
1016        while let Some(join_result) = join_set.join_next().await {
1017            match join_result {
1018                Ok((idx, agent_id, Ok(exec_result))) => {
1019                    let _ = self
1020                        .event_bus
1021                        .publish(KernelEvent::AgentGroupMemberCompleted {
1022                            group_id,
1023                            agent_id,
1024                            success: exec_result.success,
1025                        });
1026                    completed[idx] = Some(SubTask {
1027                        id: subtasks[idx].id,
1028                        description: subtasks[idx].description.clone(),
1029                        required_capability: subtasks[idx].required_capability.clone(),
1030                        result: Some(exec_result.output.clone()),
1031                        success: exec_result.success,
1032                        role: subtasks[idx].role.clone(),
1033                    });
1034                }
1035                Ok((idx, agent_id, Err(e))) => {
1036                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1037                    let _ = self
1038                        .event_bus
1039                        .publish(KernelEvent::AgentGroupMemberCompleted {
1040                            group_id,
1041                            agent_id,
1042                            success: false,
1043                        });
1044                    completed[idx] = Some(SubTask {
1045                        id: subtasks[idx].id,
1046                        description: subtasks[idx].description.clone(),
1047                        required_capability: subtasks[idx].required_capability.clone(),
1048                        result: Some(format!("Failed: {e}")),
1049                        success: false,
1050                        role: subtasks[idx].role.clone(),
1051                    });
1052                }
1053                Err(e) => {
1054                    tracing::error!(error = %e, "JoinSet task panicked");
1055                }
1056            }
1057        }
1058
1059        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1060        let succeeded = completed.iter().filter(|r| r.success).count();
1061        let total = completed.len();
1062
1063        tracing::info!(
1064            group_id = %group_id,
1065            succeeded,
1066            total,
1067            "Parallel multi-agent execution complete"
1068        );
1069
1070        // Persist group state
1071        let _ = self
1072            .state_store
1073            .save_json("agent_groups", &group_id.to_string(), &group)
1074            .await;
1075        self.git_commit(
1076            &format!("agent_groups/{}.json", group_id),
1077            "orchestrator: save group",
1078        );
1079
1080        Ok(completed)
1081    }
1082}
1083
1084/// Active session state for multi-turn interviews.
1085#[derive(Debug, Clone)]
1086#[allow(unused)]
1087struct InterviewSession {
1088    id: String,
1089    interview: InterviewResult,
1090    phase: Phase,
1091    seed_id: Option<Uuid>,
1092    agent_id: Option<AgentId>,
1093}
1094
1095/// Result of a full orchestration cycle.
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097pub struct OrchestrationResult {
1098    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
1099    #[serde(skip_serializing_if = "Option::is_none")]
1100    pub session_id: Option<String>,
1101    /// The Space ID that handled this message.
1102    #[serde(skip_serializing_if = "Option::is_none")]
1103    pub space_id: Option<Uuid>,
1104    /// Space decoration tag for the response (e.g. "[🔧 oxios]").
1105    #[serde(skip_serializing_if = "Option::is_none")]
1106    pub space_tag: Option<String>,
1107    /// The response to send back to the user.
1108    pub response: String,
1109    /// The seed that was created (if seed phase was reached).
1110    #[serde(skip_serializing_if = "Option::is_none")]
1111    pub seed_id: Option<Uuid>,
1112    /// The agent that executed (if execute phase was reached).
1113    #[serde(skip_serializing_if = "Option::is_none")]
1114    pub agent_id: Option<AgentId>,
1115    /// The furthest phase reached.
1116    pub phase_reached: Phase,
1117    /// Whether evaluation passed (false if evaluation was skipped or failed).
1118    pub evaluation_passed: bool,
1119    /// Output or notes from evaluation.
1120    #[serde(skip_serializing_if = "Option::is_none")]
1121    pub output: Option<String>,
1122}
1123
1124/// Format clarifying questions for display.
1125fn format_questions(questions: &[String]) -> String {
1126    if questions.is_empty() {
1127        "I need a bit more clarification before I can proceed.".to_string()
1128    } else {
1129        format!(
1130            "I'd like to understand your request better. Could you help clarify:\n\n{}",
1131            questions
1132                .iter()
1133                .enumerate()
1134                .map(|(i, q)| format!("{}. {}", i + 1, q))
1135                .collect::<Vec<_>>()
1136                .join("\n")
1137        )
1138    }
1139}
1140
1141/// Format the final result for display.
1142fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
1143    let passed = evaluation.all_passed();
1144
1145    let mut lines = Vec::new();
1146    lines.push(format!("Task '{}' completed.", seed.goal));
1147
1148    if passed {
1149        lines.push("✅ Evaluation passed.".to_string());
1150    } else {
1151        lines.push(format!(
1152            "⚠️ Evaluation score: {:.0}%",
1153            evaluation.score * 100.0
1154        ));
1155    }
1156
1157    if !evaluation.notes.is_empty() {
1158        lines.push("\nNotes:".to_string());
1159        for note in &evaluation.notes {
1160            lines.push(format!("- {}", note));
1161        }
1162    }
1163
1164    lines.join("\n")
1165}
1166
1167/// Check if a seed should be split into subtasks.
1168///
1169/// Simple heuristic: if the seed has 3 or more acceptance criteria,
1170/// it likely contains distinct concerns that can be parallelized.
1171fn should_split_seed(seed: &Seed) -> bool {
1172    // Only split for genuinely complex tasks with many criteria.
1173    // Simple tasks (even with 3-4 criteria) are better handled by a single agent
1174    // to preserve context coherence.
1175    seed.acceptance_criteria.len() >= 5
1176}
1177
1178/// Split a seed into subtasks based on acceptance criteria.
1179///
1180/// Each acceptance criterion becomes a separate subtask with the
1181/// parent seed's goal as context. Infers required capability from
1182/// the goal text using the same heuristic as `build_agent_card`.
1183fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1184    seed.acceptance_criteria
1185        .iter()
1186        .map(|criterion| {
1187            let desc = format!("{}: {}", seed.goal, criterion);
1188            let desc_lower = desc.to_lowercase();
1189
1190            // Infer capability from subtask description.
1191            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1192                Some("code-review".to_string())
1193            } else if desc_lower.contains("test") {
1194                Some("testing".to_string())
1195            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1196                Some("refactoring".to_string())
1197            } else if desc_lower.contains("write")
1198                || desc_lower.contains("create")
1199                || desc_lower.contains("implement")
1200            {
1201                Some("code-generation".to_string())
1202            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1203                Some("debugging".to_string())
1204            } else {
1205                None
1206            };
1207
1208            let mut subtask = SubTask::new(desc);
1209            subtask.required_capability = cap;
1210            subtask
1211        })
1212        .collect()
1213}
1214
1215/// Format combined results from multi-agent execution.
1216fn format_result_combined(combined: &str) -> String {
1217    if combined.is_empty() {
1218        "No subtasks completed successfully.".to_string()
1219    } else {
1220        format!("Multi-agent execution completed:\n\n{}", combined)
1221    }
1222}
1223
1224/// Execute a subtask via lifecycle manager, returning (output, success).
1225async fn run_via_lifecycle(
1226    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1227    parent_seed: &Seed,
1228    description: &str,
1229) -> (String, bool) {
1230    let child_seed = Seed {
1231        id: Uuid::new_v4(),
1232        goal: description.to_string(),
1233        constraints: parent_seed.constraints.clone(),
1234        acceptance_criteria: vec!["Task completes successfully".into()],
1235        ontology: parent_seed.ontology.clone(),
1236        created_at: chrono::Utc::now(),
1237        generation: parent_seed.generation + 1,
1238        parent_seed_id: Some(parent_seed.id),
1239        cspace_hint: None,
1240    };
1241    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1242        Ok(result) => (result.output, result.success),
1243        Err(e) => (format!("Failed: {e}"), false),
1244    }
1245}