Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the full Ouroboros lifecycle for user messages.
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol
4//! end-to-end. Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed when ambiguity is low enough
7//! 3. Fork and execute an agent via the supervisor
8//! 4. Evaluate the result
9//! 5. Evolve and re-execute if evaluation fails
10//!
11//! The orchestrator does NOT know about channels or HTTP — it only
12//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::{Context, Result};
18use chrono;
19use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::agent_lifecycle::AgentLifecycleManager;
25use crate::event_bus::{EventBus, KernelEvent};
26use crate::git_layer::GitLayer;
27use crate::metrics::get_metrics;
28use crate::scheduler::Priority;
29use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
30use crate::state_store::StateStore;
31use crate::types::AgentId;
32
33/// Role of an agent within a group.
34#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
35pub enum AgentRole {
36    /// Executes a specific subtask.
37    #[default]
38    Worker,
39    /// Coordinates subtasks, synthesizes results.
40    Manager,
41}
42
43/// A subtask within a multi-agent plan.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SubTask {
46    /// Unique subtask ID.
47    pub id: Uuid,
48    /// Human-readable description.
49    pub description: String,
50    /// Capability required (e.g., "code-review", "testing").
51    pub required_capability: Option<String>,
52    /// Result of the subtask (filled after execution).
53    pub result: Option<String>,
54    /// Whether this subtask succeeded.
55    pub success: bool,
56    /// Role of the agent assigned to this subtask.
57    #[serde(default)]
58    pub role: AgentRole,
59}
60
61impl SubTask {
62    /// Create a new subtask with the given description.
63    pub fn new(description: impl Into<String>) -> Self {
64        Self {
65            id: Uuid::new_v4(),
66            description: description.into(),
67            required_capability: None,
68            result: None,
69            success: false,
70            role: AgentRole::default(),
71        }
72    }
73
74    /// Set the required capability for this subtask.
75    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
76        self.required_capability = Some(cap.into());
77        self
78    }
79}
80
81/// The orchestrator coordinates the full Ouroboros lifecycle.
82pub struct Orchestrator {
83    ouroboros: Arc<dyn OuroborosProtocol>,
84    event_bus: EventBus,
85    state_store: Arc<StateStore>,
86    /// Git version control layer for auto-commits.
87    git_layer: Option<Arc<GitLayer>>,
88    /// Active interview sessions, keyed by session ID.
89    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
90    /// Agent lifecycle manager (fork, register, run, cleanup).
91    lifecycle: AgentLifecycleManager,
92    /// A2A protocol for inter-agent task delegation.
93    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
94    /// Space manager for context partitioning.
95    space_manager: RwLock<Option<Arc<SpaceManager>>>,
96    /// Conversation buffer for topic shift detection.
97    conversation_buffer: RwLock<ConversationBuffer>,
98    /// Orchestrator configuration (Ouroboros protocol settings).
99    config: crate::config::OrchestratorConfig,
100    /// Delegation configuration for A2A retries.
101    delegation_config: DelegationConfig,
102    /// A2A circuit breaker for delegation reliability.
103    a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
104}
105
106/// Configuration for A2A delegation retries.
107#[derive(Debug, Clone)]
108struct DelegationConfig {
109    /// Maximum retry attempts for A2A delegation.
110    max_retries: u32,
111    /// Base delay for exponential backoff (milliseconds).
112    base_delay_ms: u64,
113    /// Maximum delay cap for exponential backoff (milliseconds).
114    max_delay_ms: u64,
115    /// Timeout per delegation attempt (milliseconds).
116    #[allow(dead_code)]
117    timeout_ms: u64,
118}
119
120impl Default for DelegationConfig {
121    fn default() -> Self {
122        Self {
123            max_retries: 3,
124            base_delay_ms: 100,
125            max_delay_ms: 5000,
126            timeout_ms: 5000,
127        }
128    }
129}
130
131impl DelegationConfig {
132    /// Calculate exponential backoff delay.
133    fn backoff_delay(&self, attempt: u32) -> u64 {
134        let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
135        delay.min(self.max_delay_ms)
136    }
137}
138
139impl Orchestrator {
140    /// Creates a new orchestrator.
141    pub fn new(
142        ouroboros: Arc<dyn OuroborosProtocol>,
143        event_bus: EventBus,
144        state_store: Arc<StateStore>,
145        lifecycle: AgentLifecycleManager,
146    ) -> Self {
147        Self::with_config(
148            ouroboros,
149            event_bus,
150            state_store,
151            lifecycle,
152            crate::config::OrchestratorConfig::default(),
153        )
154    }
155
156    /// Creates a new orchestrator with custom config.
157    pub fn with_config(
158        ouroboros: Arc<dyn OuroborosProtocol>,
159        event_bus: EventBus,
160        state_store: Arc<StateStore>,
161        lifecycle: AgentLifecycleManager,
162        config: crate::config::OrchestratorConfig,
163    ) -> Self {
164        Self {
165            ouroboros,
166            event_bus,
167            state_store,
168            git_layer: None,
169            sessions: RwLock::new(std::collections::HashMap::new()),
170            lifecycle,
171            a2a: None,
172            space_manager: RwLock::new(None),
173            conversation_buffer: RwLock::new(ConversationBuffer::default()),
174            config,
175            delegation_config: DelegationConfig::default(),
176            a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
177        }
178    }
179
180    /// Set the SpaceManager for context partitioning.
181    pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
182        *self.space_manager.write() = Some(manager);
183    }
184
185    /// Get the current Space ID, if SpaceManager is set.
186    pub fn current_space_id(&self) -> Option<SpaceId> {
187        self.space_manager
188            .read()
189            .as_ref()
190            .map(|m| m.current_space_id())
191    }
192
193    /// Get the current Space name tag for response decoration.
194    pub fn current_space_tag(&self) -> String {
195        self.space_manager
196            .read()
197            .as_ref()
198            .and_then(|m| {
199                m.current_space().map(|s| {
200                    if s.is_default() {
201                        String::new()
202                    } else {
203                        format!("[{} {}]", s.emoji(), s.name)
204                    }
205                })
206            })
207            .unwrap_or_default()
208    }
209
210    /// Set the A2A protocol for inter-agent task delegation.
211    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
212        self.a2a = Some(a2a);
213    }
214
215    /// Set the GitLayer for auto-commits after state saves.
216    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
217        self.git_layer = Some(git_layer);
218    }
219
220    /// Commit a file to git if GitLayer is configured and enabled.
221    fn git_commit(&self, rel_path: &str, message: &str) {
222        if let Some(ref gl) = self.git_layer {
223            if gl.is_enabled() {
224                let _ = gl.commit_file(rel_path, message);
225            }
226        }
227    }
228
229    /// Handle a user message through the full Ouroboros loop.
230    ///
231    /// Returns an `OrchestrationResult` with the response and metadata.
232    ///
233    /// If the interview phase needs clarification (ambiguity > 0.2),
234    /// the result will contain the questions and the phase will be
235    /// `Phase::Interview`. The caller should send these questions to
236    /// the user and include the `session_id` in follow-up messages.
237    #[allow(clippy::await_holding_lock)]
238    pub async fn handle_message(
239        &self,
240        user_id: &str,
241        user_message: &str,
242        session_id: Option<&str>,
243    ) -> Result<OrchestrationResult> {
244        tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
245        get_metrics().messages.inc();
246        let orch_start = std::time::Instant::now();
247
248        let session_id = session_id
249            .map(String::from)
250            .unwrap_or_else(|| Uuid::new_v4().to_string());
251
252        tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
253
254        // ── Space Detection ──
255        let space_tag = self.current_space_tag();
256        let (turns, sm_arc) = {
257            let buffer = self.conversation_buffer.read();
258            let sm_guard = self.space_manager.read();
259            let turns: Vec<_> = buffer.turns().iter().cloned().collect();
260            // Extract Arc from guard and drop guard before .await
261            let sm_arc = sm_guard.as_ref().cloned();
262            (turns, sm_arc)
263        };
264        let space_id = if let Some(ref sm) = sm_arc {
265            match sm.detect_or_create(user_message, &turns).await {
266                Ok(id) => {
267                    tracing::info!(space_id = %id, "Space detected/created for message");
268                    id
269                }
270                Err(e) => {
271                    tracing::warn!(error = %e, "Space detection failed, using default");
272                    sm.default_space_id()
273                }
274            }
275        } else {
276            uuid::Uuid::nil()
277        };
278
279        // Record user message in conversation buffer
280        {
281            let mut buffer = self.conversation_buffer.write();
282            buffer.push_user(user_message);
283        }
284
285        // Phase 1: Interview
286        self.publish_phase_started(&session_id, Phase::Interview)
287            .await;
288
289        // Get or create the interview session (pre-fetch to avoid lock across await).
290        let needs_interview;
291        let existing_history: Option<Vec<_>>;
292        {
293            let sessions = self.sessions.read();
294            needs_interview = !sessions.contains_key(&session_id);
295            existing_history = if !needs_interview {
296                sessions
297                    .get(&session_id)
298                    .map(|s| s.interview.conversation_history.clone())
299            } else {
300                None
301            };
302            // Lock dropped here before any .await
303        }
304
305        // Conduct the interview.
306        let interview = {
307            tracing::info!(phase = "interview", "Starting interview phase");
308            if needs_interview {
309                self.ouroboros.interview(user_message).await?
310            } else {
311                // This is a follow-up message in an existing interview.
312                // Build multi-turn context from conversation history.
313                let multi_turn_context = {
314                    let mut context_parts = Vec::new();
315                    if let Some(ref history) = existing_history {
316                        for exchange in history {
317                            context_parts.push(format!(
318                                "User: {}\nAgent: {}",
319                                exchange.user, exchange.agent
320                            ));
321                        }
322                    }
323                    context_parts.push(format!("User: {}", user_message));
324                    context_parts.join("\n\n")
325                };
326
327                // Record user's answer in session for future turns (brief write lock)
328                {
329                    let mut sessions = self.sessions.write();
330                    if let Some(s) = sessions.get_mut(&session_id) {
331                        let last_q = s.interview.questions.last().cloned().unwrap_or_default();
332                        s.interview.add_exchange(&last_q, user_message);
333                    }
334                }
335
336                // Run another interview pass with full conversation history.
337                self.ouroboros.interview(&multi_turn_context).await?
338            }
339        };
340
341        // If this is a non-task message (greeting, small talk), return the chat response directly.
342        if !interview.is_task {
343            tracing::info!(session_id = %session_id, "Chat response (non-task)");
344
345            let response_text = if interview.chat_response.is_empty() {
346                "Hello! How can I help you today?".to_string()
347            } else {
348                interview.chat_response.clone()
349            };
350
351            // Record agent response in conversation buffer
352            {
353                let mut buffer = self.conversation_buffer.write();
354                buffer.push_agent(&response_text, &space_id);
355            }
356
357            // Record exchange in conversation history for multi-turn
358            // and store session so multi-turn works on follow-up messages
359            {
360                let mut sessions = self.sessions.write();
361                if let Some(session) = sessions.get_mut(&session_id) {
362                    tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
363                    session
364                        .interview
365                        .add_to_history(user_message, &response_text);
366                } else {
367                    // First non-task message — create a minimal session for history
368                    let mut interview = InterviewResult::new();
369                    interview.is_task = false;
370                    interview.chat_response = response_text.clone();
371                    interview.add_to_history(user_message, &response_text);
372                    sessions.insert(
373                        session_id.clone(),
374                        InterviewSession {
375                            id: session_id.clone(),
376                            interview,
377                            phase: Phase::Interview,
378                            seed_id: None,
379                            agent_id: None,
380                        },
381                    );
382                }
383            }
384
385            self.publish_phase_completed(&session_id, Phase::Interview, "chat")
386                .await;
387
388            return Ok(OrchestrationResult {
389                session_id: Some(session_id.clone()),
390                space_id: Some(space_id),
391                space_tag: Some(space_tag.clone()),
392                response: response_text,
393                seed_id: None,
394                agent_id: None,
395                phase_reached: Phase::Interview,
396                evaluation_passed: false,
397                output: None,
398            });
399        }
400
401        // If ambiguity is too high, return questions for the user to answer.
402        if !interview.ready_for_seed {
403            // Record this exchange in conversation history and store the interview.
404            {
405                let mut sessions = self.sessions.write();
406                let session =
407                    sessions
408                        .entry(session_id.clone())
409                        .or_insert_with(|| InterviewSession {
410                            id: session_id.clone(),
411                            interview: interview.clone(),
412                            phase: Phase::Interview,
413                            seed_id: None,
414                            agent_id: None,
415                        });
416                // The session already has user's answer recorded via add_exchange above.
417                // Record the questions as the agent's response in history.
418                let questions_text = interview.questions.join("\n");
419                let last_answer = session.interview.answers.last().cloned();
420                if let Some(ref ans) = last_answer {
421                    if !ans.is_empty() {
422                        session.interview.add_to_history(ans, &questions_text);
423                    }
424                }
425            } // Lock dropped before .await
426
427            let questions = interview
428                .questions
429                .iter()
430                .filter(|q| !q.is_empty())
431                .cloned()
432                .collect::<Vec<_>>();
433
434            tracing::info!(
435                session_id = %session_id,
436                ambiguity = interview.ambiguity.ambiguity(),
437                questions = questions.len(),
438                "Interview needs clarification"
439            );
440
441            self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
442                .await;
443
444            return Ok(OrchestrationResult {
445                session_id: Some(session_id.clone()),
446                space_id: Some(space_id),
447                space_tag: Some(space_tag.clone()),
448                response: format_questions(&questions),
449                seed_id: None,
450                agent_id: None,
451                phase_reached: Phase::Interview,
452                evaluation_passed: false,
453                output: None,
454            });
455        }
456
457        // Record agent response in conversation buffer (for topic shift detection)
458        // Note: interview phase returns questions, not a full agent response,
459        // but we record it for completeness.
460        {
461            let mut buffer = self.conversation_buffer.write();
462            buffer.push_agent("[interview: questions]", &space_id);
463        }
464
465        // Interview complete and ready. Proceed to seed generation.
466        self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
467            .await;
468        self.publish_phase_started(&session_id, Phase::Seed).await;
469
470        // Phase 2: Generate seed
471        tracing::info!(phase = "seed", "Starting seed generation");
472        let seed = self.ouroboros.generate_seed(&interview).await?;
473
474        // Save seed to state store.
475        self.save_seed(&seed).await?;
476
477        // Publish seed created event.
478        self.event_bus
479            .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
480
481        self.publish_phase_completed(&session_id, Phase::Seed, "generated")
482            .await;
483        self.publish_phase_started(&session_id, Phase::Execute)
484            .await;
485
486        // Check if the seed should be split into multi-agent execution.
487        // When the seed has 3+ acceptance criteria, we treat each criterion
488        // as a distinct subtask and delegate to separate agents.
489        if should_split_seed(&seed) {
490            let subtasks = split_into_subtasks(&seed);
491            if subtasks.len() > 1 {
492                tracing::info!(
493                    phase = "delegate",
494                    subtasks = subtasks.len(),
495                    "Delegating to multi-agent"
496                );
497                let results = self.delegate_subtasks(subtasks, &seed).await?;
498
499                // Combine successful results
500                let combined: String = results
501                    .iter()
502                    .filter(|r| r.success)
503                    .filter_map(|r| r.result.as_deref())
504                    .collect::<Vec<_>>()
505                    .join("\n\n");
506
507                let all_passed = results.iter().all(|r| r.success);
508
509                // Clean up the session.
510                {
511                    let mut sessions = self.sessions.write();
512                    sessions.remove(&session_id);
513                }
514
515                tracing::info!(
516                    session_id = %session_id,
517                    subtasks = results.len(),
518                    passed = all_passed,
519                    "Multi-agent orchestration complete"
520                );
521
522                return Ok(OrchestrationResult {
523                    session_id: Some(session_id),
524                    space_id: Some(space_id),
525                    space_tag: Some(space_tag.clone()),
526                    response: format_result_combined(&combined),
527                    seed_id: Some(seed.id),
528                    agent_id: None,
529                    phase_reached: Phase::Execute,
530                    evaluation_passed: all_passed,
531                    output: Some(combined),
532                });
533            }
534        }
535
536        // Record agent response in conversation buffer (for multi-agent case)
537        {
538            let mut buffer = self.conversation_buffer.write();
539            buffer.push_agent("[multi-agent: complete]", &space_id);
540        }
541
542        // Phase 3: Fork and execute agent via lifecycle manager
543        tracing::info!(phase = "execute", "Starting execution phase");
544        let exec_result = self
545            .lifecycle
546            .spawn_and_run(&seed, Priority::Normal)
547            .await?;
548
549        // Periodically reap zombie tasks.
550        self.lifecycle.reap_zombies();
551
552        // Phase 4: Evaluate
553        self.publish_phase_completed(&session_id, Phase::Execute, "completed")
554            .await;
555        self.publish_phase_started(&session_id, Phase::Evaluate)
556            .await;
557
558        tracing::info!(phase = "evaluate", "Starting evaluation phase");
559        let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
560
561        self.publish_phase_completed(
562            &session_id,
563            Phase::Evaluate,
564            &format!("score={:.2}", evaluation.score),
565        )
566        .await;
567
568        self.event_bus.publish(KernelEvent::EvaluationComplete {
569            seed_id: seed.id,
570            passed: evaluation.all_passed(),
571        })?;
572
573        // Save evaluation to state store for lineage tracking.
574        self.save_evaluation(&seed, &evaluation).await?;
575
576        // Phase 5: Evolve if needed
577        let mut current_seed = Some(seed);
578        let mut current_evaluation = evaluation;
579        let mut iterations = 0;
580
581        while !current_evaluation.all_passed()
582            && current_evaluation.score < self.config.min_evaluation_score
583            && iterations < self.config.max_evolution_iterations
584        {
585            iterations += 1;
586            self.publish_phase_started(&session_id, Phase::Evolve).await;
587
588            tracing::info!(
589                phase = "evolve",
590                iteration = iterations,
591                max_iterations = self.config.max_evolution_iterations,
592                "Starting evolve phase"
593            );
594            let evolve_result = self
595                .ouroboros
596                .evolve(
597                    current_seed.as_ref().expect("seed exists"),
598                    &current_evaluation,
599                )
600                .await?;
601
602            if let Some(evolved) = evolve_result {
603                current_seed = Some(evolved.clone());
604
605                // Save evolved seed.
606                self.save_seed(&evolved).await?;
607                self.event_bus.publish(KernelEvent::SeedCreated {
608                    seed_id: evolved.id,
609                })?;
610
611                self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
612                    .await;
613                self.publish_phase_started(&session_id, Phase::Execute)
614                    .await;
615
616                tracing::info!(
617                    phase = "re-execute",
618                    iteration = iterations,
619                    "Re-executing with evolved seed"
620                );
621                let new_exec = self
622                    .lifecycle
623                    .spawn_and_run(&evolved, Priority::High)
624                    .await?;
625
626                self.publish_phase_completed(&session_id, Phase::Execute, "completed")
627                    .await;
628                self.publish_phase_started(&session_id, Phase::Evaluate)
629                    .await;
630
631                tracing::info!(
632                    phase = "re-evaluate",
633                    iteration = iterations,
634                    "Re-evaluating evolved result"
635                );
636                let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
637                current_evaluation = new_eval;
638
639                self.publish_phase_completed(
640                    &session_id,
641                    Phase::Evaluate,
642                    &format!("score={:.2}", current_evaluation.score),
643                )
644                .await;
645                // Save evolved seed evaluation for lineage tracking.
646                self.save_evaluation(&evolved, &current_evaluation).await?;
647            } else {
648                // No evolution possible.
649                self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
650                    .await;
651                break;
652            }
653        }
654
655        // Clean up the session.
656        {
657            let mut sessions = self.sessions.write();
658            sessions.remove(&session_id);
659        }
660
661        let final_seed = current_seed.expect("at least one seed exists");
662        let passed = current_evaluation.all_passed() || current_evaluation.score >= 0.7;
663
664        tracing::info!(
665            session_id = %session_id,
666            iterations,
667            score = current_evaluation.score,
668            passed,
669            "Orchestration complete"
670        );
671
672        // Measure orchestration duration.
673        let metrics = get_metrics();
674        metrics
675            .orch_duration
676            .observe(orch_start.elapsed().as_secs_f64());
677        if passed {
678            metrics.agents_completed.inc();
679        } else {
680            metrics.agents_failed.inc();
681        }
682
683        // Record agent response in conversation buffer (for topic shift detection)
684        {
685            let mut buffer = self.conversation_buffer.write();
686            buffer.push_agent(&final_seed.goal, &space_id);
687        }
688
689        Ok(OrchestrationResult {
690            session_id: Some(session_id),
691            space_id: Some(space_id),
692            space_tag: Some(space_tag.clone()),
693            response: format_result(&final_seed, &current_evaluation),
694            seed_id: Some(final_seed.id),
695            agent_id: None,
696            phase_reached: Phase::Evaluate,
697            evaluation_passed: passed,
698            output: Some(current_evaluation.notes.join("; ")),
699        })
700    }
701
702    /// Save a seed to the state store.
703    async fn save_seed(&self, seed: &Seed) -> Result<()> {
704        let key = seed.id.to_string();
705
706        self.state_store
707            .save_json("seeds", &key, seed)
708            .await
709            .context("failed to save seed to state store")?;
710
711        self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
712
713        Ok(())
714    }
715
716    /// Save an evaluation result to the state store.
717    async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
718        let key = format!("{}-eval", seed.id);
719
720        self.state_store
721            .save_json("evals", &key, evaluation)
722            .await
723            .context("failed to save evaluation to state store")?;
724
725        self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
726
727        Ok(())
728    }
729
730    /// Publish a PhaseStarted event.
731    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
732        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
733            session_id: session_id.to_owned(),
734            phase,
735        });
736    }
737
738    /// Publish a PhaseCompleted event.
739    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
740        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
741            session_id: session_id.to_owned(),
742            phase,
743            result_summary: result.to_owned(),
744        });
745    }
746
747    /// Execute multiple subtasks using separate agents in parallel.
748    ///
749    /// When A2A is available, the orchestrator delegates tasks through the
750    /// A2A protocol with circuit breaker and retry support.
751    /// Otherwise, falls back to direct lifecycle execution.
752    ///
753    /// Results are collected as they complete using `JoinSet`.
754    pub async fn delegate_subtasks(
755        &self,
756        subtasks: Vec<SubTask>,
757        parent_seed: &Seed,
758    ) -> Result<Vec<SubTask>> {
759        // Single task — execute directly without group overhead.
760        if subtasks.len() == 1 {
761            return self.execute_single_subtask(subtasks, parent_seed).await;
762        }
763
764        // Try A2A-based delegation when the protocol is available.
765        if let Some(ref a2a) = self.a2a {
766            // Check circuit breaker
767            if !self.a2a_breaker.is_allowed() {
768                tracing::warn!(
769                    state = ?self.a2a_breaker.state(),
770                    "A2A circuit breaker open, using lifecycle fallback"
771                );
772                return self.delegate_via_lifecycle(subtasks, parent_seed).await;
773            }
774
775            // Delegate with retry
776            return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
777        }
778
779        // Fallback: direct lifecycle execution (no A2A).
780        self.delegate_via_lifecycle(subtasks, parent_seed).await
781    }
782
783    /// Delegate subtasks via A2A with circuit breaker and retry support.
784    async fn delegate_with_retry(
785        &self,
786        subtasks: Vec<SubTask>,
787        parent_seed: &Seed,
788        a2a: &Arc<crate::a2a::A2AProtocol>,
789    ) -> Result<Vec<SubTask>> {
790        let mut attempt = 0;
791        let max_retries = self.delegation_config.max_retries;
792
793        loop {
794            match self
795                .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
796                .await
797            {
798                Ok(results) => {
799                    self.a2a_breaker.record_success();
800                    return Ok(results);
801                }
802                Err(e) => {
803                    self.a2a_breaker.record_failure();
804                    attempt += 1;
805
806                    if attempt >= max_retries {
807                        tracing::error!(
808                            attempts = attempt,
809                            error = %e,
810                            "A2A delegation exhausted after {} attempts, using lifecycle fallback",
811                            attempt
812                        );
813                        return self.delegate_via_lifecycle(subtasks, parent_seed).await;
814                    }
815
816                    // Exponential backoff
817                    let delay = self.delegation_config.backoff_delay(attempt);
818                    tracing::warn!(
819                        attempt,
820                        delay_ms = delay,
821                        error = %e,
822                        "A2A delegation failed, retrying with backoff"
823                    );
824                    tokio::time::sleep(Duration::from_millis(delay)).await;
825                }
826            }
827        }
828    }
829
830    /// Execute a single subtask directly via lifecycle manager.
831    async fn execute_single_subtask(
832        &self,
833        subtasks: Vec<SubTask>,
834        parent_seed: &Seed,
835    ) -> Result<Vec<SubTask>> {
836        let mut task = subtasks
837            .into_iter()
838            .next()
839            .expect("execute_single_subtask is only called when subtasks is non-empty");
840        let child_seed = Seed {
841            id: Uuid::new_v4(),
842            goal: task.description.clone(),
843            constraints: parent_seed.constraints.clone(),
844            acceptance_criteria: vec!["Task completes successfully".into()],
845            ontology: parent_seed.ontology.clone(),
846            created_at: chrono::Utc::now(),
847            generation: parent_seed.generation + 1,
848            parent_seed_id: Some(parent_seed.id),
849            cspace_hint: None,
850        };
851        match self
852            .lifecycle
853            .spawn_and_run(&child_seed, Priority::Normal)
854            .await
855        {
856            Ok(result) => {
857                task.result = Some(result.output.clone());
858            }
859            Err(e) => {
860                task.result = Some(format!("Failed: {e}"));
861                task.success = false;
862            }
863        }
864        Ok(vec![task])
865    }
866
867    /// Delegate subtasks via A2A protocol.
868    ///
869    /// Queries the AgentCardRegistry for agents matching each subtask's
870    /// Execute subtasks via A2A dispatch handler.
871    ///
872    /// Queries the AgentCardRegistry for agents matching each subtask's
873    /// required capability, then calls `execute_delegation` which runs
874    /// the task through the registered handler (lifecycle).
875    /// Falls back to direct lifecycle execution when no handler is registered.
876    async fn delegate_via_a2a(
877        &self,
878        subtasks: Vec<SubTask>,
879        parent_seed: &Seed,
880        a2a: &Arc<crate::a2a::A2AProtocol>,
881    ) -> Result<Vec<SubTask>> {
882        use crate::a2a::TaskPriority;
883        use tokio::task::JoinSet;
884
885        tracing::info!(
886            subtasks = subtasks.len(),
887            "Delegating subtasks via A2A protocol"
888        );
889
890        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
891        let subtask_count = subtasks.len();
892        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
893
894        for (idx, subtask) in subtasks.into_iter().enumerate() {
895            let capability = subtask.required_capability.clone();
896            let description = subtask.description.clone();
897            let subtask_id = subtask.id;
898            let role = subtask.role.clone();
899            let a2a = Arc::clone(a2a);
900            let parent_seed = parent_seed.clone();
901            let lifecycle = self.lifecycle.clone();
902
903            join_set.spawn(async move {
904                // Find agent with the required capability via A2A registry.
905                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
906                    a2a.query_capabilities(cap).await.ok()
907                        .and_then(|agents| agents.into_iter().next())
908                } else {
909                    None
910                };
911
912                let (output, success) = if let Some(ref target_card) = target {
913                    let target_id = target_card.agent_id;
914                    tracing::info!(
915                        subtask_index = idx,
916                        target = %target_card.name,
917                        target_id = %target_id,
918                        "A2A dispatching subtask"
919                    );
920
921                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
922                        "parent_seed": parent_seed.id.to_string(),
923                        "goal": description,
924                    }))
925                    .with_priority(TaskPriority::Normal);
926
927                    // Enqueue audit trail (fire-and-forget into queue).
928                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
929
930                    // Execute through dispatch handler (blocking).
931                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
932                        Some(Ok(result)) => {
933                            let out = result.get("output")
934                                .and_then(|v| v.as_str())
935                                .unwrap_or("")
936                                .to_string();
937                            let ok = result.get("success")
938                                .and_then(|v| v.as_bool())
939                                .unwrap_or(false);
940                            (out, ok)
941                        }
942                        Some(Err(e)) => {
943                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
944                            (format!("Failed: {e}"), false)
945                        }
946                        None => {
947                            // No handler — fallback to lifecycle.
948                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
949                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
950                        }
951                    }
952                } else {
953                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
954                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
955                };
956
957                (idx, SubTask {
958                    id: subtask_id,
959                    description,
960                    required_capability: capability,
961                    result: Some(output),
962                    success,
963                    role,
964                })
965            });
966        }
967
968        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
969        while let Some(join_result) = join_set.join_next().await {
970            match join_result {
971                Ok((idx, subtask)) => {
972                    results[idx] = Some(subtask);
973                }
974                Err(e) => {
975                    tracing::error!(error = %e, "A2A task panicked");
976                }
977            }
978        }
979
980        let completed: Vec<SubTask> = results.into_iter().flatten().collect();
981        tracing::info!(
982            completed = completed.len(),
983            succeeded = completed.iter().filter(|r| r.success).count(),
984            "A2A delegation complete"
985        );
986        Ok(completed)
987    }
988
989    async fn delegate_via_lifecycle(
990        &self,
991        subtasks: Vec<SubTask>,
992        parent_seed: &Seed,
993    ) -> Result<Vec<SubTask>> {
994        use crate::agent_group::OxiosAgentGroup;
995        use tokio::task::JoinSet;
996
997        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
998        let group = OxiosAgentGroup::new(parent_seed, descriptions);
999        let group_id = group.id;
1000
1001        self.event_bus.publish(KernelEvent::AgentGroupCreated {
1002            group_id,
1003            agent_count: group.agents.len(),
1004        })?;
1005
1006        tracing::info!(
1007            group_id = %group_id,
1008            agent_count = group.agents.len(),
1009            "Starting parallel multi-agent execution"
1010        );
1011
1012        let mut join_set: JoinSet<(
1013            usize,
1014            crate::types::AgentId,
1015            Result<oxios_ouroboros::ExecutionResult>,
1016        )> = JoinSet::new();
1017
1018        for (idx, agent_entry) in group.agents.iter().enumerate() {
1019            let child_seed = agent_entry.seed.clone();
1020            let agent_id = agent_entry.id;
1021            let lifecycle = self.lifecycle.clone();
1022
1023            join_set.spawn(async move {
1024                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1025                (idx, agent_id, result)
1026            });
1027        }
1028
1029        let subtask_count = subtasks.len();
1030        let mut completed = vec![None; subtask_count];
1031        while let Some(join_result) = join_set.join_next().await {
1032            match join_result {
1033                Ok((idx, agent_id, Ok(exec_result))) => {
1034                    let _ = self
1035                        .event_bus
1036                        .publish(KernelEvent::AgentGroupMemberCompleted {
1037                            group_id,
1038                            agent_id,
1039                            success: exec_result.success,
1040                        });
1041                    completed[idx] = Some(SubTask {
1042                        id: subtasks[idx].id,
1043                        description: subtasks[idx].description.clone(),
1044                        required_capability: subtasks[idx].required_capability.clone(),
1045                        result: Some(exec_result.output.clone()),
1046                        success: exec_result.success,
1047                        role: subtasks[idx].role.clone(),
1048                    });
1049                }
1050                Ok((idx, agent_id, Err(e))) => {
1051                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1052                    let _ = self
1053                        .event_bus
1054                        .publish(KernelEvent::AgentGroupMemberCompleted {
1055                            group_id,
1056                            agent_id,
1057                            success: false,
1058                        });
1059                    completed[idx] = Some(SubTask {
1060                        id: subtasks[idx].id,
1061                        description: subtasks[idx].description.clone(),
1062                        required_capability: subtasks[idx].required_capability.clone(),
1063                        result: Some(format!("Failed: {e}")),
1064                        success: false,
1065                        role: subtasks[idx].role.clone(),
1066                    });
1067                }
1068                Err(e) => {
1069                    tracing::error!(error = %e, "JoinSet task panicked");
1070                }
1071            }
1072        }
1073
1074        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1075        let succeeded = completed.iter().filter(|r| r.success).count();
1076        let total = completed.len();
1077
1078        tracing::info!(
1079            group_id = %group_id,
1080            succeeded,
1081            total,
1082            "Parallel multi-agent execution complete"
1083        );
1084
1085        // Persist group state
1086        let _ = self
1087            .state_store
1088            .save_json("agent_groups", &group_id.to_string(), &group)
1089            .await;
1090        self.git_commit(
1091            &format!("agent_groups/{}.json", group_id),
1092            "orchestrator: save group",
1093        );
1094
1095        Ok(completed)
1096    }
1097}
1098
1099/// Active session state for multi-turn interviews.
1100#[derive(Debug, Clone)]
1101#[allow(unused)]
1102struct InterviewSession {
1103    id: String,
1104    interview: InterviewResult,
1105    phase: Phase,
1106    seed_id: Option<Uuid>,
1107    agent_id: Option<AgentId>,
1108}
1109
1110/// Result of a full orchestration cycle.
1111#[derive(Debug, Clone, Serialize, Deserialize)]
1112pub struct OrchestrationResult {
1113    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
1114    #[serde(skip_serializing_if = "Option::is_none")]
1115    pub session_id: Option<String>,
1116    /// The Space ID that handled this message.
1117    #[serde(skip_serializing_if = "Option::is_none")]
1118    pub space_id: Option<Uuid>,
1119    /// Space decoration tag for the response (e.g. "[🔧 oxios]").
1120    #[serde(skip_serializing_if = "Option::is_none")]
1121    pub space_tag: Option<String>,
1122    /// The response to send back to the user.
1123    pub response: String,
1124    /// The seed that was created (if seed phase was reached).
1125    #[serde(skip_serializing_if = "Option::is_none")]
1126    pub seed_id: Option<Uuid>,
1127    /// The agent that executed (if execute phase was reached).
1128    #[serde(skip_serializing_if = "Option::is_none")]
1129    pub agent_id: Option<AgentId>,
1130    /// The furthest phase reached.
1131    pub phase_reached: Phase,
1132    /// Whether evaluation passed (false if evaluation was skipped or failed).
1133    pub evaluation_passed: bool,
1134    /// Output or notes from evaluation.
1135    #[serde(skip_serializing_if = "Option::is_none")]
1136    pub output: Option<String>,
1137}
1138
1139/// Format clarifying questions for display.
1140fn format_questions(questions: &[String]) -> String {
1141    if questions.is_empty() {
1142        "I need a bit more clarification before I can proceed.".to_string()
1143    } else {
1144        format!(
1145            "I'd like to understand your request better. Could you help clarify:\n\n{}",
1146            questions
1147                .iter()
1148                .enumerate()
1149                .map(|(i, q)| format!("{}. {}", i + 1, q))
1150                .collect::<Vec<_>>()
1151                .join("\n")
1152        )
1153    }
1154}
1155
1156/// Format the final result for display.
1157fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
1158    let passed = evaluation.all_passed();
1159
1160    let mut lines = Vec::new();
1161    lines.push(format!("Task '{}' completed.", seed.goal));
1162
1163    if passed {
1164        lines.push("✅ Evaluation passed.".to_string());
1165    } else {
1166        lines.push(format!(
1167            "⚠️ Evaluation score: {:.0}%",
1168            evaluation.score * 100.0
1169        ));
1170    }
1171
1172    if !evaluation.notes.is_empty() {
1173        lines.push("\nNotes:".to_string());
1174        for note in &evaluation.notes {
1175            lines.push(format!("- {}", note));
1176        }
1177    }
1178
1179    lines.join("\n")
1180}
1181
1182/// Check if a seed should be split into subtasks.
1183///
1184/// Simple heuristic: if the seed has 3 or more acceptance criteria,
1185/// it likely contains distinct concerns that can be parallelized.
1186fn should_split_seed(seed: &Seed) -> bool {
1187    // Only split for genuinely complex tasks with many criteria.
1188    // Simple tasks (even with 3-4 criteria) are better handled by a single agent
1189    // to preserve context coherence.
1190    seed.acceptance_criteria.len() >= 5
1191}
1192
1193/// Split a seed into subtasks based on acceptance criteria.
1194///
1195/// Each acceptance criterion becomes a separate subtask with the
1196/// parent seed's goal as context. Infers required capability from
1197/// the goal text using the same heuristic as `build_agent_card`.
1198fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1199    seed.acceptance_criteria
1200        .iter()
1201        .map(|criterion| {
1202            let desc = format!("{}: {}", seed.goal, criterion);
1203            let desc_lower = desc.to_lowercase();
1204
1205            // Infer capability from subtask description.
1206            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1207                Some("code-review".to_string())
1208            } else if desc_lower.contains("test") {
1209                Some("testing".to_string())
1210            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1211                Some("refactoring".to_string())
1212            } else if desc_lower.contains("write")
1213                || desc_lower.contains("create")
1214                || desc_lower.contains("implement")
1215            {
1216                Some("code-generation".to_string())
1217            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1218                Some("debugging".to_string())
1219            } else {
1220                None
1221            };
1222
1223            let mut subtask = SubTask::new(desc);
1224            subtask.required_capability = cap;
1225            subtask
1226        })
1227        .collect()
1228}
1229
1230/// Format combined results from multi-agent execution.
1231fn format_result_combined(combined: &str) -> String {
1232    if combined.is_empty() {
1233        "No subtasks completed successfully.".to_string()
1234    } else {
1235        format!("Multi-agent execution completed:\n\n{}", combined)
1236    }
1237}
1238
1239/// Execute a subtask via lifecycle manager, returning (output, success).
1240async fn run_via_lifecycle(
1241    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1242    parent_seed: &Seed,
1243    description: &str,
1244) -> (String, bool) {
1245    let child_seed = Seed {
1246        id: Uuid::new_v4(),
1247        goal: description.to_string(),
1248        constraints: parent_seed.constraints.clone(),
1249        acceptance_criteria: vec!["Task completes successfully".into()],
1250        ontology: parent_seed.ontology.clone(),
1251        created_at: chrono::Utc::now(),
1252        generation: parent_seed.generation + 1,
1253        parent_seed_id: Some(parent_seed.id),
1254        cspace_hint: None,
1255    };
1256    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1257        Ok(result) => (result.output, result.success),
1258        Err(e) => (format!("Failed: {e}"), false),
1259    }
1260}