1use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::{Context, Result};
18use chrono;
19use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::agent_lifecycle::AgentLifecycleManager;
25use crate::event_bus::{EventBus, KernelEvent};
26use crate::git_layer::GitLayer;
27use crate::metrics::get_metrics;
28use crate::scheduler::Priority;
29use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
30use crate::state_store::StateStore;
31use crate::types::AgentId;
32
33#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
35pub enum AgentRole {
36 #[default]
38 Worker,
39 Manager,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SubTask {
46 pub id: Uuid,
48 pub description: String,
50 pub required_capability: Option<String>,
52 pub result: Option<String>,
54 pub success: bool,
56 #[serde(default)]
58 pub role: AgentRole,
59}
60
61impl SubTask {
62 pub fn new(description: impl Into<String>) -> Self {
64 Self {
65 id: Uuid::new_v4(),
66 description: description.into(),
67 required_capability: None,
68 result: None,
69 success: false,
70 role: AgentRole::default(),
71 }
72 }
73
74 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
76 self.required_capability = Some(cap.into());
77 self
78 }
79}
80
81pub struct Orchestrator {
83 ouroboros: Arc<dyn OuroborosProtocol>,
84 event_bus: EventBus,
85 state_store: Arc<StateStore>,
86 git_layer: Option<Arc<GitLayer>>,
88 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
90 lifecycle: AgentLifecycleManager,
92 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
94 space_manager: RwLock<Option<Arc<SpaceManager>>>,
96 conversation_buffer: RwLock<ConversationBuffer>,
98 config: crate::config::OrchestratorConfig,
100 delegation_config: DelegationConfig,
102 a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
104}
105
106#[derive(Debug, Clone)]
108struct DelegationConfig {
109 max_retries: u32,
111 base_delay_ms: u64,
113 max_delay_ms: u64,
115 #[allow(dead_code)]
117 timeout_ms: u64,
118}
119
120impl Default for DelegationConfig {
121 fn default() -> Self {
122 Self {
123 max_retries: 3,
124 base_delay_ms: 100,
125 max_delay_ms: 5000,
126 timeout_ms: 5000,
127 }
128 }
129}
130
131impl DelegationConfig {
132 fn backoff_delay(&self, attempt: u32) -> u64 {
134 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
135 delay.min(self.max_delay_ms)
136 }
137}
138
139impl Orchestrator {
140 pub fn new(
142 ouroboros: Arc<dyn OuroborosProtocol>,
143 event_bus: EventBus,
144 state_store: Arc<StateStore>,
145 lifecycle: AgentLifecycleManager,
146 ) -> Self {
147 Self::with_config(
148 ouroboros,
149 event_bus,
150 state_store,
151 lifecycle,
152 crate::config::OrchestratorConfig::default(),
153 )
154 }
155
156 pub fn with_config(
158 ouroboros: Arc<dyn OuroborosProtocol>,
159 event_bus: EventBus,
160 state_store: Arc<StateStore>,
161 lifecycle: AgentLifecycleManager,
162 config: crate::config::OrchestratorConfig,
163 ) -> Self {
164 Self {
165 ouroboros,
166 event_bus,
167 state_store,
168 git_layer: None,
169 sessions: RwLock::new(std::collections::HashMap::new()),
170 lifecycle,
171 a2a: None,
172 space_manager: RwLock::new(None),
173 conversation_buffer: RwLock::new(ConversationBuffer::default()),
174 config,
175 delegation_config: DelegationConfig::default(),
176 a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
177 }
178 }
179
180 pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
182 *self.space_manager.write() = Some(manager);
183 }
184
185 pub fn current_space_id(&self) -> Option<SpaceId> {
187 self.space_manager
188 .read()
189 .as_ref()
190 .map(|m| m.current_space_id())
191 }
192
193 pub fn current_space_tag(&self) -> String {
195 self.space_manager
196 .read()
197 .as_ref()
198 .and_then(|m| {
199 m.current_space().map(|s| {
200 if s.is_default() {
201 String::new()
202 } else {
203 format!("[{} {}]", s.emoji(), s.name)
204 }
205 })
206 })
207 .unwrap_or_default()
208 }
209
210 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
212 self.a2a = Some(a2a);
213 }
214
215 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
217 self.git_layer = Some(git_layer);
218 }
219
220 fn git_commit(&self, rel_path: &str, message: &str) {
222 if let Some(ref gl) = self.git_layer {
223 if gl.is_enabled() {
224 let _ = gl.commit_file(rel_path, message);
225 }
226 }
227 }
228
229 #[allow(clippy::await_holding_lock)]
238 pub async fn handle_message(
239 &self,
240 user_id: &str,
241 user_message: &str,
242 session_id: Option<&str>,
243 ) -> Result<OrchestrationResult> {
244 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
245 get_metrics().messages.inc();
246 let orch_start = std::time::Instant::now();
247
248 let session_id = session_id
249 .map(String::from)
250 .unwrap_or_else(|| Uuid::new_v4().to_string());
251
252 tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
253
254 let space_tag = self.current_space_tag();
256 let (turns, sm_arc) = {
257 let buffer = self.conversation_buffer.read();
258 let sm_guard = self.space_manager.read();
259 let turns: Vec<_> = buffer.turns().iter().cloned().collect();
260 let sm_arc = sm_guard.as_ref().cloned();
262 (turns, sm_arc)
263 };
264 let space_id = if let Some(ref sm) = sm_arc {
265 match sm.detect_or_create(user_message, &turns).await {
266 Ok(id) => {
267 tracing::info!(space_id = %id, "Space detected/created for message");
268 id
269 }
270 Err(e) => {
271 tracing::warn!(error = %e, "Space detection failed, using default");
272 sm.default_space_id()
273 }
274 }
275 } else {
276 uuid::Uuid::nil()
277 };
278
279 {
281 let mut buffer = self.conversation_buffer.write();
282 buffer.push_user(user_message);
283 }
284
285 self.publish_phase_started(&session_id, Phase::Interview)
287 .await;
288
289 let needs_interview;
291 let existing_history: Option<Vec<_>>;
292 {
293 let sessions = self.sessions.read();
294 needs_interview = !sessions.contains_key(&session_id);
295 existing_history = if !needs_interview {
296 sessions
297 .get(&session_id)
298 .map(|s| s.interview.conversation_history.clone())
299 } else {
300 None
301 };
302 }
304
305 let interview = {
307 tracing::info!(phase = "interview", "Starting interview phase");
308 if needs_interview {
309 self.ouroboros.interview(user_message).await?
310 } else {
311 let multi_turn_context = {
314 let mut context_parts = Vec::new();
315 if let Some(ref history) = existing_history {
316 for exchange in history {
317 context_parts.push(format!(
318 "User: {}\nAgent: {}",
319 exchange.user, exchange.agent
320 ));
321 }
322 }
323 context_parts.push(format!("User: {}", user_message));
324 context_parts.join("\n\n")
325 };
326
327 {
329 let mut sessions = self.sessions.write();
330 if let Some(s) = sessions.get_mut(&session_id) {
331 let last_q = s.interview.questions.last().cloned().unwrap_or_default();
332 s.interview.add_exchange(&last_q, user_message);
333 }
334 }
335
336 self.ouroboros.interview(&multi_turn_context).await?
338 }
339 };
340
341 if !interview.is_task {
343 tracing::info!(session_id = %session_id, "Chat response (non-task)");
344
345 let response_text = if interview.chat_response.is_empty() {
346 "Hello! How can I help you today?".to_string()
347 } else {
348 interview.chat_response.clone()
349 };
350
351 {
353 let mut buffer = self.conversation_buffer.write();
354 buffer.push_agent(&response_text, &space_id);
355 }
356
357 {
360 let mut sessions = self.sessions.write();
361 if let Some(session) = sessions.get_mut(&session_id) {
362 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
363 session
364 .interview
365 .add_to_history(user_message, &response_text);
366 } else {
367 let mut interview = InterviewResult::new();
369 interview.is_task = false;
370 interview.chat_response = response_text.clone();
371 interview.add_to_history(user_message, &response_text);
372 sessions.insert(
373 session_id.clone(),
374 InterviewSession {
375 id: session_id.clone(),
376 interview,
377 phase: Phase::Interview,
378 seed_id: None,
379 agent_id: None,
380 },
381 );
382 }
383 }
384
385 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
386 .await;
387
388 return Ok(OrchestrationResult {
389 session_id: Some(session_id.clone()),
390 space_id: Some(space_id),
391 space_tag: Some(space_tag.clone()),
392 response: response_text,
393 seed_id: None,
394 agent_id: None,
395 phase_reached: Phase::Interview,
396 evaluation_passed: false,
397 output: None,
398 });
399 }
400
401 if !interview.ready_for_seed {
403 {
405 let mut sessions = self.sessions.write();
406 let session =
407 sessions
408 .entry(session_id.clone())
409 .or_insert_with(|| InterviewSession {
410 id: session_id.clone(),
411 interview: interview.clone(),
412 phase: Phase::Interview,
413 seed_id: None,
414 agent_id: None,
415 });
416 let questions_text = interview.questions.join("\n");
419 let last_answer = session.interview.answers.last().cloned();
420 if let Some(ref ans) = last_answer {
421 if !ans.is_empty() {
422 session.interview.add_to_history(ans, &questions_text);
423 }
424 }
425 } let questions = interview
428 .questions
429 .iter()
430 .filter(|q| !q.is_empty())
431 .cloned()
432 .collect::<Vec<_>>();
433
434 tracing::info!(
435 session_id = %session_id,
436 ambiguity = interview.ambiguity.ambiguity(),
437 questions = questions.len(),
438 "Interview needs clarification"
439 );
440
441 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
442 .await;
443
444 return Ok(OrchestrationResult {
445 session_id: Some(session_id.clone()),
446 space_id: Some(space_id),
447 space_tag: Some(space_tag.clone()),
448 response: format_questions(&questions),
449 seed_id: None,
450 agent_id: None,
451 phase_reached: Phase::Interview,
452 evaluation_passed: false,
453 output: None,
454 });
455 }
456
457 {
461 let mut buffer = self.conversation_buffer.write();
462 buffer.push_agent("[interview: questions]", &space_id);
463 }
464
465 self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
467 .await;
468 self.publish_phase_started(&session_id, Phase::Seed).await;
469
470 tracing::info!(phase = "seed", "Starting seed generation");
472 let seed = self.ouroboros.generate_seed(&interview).await?;
473
474 self.save_seed(&seed).await?;
476
477 self.event_bus
479 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
480
481 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
482 .await;
483 self.publish_phase_started(&session_id, Phase::Execute)
484 .await;
485
486 if should_split_seed(&seed) {
490 let subtasks = split_into_subtasks(&seed);
491 if subtasks.len() > 1 {
492 tracing::info!(
493 phase = "delegate",
494 subtasks = subtasks.len(),
495 "Delegating to multi-agent"
496 );
497 let results = self.delegate_subtasks(subtasks, &seed).await?;
498
499 let combined: String = results
501 .iter()
502 .filter(|r| r.success)
503 .filter_map(|r| r.result.as_deref())
504 .collect::<Vec<_>>()
505 .join("\n\n");
506
507 let all_passed = results.iter().all(|r| r.success);
508
509 {
511 let mut sessions = self.sessions.write();
512 sessions.remove(&session_id);
513 }
514
515 tracing::info!(
516 session_id = %session_id,
517 subtasks = results.len(),
518 passed = all_passed,
519 "Multi-agent orchestration complete"
520 );
521
522 return Ok(OrchestrationResult {
523 session_id: Some(session_id),
524 space_id: Some(space_id),
525 space_tag: Some(space_tag.clone()),
526 response: format_result_combined(&combined),
527 seed_id: Some(seed.id),
528 agent_id: None,
529 phase_reached: Phase::Execute,
530 evaluation_passed: all_passed,
531 output: Some(combined),
532 });
533 }
534 }
535
536 {
538 let mut buffer = self.conversation_buffer.write();
539 buffer.push_agent("[multi-agent: complete]", &space_id);
540 }
541
542 tracing::info!(phase = "execute", "Starting execution phase");
544 let exec_result = self
545 .lifecycle
546 .spawn_and_run(&seed, Priority::Normal)
547 .await?;
548
549 self.lifecycle.reap_zombies();
551
552 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
554 .await;
555 self.publish_phase_started(&session_id, Phase::Evaluate)
556 .await;
557
558 tracing::info!(phase = "evaluate", "Starting evaluation phase");
559 let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
560
561 self.publish_phase_completed(
562 &session_id,
563 Phase::Evaluate,
564 &format!("score={:.2}", evaluation.score),
565 )
566 .await;
567
568 self.event_bus.publish(KernelEvent::EvaluationComplete {
569 seed_id: seed.id,
570 passed: evaluation.all_passed(),
571 })?;
572
573 self.save_evaluation(&seed, &evaluation).await?;
575
576 let mut current_seed = Some(seed);
578 let mut current_evaluation = evaluation;
579 let mut iterations = 0;
580
581 while !current_evaluation.all_passed()
582 && current_evaluation.score < self.config.min_evaluation_score
583 && iterations < self.config.max_evolution_iterations
584 {
585 iterations += 1;
586 self.publish_phase_started(&session_id, Phase::Evolve).await;
587
588 tracing::info!(
589 phase = "evolve",
590 iteration = iterations,
591 max_iterations = self.config.max_evolution_iterations,
592 "Starting evolve phase"
593 );
594 let evolve_result = self
595 .ouroboros
596 .evolve(
597 current_seed.as_ref().expect("seed exists"),
598 ¤t_evaluation,
599 )
600 .await?;
601
602 if let Some(evolved) = evolve_result {
603 current_seed = Some(evolved.clone());
604
605 self.save_seed(&evolved).await?;
607 self.event_bus.publish(KernelEvent::SeedCreated {
608 seed_id: evolved.id,
609 })?;
610
611 self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
612 .await;
613 self.publish_phase_started(&session_id, Phase::Execute)
614 .await;
615
616 tracing::info!(
617 phase = "re-execute",
618 iteration = iterations,
619 "Re-executing with evolved seed"
620 );
621 let new_exec = self
622 .lifecycle
623 .spawn_and_run(&evolved, Priority::High)
624 .await?;
625
626 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
627 .await;
628 self.publish_phase_started(&session_id, Phase::Evaluate)
629 .await;
630
631 tracing::info!(
632 phase = "re-evaluate",
633 iteration = iterations,
634 "Re-evaluating evolved result"
635 );
636 let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
637 current_evaluation = new_eval;
638
639 self.publish_phase_completed(
640 &session_id,
641 Phase::Evaluate,
642 &format!("score={:.2}", current_evaluation.score),
643 )
644 .await;
645 self.save_evaluation(&evolved, ¤t_evaluation).await?;
647 } else {
648 self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
650 .await;
651 break;
652 }
653 }
654
655 {
657 let mut sessions = self.sessions.write();
658 sessions.remove(&session_id);
659 }
660
661 let final_seed = current_seed.expect("at least one seed exists");
662 let passed = current_evaluation.all_passed() || current_evaluation.score >= 0.7;
663
664 tracing::info!(
665 session_id = %session_id,
666 iterations,
667 score = current_evaluation.score,
668 passed,
669 "Orchestration complete"
670 );
671
672 let metrics = get_metrics();
674 metrics
675 .orch_duration
676 .observe(orch_start.elapsed().as_secs_f64());
677 if passed {
678 metrics.agents_completed.inc();
679 } else {
680 metrics.agents_failed.inc();
681 }
682
683 {
685 let mut buffer = self.conversation_buffer.write();
686 buffer.push_agent(&final_seed.goal, &space_id);
687 }
688
689 Ok(OrchestrationResult {
690 session_id: Some(session_id),
691 space_id: Some(space_id),
692 space_tag: Some(space_tag.clone()),
693 response: format_result(&final_seed, ¤t_evaluation),
694 seed_id: Some(final_seed.id),
695 agent_id: None,
696 phase_reached: Phase::Evaluate,
697 evaluation_passed: passed,
698 output: Some(current_evaluation.notes.join("; ")),
699 })
700 }
701
702 async fn save_seed(&self, seed: &Seed) -> Result<()> {
704 let key = seed.id.to_string();
705
706 self.state_store
707 .save_json("seeds", &key, seed)
708 .await
709 .context("failed to save seed to state store")?;
710
711 self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
712
713 Ok(())
714 }
715
716 async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
718 let key = format!("{}-eval", seed.id);
719
720 self.state_store
721 .save_json("evals", &key, evaluation)
722 .await
723 .context("failed to save evaluation to state store")?;
724
725 self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
726
727 Ok(())
728 }
729
730 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
732 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
733 session_id: session_id.to_owned(),
734 phase,
735 });
736 }
737
738 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
740 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
741 session_id: session_id.to_owned(),
742 phase,
743 result_summary: result.to_owned(),
744 });
745 }
746
747 pub async fn delegate_subtasks(
755 &self,
756 subtasks: Vec<SubTask>,
757 parent_seed: &Seed,
758 ) -> Result<Vec<SubTask>> {
759 if subtasks.len() == 1 {
761 return self.execute_single_subtask(subtasks, parent_seed).await;
762 }
763
764 if let Some(ref a2a) = self.a2a {
766 if !self.a2a_breaker.is_allowed() {
768 tracing::warn!(
769 state = ?self.a2a_breaker.state(),
770 "A2A circuit breaker open, using lifecycle fallback"
771 );
772 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
773 }
774
775 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
777 }
778
779 self.delegate_via_lifecycle(subtasks, parent_seed).await
781 }
782
783 async fn delegate_with_retry(
785 &self,
786 subtasks: Vec<SubTask>,
787 parent_seed: &Seed,
788 a2a: &Arc<crate::a2a::A2AProtocol>,
789 ) -> Result<Vec<SubTask>> {
790 let mut attempt = 0;
791 let max_retries = self.delegation_config.max_retries;
792
793 loop {
794 match self
795 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
796 .await
797 {
798 Ok(results) => {
799 self.a2a_breaker.record_success();
800 return Ok(results);
801 }
802 Err(e) => {
803 self.a2a_breaker.record_failure();
804 attempt += 1;
805
806 if attempt >= max_retries {
807 tracing::error!(
808 attempts = attempt,
809 error = %e,
810 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
811 attempt
812 );
813 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
814 }
815
816 let delay = self.delegation_config.backoff_delay(attempt);
818 tracing::warn!(
819 attempt,
820 delay_ms = delay,
821 error = %e,
822 "A2A delegation failed, retrying with backoff"
823 );
824 tokio::time::sleep(Duration::from_millis(delay)).await;
825 }
826 }
827 }
828 }
829
830 async fn execute_single_subtask(
832 &self,
833 subtasks: Vec<SubTask>,
834 parent_seed: &Seed,
835 ) -> Result<Vec<SubTask>> {
836 let mut task = subtasks
837 .into_iter()
838 .next()
839 .expect("execute_single_subtask is only called when subtasks is non-empty");
840 let child_seed = Seed {
841 id: Uuid::new_v4(),
842 goal: task.description.clone(),
843 constraints: parent_seed.constraints.clone(),
844 acceptance_criteria: vec!["Task completes successfully".into()],
845 ontology: parent_seed.ontology.clone(),
846 created_at: chrono::Utc::now(),
847 generation: parent_seed.generation + 1,
848 parent_seed_id: Some(parent_seed.id),
849 cspace_hint: None,
850 };
851 match self
852 .lifecycle
853 .spawn_and_run(&child_seed, Priority::Normal)
854 .await
855 {
856 Ok(result) => {
857 task.result = Some(result.output.clone());
858 }
859 Err(e) => {
860 task.result = Some(format!("Failed: {e}"));
861 task.success = false;
862 }
863 }
864 Ok(vec![task])
865 }
866
867 async fn delegate_via_a2a(
877 &self,
878 subtasks: Vec<SubTask>,
879 parent_seed: &Seed,
880 a2a: &Arc<crate::a2a::A2AProtocol>,
881 ) -> Result<Vec<SubTask>> {
882 use crate::a2a::TaskPriority;
883 use tokio::task::JoinSet;
884
885 tracing::info!(
886 subtasks = subtasks.len(),
887 "Delegating subtasks via A2A protocol"
888 );
889
890 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
891 let subtask_count = subtasks.len();
892 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
893
894 for (idx, subtask) in subtasks.into_iter().enumerate() {
895 let capability = subtask.required_capability.clone();
896 let description = subtask.description.clone();
897 let subtask_id = subtask.id;
898 let role = subtask.role.clone();
899 let a2a = Arc::clone(a2a);
900 let parent_seed = parent_seed.clone();
901 let lifecycle = self.lifecycle.clone();
902
903 join_set.spawn(async move {
904 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
906 a2a.query_capabilities(cap).await.ok()
907 .and_then(|agents| agents.into_iter().next())
908 } else {
909 None
910 };
911
912 let (output, success) = if let Some(ref target_card) = target {
913 let target_id = target_card.agent_id;
914 tracing::info!(
915 subtask_index = idx,
916 target = %target_card.name,
917 target_id = %target_id,
918 "A2A dispatching subtask"
919 );
920
921 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
922 "parent_seed": parent_seed.id.to_string(),
923 "goal": description,
924 }))
925 .with_priority(TaskPriority::Normal);
926
927 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
929
930 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
932 Some(Ok(result)) => {
933 let out = result.get("output")
934 .and_then(|v| v.as_str())
935 .unwrap_or("")
936 .to_string();
937 let ok = result.get("success")
938 .and_then(|v| v.as_bool())
939 .unwrap_or(false);
940 (out, ok)
941 }
942 Some(Err(e)) => {
943 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
944 (format!("Failed: {e}"), false)
945 }
946 None => {
947 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
949 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
950 }
951 }
952 } else {
953 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
954 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
955 };
956
957 (idx, SubTask {
958 id: subtask_id,
959 description,
960 required_capability: capability,
961 result: Some(output),
962 success,
963 role,
964 })
965 });
966 }
967
968 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
969 while let Some(join_result) = join_set.join_next().await {
970 match join_result {
971 Ok((idx, subtask)) => {
972 results[idx] = Some(subtask);
973 }
974 Err(e) => {
975 tracing::error!(error = %e, "A2A task panicked");
976 }
977 }
978 }
979
980 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
981 tracing::info!(
982 completed = completed.len(),
983 succeeded = completed.iter().filter(|r| r.success).count(),
984 "A2A delegation complete"
985 );
986 Ok(completed)
987 }
988
989 async fn delegate_via_lifecycle(
990 &self,
991 subtasks: Vec<SubTask>,
992 parent_seed: &Seed,
993 ) -> Result<Vec<SubTask>> {
994 use crate::agent_group::OxiosAgentGroup;
995 use tokio::task::JoinSet;
996
997 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
998 let group = OxiosAgentGroup::new(parent_seed, descriptions);
999 let group_id = group.id;
1000
1001 self.event_bus.publish(KernelEvent::AgentGroupCreated {
1002 group_id,
1003 agent_count: group.agents.len(),
1004 })?;
1005
1006 tracing::info!(
1007 group_id = %group_id,
1008 agent_count = group.agents.len(),
1009 "Starting parallel multi-agent execution"
1010 );
1011
1012 let mut join_set: JoinSet<(
1013 usize,
1014 crate::types::AgentId,
1015 Result<oxios_ouroboros::ExecutionResult>,
1016 )> = JoinSet::new();
1017
1018 for (idx, agent_entry) in group.agents.iter().enumerate() {
1019 let child_seed = agent_entry.seed.clone();
1020 let agent_id = agent_entry.id;
1021 let lifecycle = self.lifecycle.clone();
1022
1023 join_set.spawn(async move {
1024 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1025 (idx, agent_id, result)
1026 });
1027 }
1028
1029 let subtask_count = subtasks.len();
1030 let mut completed = vec![None; subtask_count];
1031 while let Some(join_result) = join_set.join_next().await {
1032 match join_result {
1033 Ok((idx, agent_id, Ok(exec_result))) => {
1034 let _ = self
1035 .event_bus
1036 .publish(KernelEvent::AgentGroupMemberCompleted {
1037 group_id,
1038 agent_id,
1039 success: exec_result.success,
1040 });
1041 completed[idx] = Some(SubTask {
1042 id: subtasks[idx].id,
1043 description: subtasks[idx].description.clone(),
1044 required_capability: subtasks[idx].required_capability.clone(),
1045 result: Some(exec_result.output.clone()),
1046 success: exec_result.success,
1047 role: subtasks[idx].role.clone(),
1048 });
1049 }
1050 Ok((idx, agent_id, Err(e))) => {
1051 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1052 let _ = self
1053 .event_bus
1054 .publish(KernelEvent::AgentGroupMemberCompleted {
1055 group_id,
1056 agent_id,
1057 success: false,
1058 });
1059 completed[idx] = Some(SubTask {
1060 id: subtasks[idx].id,
1061 description: subtasks[idx].description.clone(),
1062 required_capability: subtasks[idx].required_capability.clone(),
1063 result: Some(format!("Failed: {e}")),
1064 success: false,
1065 role: subtasks[idx].role.clone(),
1066 });
1067 }
1068 Err(e) => {
1069 tracing::error!(error = %e, "JoinSet task panicked");
1070 }
1071 }
1072 }
1073
1074 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1075 let succeeded = completed.iter().filter(|r| r.success).count();
1076 let total = completed.len();
1077
1078 tracing::info!(
1079 group_id = %group_id,
1080 succeeded,
1081 total,
1082 "Parallel multi-agent execution complete"
1083 );
1084
1085 let _ = self
1087 .state_store
1088 .save_json("agent_groups", &group_id.to_string(), &group)
1089 .await;
1090 self.git_commit(
1091 &format!("agent_groups/{}.json", group_id),
1092 "orchestrator: save group",
1093 );
1094
1095 Ok(completed)
1096 }
1097}
1098
1099#[derive(Debug, Clone)]
1101#[allow(unused)]
1102struct InterviewSession {
1103 id: String,
1104 interview: InterviewResult,
1105 phase: Phase,
1106 seed_id: Option<Uuid>,
1107 agent_id: Option<AgentId>,
1108}
1109
1110#[derive(Debug, Clone, Serialize, Deserialize)]
1112pub struct OrchestrationResult {
1113 #[serde(skip_serializing_if = "Option::is_none")]
1115 pub session_id: Option<String>,
1116 #[serde(skip_serializing_if = "Option::is_none")]
1118 pub space_id: Option<Uuid>,
1119 #[serde(skip_serializing_if = "Option::is_none")]
1121 pub space_tag: Option<String>,
1122 pub response: String,
1124 #[serde(skip_serializing_if = "Option::is_none")]
1126 pub seed_id: Option<Uuid>,
1127 #[serde(skip_serializing_if = "Option::is_none")]
1129 pub agent_id: Option<AgentId>,
1130 pub phase_reached: Phase,
1132 pub evaluation_passed: bool,
1134 #[serde(skip_serializing_if = "Option::is_none")]
1136 pub output: Option<String>,
1137}
1138
1139fn format_questions(questions: &[String]) -> String {
1141 if questions.is_empty() {
1142 "I need a bit more clarification before I can proceed.".to_string()
1143 } else {
1144 format!(
1145 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1146 questions
1147 .iter()
1148 .enumerate()
1149 .map(|(i, q)| format!("{}. {}", i + 1, q))
1150 .collect::<Vec<_>>()
1151 .join("\n")
1152 )
1153 }
1154}
1155
1156fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
1158 let passed = evaluation.all_passed();
1159
1160 let mut lines = Vec::new();
1161 lines.push(format!("Task '{}' completed.", seed.goal));
1162
1163 if passed {
1164 lines.push("✅ Evaluation passed.".to_string());
1165 } else {
1166 lines.push(format!(
1167 "⚠️ Evaluation score: {:.0}%",
1168 evaluation.score * 100.0
1169 ));
1170 }
1171
1172 if !evaluation.notes.is_empty() {
1173 lines.push("\nNotes:".to_string());
1174 for note in &evaluation.notes {
1175 lines.push(format!("- {}", note));
1176 }
1177 }
1178
1179 lines.join("\n")
1180}
1181
1182fn should_split_seed(seed: &Seed) -> bool {
1187 seed.acceptance_criteria.len() >= 5
1191}
1192
1193fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1199 seed.acceptance_criteria
1200 .iter()
1201 .map(|criterion| {
1202 let desc = format!("{}: {}", seed.goal, criterion);
1203 let desc_lower = desc.to_lowercase();
1204
1205 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1207 Some("code-review".to_string())
1208 } else if desc_lower.contains("test") {
1209 Some("testing".to_string())
1210 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1211 Some("refactoring".to_string())
1212 } else if desc_lower.contains("write")
1213 || desc_lower.contains("create")
1214 || desc_lower.contains("implement")
1215 {
1216 Some("code-generation".to_string())
1217 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1218 Some("debugging".to_string())
1219 } else {
1220 None
1221 };
1222
1223 let mut subtask = SubTask::new(desc);
1224 subtask.required_capability = cap;
1225 subtask
1226 })
1227 .collect()
1228}
1229
1230fn format_result_combined(combined: &str) -> String {
1232 if combined.is_empty() {
1233 "No subtasks completed successfully.".to_string()
1234 } else {
1235 format!("Multi-agent execution completed:\n\n{}", combined)
1236 }
1237}
1238
1239async fn run_via_lifecycle(
1241 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1242 parent_seed: &Seed,
1243 description: &str,
1244) -> (String, bool) {
1245 let child_seed = Seed {
1246 id: Uuid::new_v4(),
1247 goal: description.to_string(),
1248 constraints: parent_seed.constraints.clone(),
1249 acceptance_criteria: vec!["Task completes successfully".into()],
1250 ontology: parent_seed.ontology.clone(),
1251 created_at: chrono::Utc::now(),
1252 generation: parent_seed.generation + 1,
1253 parent_seed_id: Some(parent_seed.id),
1254 cspace_hint: None,
1255 };
1256 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1257 Ok(result) => (result.output, result.success),
1258 Err(e) => (format!("Failed: {e}"), false),
1259 }
1260}