Skip to main content

oxios_kernel/
orchestrator.rs

1//! Orchestrator: coordinates the Ouroboros lifecycle for user messages.
2//!
3//! The orchestrator is the "brain" that runs the Ouroboros protocol.
4//! Given a user message:
5//! 1. Conduct the interview (ask clarifying questions if needed)
6//! 2. Generate a seed (via LLM for complex tasks, or ad-hoc for simple tasks)
7//! 3. Execute the agent via the supervisor
8//! 4. Return the result to the user
9//!
10//! The orchestrator does NOT know about channels or HTTP — it only
11//! coordinates Ouroboros + Supervisor + EventBus + StateStore + Scheduler + AccessManager.
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use crate::agent_lifecycle::AgentLifecycleManager;
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::git_layer::GitLayer;
26use crate::metrics::get_metrics;
27use crate::scheduler::Priority;
28use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
29use crate::state_store::StateStore;
30use crate::types::AgentId;
31
32/// Role of an agent within a group.
33#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
34pub enum AgentRole {
35    /// Executes a specific subtask.
36    #[default]
37    Worker,
38    /// Coordinates subtasks, synthesizes results.
39    Manager,
40}
41
42/// A subtask within a multi-agent plan.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SubTask {
45    /// Unique subtask ID.
46    pub id: Uuid,
47    /// Human-readable description.
48    pub description: String,
49    /// Capability required (e.g., "code-review", "testing").
50    pub required_capability: Option<String>,
51    /// Result of the subtask (filled after execution).
52    pub result: Option<String>,
53    /// Whether this subtask succeeded.
54    pub success: bool,
55    /// Role of the agent assigned to this subtask.
56    #[serde(default)]
57    pub role: AgentRole,
58}
59
60impl SubTask {
61    /// Create a new subtask with the given description.
62    pub fn new(description: impl Into<String>) -> Self {
63        Self {
64            id: Uuid::new_v4(),
65            description: description.into(),
66            required_capability: None,
67            result: None,
68            success: false,
69            role: AgentRole::default(),
70        }
71    }
72
73    /// Set the required capability for this subtask.
74    pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
75        self.required_capability = Some(cap.into());
76        self
77    }
78}
79
80/// The orchestrator coordinates the full Ouroboros lifecycle.
81pub struct Orchestrator {
82    ouroboros: Arc<dyn OuroborosProtocol>,
83    event_bus: EventBus,
84    state_store: Arc<StateStore>,
85    /// Git version control layer for auto-commits.
86    git_layer: Option<Arc<GitLayer>>,
87    /// Active interview sessions, keyed by session ID.
88    sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
89    /// Agent lifecycle manager (fork, register, run, cleanup).
90    lifecycle: AgentLifecycleManager,
91    /// A2A protocol for inter-agent task delegation.
92    a2a: Option<Arc<crate::a2a::A2AProtocol>>,
93    /// Space manager for context partitioning.
94    space_manager: RwLock<Option<Arc<SpaceManager>>>,
95    /// Conversation buffer for topic shift detection.
96    conversation_buffer: RwLock<ConversationBuffer>,
97    /// Orchestrator configuration (Ouroboros protocol settings).
98    delegation_config: DelegationConfig,
99    /// A2A circuit breaker for delegation reliability.
100    a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
101}
102
103/// Configuration for A2A delegation retries.
104#[derive(Debug, Clone)]
105struct DelegationConfig {
106    /// Maximum retry attempts for A2A delegation.
107    max_retries: u32,
108    /// Base delay for exponential backoff (milliseconds).
109    base_delay_ms: u64,
110    /// Maximum delay cap for exponential backoff (milliseconds).
111    max_delay_ms: u64,
112    /// Timeout per delegation attempt (milliseconds).
113    #[allow(dead_code)]
114    timeout_ms: u64,
115}
116
117impl Default for DelegationConfig {
118    fn default() -> Self {
119        Self {
120            max_retries: 3,
121            base_delay_ms: 100,
122            max_delay_ms: 5000,
123            timeout_ms: 5000,
124        }
125    }
126}
127
128impl DelegationConfig {
129    /// Calculate exponential backoff delay.
130    fn backoff_delay(&self, attempt: u32) -> u64 {
131        let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
132        delay.min(self.max_delay_ms)
133    }
134}
135
136impl Orchestrator {
137    /// Creates a new orchestrator.
138    pub fn new(
139        ouroboros: Arc<dyn OuroborosProtocol>,
140        event_bus: EventBus,
141        state_store: Arc<StateStore>,
142        lifecycle: AgentLifecycleManager,
143    ) -> Self {
144        Self::with_config(
145            ouroboros,
146            event_bus,
147            state_store,
148            lifecycle,
149            crate::config::OrchestratorConfig::default(),
150        )
151    }
152
153    /// Creates a new orchestrator with custom config (kept for API compat).
154    pub fn with_config(
155        ouroboros: Arc<dyn OuroborosProtocol>,
156        event_bus: EventBus,
157        state_store: Arc<StateStore>,
158        lifecycle: AgentLifecycleManager,
159        _config: crate::config::OrchestratorConfig,
160    ) -> Self {
161        Self {
162            ouroboros,
163            event_bus,
164            state_store,
165            git_layer: None,
166            sessions: RwLock::new(std::collections::HashMap::new()),
167            lifecycle,
168            a2a: None,
169            space_manager: RwLock::new(None),
170            conversation_buffer: RwLock::new(ConversationBuffer::default()),
171            delegation_config: DelegationConfig::default(),
172            a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
173        }
174    }
175
176    /// Set the SpaceManager for context partitioning.
177    pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
178        *self.space_manager.write() = Some(manager);
179    }
180
181    /// Get the current Space ID, if SpaceManager is set.
182    pub fn current_space_id(&self) -> Option<SpaceId> {
183        self.space_manager
184            .read()
185            .as_ref()
186            .map(|m| m.current_space_id())
187    }
188
189    /// Get the current Space name tag for response decoration.
190    pub fn current_space_tag(&self) -> String {
191        self.space_manager
192            .read()
193            .as_ref()
194            .and_then(|m| {
195                m.current_space().map(|s| {
196                    if s.is_default() {
197                        String::new()
198                    } else {
199                        format!("[{} {}]", s.emoji(), s.name)
200                    }
201                })
202            })
203            .unwrap_or_default()
204    }
205
206    /// Set the A2A protocol for inter-agent task delegation.
207    pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
208        self.a2a = Some(a2a);
209    }
210
211    /// Set the GitLayer for auto-commits after state saves.
212    pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
213        self.git_layer = Some(git_layer);
214    }
215
216    /// Commit a file to git if GitLayer is configured and enabled.
217    fn git_commit(&self, rel_path: &str, message: &str) {
218        if let Some(ref gl) = self.git_layer {
219            if gl.is_enabled() {
220                let _ = gl.commit_file(rel_path, message);
221            }
222        }
223    }
224
225    /// Handle a user message through the full Ouroboros loop.
226    ///
227    /// Returns an `OrchestrationResult` with the response and metadata.
228    ///
229    /// If the interview phase needs clarification (ambiguity > 0.2),
230    /// the result will contain the questions and the phase will be
231    /// `Phase::Interview`. The caller should send these questions to
232    /// the user and include the `session_id` in follow-up messages.
233    #[allow(clippy::await_holding_lock)]
234    pub async fn handle_message(
235        &self,
236        user_id: &str,
237        user_message: &str,
238        session_id: Option<&str>,
239    ) -> Result<OrchestrationResult> {
240        tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
241        get_metrics().messages.inc();
242        let orch_start = std::time::Instant::now();
243
244        let session_id = session_id
245            .map(String::from)
246            .unwrap_or_else(|| Uuid::new_v4().to_string());
247
248        tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
249
250        // ── Space Detection ──
251        let space_tag = self.current_space_tag();
252        let (turns, sm_arc) = {
253            let buffer = self.conversation_buffer.read();
254            let sm_guard = self.space_manager.read();
255            let turns: Vec<_> = buffer.turns().iter().cloned().collect();
256            // Extract Arc from guard and drop guard before .await
257            let sm_arc = sm_guard.as_ref().cloned();
258            (turns, sm_arc)
259        };
260        let space_id = if let Some(ref sm) = sm_arc {
261            match sm.detect_or_create(user_message, &turns).await {
262                Ok(id) => {
263                    tracing::info!(space_id = %id, "Space detected/created for message");
264                    id
265                }
266                Err(e) => {
267                    tracing::warn!(error = %e, "Space detection failed, using default");
268                    sm.default_space_id()
269                }
270            }
271        } else {
272            uuid::Uuid::nil()
273        };
274
275        // Record user message in conversation buffer
276        {
277            let mut buffer = self.conversation_buffer.write();
278            buffer.push_user(user_message);
279        }
280
281        // Phase 1: Interview
282        self.publish_phase_started(&session_id, Phase::Interview)
283            .await;
284
285        // Get or create the interview session (pre-fetch to avoid lock across await).
286        let needs_interview;
287        let existing_history: Option<Vec<_>>;
288        {
289            let sessions = self.sessions.read();
290            needs_interview = !sessions.contains_key(&session_id);
291            existing_history = if !needs_interview {
292                sessions
293                    .get(&session_id)
294                    .map(|s| s.interview.conversation_history.clone())
295            } else {
296                None
297            };
298            // Lock dropped here before any .await
299        }
300
301        // Conduct the interview.
302        let interview = {
303            tracing::info!(phase = "interview", "Starting interview phase");
304            if needs_interview {
305                self.ouroboros.interview(user_message).await?
306            } else {
307                // This is a follow-up message in an existing interview.
308                // Build multi-turn context from conversation history.
309                let multi_turn_context = {
310                    let mut context_parts = Vec::new();
311                    if let Some(ref history) = existing_history {
312                        for exchange in history {
313                            context_parts.push(format!(
314                                "User: {}\nAgent: {}",
315                                exchange.user, exchange.agent
316                            ));
317                        }
318                    }
319                    context_parts.push(format!("User: {}", user_message));
320                    context_parts.join("\n\n")
321                };
322
323                // Record user's answer in session for future turns (brief write lock)
324                {
325                    let mut sessions = self.sessions.write();
326                    if let Some(s) = sessions.get_mut(&session_id) {
327                        let last_q = s.interview.questions.last().cloned().unwrap_or_default();
328                        s.interview.add_exchange(&last_q, user_message);
329                    }
330                }
331
332                // Run another interview pass with full conversation history.
333                self.ouroboros.interview(&multi_turn_context).await?
334            }
335        };
336
337        // If this is a non-task message (greeting, small talk), return the chat response directly.
338        if !interview.is_task {
339            tracing::info!(session_id = %session_id, "Chat response (non-task)");
340
341            let response_text = if interview.chat_response.is_empty() {
342                "Hello! How can I help you today?".to_string()
343            } else {
344                interview.chat_response.clone()
345            };
346
347            // Record agent response in conversation buffer
348            {
349                let mut buffer = self.conversation_buffer.write();
350                buffer.push_agent(&response_text, &space_id);
351            }
352
353            // Record exchange in conversation history for multi-turn
354            // and store session so multi-turn works on follow-up messages
355            {
356                let mut sessions = self.sessions.write();
357                if let Some(session) = sessions.get_mut(&session_id) {
358                    tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
359                    session
360                        .interview
361                        .add_to_history(user_message, &response_text);
362                } else {
363                    // First non-task message — create a minimal session for history
364                    let mut interview = InterviewResult::new();
365                    interview.is_task = false;
366                    interview.chat_response = response_text.clone();
367                    interview.add_to_history(user_message, &response_text);
368                    sessions.insert(
369                        session_id.clone(),
370                        InterviewSession {
371                            id: session_id.clone(),
372                            interview,
373                            phase: Phase::Interview,
374                            seed_id: None,
375                            agent_id: None,
376                        },
377                    );
378                }
379            }
380
381            self.publish_phase_completed(&session_id, Phase::Interview, "chat")
382                .await;
383
384            return Ok(OrchestrationResult {
385                session_id: Some(session_id.clone()),
386                space_id: Some(space_id),
387                space_tag: Some(space_tag.clone()),
388                response: response_text,
389                seed_id: None,
390                agent_id: None,
391                phase_reached: Phase::Interview,
392                evaluation_passed: false,
393                output: None,
394            });
395        }
396
397        // If ambiguity is too high, return questions for the user to answer.
398        if !interview.ready_for_seed {
399            // Record this exchange in conversation history and store the interview.
400            {
401                let mut sessions = self.sessions.write();
402                let session =
403                    sessions
404                        .entry(session_id.clone())
405                        .or_insert_with(|| InterviewSession {
406                            id: session_id.clone(),
407                            interview: interview.clone(),
408                            phase: Phase::Interview,
409                            seed_id: None,
410                            agent_id: None,
411                        });
412                // The session already has user's answer recorded via add_exchange above.
413                // Record the questions as the agent's response in history.
414                let questions_text = interview.questions.join("\n");
415                let last_answer = session.interview.answers.last().cloned();
416                if let Some(ref ans) = last_answer {
417                    if !ans.is_empty() {
418                        session.interview.add_to_history(ans, &questions_text);
419                    }
420                }
421            } // Lock dropped before .await
422
423            let questions = interview
424                .questions
425                .iter()
426                .filter(|q| !q.is_empty())
427                .cloned()
428                .collect::<Vec<_>>();
429
430            tracing::info!(
431                session_id = %session_id,
432                ambiguity = interview.ambiguity.ambiguity(),
433                questions = questions.len(),
434                "Interview needs clarification"
435            );
436
437            self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
438                .await;
439
440            return Ok(OrchestrationResult {
441                session_id: Some(session_id.clone()),
442                space_id: Some(space_id),
443                space_tag: Some(space_tag.clone()),
444                response: format_questions(&questions),
445                seed_id: None,
446                agent_id: None,
447                phase_reached: Phase::Interview,
448                evaluation_passed: false,
449                output: None,
450            });
451        }
452
453        // Record agent response in conversation buffer (for topic shift detection)
454        // Note: interview phase returns questions, not a full agent response,
455        // but we record it for completeness.
456        {
457            let mut buffer = self.conversation_buffer.write();
458            buffer.push_agent("[interview: ready]", &space_id);
459        }
460
461        // Interview complete and ready.
462        self.publish_phase_completed(&session_id, Phase::Interview, "ready")
463            .await;
464        self.publish_phase_started(&session_id, Phase::Seed).await;
465
466        // ── Complexity-based routing ──
467        //
468        // "simple" + low ambiguity → create a lightweight Seed from the user
469        // message directly (no LLM call) and skip formal evaluation.
470        // "complex" (or ambiguous simple) → generate a full Seed via LLM.
471        let is_simple = interview.complexity == "simple"
472            && interview.ambiguity.ambiguity() <= 0.3;
473
474        let seed = if is_simple {
475            tracing::info!(phase = "seed", method = "from_message", "Simple task — ad-hoc seed");
476            Seed::from_message(&interview.original_message)
477        } else {
478            tracing::info!(phase = "seed", method = "llm", "Complex task — LLM-generated seed");
479            self.ouroboros.generate_seed(&interview).await?
480        };
481
482        // Save seed to state store.
483        self.save_seed(&seed).await?;
484
485        // Publish seed created event.
486        self.event_bus
487            .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
488
489        self.publish_phase_completed(&session_id, Phase::Seed, "generated")
490            .await;
491        self.publish_phase_started(&session_id, Phase::Execute)
492            .await;
493
494        // Check if the seed should be split into multi-agent execution.
495        // When the seed has 3+ acceptance criteria, we treat each criterion
496        // as a distinct subtask and delegate to separate agents.
497        if should_split_seed(&seed) {
498            let subtasks = split_into_subtasks(&seed);
499            if subtasks.len() > 1 {
500                tracing::info!(
501                    phase = "delegate",
502                    subtasks = subtasks.len(),
503                    "Delegating to multi-agent"
504                );
505                let results = self.delegate_subtasks(subtasks, &seed).await?;
506
507                // Combine successful results
508                let combined: String = results
509                    .iter()
510                    .filter(|r| r.success)
511                    .filter_map(|r| r.result.as_deref())
512                    .collect::<Vec<_>>()
513                    .join("\n\n");
514
515                let all_passed = results.iter().all(|r| r.success);
516
517                // Clean up the session.
518                {
519                    let mut sessions = self.sessions.write();
520                    sessions.remove(&session_id);
521                }
522
523                tracing::info!(
524                    session_id = %session_id,
525                    subtasks = results.len(),
526                    passed = all_passed,
527                    "Multi-agent orchestration complete"
528                );
529
530                return Ok(OrchestrationResult {
531                    session_id: Some(session_id),
532                    space_id: Some(space_id),
533                    space_tag: Some(space_tag.clone()),
534                    response: format_result_combined(&combined),
535                    seed_id: Some(seed.id),
536                    agent_id: None,
537                    phase_reached: Phase::Execute,
538                    evaluation_passed: all_passed,
539                    output: Some(combined),
540                });
541            }
542        }
543
544        // Record agent response in conversation buffer (for multi-agent case)
545        {
546            let mut buffer = self.conversation_buffer.write();
547            buffer.push_agent("[multi-agent: complete]", &space_id);
548        }
549
550        // Execute agent via lifecycle manager.
551        tracing::info!(phase = "execute", "Starting execution phase");
552        let exec_result = self
553            .lifecycle
554            .spawn_and_run(&seed, Priority::Normal)
555            .await?;
556
557        // Periodically reap zombie tasks.
558        self.lifecycle.reap_zombies();
559
560        self.publish_phase_completed(&session_id, Phase::Execute, "completed")
561            .await;
562
563        // Clean up the session.
564        {
565            let mut sessions = self.sessions.write();
566            sessions.remove(&session_id);
567        }
568
569        let passed = exec_result.success;
570
571        tracing::info!(
572            session_id = %session_id,
573            passed,
574            "Orchestration complete"
575        );
576
577        // Measure orchestration duration.
578        let metrics = get_metrics();
579        metrics
580            .orch_duration
581            .observe(orch_start.elapsed().as_secs_f64());
582        if passed {
583            metrics.agents_completed.inc();
584        } else {
585            metrics.agents_failed.inc();
586        }
587
588        // Record agent response in conversation buffer (for topic shift detection)
589        {
590            let mut buffer = self.conversation_buffer.write();
591            buffer.push_agent(&seed.goal, &space_id);
592        }
593
594        Ok(OrchestrationResult {
595            session_id: Some(session_id),
596            space_id: Some(space_id),
597            space_tag: Some(space_tag.clone()),
598            response: format_execution_result(&seed, &exec_result),
599            seed_id: Some(seed.id),
600            agent_id: None,
601            phase_reached: Phase::Execute,
602            evaluation_passed: passed,
603            output: Some(exec_result.output.clone()),
604        })
605    }
606
607    /// Save a seed to the state store.
608    async fn save_seed(&self, seed: &Seed) -> Result<()> {
609        let key = seed.id.to_string();
610
611        self.state_store
612            .save_json("seeds", &key, seed)
613            .await
614            .context("failed to save seed to state store")?;
615
616        self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
617
618        Ok(())
619    }
620
621    /// Save an evaluation result to the state store.
622    /// Publish a PhaseStarted event.
623    async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
624        let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
625            session_id: session_id.to_owned(),
626            phase,
627        });
628    }
629
630    /// Publish a PhaseCompleted event.
631    async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
632        let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
633            session_id: session_id.to_owned(),
634            phase,
635            result_summary: result.to_owned(),
636        });
637    }
638
639    /// Execute multiple subtasks using separate agents in parallel.
640    ///
641    /// When A2A is available, the orchestrator delegates tasks through the
642    /// A2A protocol with circuit breaker and retry support.
643    /// Otherwise, falls back to direct lifecycle execution.
644    ///
645    /// Results are collected as they complete using `JoinSet`.
646    pub async fn delegate_subtasks(
647        &self,
648        subtasks: Vec<SubTask>,
649        parent_seed: &Seed,
650    ) -> Result<Vec<SubTask>> {
651        // Single task — execute directly without group overhead.
652        if subtasks.len() == 1 {
653            return self.execute_single_subtask(subtasks, parent_seed).await;
654        }
655
656        // Try A2A-based delegation when the protocol is available.
657        if let Some(ref a2a) = self.a2a {
658            // Check circuit breaker
659            if !self.a2a_breaker.is_allowed() {
660                tracing::warn!(
661                    state = ?self.a2a_breaker.state(),
662                    "A2A circuit breaker open, using lifecycle fallback"
663                );
664                return self.delegate_via_lifecycle(subtasks, parent_seed).await;
665            }
666
667            // Delegate with retry
668            return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
669        }
670
671        // Fallback: direct lifecycle execution (no A2A).
672        self.delegate_via_lifecycle(subtasks, parent_seed).await
673    }
674
675    /// Delegate subtasks via A2A with circuit breaker and retry support.
676    async fn delegate_with_retry(
677        &self,
678        subtasks: Vec<SubTask>,
679        parent_seed: &Seed,
680        a2a: &Arc<crate::a2a::A2AProtocol>,
681    ) -> Result<Vec<SubTask>> {
682        let mut attempt = 0;
683        let max_retries = self.delegation_config.max_retries;
684
685        loop {
686            match self
687                .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
688                .await
689            {
690                Ok(results) => {
691                    self.a2a_breaker.record_success();
692                    return Ok(results);
693                }
694                Err(e) => {
695                    self.a2a_breaker.record_failure();
696                    attempt += 1;
697
698                    if attempt >= max_retries {
699                        tracing::error!(
700                            attempts = attempt,
701                            error = %e,
702                            "A2A delegation exhausted after {} attempts, using lifecycle fallback",
703                            attempt
704                        );
705                        return self.delegate_via_lifecycle(subtasks, parent_seed).await;
706                    }
707
708                    // Exponential backoff
709                    let delay = self.delegation_config.backoff_delay(attempt);
710                    tracing::warn!(
711                        attempt,
712                        delay_ms = delay,
713                        error = %e,
714                        "A2A delegation failed, retrying with backoff"
715                    );
716                    tokio::time::sleep(Duration::from_millis(delay)).await;
717                }
718            }
719        }
720    }
721
722    /// Execute a single subtask directly via lifecycle manager.
723    async fn execute_single_subtask(
724        &self,
725        subtasks: Vec<SubTask>,
726        parent_seed: &Seed,
727    ) -> Result<Vec<SubTask>> {
728        let mut task = subtasks
729            .into_iter()
730            .next()
731            .expect("execute_single_subtask is only called when subtasks is non-empty");
732        let child_seed = Seed {
733            id: Uuid::new_v4(),
734            goal: task.description.clone(),
735            constraints: parent_seed.constraints.clone(),
736            acceptance_criteria: vec!["Task completes successfully".into()],
737            ontology: parent_seed.ontology.clone(),
738            created_at: chrono::Utc::now(),
739            generation: parent_seed.generation + 1,
740            parent_seed_id: Some(parent_seed.id),
741            cspace_hint: None,
742            original_request: parent_seed.original_request.clone(),
743        };
744        match self
745            .lifecycle
746            .spawn_and_run(&child_seed, Priority::Normal)
747            .await
748        {
749            Ok(result) => {
750                task.result = Some(result.output.clone());
751            }
752            Err(e) => {
753                task.result = Some(format!("Failed: {e}"));
754                task.success = false;
755            }
756        }
757        Ok(vec![task])
758    }
759
760    /// Delegate subtasks via A2A protocol.
761    ///
762    /// Queries the AgentCardRegistry for agents matching each subtask's
763    /// Execute subtasks via A2A dispatch handler.
764    ///
765    /// Queries the AgentCardRegistry for agents matching each subtask's
766    /// required capability, then calls `execute_delegation` which runs
767    /// the task through the registered handler (lifecycle).
768    /// Falls back to direct lifecycle execution when no handler is registered.
769    async fn delegate_via_a2a(
770        &self,
771        subtasks: Vec<SubTask>,
772        parent_seed: &Seed,
773        a2a: &Arc<crate::a2a::A2AProtocol>,
774    ) -> Result<Vec<SubTask>> {
775        use crate::a2a::TaskPriority;
776        use tokio::task::JoinSet;
777
778        tracing::info!(
779            subtasks = subtasks.len(),
780            "Delegating subtasks via A2A protocol"
781        );
782
783        let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
784        let subtask_count = subtasks.len();
785        let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
786
787        for (idx, subtask) in subtasks.into_iter().enumerate() {
788            let capability = subtask.required_capability.clone();
789            let description = subtask.description.clone();
790            let subtask_id = subtask.id;
791            let role = subtask.role.clone();
792            let a2a = Arc::clone(a2a);
793            let parent_seed = parent_seed.clone();
794            let lifecycle = self.lifecycle.clone();
795
796            join_set.spawn(async move {
797                // Find agent with the required capability via A2A registry.
798                let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
799                    a2a.query_capabilities(cap).await.ok()
800                        .and_then(|agents| agents.into_iter().next())
801                } else {
802                    None
803                };
804
805                let (output, success) = if let Some(ref target_card) = target {
806                    let target_id = target_card.agent_id;
807                    tracing::info!(
808                        subtask_index = idx,
809                        target = %target_card.name,
810                        target_id = %target_id,
811                        "A2A dispatching subtask"
812                    );
813
814                    let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
815                        "parent_seed": parent_seed.id.to_string(),
816                        "goal": description,
817                    }))
818                    .with_priority(TaskPriority::Normal);
819
820                    // Enqueue audit trail (fire-and-forget into queue).
821                    let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
822
823                    // Execute through dispatch handler (blocking).
824                    match a2a.execute_delegation(orchestrator_id, target_id, task).await {
825                        Some(Ok(result)) => {
826                            let out = result.get("output")
827                                .and_then(|v| v.as_str())
828                                .unwrap_or("")
829                                .to_string();
830                            let ok = result.get("success")
831                                .and_then(|v| v.as_bool())
832                                .unwrap_or(false);
833                            (out, ok)
834                        }
835                        Some(Err(e)) => {
836                            tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
837                            (format!("Failed: {e}"), false)
838                        }
839                        None => {
840                            // No handler — fallback to lifecycle.
841                            tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
842                            run_via_lifecycle(&lifecycle, &parent_seed, &description).await
843                        }
844                    }
845                } else {
846                    tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
847                    run_via_lifecycle(&lifecycle, &parent_seed, &description).await
848                };
849
850                (idx, SubTask {
851                    id: subtask_id,
852                    description,
853                    required_capability: capability,
854                    result: Some(output),
855                    success,
856                    role,
857                })
858            });
859        }
860
861        let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
862        while let Some(join_result) = join_set.join_next().await {
863            match join_result {
864                Ok((idx, subtask)) => {
865                    results[idx] = Some(subtask);
866                }
867                Err(e) => {
868                    tracing::error!(error = %e, "A2A task panicked");
869                }
870            }
871        }
872
873        let completed: Vec<SubTask> = results.into_iter().flatten().collect();
874        tracing::info!(
875            completed = completed.len(),
876            succeeded = completed.iter().filter(|r| r.success).count(),
877            "A2A delegation complete"
878        );
879        Ok(completed)
880    }
881
882    async fn delegate_via_lifecycle(
883        &self,
884        subtasks: Vec<SubTask>,
885        parent_seed: &Seed,
886    ) -> Result<Vec<SubTask>> {
887        use crate::agent_group::OxiosAgentGroup;
888        use tokio::task::JoinSet;
889
890        let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
891        let group = OxiosAgentGroup::new(parent_seed, descriptions);
892        let group_id = group.id;
893
894        self.event_bus.publish(KernelEvent::AgentGroupCreated {
895            group_id,
896            agent_count: group.agents.len(),
897        })?;
898
899        tracing::info!(
900            group_id = %group_id,
901            agent_count = group.agents.len(),
902            "Starting parallel multi-agent execution"
903        );
904
905        let mut join_set: JoinSet<(
906            usize,
907            crate::types::AgentId,
908            Result<oxios_ouroboros::ExecutionResult>,
909        )> = JoinSet::new();
910
911        for (idx, agent_entry) in group.agents.iter().enumerate() {
912            let child_seed = agent_entry.seed.clone();
913            let agent_id = agent_entry.id;
914            let lifecycle = self.lifecycle.clone();
915
916            join_set.spawn(async move {
917                let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
918                (idx, agent_id, result)
919            });
920        }
921
922        let subtask_count = subtasks.len();
923        let mut completed = vec![None; subtask_count];
924        while let Some(join_result) = join_set.join_next().await {
925            match join_result {
926                Ok((idx, agent_id, Ok(exec_result))) => {
927                    let _ = self
928                        .event_bus
929                        .publish(KernelEvent::AgentGroupMemberCompleted {
930                            group_id,
931                            agent_id,
932                            success: exec_result.success,
933                        });
934                    completed[idx] = Some(SubTask {
935                        id: subtasks[idx].id,
936                        description: subtasks[idx].description.clone(),
937                        required_capability: subtasks[idx].required_capability.clone(),
938                        result: Some(exec_result.output.clone()),
939                        success: exec_result.success,
940                        role: subtasks[idx].role.clone(),
941                    });
942                }
943                Ok((idx, agent_id, Err(e))) => {
944                    tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
945                    let _ = self
946                        .event_bus
947                        .publish(KernelEvent::AgentGroupMemberCompleted {
948                            group_id,
949                            agent_id,
950                            success: false,
951                        });
952                    completed[idx] = Some(SubTask {
953                        id: subtasks[idx].id,
954                        description: subtasks[idx].description.clone(),
955                        required_capability: subtasks[idx].required_capability.clone(),
956                        result: Some(format!("Failed: {e}")),
957                        success: false,
958                        role: subtasks[idx].role.clone(),
959                    });
960                }
961                Err(e) => {
962                    tracing::error!(error = %e, "JoinSet task panicked");
963                }
964            }
965        }
966
967        let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
968        let succeeded = completed.iter().filter(|r| r.success).count();
969        let total = completed.len();
970
971        tracing::info!(
972            group_id = %group_id,
973            succeeded,
974            total,
975            "Parallel multi-agent execution complete"
976        );
977
978        // Persist group state
979        let _ = self
980            .state_store
981            .save_json("agent_groups", &group_id.to_string(), &group)
982            .await;
983        self.git_commit(
984            &format!("agent_groups/{}.json", group_id),
985            "orchestrator: save group",
986        );
987
988        Ok(completed)
989    }
990}
991
992/// Active session state for multi-turn interviews.
993#[derive(Debug, Clone)]
994#[allow(unused)]
995struct InterviewSession {
996    id: String,
997    interview: InterviewResult,
998    phase: Phase,
999    seed_id: Option<Uuid>,
1000    agent_id: Option<AgentId>,
1001}
1002
1003/// Result of a full orchestration cycle.
1004#[derive(Debug, Clone, Serialize, Deserialize)]
1005pub struct OrchestrationResult {
1006    /// Session ID for multi-turn interviews. Pass this on follow-up messages.
1007    #[serde(skip_serializing_if = "Option::is_none")]
1008    pub session_id: Option<String>,
1009    /// The Space ID that handled this message.
1010    #[serde(skip_serializing_if = "Option::is_none")]
1011    pub space_id: Option<Uuid>,
1012    /// Space decoration tag for the response (e.g. "[🔧 oxios]").
1013    #[serde(skip_serializing_if = "Option::is_none")]
1014    pub space_tag: Option<String>,
1015    /// The response to send back to the user.
1016    pub response: String,
1017    /// The seed that was created (if seed phase was reached).
1018    #[serde(skip_serializing_if = "Option::is_none")]
1019    pub seed_id: Option<Uuid>,
1020    /// The agent that executed (if execute phase was reached).
1021    #[serde(skip_serializing_if = "Option::is_none")]
1022    pub agent_id: Option<AgentId>,
1023    /// The furthest phase reached.
1024    pub phase_reached: Phase,
1025    /// Whether evaluation passed (false if evaluation was skipped or failed).
1026    pub evaluation_passed: bool,
1027    /// Output or notes from evaluation.
1028    #[serde(skip_serializing_if = "Option::is_none")]
1029    pub output: Option<String>,
1030}
1031
1032/// Format clarifying questions for display.
1033fn format_questions(questions: &[String]) -> String {
1034    if questions.is_empty() {
1035        "I need a bit more clarification before I can proceed.".to_string()
1036    } else {
1037        format!(
1038            "I'd like to understand your request better. Could you help clarify:\n\n{}",
1039            questions
1040                .iter()
1041                .enumerate()
1042                .map(|(i, q)| format!("{}. {}", i + 1, q))
1043                .collect::<Vec<_>>()
1044                .join("\n")
1045        )
1046    }
1047}
1048
1049/// Format the final result for display.
1050/// Format execution result for display to the user.
1051fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1052    let mut lines = Vec::new();
1053
1054    if exec.success {
1055        lines.push(format!("✅ '{}'", seed.goal));
1056    } else {
1057        lines.push(format!(
1058            "⚠️ '{}'을(를) 시도했지만 완전히 성공하지 못했습니다.",
1059            seed.goal
1060        ));
1061    }
1062
1063    // Show a truncated preview of the output if present.
1064    if !exec.output.is_empty() {
1065        let preview = if exec.output.len() > 500 {
1066            format!("{}...", &exec.output[..500])
1067        } else {
1068            exec.output.clone()
1069        };
1070        lines.push(String::new());
1071        lines.push(preview);
1072    }
1073
1074    lines.join("\n")
1075}
1076
1077/// Check if a seed should be split into subtasks.
1078///
1079/// Simple heuristic: if the seed has 3 or more acceptance criteria,
1080/// it likely contains distinct concerns that can be parallelized.
1081fn should_split_seed(seed: &Seed) -> bool {
1082    // Only split for genuinely complex tasks with many criteria.
1083    // Simple tasks (even with 3-4 criteria) are better handled by a single agent
1084    // to preserve context coherence.
1085    seed.acceptance_criteria.len() >= 5
1086}
1087
1088/// Split a seed into subtasks based on acceptance criteria.
1089///
1090/// Each acceptance criterion becomes a separate subtask with the
1091/// parent seed's goal as context. Infers required capability from
1092/// the goal text using the same heuristic as `build_agent_card`.
1093fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1094    seed.acceptance_criteria
1095        .iter()
1096        .map(|criterion| {
1097            let desc = format!("{}: {}", seed.goal, criterion);
1098            let desc_lower = desc.to_lowercase();
1099
1100            // Infer capability from subtask description.
1101            let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1102                Some("code-review".to_string())
1103            } else if desc_lower.contains("test") {
1104                Some("testing".to_string())
1105            } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1106                Some("refactoring".to_string())
1107            } else if desc_lower.contains("write")
1108                || desc_lower.contains("create")
1109                || desc_lower.contains("implement")
1110            {
1111                Some("code-generation".to_string())
1112            } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1113                Some("debugging".to_string())
1114            } else {
1115                None
1116            };
1117
1118            let mut subtask = SubTask::new(desc);
1119            subtask.required_capability = cap;
1120            subtask
1121        })
1122        .collect()
1123}
1124
1125/// Format combined results from multi-agent execution.
1126fn format_result_combined(combined: &str) -> String {
1127    if combined.is_empty() {
1128        "No subtasks completed successfully.".to_string()
1129    } else {
1130        format!("Multi-agent execution completed:\n\n{}", combined)
1131    }
1132}
1133
1134/// Execute a subtask via lifecycle manager, returning (output, success).
1135async fn run_via_lifecycle(
1136    lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1137    parent_seed: &Seed,
1138    description: &str,
1139) -> (String, bool) {
1140    let child_seed = Seed {
1141        id: Uuid::new_v4(),
1142        goal: description.to_string(),
1143        constraints: parent_seed.constraints.clone(),
1144        acceptance_criteria: vec!["Task completes successfully".into()],
1145        ontology: parent_seed.ontology.clone(),
1146        created_at: chrono::Utc::now(),
1147        generation: parent_seed.generation + 1,
1148        parent_seed_id: Some(parent_seed.id),
1149        cspace_hint: None,
1150        original_request: parent_seed.original_request.clone(),
1151    };
1152    match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1153        Ok(result) => (result.output, result.success),
1154        Err(e) => (format!("Failed: {e}"), false),
1155    }
1156}