Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the Ouroboros lifecycle for user messages.
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol.
4//! Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed (via LLM for complex tasks, or ad-hoc for simple tasks)
7//! 3. Execute the agent via the supervisor
8//! 4. Return the result to the user
9//!
10//! The orchestrator does NOT know about channels or HTTP — it only
11//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use crate::agent_lifecycle::AgentLifecycleManager;
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::git_layer::GitLayer;
26use crate::metrics::get_metrics;
27use crate::scheduler::Priority;
28use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
29use crate::state_store::StateStore;
30use crate::types::AgentId;
31
32/// Role of an agent within a group.
33#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
34pub enum AgentRole {
35    /// Executes a specific subtask.
36    #[default]
37    Worker,
38    /// Coordinates subtasks, synthesizes results.
39    Manager,
40}
41
42/// A subtask within a multi-agent plan.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SubTask {
45    /// Unique subtask ID.
46    pub id: Uuid,
47    /// Human-readable description.
48    pub description: String,
49    /// Capability required (e.g., "code-review", "testing").
50    pub required_capability: Option<String>,
51    /// Result of the subtask (filled after execution).
52    pub result: Option<String>,
53    /// Whether this subtask succeeded.
54    pub success: bool,
55    /// Role of the agent assigned to this subtask.
56    #[serde(default)]
57    pub role: AgentRole,
58}
59
60impl SubTask {
61    /// Create a new subtask with the given description.
62    pub fn new(description: impl Into<String>) -> Self {
63        Self {
64            id: Uuid::new_v4(),
65            description: description.into(),
66            required_capability: None,
67            result: None,
68            success: false,
69            role: AgentRole::default(),
70        }
71    }
72
73    /// Set the required capability for this subtask.
74    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
75        self.required_capability = Some(cap.into());
76        self
77    }
78}
79
80/// The orchestrator coordinates the full Ouroboros lifecycle.
81pub struct Orchestrator {
82    ouroboros: Arc<dyn OuroborosProtocol>,
83    event_bus: EventBus,
84    state_store: Arc<StateStore>,
85    /// Git version control layer for auto-commits.
86    git_layer: Option<Arc<GitLayer>>,
87    /// Active interview sessions, keyed by session ID.
88    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
89    /// Agent lifecycle manager (fork, register, run, cleanup).
90    lifecycle: AgentLifecycleManager,
91    /// A2A protocol for inter-agent task delegation.
92    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
93    /// Space manager for context partitioning.
94    space_manager: RwLock<Option<Arc<SpaceManager>>>,
95    /// Conversation buffer for topic shift detection.
96    conversation_buffer: RwLock<ConversationBuffer>,
97    /// Orchestrator configuration (Ouroboros protocol settings).
98    delegation_config: DelegationConfig,
99    /// A2A circuit breaker for delegation reliability.
100    a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
101}
102
103/// Configuration for A2A delegation retries.
104#[derive(Debug, Clone)]
105struct DelegationConfig {
106    /// Maximum retry attempts for A2A delegation.
107    max_retries: u32,
108    /// Base delay for exponential backoff (milliseconds).
109    base_delay_ms: u64,
110    /// Maximum delay cap for exponential backoff (milliseconds).
111    max_delay_ms: u64,
112    /// Timeout per delegation attempt (milliseconds).
113    #[allow(dead_code)]
114    timeout_ms: u64,
115}
116
117impl Default for DelegationConfig {
118    fn default() -> Self {
119        Self {
120            max_retries: 3,
121            base_delay_ms: 100,
122            max_delay_ms: 5000,
123            timeout_ms: 5000,
124        }
125    }
126}
127
128impl DelegationConfig {
129    /// Calculate exponential backoff delay.
130    fn backoff_delay(&self, attempt: u32) -> u64 {
131        let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
132        delay.min(self.max_delay_ms)
133    }
134}
135
136impl Orchestrator {
137    /// Creates a new orchestrator.
138    pub fn new(
139        ouroboros: Arc<dyn OuroborosProtocol>,
140        event_bus: EventBus,
141        state_store: Arc<StateStore>,
142        lifecycle: AgentLifecycleManager,
143    ) -> Self {
144        Self::with_config(
145            ouroboros,
146            event_bus,
147            state_store,
148            lifecycle,
149            crate::config::OrchestratorConfig::default(),
150        )
151    }
152
153    /// Creates a new orchestrator with custom config (kept for API compat).
154    pub fn with_config(
155        ouroboros: Arc<dyn OuroborosProtocol>,
156        event_bus: EventBus,
157        state_store: Arc<StateStore>,
158        lifecycle: AgentLifecycleManager,
159        _config: crate::config::OrchestratorConfig,
160    ) -> Self {
161        Self {
162            ouroboros,
163            event_bus,
164            state_store,
165            git_layer: None,
166            sessions: RwLock::new(std::collections::HashMap::new()),
167            lifecycle,
168            a2a: None,
169            space_manager: RwLock::new(None),
170            conversation_buffer: RwLock::new(ConversationBuffer::default()),
171            delegation_config: DelegationConfig::default(),
172            a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
173        }
174    }
175
176    /// Set the SpaceManager for context partitioning.
177    pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
178        *self.space_manager.write() = Some(manager);
179    }
180
181    /// Get the current Space ID, if SpaceManager is set.
182    pub fn current_space_id(&self) -> Option<SpaceId> {
183        self.space_manager
184            .read()
185            .as_ref()
186            .map(|m| m.current_space_id())
187    }
188
189    /// Get the current Space name tag for response decoration.
190    pub fn current_space_tag(&self) -> String {
191        self.space_manager
192            .read()
193            .as_ref()
194            .and_then(|m| {
195                m.current_space().map(|s| {
196                    if s.is_default() {
197                        String::new()
198                    } else {
199                        format!("[{} {}]", s.emoji(), s.name)
200                    }
201                })
202            })
203            .unwrap_or_default()
204    }
205
206    /// Set the A2A protocol for inter-agent task delegation.
207    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
208        self.a2a = Some(a2a);
209    }
210
211    /// Set the GitLayer for auto-commits after state saves.
212    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
213        self.git_layer = Some(git_layer);
214    }
215
216    /// Commit a file to git if GitLayer is configured and enabled.
217    fn git_commit(&self, rel_path: &str, message: &str) {
218        if let Some(ref gl) = self.git_layer {
219            if gl.is_enabled() {
220                let _ = gl.commit_file(rel_path, message);
221            }
222        }
223    }
224
225    /// Handle a user message through the full Ouroboros loop.
226    ///
227    /// Returns an `OrchestrationResult` with the response and metadata.
228    ///
229    /// If the interview phase needs clarification (ambiguity > 0.2),
230    /// the result will contain the questions and the phase will be
231    /// `Phase::Interview`. The caller should send these questions to
232    /// the user and include the `session_id` in follow-up messages.
233    #[allow(clippy::await_holding_lock)]
234    pub async fn handle_message(
235        &self,
236        user_id: &str,
237        user_message: &str,
238        session_id: Option<&str>,
239    ) -> Result<OrchestrationResult> {
240        tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
241        get_metrics().messages.inc();
242        let orch_start = std::time::Instant::now();
243
244        let session_id = session_id
245            .map(String::from)
246            .unwrap_or_else(|| Uuid::new_v4().to_string());
247
248        tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
249
250        // ── Space Detection ──
251        let space_tag = self.current_space_tag();
252        let (turns, sm_arc) = {
253            let buffer = self.conversation_buffer.read();
254            let sm_guard = self.space_manager.read();
255            let turns: Vec<_> = buffer.turns().iter().cloned().collect();
256            // Extract Arc from guard and drop guard before .await
257            let sm_arc = sm_guard.as_ref().cloned();
258            (turns, sm_arc)
259        };
260        let space_id = if let Some(ref sm) = sm_arc {
261            match sm.detect_or_create(user_message, &turns).await {
262                Ok(id) => {
263                    tracing::info!(space_id = %id, "Space detected/created for message");
264                    id
265                }
266                Err(e) => {
267                    tracing::warn!(error = %e, "Space detection failed, using default");
268                    sm.default_space_id()
269                }
270            }
271        } else {
272            uuid::Uuid::nil()
273        };
274
275        // Record user message in conversation buffer
276        {
277            let mut buffer = self.conversation_buffer.write();
278            buffer.push_user(user_message);
279        }
280
281        // Phase 1: Interview
282        self.publish_phase_started(&session_id, Phase::Interview)
283            .await;
284
285        // Get or create the interview session (pre-fetch to avoid lock across await).
286        let needs_interview;
287        let existing_history: Option<Vec<_>>;
288        {
289            let sessions = self.sessions.read();
290            needs_interview = !sessions.contains_key(&session_id);
291            existing_history = if !needs_interview {
292                sessions
293                    .get(&session_id)
294                    .map(|s| s.interview.conversation_history.clone())
295            } else {
296                None
297            };
298            // Lock dropped here before any .await
299        }
300
301        // Conduct the interview.
302        let interview = {
303            tracing::info!(phase = "interview", "Starting interview phase");
304            if needs_interview {
305                self.ouroboros.interview(user_message).await?
306            } else {
307                // This is a follow-up message in an existing interview.
308                // Build multi-turn context from conversation history.
309                let multi_turn_context = {
310                    let mut context_parts = Vec::new();
311                    if let Some(ref history) = existing_history {
312                        for exchange in history {
313                            context_parts.push(format!(
314                                "User: {}\nAgent: {}",
315                                exchange.user, exchange.agent
316                            ));
317                        }
318                    }
319                    context_parts.push(format!("User: {}", user_message));
320                    context_parts.join("\n\n")
321                };
322
323                // Record user's answer in session for future turns (brief write lock)
324                {
325                    let mut sessions = self.sessions.write();
326                    if let Some(s) = sessions.get_mut(&session_id) {
327                        let last_q = s.interview.questions.last().cloned().unwrap_or_default();
328                        s.interview.add_exchange(&last_q, user_message);
329                    }
330                }
331
332                // Run another interview pass with full conversation history.
333                self.ouroboros.interview(&multi_turn_context).await?
334            }
335        };
336
337        // If this is a non-task message (greeting, small talk), return the chat response directly.
338        if !interview.is_task {
339            tracing::info!(session_id = %session_id, "Chat response (non-task)");
340
341            let response_text = if interview.chat_response.is_empty() {
342                "Hello! How can I help you today?".to_string()
343            } else {
344                interview.chat_response.clone()
345            };
346
347            // Record agent response in conversation buffer
348            {
349                let mut buffer = self.conversation_buffer.write();
350                buffer.push_agent(&response_text, &space_id);
351            }
352
353            // Record exchange in conversation history for multi-turn
354            // and store session so multi-turn works on follow-up messages
355            {
356                let mut sessions = self.sessions.write();
357                if let Some(session) = sessions.get_mut(&session_id) {
358                    tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
359                    session
360                        .interview
361                        .add_to_history(user_message, &response_text);
362                } else {
363                    // First non-task message — create a minimal session for history
364                    let mut interview = InterviewResult::new();
365                    interview.is_task = false;
366                    interview.chat_response = response_text.clone();
367                    interview.add_to_history(user_message, &response_text);
368                    sessions.insert(
369                        session_id.clone(),
370                        InterviewSession {
371                            id: session_id.clone(),
372                            interview,
373                            phase: Phase::Interview,
374                            seed_id: None,
375                            agent_id: None,
376                        },
377                    );
378                }
379            }
380
381            self.publish_phase_completed(&session_id, Phase::Interview, "chat")
382                .await;
383
384            return Ok(OrchestrationResult {
385                session_id: Some(session_id.clone()),
386                space_id: Some(space_id),
387                space_tag: Some(space_tag.clone()),
388                response: response_text,
389                seed_id: None,
390                agent_id: None,
391                phase_reached: Phase::Interview,
392                evaluation_passed: false,
393                output: None,
394            });
395        }
396
397        // If ambiguity is too high, return questions for the user to answer.
398        if !interview.ready_for_seed {
399            // Record this exchange in conversation history and store the interview.
400            {
401                let mut sessions = self.sessions.write();
402                let session =
403                    sessions
404                        .entry(session_id.clone())
405                        .or_insert_with(|| InterviewSession {
406                            id: session_id.clone(),
407                            interview: interview.clone(),
408                            phase: Phase::Interview,
409                            seed_id: None,
410                            agent_id: None,
411                        });
412                // The session already has user's answer recorded via add_exchange above.
413                // Record the questions as the agent's response in history.
414                let questions_text = interview.questions.join("\n");
415                let last_answer = session.interview.answers.last().cloned();
416                if let Some(ref ans) = last_answer {
417                    if !ans.is_empty() {
418                        session.interview.add_to_history(ans, &questions_text);
419                    }
420                }
421            } // Lock dropped before .await
422
423            let questions = interview
424                .questions
425                .iter()
426                .filter(|q| !q.is_empty())
427                .cloned()
428                .collect::<Vec<_>>();
429
430            tracing::info!(
431                session_id = %session_id,
432                ambiguity = interview.ambiguity.ambiguity(),
433                questions = questions.len(),
434                "Interview needs clarification"
435            );
436
437            self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
438                .await;
439
440            return Ok(OrchestrationResult {
441                session_id: Some(session_id.clone()),
442                space_id: Some(space_id),
443                space_tag: Some(space_tag.clone()),
444                response: format_questions(&questions),
445                seed_id: None,
446                agent_id: None,
447                phase_reached: Phase::Interview,
448                evaluation_passed: false,
449                output: None,
450            });
451        }
452
453        // Record agent response in conversation buffer (for topic shift detection)
454        // Note: interview phase returns questions, not a full agent response,
455        // but we record it for completeness.
456        {
457            let mut buffer = self.conversation_buffer.write();
458            buffer.push_agent("[interview: ready]", &space_id);
459        }
460
461        // Interview complete and ready.
462        self.publish_phase_completed(&session_id, Phase::Interview, "ready")
463            .await;
464        self.publish_phase_started(&session_id, Phase::Seed).await;
465
466        // ── Complexity-based routing ──
467        //
468        // "simple" + low ambiguity → create a lightweight Seed from the user
469        // message directly (no LLM call) and skip formal evaluation.
470        // "complex" (or ambiguous simple) → generate a full Seed via LLM.
471        let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
472
473        let seed = if is_simple {
474            tracing::info!(
475                phase = "seed",
476                method = "from_message",
477                "Simple task — ad-hoc seed"
478            );
479            Seed::from_message(&interview.original_message)
480        } else {
481            tracing::info!(
482                phase = "seed",
483                method = "llm",
484                "Complex task — LLM-generated seed"
485            );
486            self.ouroboros.generate_seed(&interview).await?
487        };
488
489        // Save seed to state store.
490        self.save_seed(&seed).await?;
491
492        // Publish seed created event.
493        self.event_bus
494            .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
495
496        self.publish_phase_completed(&session_id, Phase::Seed, "generated")
497            .await;
498        self.publish_phase_started(&session_id, Phase::Execute)
499            .await;
500
501        // Check if the seed should be split into multi-agent execution.
502        // When the seed has 3+ acceptance criteria, we treat each criterion
503        // as a distinct subtask and delegate to separate agents.
504        if should_split_seed(&seed) {
505            let subtasks = split_into_subtasks(&seed);
506            if subtasks.len() > 1 {
507                tracing::info!(
508                    phase = "delegate",
509                    subtasks = subtasks.len(),
510                    "Delegating to multi-agent"
511                );
512                let results = self.delegate_subtasks(subtasks, &seed).await?;
513
514                // Combine successful results
515                let combined: String = results
516                    .iter()
517                    .filter(|r| r.success)
518                    .filter_map(|r| r.result.as_deref())
519                    .collect::<Vec<_>>()
520                    .join("\n\n");
521
522                let all_passed = results.iter().all(|r| r.success);
523
524                // Clean up the session.
525                {
526                    let mut sessions = self.sessions.write();
527                    sessions.remove(&session_id);
528                }
529
530                tracing::info!(
531                    session_id = %session_id,
532                    subtasks = results.len(),
533                    passed = all_passed,
534                    "Multi-agent orchestration complete"
535                );
536
537                return Ok(OrchestrationResult {
538                    session_id: Some(session_id),
539                    space_id: Some(space_id),
540                    space_tag: Some(space_tag.clone()),
541                    response: format_result_combined(&combined),
542                    seed_id: Some(seed.id),
543                    agent_id: None,
544                    phase_reached: Phase::Execute,
545                    evaluation_passed: all_passed,
546                    output: Some(combined),
547                });
548            }
549        }
550
551        // Record agent response in conversation buffer (for multi-agent case)
552        {
553            let mut buffer = self.conversation_buffer.write();
554            buffer.push_agent("[multi-agent: complete]", &space_id);
555        }
556
557        // Execute agent via lifecycle manager.
558        tracing::info!(phase = "execute", "Starting execution phase");
559        let exec_result = self
560            .lifecycle
561            .spawn_and_run(&seed, Priority::Normal)
562            .await?;
563
564        // Periodically reap zombie tasks.
565        self.lifecycle.reap_zombies();
566
567        self.publish_phase_completed(&session_id, Phase::Execute, "completed")
568            .await;
569
570        // Clean up the session.
571        {
572            let mut sessions = self.sessions.write();
573            sessions.remove(&session_id);
574        }
575
576        let passed = exec_result.success;
577
578        tracing::info!(
579            session_id = %session_id,
580            passed,
581            "Orchestration complete"
582        );
583
584        // Measure orchestration duration.
585        let metrics = get_metrics();
586        metrics
587            .orch_duration
588            .observe(orch_start.elapsed().as_secs_f64());
589        if passed {
590            metrics.agents_completed.inc();
591        } else {
592            metrics.agents_failed.inc();
593        }
594
595        // Record agent response in conversation buffer (for topic shift detection)
596        {
597            let mut buffer = self.conversation_buffer.write();
598            buffer.push_agent(&seed.goal, &space_id);
599        }
600
601        Ok(OrchestrationResult {
602            session_id: Some(session_id),
603            space_id: Some(space_id),
604            space_tag: Some(space_tag.clone()),
605            response: format_execution_result(&seed, &exec_result),
606            seed_id: Some(seed.id),
607            agent_id: None,
608            phase_reached: Phase::Execute,
609            evaluation_passed: passed,
610            output: Some(exec_result.output.clone()),
611        })
612    }
613
614    /// Save a seed to the state store.
615    async fn save_seed(&self, seed: &Seed) -> Result<()> {
616        let key = seed.id.to_string();
617
618        self.state_store
619            .save_json("seeds", &key, seed)
620            .await
621            .context("failed to save seed to state store")?;
622
623        self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
624
625        Ok(())
626    }
627
628    /// Save an evaluation result to the state store.
629    /// Publish a PhaseStarted event.
630    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
631        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
632            session_id: session_id.to_owned(),
633            phase,
634        });
635    }
636
637    /// Publish a PhaseCompleted event.
638    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
639        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
640            session_id: session_id.to_owned(),
641            phase,
642            result_summary: result.to_owned(),
643        });
644    }
645
646    /// Execute multiple subtasks using separate agents in parallel.
647    ///
648    /// When A2A is available, the orchestrator delegates tasks through the
649    /// A2A protocol with circuit breaker and retry support.
650    /// Otherwise, falls back to direct lifecycle execution.
651    ///
652    /// Results are collected as they complete using `JoinSet`.
653    pub async fn delegate_subtasks(
654        &self,
655        subtasks: Vec<SubTask>,
656        parent_seed: &Seed,
657    ) -> Result<Vec<SubTask>> {
658        // Single task — execute directly without group overhead.
659        if subtasks.len() == 1 {
660            return self.execute_single_subtask(subtasks, parent_seed).await;
661        }
662
663        // Try A2A-based delegation when the protocol is available.
664        if let Some(ref a2a) = self.a2a {
665            // Check circuit breaker
666            if !self.a2a_breaker.is_allowed() {
667                tracing::warn!(
668                    state = ?self.a2a_breaker.state(),
669                    "A2A circuit breaker open, using lifecycle fallback"
670                );
671                return self.delegate_via_lifecycle(subtasks, parent_seed).await;
672            }
673
674            // Delegate with retry
675            return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
676        }
677
678        // Fallback: direct lifecycle execution (no A2A).
679        self.delegate_via_lifecycle(subtasks, parent_seed).await
680    }
681
682    /// Delegate subtasks via A2A with circuit breaker and retry support.
683    async fn delegate_with_retry(
684        &self,
685        subtasks: Vec<SubTask>,
686        parent_seed: &Seed,
687        a2a: &Arc<crate::a2a::A2AProtocol>,
688    ) -> Result<Vec<SubTask>> {
689        let mut attempt = 0;
690        let max_retries = self.delegation_config.max_retries;
691
692        loop {
693            match self
694                .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
695                .await
696            {
697                Ok(results) => {
698                    self.a2a_breaker.record_success();
699                    return Ok(results);
700                }
701                Err(e) => {
702                    self.a2a_breaker.record_failure();
703                    attempt += 1;
704
705                    if attempt >= max_retries {
706                        tracing::error!(
707                            attempts = attempt,
708                            error = %e,
709                            "A2A delegation exhausted after {} attempts, using lifecycle fallback",
710                            attempt
711                        );
712                        return self.delegate_via_lifecycle(subtasks, parent_seed).await;
713                    }
714
715                    // Exponential backoff
716                    let delay = self.delegation_config.backoff_delay(attempt);
717                    tracing::warn!(
718                        attempt,
719                        delay_ms = delay,
720                        error = %e,
721                        "A2A delegation failed, retrying with backoff"
722                    );
723                    tokio::time::sleep(Duration::from_millis(delay)).await;
724                }
725            }
726        }
727    }
728
729    /// Execute a single subtask directly via lifecycle manager.
730    async fn execute_single_subtask(
731        &self,
732        subtasks: Vec<SubTask>,
733        parent_seed: &Seed,
734    ) -> Result<Vec<SubTask>> {
735        let mut task = subtasks
736            .into_iter()
737            .next()
738            .expect("execute_single_subtask is only called when subtasks is non-empty");
739        let child_seed = Seed {
740            id: Uuid::new_v4(),
741            goal: task.description.clone(),
742            constraints: parent_seed.constraints.clone(),
743            acceptance_criteria: vec!["Task completes successfully".into()],
744            ontology: parent_seed.ontology.clone(),
745            created_at: chrono::Utc::now(),
746            generation: parent_seed.generation + 1,
747            parent_seed_id: Some(parent_seed.id),
748            cspace_hint: None,
749            original_request: parent_seed.original_request.clone(),
750        };
751        match self
752            .lifecycle
753            .spawn_and_run(&child_seed, Priority::Normal)
754            .await
755        {
756            Ok(result) => {
757                task.result = Some(result.output.clone());
758            }
759            Err(e) => {
760                task.result = Some(format!("Failed: {e}"));
761                task.success = false;
762            }
763        }
764        Ok(vec![task])
765    }
766
767    /// Delegate subtasks via A2A protocol.
768    ///
769    /// Queries the AgentCardRegistry for agents matching each subtask's
770    /// Execute subtasks via A2A dispatch handler.
771    ///
772    /// Queries the AgentCardRegistry for agents matching each subtask's
773    /// required capability, then calls `execute_delegation` which runs
774    /// the task through the registered handler (lifecycle).
775    /// Falls back to direct lifecycle execution when no handler is registered.
776    async fn delegate_via_a2a(
777        &self,
778        subtasks: Vec<SubTask>,
779        parent_seed: &Seed,
780        a2a: &Arc<crate::a2a::A2AProtocol>,
781    ) -> Result<Vec<SubTask>> {
782        use crate::a2a::TaskPriority;
783        use tokio::task::JoinSet;
784
785        tracing::info!(
786            subtasks = subtasks.len(),
787            "Delegating subtasks via A2A protocol"
788        );
789
790        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
791        let subtask_count = subtasks.len();
792        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
793
794        for (idx, subtask) in subtasks.into_iter().enumerate() {
795            let capability = subtask.required_capability.clone();
796            let description = subtask.description.clone();
797            let subtask_id = subtask.id;
798            let role = subtask.role.clone();
799            let a2a = Arc::clone(a2a);
800            let parent_seed = parent_seed.clone();
801            let lifecycle = self.lifecycle.clone();
802
803            join_set.spawn(async move {
804                // Find agent with the required capability via A2A registry.
805                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
806                    a2a.query_capabilities(cap).await.ok()
807                        .and_then(|agents| agents.into_iter().next())
808                } else {
809                    None
810                };
811
812                let (output, success) = if let Some(ref target_card) = target {
813                    let target_id = target_card.agent_id;
814                    tracing::info!(
815                        subtask_index = idx,
816                        target = %target_card.name,
817                        target_id = %target_id,
818                        "A2A dispatching subtask"
819                    );
820
821                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
822                        "parent_seed": parent_seed.id.to_string(),
823                        "goal": description,
824                    }))
825                    .with_priority(TaskPriority::Normal);
826
827                    // Enqueue audit trail (fire-and-forget into queue).
828                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
829
830                    // Execute through dispatch handler (blocking).
831                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
832                        Some(Ok(result)) => {
833                            let out = result.get("output")
834                                .and_then(|v| v.as_str())
835                                .unwrap_or("")
836                                .to_string();
837                            let ok = result.get("success")
838                                .and_then(|v| v.as_bool())
839                                .unwrap_or(false);
840                            (out, ok)
841                        }
842                        Some(Err(e)) => {
843                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
844                            (format!("Failed: {e}"), false)
845                        }
846                        None => {
847                            // No handler — fallback to lifecycle.
848                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
849                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
850                        }
851                    }
852                } else {
853                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
854                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
855                };
856
857                (idx, SubTask {
858                    id: subtask_id,
859                    description,
860                    required_capability: capability,
861                    result: Some(output),
862                    success,
863                    role,
864                })
865            });
866        }
867
868        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
869        while let Some(join_result) = join_set.join_next().await {
870            match join_result {
871                Ok((idx, subtask)) => {
872                    results[idx] = Some(subtask);
873                }
874                Err(e) => {
875                    tracing::error!(error = %e, "A2A task panicked");
876                }
877            }
878        }
879
880        let completed: Vec<SubTask> = results.into_iter().flatten().collect();
881        tracing::info!(
882            completed = completed.len(),
883            succeeded = completed.iter().filter(|r| r.success).count(),
884            "A2A delegation complete"
885        );
886        Ok(completed)
887    }
888
889    async fn delegate_via_lifecycle(
890        &self,
891        subtasks: Vec<SubTask>,
892        parent_seed: &Seed,
893    ) -> Result<Vec<SubTask>> {
894        use crate::agent_group::OxiosAgentGroup;
895        use tokio::task::JoinSet;
896
897        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
898        let group = OxiosAgentGroup::new(parent_seed, descriptions);
899        let group_id = group.id;
900
901        self.event_bus.publish(KernelEvent::AgentGroupCreated {
902            group_id,
903            agent_count: group.agents.len(),
904        })?;
905
906        tracing::info!(
907            group_id = %group_id,
908            agent_count = group.agents.len(),
909            "Starting parallel multi-agent execution"
910        );
911
912        let mut join_set: JoinSet<(
913            usize,
914            crate::types::AgentId,
915            Result<oxios_ouroboros::ExecutionResult>,
916        )> = JoinSet::new();
917
918        for (idx, agent_entry) in group.agents.iter().enumerate() {
919            let child_seed = agent_entry.seed.clone();
920            let agent_id = agent_entry.id;
921            let lifecycle = self.lifecycle.clone();
922
923            join_set.spawn(async move {
924                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
925                (idx, agent_id, result)
926            });
927        }
928
929        let subtask_count = subtasks.len();
930        let mut completed = vec![None; subtask_count];
931        while let Some(join_result) = join_set.join_next().await {
932            match join_result {
933                Ok((idx, agent_id, Ok(exec_result))) => {
934                    let _ = self
935                        .event_bus
936                        .publish(KernelEvent::AgentGroupMemberCompleted {
937                            group_id,
938                            agent_id,
939                            success: exec_result.success,
940                        });
941                    completed[idx] = Some(SubTask {
942                        id: subtasks[idx].id,
943                        description: subtasks[idx].description.clone(),
944                        required_capability: subtasks[idx].required_capability.clone(),
945                        result: Some(exec_result.output.clone()),
946                        success: exec_result.success,
947                        role: subtasks[idx].role.clone(),
948                    });
949                }
950                Ok((idx, agent_id, Err(e))) => {
951                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
952                    let _ = self
953                        .event_bus
954                        .publish(KernelEvent::AgentGroupMemberCompleted {
955                            group_id,
956                            agent_id,
957                            success: false,
958                        });
959                    completed[idx] = Some(SubTask {
960                        id: subtasks[idx].id,
961                        description: subtasks[idx].description.clone(),
962                        required_capability: subtasks[idx].required_capability.clone(),
963                        result: Some(format!("Failed: {e}")),
964                        success: false,
965                        role: subtasks[idx].role.clone(),
966                    });
967                }
968                Err(e) => {
969                    tracing::error!(error = %e, "JoinSet task panicked");
970                }
971            }
972        }
973
974        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
975        let succeeded = completed.iter().filter(|r| r.success).count();
976        let total = completed.len();
977
978        tracing::info!(
979            group_id = %group_id,
980            succeeded,
981            total,
982            "Parallel multi-agent execution complete"
983        );
984
985        // Persist group state
986        let _ = self
987            .state_store
988            .save_json("agent_groups", &group_id.to_string(), &group)
989            .await;
990        self.git_commit(
991            &format!("agent_groups/{}.json", group_id),
992            "orchestrator: save group",
993        );
994
995        Ok(completed)
996    }
997}
998
999/// Active session state for multi-turn interviews.
1000#[derive(Debug, Clone)]
1001#[allow(unused)]
1002struct InterviewSession {
1003    id: String,
1004    interview: InterviewResult,
1005    phase: Phase,
1006    seed_id: Option<Uuid>,
1007    agent_id: Option<AgentId>,
1008}
1009
1010/// Result of a full orchestration cycle.
1011#[derive(Debug, Clone, Serialize, Deserialize)]
1012pub struct OrchestrationResult {
1013    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
1014    #[serde(skip_serializing_if = "Option::is_none")]
1015    pub session_id: Option<String>,
1016    /// The Space ID that handled this message.
1017    #[serde(skip_serializing_if = "Option::is_none")]
1018    pub space_id: Option<Uuid>,
1019    /// Space decoration tag for the response (e.g. "[🔧 oxios]").
1020    #[serde(skip_serializing_if = "Option::is_none")]
1021    pub space_tag: Option<String>,
1022    /// The response to send back to the user.
1023    pub response: String,
1024    /// The seed that was created (if seed phase was reached).
1025    #[serde(skip_serializing_if = "Option::is_none")]
1026    pub seed_id: Option<Uuid>,
1027    /// The agent that executed (if execute phase was reached).
1028    #[serde(skip_serializing_if = "Option::is_none")]
1029    pub agent_id: Option<AgentId>,
1030    /// The furthest phase reached.
1031    pub phase_reached: Phase,
1032    /// Whether evaluation passed (false if evaluation was skipped or failed).
1033    pub evaluation_passed: bool,
1034    /// Output or notes from evaluation.
1035    #[serde(skip_serializing_if = "Option::is_none")]
1036    pub output: Option<String>,
1037}
1038
1039/// Format clarifying questions for display.
1040fn format_questions(questions: &[String]) -> String {
1041    if questions.is_empty() {
1042        "I need a bit more clarification before I can proceed.".to_string()
1043    } else {
1044        format!(
1045            "I'd like to understand your request better. Could you help clarify:\n\n{}",
1046            questions
1047                .iter()
1048                .enumerate()
1049                .map(|(i, q)| format!("{}. {}", i + 1, q))
1050                .collect::<Vec<_>>()
1051                .join("\n")
1052        )
1053    }
1054}
1055
1056/// Format the final result for display.
1057/// Format execution result for display to the user.
1058fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1059    let mut lines = Vec::new();
1060
1061    if exec.success {
1062        lines.push(format!("✅ '{}'", seed.goal));
1063    } else {
1064        lines.push(format!(
1065            "⚠️ '{}'을(를) 시도했지만 완전히 성공하지 못했습니다.",
1066            seed.goal
1067        ));
1068    }
1069
1070    // Show a truncated preview of the output if present.
1071    if !exec.output.is_empty() {
1072        let preview = if exec.output.len() > 500 {
1073            format!("{}...", &exec.output[..500])
1074        } else {
1075            exec.output.clone()
1076        };
1077        lines.push(String::new());
1078        lines.push(preview);
1079    }
1080
1081    lines.join("\n")
1082}
1083
1084/// Check if a seed should be split into subtasks.
1085///
1086/// Simple heuristic: if the seed has 3 or more acceptance criteria,
1087/// it likely contains distinct concerns that can be parallelized.
1088fn should_split_seed(seed: &Seed) -> bool {
1089    // Only split for genuinely complex tasks with many criteria.
1090    // Simple tasks (even with 3-4 criteria) are better handled by a single agent
1091    // to preserve context coherence.
1092    seed.acceptance_criteria.len() >= 5
1093}
1094
1095/// Split a seed into subtasks based on acceptance criteria.
1096///
1097/// Each acceptance criterion becomes a separate subtask with the
1098/// parent seed's goal as context. Infers required capability from
1099/// the goal text using the same heuristic as `build_agent_card`.
1100fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1101    seed.acceptance_criteria
1102        .iter()
1103        .map(|criterion| {
1104            let desc = format!("{}: {}", seed.goal, criterion);
1105            let desc_lower = desc.to_lowercase();
1106
1107            // Infer capability from subtask description.
1108            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1109                Some("code-review".to_string())
1110            } else if desc_lower.contains("test") {
1111                Some("testing".to_string())
1112            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1113                Some("refactoring".to_string())
1114            } else if desc_lower.contains("write")
1115                || desc_lower.contains("create")
1116                || desc_lower.contains("implement")
1117            {
1118                Some("code-generation".to_string())
1119            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1120                Some("debugging".to_string())
1121            } else {
1122                None
1123            };
1124
1125            let mut subtask = SubTask::new(desc);
1126            subtask.required_capability = cap;
1127            subtask
1128        })
1129        .collect()
1130}
1131
1132/// Format combined results from multi-agent execution.
1133fn format_result_combined(combined: &str) -> String {
1134    if combined.is_empty() {
1135        "No subtasks completed successfully.".to_string()
1136    } else {
1137        format!("Multi-agent execution completed:\n\n{}", combined)
1138    }
1139}
1140
1141/// Execute a subtask via lifecycle manager, returning (output, success).
1142async fn run_via_lifecycle(
1143    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1144    parent_seed: &Seed,
1145    description: &str,
1146) -> (String, bool) {
1147    let child_seed = Seed {
1148        id: Uuid::new_v4(),
1149        goal: description.to_string(),
1150        constraints: parent_seed.constraints.clone(),
1151        acceptance_criteria: vec!["Task completes successfully".into()],
1152        ontology: parent_seed.ontology.clone(),
1153        created_at: chrono::Utc::now(),
1154        generation: parent_seed.generation + 1,
1155        parent_seed_id: Some(parent_seed.id),
1156        cspace_hint: None,
1157        original_request: parent_seed.original_request.clone(),
1158    };
1159    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1160        Ok(result) => (result.output, result.success),
1161        Err(e) => (format!("Failed: {e}"), false),
1162    }
1163}