1use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::{Context, Result};
18use chrono;
19use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
20use parking_lot::RwLock;
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::agent_lifecycle::AgentLifecycleManager;
25use crate::event_bus::{EventBus, KernelEvent};
26use crate::git_layer::GitLayer;
27use crate::metrics::get_metrics;
28use crate::scheduler::Priority;
29use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
30use crate::state_store::StateStore;
31use crate::types::AgentId;
32
33#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
35pub enum AgentRole {
36 #[default]
38 Worker,
39 Manager,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SubTask {
46 pub id: Uuid,
48 pub description: String,
50 pub required_capability: Option<String>,
52 pub result: Option<String>,
54 pub success: bool,
56 #[serde(default)]
58 pub role: AgentRole,
59}
60
61impl SubTask {
62 pub fn new(description: impl Into<String>) -> Self {
64 Self {
65 id: Uuid::new_v4(),
66 description: description.into(),
67 required_capability: None,
68 result: None,
69 success: false,
70 role: AgentRole::default(),
71 }
72 }
73
74 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
76 self.required_capability = Some(cap.into());
77 self
78 }
79}
80
81pub struct Orchestrator {
83 ouroboros: Arc<dyn OuroborosProtocol>,
84 event_bus: EventBus,
85 state_store: Arc<StateStore>,
86 git_layer: Option<Arc<GitLayer>>,
88 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
90 lifecycle: AgentLifecycleManager,
92 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
94 space_manager: RwLock<Option<Arc<SpaceManager>>>,
96 conversation_buffer: RwLock<ConversationBuffer>,
98 config: crate::config::OrchestratorConfig,
100 delegation_config: DelegationConfig,
102 a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
104}
105
106#[derive(Debug, Clone)]
108struct DelegationConfig {
109 max_retries: u32,
111 base_delay_ms: u64,
113 max_delay_ms: u64,
115 timeout_ms: u64,
117}
118
119impl Default for DelegationConfig {
120 fn default() -> Self {
121 Self {
122 max_retries: 3,
123 base_delay_ms: 100,
124 max_delay_ms: 5000,
125 timeout_ms: 5000,
126 }
127 }
128}
129
130impl DelegationConfig {
131 fn backoff_delay(&self, attempt: u32) -> u64 {
133 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10) as u32);
134 delay.min(self.max_delay_ms)
135 }
136}
137
138impl Orchestrator {
139 pub fn new(
141 ouroboros: Arc<dyn OuroborosProtocol>,
142 event_bus: EventBus,
143 state_store: Arc<StateStore>,
144 lifecycle: AgentLifecycleManager,
145 ) -> Self {
146 Self::with_config(
147 ouroboros,
148 event_bus,
149 state_store,
150 lifecycle,
151 crate::config::OrchestratorConfig::default(),
152 )
153 }
154
155 pub fn with_config(
157 ouroboros: Arc<dyn OuroborosProtocol>,
158 event_bus: EventBus,
159 state_store: Arc<StateStore>,
160 lifecycle: AgentLifecycleManager,
161 config: crate::config::OrchestratorConfig,
162 ) -> Self {
163 Self {
164 ouroboros,
165 event_bus,
166 state_store,
167 git_layer: None,
168 sessions: RwLock::new(std::collections::HashMap::new()),
169 lifecycle,
170 a2a: None,
171 space_manager: RwLock::new(None),
172 conversation_buffer: RwLock::new(ConversationBuffer::default()),
173 config,
174 delegation_config: DelegationConfig::default(),
175 a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
176 }
177 }
178
179 pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
181 *self.space_manager.write() = Some(manager);
182 }
183
184 pub fn current_space_id(&self) -> Option<SpaceId> {
186 self.space_manager
187 .read()
188 .as_ref()
189 .map(|m| m.current_space_id())
190 }
191
192 pub fn current_space_tag(&self) -> String {
194 self.space_manager
195 .read()
196 .as_ref()
197 .and_then(|m| {
198 m.current_space().map(|s| {
199 if s.is_default() {
200 String::new()
201 } else {
202 format!("[{} {}]", s.emoji(), s.name)
203 }
204 })
205 })
206 .unwrap_or_default()
207 }
208
209 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
211 self.a2a = Some(a2a);
212 }
213
214 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
216 self.git_layer = Some(git_layer);
217 }
218
219 fn git_commit(&self, rel_path: &str, message: &str) {
221 if let Some(ref gl) = self.git_layer {
222 if gl.is_enabled() {
223 let _ = gl.commit_file(rel_path, message);
224 }
225 }
226 }
227
228 #[allow(clippy::await_holding_lock)]
237 pub async fn handle_message(
238 &self,
239 user_id: &str,
240 user_message: &str,
241 session_id: Option<&str>,
242 ) -> Result<OrchestrationResult> {
243 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
244 get_metrics().messages.inc();
245 let orch_start = std::time::Instant::now();
246
247 let session_id = session_id
248 .map(String::from)
249 .unwrap_or_else(|| Uuid::new_v4().to_string());
250
251 tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
252
253 let space_tag = self.current_space_tag();
255 let (turns, sm_arc) = {
256 let buffer = self.conversation_buffer.read();
257 let sm_guard = self.space_manager.read();
258 let turns: Vec<_> = buffer.turns().iter().cloned().collect();
259 let sm_arc = sm_guard.as_ref().cloned();
261 (turns, sm_arc)
262 };
263 let space_id = if let Some(ref sm) = sm_arc {
264 match sm.detect_or_create(user_message, &turns).await {
265 Ok(id) => {
266 tracing::info!(space_id = %id, "Space detected/created for message");
267 id
268 }
269 Err(e) => {
270 tracing::warn!(error = %e, "Space detection failed, using default");
271 sm.default_space_id()
272 }
273 }
274 } else {
275 uuid::Uuid::nil()
276 };
277
278 {
280 let mut buffer = self.conversation_buffer.write();
281 buffer.push_user(user_message);
282 }
283
284 self.publish_phase_started(&session_id, Phase::Interview)
286 .await;
287
288 let needs_interview;
290 let existing_history: Option<Vec<_>>;
291 {
292 let sessions = self.sessions.read();
293 needs_interview = !sessions.contains_key(&session_id);
294 existing_history = if !needs_interview {
295 sessions.get(&session_id).map(|s| s.interview.conversation_history.clone())
296 } else {
297 None
298 };
299 }
301
302 let interview = {
304 tracing::info!(phase = "interview", "Starting interview phase");
305 if needs_interview {
306 self.ouroboros.interview(user_message).await?
307 } else {
308 let multi_turn_context = {
311 let mut context_parts = Vec::new();
312 if let Some(ref history) = existing_history {
313 for exchange in history {
314 context_parts.push(format!("User: {}\nAgent: {}", exchange.user, exchange.agent));
315 }
316 }
317 context_parts.push(format!("User: {}", user_message));
318 context_parts.join("\n\n")
319 };
320
321 {
323 let mut sessions = self.sessions.write();
324 if let Some(s) = sessions.get_mut(&session_id) {
325 let last_q = s.interview.questions.last().cloned().unwrap_or_default();
326 s.interview.add_exchange(&last_q, user_message);
327 }
328 }
329
330 self.ouroboros.interview(&multi_turn_context).await?
332 }
333 };
334
335 if !interview.is_task {
337 tracing::info!(session_id = %session_id, "Chat response (non-task)");
338
339 let response_text = if interview.chat_response.is_empty() {
340 "Hello! How can I help you today?".to_string()
341 } else {
342 interview.chat_response.clone()
343 };
344
345 {
347 let mut buffer = self.conversation_buffer.write();
348 buffer.push_agent(&response_text, &space_id);
349 }
350
351 {
354 let mut sessions = self.sessions.write();
355 if let Some(session) = sessions.get_mut(&session_id) {
356 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
357 session.interview.add_to_history(user_message, &response_text);
358 } else {
359 let mut interview = InterviewResult::new();
361 interview.is_task = false;
362 interview.chat_response = response_text.clone();
363 interview.add_to_history(user_message, &response_text);
364 sessions.insert(
365 session_id.clone(),
366 InterviewSession {
367 id: session_id.clone(),
368 interview,
369 phase: Phase::Interview,
370 seed_id: None,
371 agent_id: None,
372 },
373 );
374 }
375 }
376
377 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
378 .await;
379
380 return Ok(OrchestrationResult {
381 session_id: Some(session_id.clone()),
382 space_id: Some(space_id),
383 space_tag: Some(space_tag.clone()),
384 response: response_text,
385 seed_id: None,
386 agent_id: None,
387 phase_reached: Phase::Interview,
388 evaluation_passed: false,
389 output: None,
390 });
391 }
392
393 if !interview.ready_for_seed {
395 {
397 let mut sessions = self.sessions.write();
398 let session = sessions.entry(session_id.clone()).or_insert_with(|| {
399 InterviewSession {
400 id: session_id.clone(),
401 interview: interview.clone(),
402 phase: Phase::Interview,
403 seed_id: None,
404 agent_id: None,
405 }
406 });
407 let questions_text = interview.questions.join("\n");
410 let last_answer = session.interview.answers.last().cloned();
411 if let Some(ref ans) = last_answer {
412 if !ans.is_empty() {
413 session.interview.add_to_history(ans, &questions_text);
414 }
415 }
416 } let questions = interview
419 .questions
420 .iter()
421 .filter(|q| !q.is_empty())
422 .cloned()
423 .collect::<Vec<_>>();
424
425 tracing::info!(
426 session_id = %session_id,
427 ambiguity = interview.ambiguity.ambiguity(),
428 questions = questions.len(),
429 "Interview needs clarification"
430 );
431
432 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
433 .await;
434
435 return Ok(OrchestrationResult {
436 session_id: Some(session_id.clone()),
437 space_id: Some(space_id),
438 space_tag: Some(space_tag.clone()),
439 response: format_questions(&questions),
440 seed_id: None,
441 agent_id: None,
442 phase_reached: Phase::Interview,
443 evaluation_passed: false,
444 output: None,
445 });
446 }
447
448 {
452 let mut buffer = self.conversation_buffer.write();
453 buffer.push_agent("[interview: questions]", &space_id);
454 }
455
456 self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
458 .await;
459 self.publish_phase_started(&session_id, Phase::Seed).await;
460
461 tracing::info!(phase = "seed", "Starting seed generation");
463 let seed = self.ouroboros.generate_seed(&interview).await?;
464
465 self.save_seed(&seed).await?;
467
468 self.event_bus
470 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
471
472 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
473 .await;
474 self.publish_phase_started(&session_id, Phase::Execute)
475 .await;
476
477 if should_split_seed(&seed) {
481 let subtasks = split_into_subtasks(&seed);
482 if subtasks.len() > 1 {
483 tracing::info!(
484 phase = "delegate",
485 subtasks = subtasks.len(),
486 "Delegating to multi-agent"
487 );
488 let results = self.delegate_subtasks(subtasks, &seed).await?;
489
490 let combined: String = results
492 .iter()
493 .filter(|r| r.success)
494 .filter_map(|r| r.result.as_deref())
495 .collect::<Vec<_>>()
496 .join("\n\n");
497
498 let all_passed = results.iter().all(|r| r.success);
499
500 {
502 let mut sessions = self.sessions.write();
503 sessions.remove(&session_id);
504 }
505
506 tracing::info!(
507 session_id = %session_id,
508 subtasks = results.len(),
509 passed = all_passed,
510 "Multi-agent orchestration complete"
511 );
512
513 return Ok(OrchestrationResult {
514 session_id: Some(session_id),
515 space_id: Some(space_id),
516 space_tag: Some(space_tag.clone()),
517 response: format_result_combined(&combined),
518 seed_id: Some(seed.id),
519 agent_id: None,
520 phase_reached: Phase::Execute,
521 evaluation_passed: all_passed,
522 output: Some(combined),
523 });
524 }
525 }
526
527 {
529 let mut buffer = self.conversation_buffer.write();
530 buffer.push_agent("[multi-agent: complete]", &space_id);
531 }
532
533 tracing::info!(phase = "execute", "Starting execution phase");
535 let exec_result = self
536 .lifecycle
537 .spawn_and_run(&seed, Priority::Normal)
538 .await?;
539
540 self.lifecycle.reap_zombies();
542
543 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
545 .await;
546 self.publish_phase_started(&session_id, Phase::Evaluate)
547 .await;
548
549 tracing::info!(phase = "evaluate", "Starting evaluation phase");
550 let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
551
552 self.publish_phase_completed(
553 &session_id,
554 Phase::Evaluate,
555 &format!("score={:.2}", evaluation.score),
556 )
557 .await;
558
559 self.event_bus.publish(KernelEvent::EvaluationComplete {
560 seed_id: seed.id,
561 passed: evaluation.all_passed(),
562 })?;
563
564 self.save_evaluation(&seed, &evaluation).await?;
566
567 let mut current_seed = Some(seed);
569 let mut current_evaluation = evaluation;
570 let mut iterations = 0;
571
572 while !current_evaluation.all_passed()
573 && current_evaluation.score < self.config.min_evaluation_score
574 && iterations < self.config.max_evolution_iterations
575 {
576 iterations += 1;
577 self.publish_phase_started(&session_id, Phase::Evolve).await;
578
579 tracing::info!(
580 phase = "evolve",
581 iteration = iterations,
582 max_iterations = self.config.max_evolution_iterations,
583 "Starting evolve phase"
584 );
585 let evolve_result = self
586 .ouroboros
587 .evolve(
588 current_seed.as_ref().expect("seed exists"),
589 ¤t_evaluation,
590 )
591 .await?;
592
593 if let Some(evolved) = evolve_result {
594 current_seed = Some(evolved.clone());
595
596 self.save_seed(&evolved).await?;
598 self.event_bus.publish(KernelEvent::SeedCreated {
599 seed_id: evolved.id,
600 })?;
601
602 self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
603 .await;
604 self.publish_phase_started(&session_id, Phase::Execute)
605 .await;
606
607 tracing::info!(
608 phase = "re-execute",
609 iteration = iterations,
610 "Re-executing with evolved seed"
611 );
612 let new_exec = self
613 .lifecycle
614 .spawn_and_run(&evolved, Priority::High)
615 .await?;
616
617 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
618 .await;
619 self.publish_phase_started(&session_id, Phase::Evaluate)
620 .await;
621
622 tracing::info!(
623 phase = "re-evaluate",
624 iteration = iterations,
625 "Re-evaluating evolved result"
626 );
627 let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
628 current_evaluation = new_eval;
629
630 self.publish_phase_completed(
631 &session_id,
632 Phase::Evaluate,
633 &format!("score={:.2}", current_evaluation.score),
634 )
635 .await;
636 self.save_evaluation(&evolved, ¤t_evaluation).await?;
638 } else {
639 self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
641 .await;
642 break;
643 }
644 }
645
646 {
648 let mut sessions = self.sessions.write();
649 sessions.remove(&session_id);
650 }
651
652 let final_seed = current_seed.expect("at least one seed exists");
653 let passed = current_evaluation.all_passed() || current_evaluation.score >= 0.7;
654
655 tracing::info!(
656 session_id = %session_id,
657 iterations,
658 score = current_evaluation.score,
659 passed,
660 "Orchestration complete"
661 );
662
663 let metrics = get_metrics();
665 metrics
666 .orch_duration
667 .observe(orch_start.elapsed().as_secs_f64());
668 if passed {
669 metrics.agents_completed.inc();
670 } else {
671 metrics.agents_failed.inc();
672 }
673
674 {
676 let mut buffer = self.conversation_buffer.write();
677 buffer.push_agent(&final_seed.goal, &space_id);
678 }
679
680 Ok(OrchestrationResult {
681 session_id: Some(session_id),
682 space_id: Some(space_id),
683 space_tag: Some(space_tag.clone()),
684 response: format_result(&final_seed, ¤t_evaluation),
685 seed_id: Some(final_seed.id),
686 agent_id: None,
687 phase_reached: Phase::Evaluate,
688 evaluation_passed: passed,
689 output: Some(current_evaluation.notes.join("; ")),
690 })
691 }
692
693 async fn save_seed(&self, seed: &Seed) -> Result<()> {
695 let key = seed.id.to_string();
696
697 self.state_store
698 .save_json("seeds", &key, seed)
699 .await
700 .context("failed to save seed to state store")?;
701
702 self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
703
704 Ok(())
705 }
706
707 async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
709 let key = format!("{}-eval", seed.id);
710
711 self.state_store
712 .save_json("evals", &key, evaluation)
713 .await
714 .context("failed to save evaluation to state store")?;
715
716 self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
717
718 Ok(())
719 }
720
721 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
723 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
724 session_id: session_id.to_owned(),
725 phase,
726 });
727 }
728
729 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
731 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
732 session_id: session_id.to_owned(),
733 phase,
734 result_summary: result.to_owned(),
735 });
736 }
737
738 pub async fn delegate_subtasks(
746 &self,
747 subtasks: Vec<SubTask>,
748 parent_seed: &Seed,
749 ) -> Result<Vec<SubTask>> {
750 if subtasks.len() == 1 {
752 return self.execute_single_subtask(subtasks, parent_seed).await;
753 }
754
755 if let Some(ref a2a) = self.a2a {
757 if !self.a2a_breaker.is_allowed() {
759 tracing::warn!(
760 state = ?self.a2a_breaker.state(),
761 "A2A circuit breaker open, using lifecycle fallback"
762 );
763 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
764 }
765
766 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
768 }
769
770 self.delegate_via_lifecycle(subtasks, parent_seed).await
772 }
773
774 async fn delegate_with_retry(
776 &self,
777 subtasks: Vec<SubTask>,
778 parent_seed: &Seed,
779 a2a: &Arc<crate::a2a::A2AProtocol>,
780 ) -> Result<Vec<SubTask>> {
781 let mut attempt = 0;
782 let max_retries = self.delegation_config.max_retries;
783
784 loop {
785 match self.delegate_via_a2a(subtasks.clone(), parent_seed, a2a).await {
786 Ok(results) => {
787 self.a2a_breaker.record_success();
788 return Ok(results);
789 }
790 Err(e) => {
791 self.a2a_breaker.record_failure();
792 attempt += 1;
793
794 if attempt >= max_retries {
795 tracing::error!(
796 attempts = attempt,
797 error = %e,
798 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
799 attempt
800 );
801 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
802 }
803
804 let delay = self.delegation_config.backoff_delay(attempt);
806 tracing::warn!(
807 attempt,
808 delay_ms = delay,
809 error = %e,
810 "A2A delegation failed, retrying with backoff"
811 );
812 tokio::time::sleep(Duration::from_millis(delay)).await;
813 }
814 }
815 }
816 }
817
818 async fn execute_single_subtask(
820 &self,
821 subtasks: Vec<SubTask>,
822 parent_seed: &Seed,
823 ) -> Result<Vec<SubTask>> {
824 let mut task = subtasks.into_iter().next().unwrap();
825 let child_seed = Seed {
826 id: Uuid::new_v4(),
827 goal: task.description.clone(),
828 constraints: parent_seed.constraints.clone(),
829 acceptance_criteria: vec!["Task completes successfully".into()],
830 ontology: parent_seed.ontology.clone(),
831 created_at: chrono::Utc::now(),
832 generation: parent_seed.generation + 1,
833 parent_seed_id: Some(parent_seed.id),
834 cspace_hint: None,
835 };
836 match self
837 .lifecycle
838 .spawn_and_run(&child_seed, Priority::Normal)
839 .await
840 {
841 Ok(result) => {
842 task.result = Some(result.output.clone());
843 }
844 Err(e) => {
845 task.result = Some(format!("Failed: {e}"));
846 task.success = false;
847 }
848 }
849 Ok(vec![task])
850 }
851
852 async fn delegate_via_a2a(
862 &self,
863 subtasks: Vec<SubTask>,
864 parent_seed: &Seed,
865 a2a: &Arc<crate::a2a::A2AProtocol>,
866 ) -> Result<Vec<SubTask>> {
867 use crate::a2a::TaskPriority;
868 use tokio::task::JoinSet;
869
870 tracing::info!(
871 subtasks = subtasks.len(),
872 "Delegating subtasks via A2A protocol"
873 );
874
875 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
876 let subtask_count = subtasks.len();
877 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
878
879 for (idx, subtask) in subtasks.into_iter().enumerate() {
880 let capability = subtask.required_capability.clone();
881 let description = subtask.description.clone();
882 let subtask_id = subtask.id;
883 let role = subtask.role.clone();
884 let a2a = Arc::clone(a2a);
885 let parent_seed = parent_seed.clone();
886 let lifecycle = self.lifecycle.clone();
887
888 join_set.spawn(async move {
889 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
891 a2a.query_capabilities(cap).await.ok()
892 .and_then(|agents| agents.into_iter().next())
893 } else {
894 None
895 };
896
897 let (output, success) = if let Some(ref target_card) = target {
898 let target_id = target_card.agent_id;
899 tracing::info!(
900 subtask_index = idx,
901 target = %target_card.name,
902 target_id = %target_id,
903 "A2A dispatching subtask"
904 );
905
906 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
907 "parent_seed": parent_seed.id.to_string(),
908 "goal": description,
909 }))
910 .with_priority(TaskPriority::Normal);
911
912 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
914
915 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
917 Some(Ok(result)) => {
918 let out = result.get("output")
919 .and_then(|v| v.as_str())
920 .unwrap_or("")
921 .to_string();
922 let ok = result.get("success")
923 .and_then(|v| v.as_bool())
924 .unwrap_or(false);
925 (out, ok)
926 }
927 Some(Err(e)) => {
928 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
929 (format!("Failed: {e}"), false)
930 }
931 None => {
932 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
934 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
935 }
936 }
937 } else {
938 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
939 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
940 };
941
942 (idx, SubTask {
943 id: subtask_id,
944 description,
945 required_capability: capability,
946 result: Some(output),
947 success,
948 role,
949 })
950 });
951 }
952
953 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
954 while let Some(join_result) = join_set.join_next().await {
955 match join_result {
956 Ok((idx, subtask)) => {
957 results[idx] = Some(subtask);
958 }
959 Err(e) => {
960 tracing::error!(error = %e, "A2A task panicked");
961 }
962 }
963 }
964
965 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
966 tracing::info!(
967 completed = completed.len(),
968 succeeded = completed.iter().filter(|r| r.success).count(),
969 "A2A delegation complete"
970 );
971 Ok(completed)
972 }
973
974 async fn delegate_via_lifecycle(
975 &self,
976 subtasks: Vec<SubTask>,
977 parent_seed: &Seed,
978 ) -> Result<Vec<SubTask>> {
979 use crate::agent_group::OxiosAgentGroup;
980 use tokio::task::JoinSet;
981
982 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
983 let group = OxiosAgentGroup::new(parent_seed, descriptions);
984 let group_id = group.id;
985
986 self.event_bus.publish(KernelEvent::AgentGroupCreated {
987 group_id,
988 agent_count: group.agents.len(),
989 })?;
990
991 tracing::info!(
992 group_id = %group_id,
993 agent_count = group.agents.len(),
994 "Starting parallel multi-agent execution"
995 );
996
997 let mut join_set: JoinSet<(
998 usize,
999 crate::types::AgentId,
1000 Result<oxios_ouroboros::ExecutionResult>,
1001 )> = JoinSet::new();
1002
1003 for (idx, agent_entry) in group.agents.iter().enumerate() {
1004 let child_seed = agent_entry.seed.clone();
1005 let agent_id = agent_entry.id;
1006 let lifecycle = self.lifecycle.clone();
1007
1008 join_set.spawn(async move {
1009 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1010 (idx, agent_id, result)
1011 });
1012 }
1013
1014 let subtask_count = subtasks.len();
1015 let mut completed = vec![None; subtask_count];
1016 while let Some(join_result) = join_set.join_next().await {
1017 match join_result {
1018 Ok((idx, agent_id, Ok(exec_result))) => {
1019 let _ = self
1020 .event_bus
1021 .publish(KernelEvent::AgentGroupMemberCompleted {
1022 group_id,
1023 agent_id,
1024 success: exec_result.success,
1025 });
1026 completed[idx] = Some(SubTask {
1027 id: subtasks[idx].id,
1028 description: subtasks[idx].description.clone(),
1029 required_capability: subtasks[idx].required_capability.clone(),
1030 result: Some(exec_result.output.clone()),
1031 success: exec_result.success,
1032 role: subtasks[idx].role.clone(),
1033 });
1034 }
1035 Ok((idx, agent_id, Err(e))) => {
1036 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1037 let _ = self
1038 .event_bus
1039 .publish(KernelEvent::AgentGroupMemberCompleted {
1040 group_id,
1041 agent_id,
1042 success: false,
1043 });
1044 completed[idx] = Some(SubTask {
1045 id: subtasks[idx].id,
1046 description: subtasks[idx].description.clone(),
1047 required_capability: subtasks[idx].required_capability.clone(),
1048 result: Some(format!("Failed: {e}")),
1049 success: false,
1050 role: subtasks[idx].role.clone(),
1051 });
1052 }
1053 Err(e) => {
1054 tracing::error!(error = %e, "JoinSet task panicked");
1055 }
1056 }
1057 }
1058
1059 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1060 let succeeded = completed.iter().filter(|r| r.success).count();
1061 let total = completed.len();
1062
1063 tracing::info!(
1064 group_id = %group_id,
1065 succeeded,
1066 total,
1067 "Parallel multi-agent execution complete"
1068 );
1069
1070 let _ = self
1072 .state_store
1073 .save_json("agent_groups", &group_id.to_string(), &group)
1074 .await;
1075 self.git_commit(
1076 &format!("agent_groups/{}.json", group_id),
1077 "orchestrator: save group",
1078 );
1079
1080 Ok(completed)
1081 }
1082}
1083
1084#[derive(Debug, Clone)]
1086#[allow(unused)]
1087struct InterviewSession {
1088 id: String,
1089 interview: InterviewResult,
1090 phase: Phase,
1091 seed_id: Option<Uuid>,
1092 agent_id: Option<AgentId>,
1093}
1094
1095#[derive(Debug, Clone, Serialize, Deserialize)]
1097pub struct OrchestrationResult {
1098 #[serde(skip_serializing_if = "Option::is_none")]
1100 pub session_id: Option<String>,
1101 #[serde(skip_serializing_if = "Option::is_none")]
1103 pub space_id: Option<Uuid>,
1104 #[serde(skip_serializing_if = "Option::is_none")]
1106 pub space_tag: Option<String>,
1107 pub response: String,
1109 #[serde(skip_serializing_if = "Option::is_none")]
1111 pub seed_id: Option<Uuid>,
1112 #[serde(skip_serializing_if = "Option::is_none")]
1114 pub agent_id: Option<AgentId>,
1115 pub phase_reached: Phase,
1117 pub evaluation_passed: bool,
1119 #[serde(skip_serializing_if = "Option::is_none")]
1121 pub output: Option<String>,
1122}
1123
1124fn format_questions(questions: &[String]) -> String {
1126 if questions.is_empty() {
1127 "I need a bit more clarification before I can proceed.".to_string()
1128 } else {
1129 format!(
1130 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1131 questions
1132 .iter()
1133 .enumerate()
1134 .map(|(i, q)| format!("{}. {}", i + 1, q))
1135 .collect::<Vec<_>>()
1136 .join("\n")
1137 )
1138 }
1139}
1140
1141fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
1143 let passed = evaluation.all_passed();
1144
1145 let mut lines = Vec::new();
1146 lines.push(format!("Task '{}' completed.", seed.goal));
1147
1148 if passed {
1149 lines.push("✅ Evaluation passed.".to_string());
1150 } else {
1151 lines.push(format!(
1152 "⚠️ Evaluation score: {:.0}%",
1153 evaluation.score * 100.0
1154 ));
1155 }
1156
1157 if !evaluation.notes.is_empty() {
1158 lines.push("\nNotes:".to_string());
1159 for note in &evaluation.notes {
1160 lines.push(format!("- {}", note));
1161 }
1162 }
1163
1164 lines.join("\n")
1165}
1166
1167fn should_split_seed(seed: &Seed) -> bool {
1172 seed.acceptance_criteria.len() >= 5
1176}
1177
1178fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1184 seed.acceptance_criteria
1185 .iter()
1186 .map(|criterion| {
1187 let desc = format!("{}: {}", seed.goal, criterion);
1188 let desc_lower = desc.to_lowercase();
1189
1190 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1192 Some("code-review".to_string())
1193 } else if desc_lower.contains("test") {
1194 Some("testing".to_string())
1195 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1196 Some("refactoring".to_string())
1197 } else if desc_lower.contains("write")
1198 || desc_lower.contains("create")
1199 || desc_lower.contains("implement")
1200 {
1201 Some("code-generation".to_string())
1202 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1203 Some("debugging".to_string())
1204 } else {
1205 None
1206 };
1207
1208 let mut subtask = SubTask::new(desc);
1209 subtask.required_capability = cap;
1210 subtask
1211 })
1212 .collect()
1213}
1214
1215fn format_result_combined(combined: &str) -> String {
1217 if combined.is_empty() {
1218 "No subtasks completed successfully.".to_string()
1219 } else {
1220 format!("Multi-agent execution completed:\n\n{}", combined)
1221 }
1222}
1223
1224async fn run_via_lifecycle(
1226 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1227 parent_seed: &Seed,
1228 description: &str,
1229) -> (String, bool) {
1230 let child_seed = Seed {
1231 id: Uuid::new_v4(),
1232 goal: description.to_string(),
1233 constraints: parent_seed.constraints.clone(),
1234 acceptance_criteria: vec!["Task completes successfully".into()],
1235 ontology: parent_seed.ontology.clone(),
1236 created_at: chrono::Utc::now(),
1237 generation: parent_seed.generation + 1,
1238 parent_seed_id: Some(parent_seed.id),
1239 cspace_hint: None,
1240 };
1241 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1242 Ok(result) => (result.output, result.success),
1243 Err(e) => (format!("Failed: {e}"), false),
1244 }
1245}