1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{
19 EvaluationResult, ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed,
20};
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::agent_lifecycle::AgentLifecycleManager;
26use crate::event_bus::{EventBus, KernelEvent};
27use crate::git_layer::GitLayer;
28use crate::metrics::get_metrics;
29use crate::project::{ConversationBuffer, ProjectManager};
30use crate::scheduler::Priority;
31use crate::state_store::StateStore;
32use crate::types::AgentId;
33
34#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
36pub enum AgentRole {
37 #[default]
39 Worker,
40 Manager,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct SubTask {
47 pub id: Uuid,
49 pub description: String,
51 pub required_capability: Option<String>,
53 pub result: Option<String>,
55 pub success: bool,
57 #[serde(default)]
59 pub role: AgentRole,
60}
61
62impl SubTask {
63 pub fn new(description: impl Into<String>) -> Self {
65 Self {
66 id: Uuid::new_v4(),
67 description: description.into(),
68 required_capability: None,
69 result: None,
70 success: false,
71 role: AgentRole::default(),
72 }
73 }
74
75 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
77 self.required_capability = Some(cap.into());
78 self
79 }
80}
81
82pub struct Orchestrator {
84 ouroboros: Arc<dyn OuroborosProtocol>,
85 event_bus: EventBus,
86 state_store: Arc<StateStore>,
87 git_layer: Option<Arc<GitLayer>>,
89 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
91 lifecycle: AgentLifecycleManager,
93 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
95 project_manager: RwLock<Option<Arc<ProjectManager>>>,
97 conversation_buffer: RwLock<ConversationBuffer>,
99 delegation_config: DelegationConfig,
101 a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
103 evolution_config: RwLock<EvolutionConfig>,
105}
106
107#[derive(Debug, Clone)]
109struct DelegationConfig {
110 max_retries: u32,
112 base_delay_ms: u64,
114 max_delay_ms: u64,
116 #[allow(dead_code)]
118 timeout_ms: u64,
119}
120
121impl Default for DelegationConfig {
122 fn default() -> Self {
123 Self {
124 max_retries: 3,
125 base_delay_ms: 100,
126 max_delay_ms: 5000,
127 timeout_ms: 5000,
128 }
129 }
130}
131
132impl DelegationConfig {
133 fn backoff_delay(&self, attempt: u32) -> u64 {
135 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
136 delay.min(self.max_delay_ms)
137 }
138}
139
140#[derive(Debug, Clone)]
142struct EvolutionConfig {
143 max_iterations: u32,
145 score_threshold: f64,
147 #[allow(dead_code)]
149 eval_cache_enabled: bool,
150}
151
152impl From<crate::config::OrchestratorConfig> for EvolutionConfig {
153 fn from(c: crate::config::OrchestratorConfig) -> Self {
154 Self {
155 max_iterations: c.max_evolution_iterations,
156 score_threshold: c.min_evaluation_score,
157 eval_cache_enabled: c.eval_cache_enabled,
158 }
159 }
160}
161
162impl Orchestrator {
163 pub fn new(
165 ouroboros: Arc<dyn OuroborosProtocol>,
166 event_bus: EventBus,
167 state_store: Arc<StateStore>,
168 lifecycle: AgentLifecycleManager,
169 ) -> Self {
170 Self::with_config(
171 ouroboros,
172 event_bus,
173 state_store,
174 lifecycle,
175 crate::config::OrchestratorConfig::default(),
176 )
177 }
178
179 pub fn with_config(
181 ouroboros: Arc<dyn OuroborosProtocol>,
182 event_bus: EventBus,
183 state_store: Arc<StateStore>,
184 lifecycle: AgentLifecycleManager,
185 config: crate::config::OrchestratorConfig,
186 ) -> Self {
187 let evolution_config = EvolutionConfig::from(config.clone());
188 Self {
189 ouroboros,
190 event_bus,
191 state_store,
192 git_layer: None,
193 sessions: RwLock::new(std::collections::HashMap::new()),
194 lifecycle,
195 a2a: None,
196 project_manager: RwLock::new(None),
197 conversation_buffer: RwLock::new(ConversationBuffer::default()),
198 delegation_config: DelegationConfig::default(),
199 a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
200 evolution_config: RwLock::new(evolution_config),
201 }
202 }
203
204 pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
206 *self.project_manager.write() = Some(manager);
207 }
208
209 pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
211 self.project_manager.read().as_ref().cloned()
212 }
213
214 pub fn detect_project_tag(&self, message: &str) -> Option<String> {
216 self.project_manager.read().as_ref().and_then(|pm| {
217 let projects = pm.list_projects();
218 let result = crate::project::detect_project(message, &projects);
219 match result {
220 crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
221 crate::project::DetectionResult::NoMatch { .. } => None,
222 }
223 })
224 }
225
226 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
228 self.a2a = Some(a2a);
229 }
230
231 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
233 self.git_layer = Some(git_layer);
234 }
235
236 pub fn update_evolution_config(&self, config: crate::config::OrchestratorConfig) {
240 *self.evolution_config.write() = EvolutionConfig::from(config);
241 tracing::info!("Orchestrator evolution config hot-reloaded");
242 }
243
244 pub async fn restore_sessions(&self) {
251 let summaries = match self.state_store.list_sessions().await {
252 Ok(s) => s,
253 Err(e) => {
254 tracing::warn!(error = %e, "Failed to list sessions for restore");
255 return;
256 }
257 };
258
259 let mut restored = 0usize;
260 for summary in &summaries {
261 let Some(ref seed_id_str) = summary.active_seed_id else {
263 continue;
264 };
265
266 let session_id = crate::state_store::SessionId(summary.id.clone());
267 let session = match self.state_store.load_session(&session_id).await {
268 Ok(Some(s)) => s,
269 Ok(None) => continue,
270 Err(e) => {
271 tracing::warn!(
272 session_id = %summary.id,
273 error = %e,
274 "Failed to load session for restore"
275 );
276 continue;
277 }
278 };
279
280 let mut interview = oxios_ouroboros::InterviewResult::new();
284 interview.is_task = true; interview.original_message = session
286 .user_messages
287 .last()
288 .map(|m| m.content.clone())
289 .unwrap_or_default();
290
291 let history: Vec<oxios_ouroboros::interview::Exchange> = session
293 .user_messages
294 .iter()
295 .zip(session.agent_responses.iter())
296 .map(|(user, agent)| oxios_ouroboros::interview::Exchange {
297 user: user.content.clone(),
298 agent: agent.content.clone(),
299 })
300 .collect();
301 interview.conversation_history = history;
302
303 let seed_id = seed_id_str.parse::<Uuid>().ok();
304
305 let interview_session = InterviewSession {
306 id: session.id.0.clone(),
307 interview,
308 phase: Phase::Execute,
309 seed_id,
310 agent_id: None,
311 };
312
313 {
314 let mut sessions = self.sessions.write();
315 sessions.insert(session.id.0.clone(), interview_session);
316 }
317
318 restored += 1;
319 }
320
321 if restored > 0 {
322 tracing::info!(restored, total = summaries.len(), "Sessions restored");
323 }
324 }
325
326 fn git_commit(&self, rel_path: &str, message: &str) {
328 if let Some(ref gl) = self.git_layer
329 && gl.is_enabled()
330 {
331 let _ = gl.commit_file(rel_path, message);
332 }
333 }
334
335 pub async fn handle_message(
344 &self,
345 user_id: &str,
346 user_message: &str,
347 session_id: Option<&str>,
348 project_ids: Option<&str>,
349 request_id: &str,
350 ) -> Result<OrchestrationResult> {
351 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
352 get_metrics().messages.inc();
353 let orch_start = std::time::Instant::now();
354
355 let session_id = session_id
356 .map(String::from)
357 .unwrap_or_else(|| Uuid::new_v4().to_string());
358
359 tracing::info!(session_id = %session_id, user_id = %user_id, request_id = %request_id, content_len = user_message.len(), "Orchestrator handling message");
360
361 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
364 ids_str
366 .split(',')
367 .next()
368 .and_then(|s| Uuid::parse_str(s.trim()).ok())
369 } else {
370 self.detect_project_tag(user_message).and_then(|_tag| {
372 self.project_manager().and_then(|pm| {
374 let projects = pm.list_projects();
375 let result = crate::project::detect_project(user_message, &projects);
376 match result {
377 crate::project::DetectionResult::Found(id) => Some(id),
378 crate::project::DetectionResult::NoMatch { .. } => None,
379 }
380 })
381 })
382 };
383
384 let project_tag = primary_project_id
386 .and_then(|id| {
387 self.project_manager()
388 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
389 })
390 .unwrap_or_default();
391
392 if let Some(pid) = primary_project_id
394 && let Some(pm) = self.project_manager()
395 {
396 pm.touch(pid);
397 }
398
399 let _conversation_turns = {
400 let buffer = self.conversation_buffer.read();
401 buffer.turns().iter().cloned().collect::<Vec<_>>()
402 };
403
404 {
406 let mut buffer = self.conversation_buffer.write();
407 buffer.push_user(user_message);
408 }
409
410 self.publish_phase_started(&session_id, Phase::Interview)
412 .await;
413
414 let needs_interview;
416 let existing_history: Option<Vec<_>>;
417 {
418 let sessions = self.sessions.read();
419 needs_interview = !sessions.contains_key(&session_id);
420 existing_history = if !needs_interview {
421 sessions
422 .get(&session_id)
423 .map(|s| s.interview.conversation_history.clone())
424 } else {
425 None
426 };
427 }
429
430 let interview = {
432 tracing::info!(phase = "interview", "Starting interview phase");
433 if needs_interview {
434 self.ouroboros.interview(user_message).await?
435 } else {
436 let multi_turn_context = {
439 let mut context_parts = Vec::new();
440 if let Some(ref history) = existing_history {
441 for exchange in history {
442 context_parts.push(format!(
443 "User: {}\nAgent: {}",
444 exchange.user, exchange.agent
445 ));
446 }
447 }
448 context_parts.push(format!("User: {user_message}"));
449 context_parts.join("\n\n")
450 };
451
452 {
457 let mut sessions = self.sessions.write();
458 if let Some(s) = sessions.get_mut(&session_id) {
459 let all_questions = s.interview.questions.join("\n");
460 s.interview.add_to_history(user_message, &all_questions);
461 }
462 }
463
464 self.ouroboros.interview(&multi_turn_context).await?
466 }
467 };
468
469 if !interview.is_task {
471 tracing::info!(session_id = %session_id, "Chat response (non-task)");
472
473 let response_text = if interview.chat_response.is_empty() {
474 "Hello! How can I help you today?".to_string()
475 } else {
476 interview.chat_response.clone()
477 };
478
479 {
481 let mut buffer = self.conversation_buffer.write();
482 buffer.push_agent(&response_text, None);
483 }
484
485 {
488 let mut sessions = self.sessions.write();
489 if let Some(session) = sessions.get_mut(&session_id) {
490 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
491 session
492 .interview
493 .add_to_history(user_message, &response_text);
494 } else {
495 let mut interview = InterviewResult::new();
497 interview.is_task = false;
498 interview.chat_response = response_text.clone();
499 interview.add_to_history(user_message, &response_text);
500 sessions.insert(
501 session_id.clone(),
502 InterviewSession {
503 id: session_id.clone(),
504 interview,
505 phase: Phase::Interview,
506 seed_id: None,
507 agent_id: None,
508 },
509 );
510 }
511 }
512
513 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
514 .await;
515
516 return Ok(OrchestrationResult {
517 session_id: Some(session_id.clone()),
518 primary_project_id,
519 project_tag: Some(project_tag.clone()),
520 response: response_text,
521 seed_id: None,
522 agent_id: None,
523 phase_reached: Phase::Interview,
524 evaluation_passed: None,
525 output: None,
526 tool_calls: vec![],
527 interview_questions: None,
528 interview_round: None,
529 interview_ambiguity: None,
530 mode: "ouroboros".to_string(),
531 });
532 }
533
534 if !interview.ready_for_seed {
536 {
538 let mut sessions = self.sessions.write();
539 let session =
540 sessions
541 .entry(session_id.clone())
542 .or_insert_with(|| InterviewSession {
543 id: session_id.clone(),
544 interview: interview.clone(),
545 phase: Phase::Interview,
546 seed_id: None,
547 agent_id: None,
548 });
549
550 let questions_text = interview.questions.join("\n");
551
552 let is_first_round = session.interview.conversation_history.is_empty();
557 if is_first_round {
558 let original = if interview.original_message.is_empty() {
559 user_message.to_string()
560 } else {
561 interview.original_message.clone()
562 };
563 session.interview.add_to_history(&original, &questions_text);
564 } else {
565 let last_answer = session.interview.answers.last().cloned();
567 if let Some(ref ans) = last_answer
568 && !ans.is_empty()
569 {
570 session.interview.add_to_history(ans, &questions_text);
571 }
572 }
573 } let questions = interview
576 .questions
577 .iter()
578 .filter(|q| !q.is_empty())
579 .cloned()
580 .collect::<Vec<_>>();
581
582 tracing::info!(
583 session_id = %session_id,
584 ambiguity = interview.ambiguity.ambiguity(),
585 questions = questions.len(),
586 "Interview needs clarification"
587 );
588
589 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
590 .await;
591
592 let structured_input = if existing_history.is_some() {
602 let mut context_parts = Vec::new();
605 if let Some(ref history) = existing_history {
606 for exchange in history {
607 context_parts.push(format!(
608 "User: {}\nAgent: {}",
609 exchange.user, exchange.agent
610 ));
611 }
612 }
613 context_parts.push(format!("User: {user_message}"));
614 context_parts.join("\n\n")
615 } else {
616 user_message.to_string()
617 };
618
619 let mut structured = match self.ouroboros.interview_structured(&structured_input).await
620 {
621 Ok(Some(s)) if !s.is_empty() => Some(s),
622 Ok(_) => None,
623 Err(e) => {
624 tracing::warn!(error = %e, "interview_structured failed; falling back to markdown");
625 None
626 }
627 };
628
629 if structured.is_none() && !questions.is_empty() {
635 structured = Some(
636 questions
637 .iter()
638 .enumerate()
639 .map(
640 |(i, q)| oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput {
641 id: format!("q{}", i + 1),
642 text: q.clone(),
643 kind: "free_text".to_string(),
644 options: vec![],
645 },
646 )
647 .collect(),
648 );
649 }
650
651 let interview_round = {
654 let sessions = self.sessions.read();
655 sessions
656 .get(&session_id)
657 .map(|s| (s.interview.answers.len().saturating_sub(1)).max(1) as u32)
658 .unwrap_or(1)
659 };
660
661 return Ok(OrchestrationResult {
662 session_id: Some(session_id.clone()),
663 primary_project_id,
664 project_tag: Some(project_tag.clone()),
665 response: format_questions(&questions),
666 seed_id: None,
667 agent_id: None,
668 phase_reached: Phase::Interview,
669 evaluation_passed: None,
670 output: None,
671 tool_calls: vec![],
672 interview_questions: structured,
673 interview_round: Some(interview_round),
674 interview_ambiguity: Some(interview.ambiguity.ambiguity()),
675 mode: "ouroboros".to_string(),
676 });
677 }
678
679 {
683 let mut buffer = self.conversation_buffer.write();
684 buffer.push_agent("[interview: ready]", None);
685 }
686
687 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
689 .await;
690 self.publish_phase_started(&session_id, Phase::Seed).await;
691
692 let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
698
699 let mut seed = if is_simple {
700 tracing::info!(
701 phase = "seed",
702 method = "from_message",
703 "Simple task â ad-hoc seed"
704 );
705 Seed::from_message(&interview.original_message)
706 } else {
707 tracing::info!(
708 phase = "seed",
709 method = "llm",
710 "Complex task â LLM-generated seed"
711 );
712 self.ouroboros.generate_seed(&interview).await?
713 };
714 seed.project_id = primary_project_id;
715
716 self.save_seed(&seed).await?;
718
719 self.event_bus
721 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
722
723 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
724 .await;
725 self.publish_phase_started(&session_id, Phase::Execute)
726 .await;
727
728 if should_split_seed(&seed) {
732 let subtasks = split_into_subtasks(&seed);
733 if subtasks.len() > 1 {
734 tracing::info!(
735 phase = "delegate",
736 subtasks = subtasks.len(),
737 "Delegating to multi-agent"
738 );
739 let results = self.delegate_subtasks(subtasks, &seed).await?;
740
741 let combined: String = results
743 .iter()
744 .filter(|r| r.success)
745 .filter_map(|r| r.result.as_deref())
746 .collect::<Vec<_>>()
747 .join("\n\n");
748
749 let all_passed = results.iter().all(|r| r.success);
750
751 {
753 let mut sessions = self.sessions.write();
754 sessions.remove(&session_id);
755 }
756
757 tracing::info!(
758 session_id = %session_id,
759 subtasks = results.len(),
760 passed = all_passed,
761 "Multi-agent orchestration complete"
762 );
763
764 return Ok(OrchestrationResult {
765 session_id: Some(session_id),
766 primary_project_id,
767 project_tag: Some(project_tag.clone()),
768 response: format_result_combined(&combined),
769 seed_id: Some(seed.id),
770 agent_id: None,
771 phase_reached: Phase::Execute,
772 evaluation_passed: Some(all_passed),
773 output: Some(combined),
774 tool_calls: vec![],
775 interview_questions: None,
776 interview_round: None,
777 interview_ambiguity: None,
778 mode: "ouroboros".to_string(),
779 });
780 }
781 }
782
783 {
785 let mut buffer = self.conversation_buffer.write();
786 buffer.push_agent("[multi-agent: complete]", None);
787 }
788
789 tracing::info!(phase = "execute", "Starting execution phase");
791 let exec_result = self
792 .lifecycle
793 .spawn_and_run(&seed, Priority::Normal)
794 .await?;
795
796 self.lifecycle.reap_zombies();
798
799 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
800 .await;
801
802 let (final_result, final_seed, passed, phase_reached) = if let Some(ref schema) =
809 seed.output_schema
810 {
811 let passed = match oxi_sdk::StructuredOutput::extract(
813 &exec_result.output,
814 &oxi_sdk::OutputMode::ValidatedJson {
815 schema: schema.clone(),
816 },
817 ) {
818 Ok(_) => {
819 tracing::info!(session_id = %session_id, "Structured output validation passed");
820 true
821 }
822 Err(e) => {
823 tracing::warn!(session_id = %session_id, error = %e, "Structured output validation failed");
824 false
825 }
826 };
827 (exec_result, seed.clone(), passed, Phase::Execute)
828 } else if self.should_evaluate(&seed) {
829 self.publish_phase_started(&session_id, Phase::Evaluate)
831 .await;
832
833 let (result, eval, evolved_seed) = self
834 .run_evolution_loop(&session_id, &seed, exec_result)
835 .await?;
836
837 let passed =
838 eval.all_passed() && eval.score >= self.evolution_config.read().score_threshold;
839
840 self.publish_phase_completed(
841 &session_id,
842 Phase::Evaluate,
843 &format!("score={:.2}", eval.score),
844 )
845 .await;
846
847 let reached = if evolved_seed.generation > 0 {
848 Phase::Evolve
849 } else {
850 Phase::Evaluate
851 };
852
853 (result, evolved_seed, passed, reached)
854 } else {
855 let passed = exec_result.success;
857 (exec_result, seed.clone(), passed, Phase::Execute)
858 };
859
860 {
862 let mut sessions = self.sessions.write();
863 sessions.remove(&session_id);
864 }
865
866 tracing::info!(
867 session_id = %session_id,
868 passed,
869 phase = %phase_reached,
870 "Orchestration complete"
871 );
872
873 let metrics = get_metrics();
875 metrics
876 .orch_duration
877 .observe(orch_start.elapsed().as_secs_f64());
878 if passed {
879 metrics.agents_completed.inc();
880 } else {
881 metrics.agents_failed.inc();
882 }
883
884 {
886 let mut buffer = self.conversation_buffer.write();
887 buffer.push_agent(&final_seed.goal, None);
888 }
889
890 Ok(OrchestrationResult {
891 session_id: Some(session_id),
892 primary_project_id,
893 project_tag: Some(project_tag.clone()),
894 response: format_execution_result(&final_seed, &final_result),
895 seed_id: Some(final_seed.id),
896 agent_id: None,
897 phase_reached,
898 evaluation_passed: Some(passed),
899 output: Some(final_result.output.clone()),
900 tool_calls: final_result.tool_calls.clone(),
901 interview_questions: None,
902 interview_round: None,
903 interview_ambiguity: None,
904 mode: "ouroboros".to_string(),
905 })
906 }
907
908 fn should_evaluate(&self, seed: &Seed) -> bool {
913 !seed.acceptance_criteria.is_empty() && seed.output_schema.is_none()
914 }
915
916 pub async fn chat(
920 &self,
921 _user_id: &str,
922 user_message: &str,
923 session_id: Option<&str>,
924 project_ids: Option<&str>,
925 request_id: &str,
926 ) -> Result<OrchestrationResult> {
927 tracing::info!(name = "orchestrator.chat", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
928 let metrics = get_metrics();
929 metrics.messages.inc();
930 let orch_start = std::time::Instant::now();
931
932 let session_id = session_id
933 .map(String::from)
934 .unwrap_or_else(|| Uuid::new_v4().to_string());
935
936 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
938 ids_str
939 .split(',')
940 .next()
941 .and_then(|s| Uuid::parse_str(s.trim()).ok())
942 } else {
943 self.detect_project_tag(user_message).and_then(|_tag| {
944 self.project_manager().and_then(|pm| {
945 let projects = pm.list_projects();
946 let result = crate::project::detect_project(user_message, &projects);
947 match result {
948 crate::project::DetectionResult::Found(id) => Some(id),
949 crate::project::DetectionResult::NoMatch { .. } => None,
950 }
951 })
952 })
953 };
954
955 let project_tag = primary_project_id
956 .and_then(|id| {
957 self.project_manager()
958 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
959 })
960 .unwrap_or_default();
961
962 let mut seed = Seed::from_message(user_message);
964 seed.project_id = primary_project_id;
965
966 tracing::info!(
968 phase = "execute",
969 mode = "chat",
970 "Starting direct execution"
971 );
972 let exec_result = self
973 .lifecycle
974 .spawn_and_run(&seed, Priority::Normal)
975 .await?;
976 self.lifecycle.reap_zombies();
977
978 let metrics = get_metrics();
979 metrics
980 .orch_duration
981 .observe(orch_start.elapsed().as_secs_f64());
982 if exec_result.success {
983 metrics.agents_completed.inc();
984 } else {
985 metrics.agents_failed.inc();
986 }
987
988 Ok(OrchestrationResult {
989 session_id: Some(session_id),
990 primary_project_id,
991 project_tag: Some(project_tag),
992 response: exec_result.output.clone(),
993 seed_id: Some(seed.id),
994 agent_id: None,
995 phase_reached: Phase::Execute,
996 evaluation_passed: None,
997 output: Some(exec_result.output),
998 tool_calls: exec_result.tool_calls,
999 interview_questions: None,
1000 interview_round: None,
1001 interview_ambiguity: None,
1002 mode: "chat".to_string(),
1003 })
1004 }
1005
1006 async fn execute_seed(&self, seed: &Seed) -> Result<ExecutionResult> {
1008 self.lifecycle.spawn_and_run(seed, Priority::Normal).await
1009 }
1010
1011 async fn run_evolution_loop(
1016 &self,
1017 _session_id: &str,
1018 seed: &Seed,
1019 initial_result: ExecutionResult,
1020 ) -> Result<(ExecutionResult, EvaluationResult, Seed)> {
1021 let max_iterations = self.evolution_config.read().max_iterations;
1022 let threshold = self.evolution_config.read().score_threshold;
1023
1024 let mut current_seed = seed.clone();
1025 let mut current_result = initial_result;
1026
1027 let mut best_result = current_result.clone();
1029 let mut best_seed = current_seed.clone();
1030 let mut best_eval: Option<EvaluationResult> = None;
1031
1032 for iteration in 0..=max_iterations {
1033 let evaluation = self
1035 .ouroboros
1036 .evaluate(¤t_seed, ¤t_result)
1037 .await?;
1038
1039 tracing::info!(
1040 iteration,
1041 seed_id = %current_seed.id,
1042 score = evaluation.score,
1043 passed = evaluation.all_passed(),
1044 "Evaluation complete"
1045 );
1046
1047 let _ = self.event_bus.publish(KernelEvent::EvaluationComplete {
1048 seed_id: current_seed.id,
1049 passed: evaluation.all_passed(),
1050 });
1051
1052 if best_eval
1054 .as_ref()
1055 .is_none_or(|b| evaluation.score >= b.score)
1056 {
1057 best_result = current_result.clone();
1058 best_seed = current_seed.clone();
1059 best_eval = Some(evaluation.clone());
1060 }
1061
1062 if evaluation.score >= threshold || iteration == max_iterations {
1064 if iteration == max_iterations && max_iterations > 0 {
1065 let _ = self.event_bus.publish(KernelEvent::EvolutionMaxReached {
1066 seed_id: current_seed.id,
1067 final_score: evaluation.score,
1068 iterations: iteration,
1069 });
1070 }
1071 return Ok((
1072 best_result,
1073 best_eval.ok_or_else(|| {
1074 anyhow::anyhow!(
1075 "Evolve loop exited with threshold met but no evaluation was produced"
1076 )
1077 })?,
1078 best_seed,
1079 ));
1080 }
1081
1082 if max_iterations == 0 {
1084 return Ok((
1085 best_result,
1086 best_eval.ok_or_else(|| {
1087 anyhow::anyhow!("No iterations configured and no evaluation was produced")
1088 })?,
1089 best_seed,
1090 ));
1091 }
1092
1093 let evolved = self.ouroboros.evolve(¤t_seed, &evaluation).await?;
1095 match evolved {
1096 Some(new_seed) => {
1097 tracing::info!(
1098 old_seed_id = %current_seed.id,
1099 new_seed_id = %new_seed.id,
1100 iteration,
1101 "Seed evolved, re-executing"
1102 );
1103
1104 let _ = self.event_bus.publish(KernelEvent::EvolutionStarted {
1105 seed_id: current_seed.id,
1106 new_seed_id: new_seed.id,
1107 iteration,
1108 });
1109
1110 self.save_seed(&new_seed).await?;
1112
1113 current_seed = new_seed;
1114 current_result = self.execute_seed(¤t_seed).await?;
1115 }
1116 None => {
1117 tracing::info!(
1118 seed_id = %current_seed.id,
1119 "Evolve returned None, stopping loop"
1120 );
1121 return Ok((
1122 best_result,
1123 best_eval.ok_or_else(|| {
1124 anyhow::anyhow!(
1125 "Evolve returned no seed and no evaluation was produced"
1126 )
1127 })?,
1128 best_seed,
1129 ));
1130 }
1131 }
1132 }
1133
1134 unreachable!()
1136 }
1137
1138 async fn save_seed(&self, seed: &Seed) -> Result<()> {
1140 let key = seed.id.to_string();
1141
1142 self.state_store
1143 .save_json("seeds", &key, seed)
1144 .await
1145 .context("failed to save seed to state store")?;
1146
1147 self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
1148
1149 Ok(())
1150 }
1151
1152 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
1155 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
1156 session_id: session_id.to_owned(),
1157 phase,
1158 });
1159 }
1160
1161 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
1163 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
1164 session_id: session_id.to_owned(),
1165 phase,
1166 result_summary: result.to_owned(),
1167 });
1168 }
1169
1170 pub async fn delegate_subtasks(
1178 &self,
1179 subtasks: Vec<SubTask>,
1180 parent_seed: &Seed,
1181 ) -> Result<Vec<SubTask>> {
1182 if subtasks.len() == 1 {
1184 return self.execute_single_subtask(subtasks, parent_seed).await;
1185 }
1186
1187 if let Some(ref a2a) = self.a2a {
1189 if !self.a2a_breaker.is_allowed() {
1191 tracing::warn!(
1192 state = ?self.a2a_breaker.state(),
1193 "A2A circuit breaker open, using lifecycle fallback"
1194 );
1195 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1196 }
1197
1198 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
1200 }
1201
1202 self.delegate_via_lifecycle(subtasks, parent_seed).await
1204 }
1205
1206 async fn delegate_with_retry(
1208 &self,
1209 subtasks: Vec<SubTask>,
1210 parent_seed: &Seed,
1211 a2a: &Arc<crate::a2a::A2AProtocol>,
1212 ) -> Result<Vec<SubTask>> {
1213 let mut attempt = 0;
1214 let max_retries = self.delegation_config.max_retries;
1215
1216 loop {
1217 match self
1218 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
1219 .await
1220 {
1221 Ok(results) => {
1222 self.a2a_breaker.record_success();
1223 return Ok(results);
1224 }
1225 Err(e) => {
1226 self.a2a_breaker.record_failure();
1227 attempt += 1;
1228
1229 if attempt >= max_retries {
1230 tracing::error!(
1231 attempts = attempt,
1232 error = %e,
1233 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
1234 attempt
1235 );
1236 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1237 }
1238
1239 let delay = self.delegation_config.backoff_delay(attempt);
1241 tracing::warn!(
1242 attempt,
1243 delay_ms = delay,
1244 error = %e,
1245 "A2A delegation failed, retrying with backoff"
1246 );
1247 tokio::time::sleep(Duration::from_millis(delay)).await;
1248 }
1249 }
1250 }
1251 }
1252
1253 async fn execute_single_subtask(
1255 &self,
1256 subtasks: Vec<SubTask>,
1257 parent_seed: &Seed,
1258 ) -> Result<Vec<SubTask>> {
1259 let mut task = subtasks
1260 .into_iter()
1261 .next()
1262 .expect("execute_single_subtask is only called when subtasks is non-empty");
1263 let child_seed = Seed {
1264 id: Uuid::new_v4(),
1265 goal: task.description.clone(),
1266 constraints: parent_seed.constraints.clone(),
1267 acceptance_criteria: vec!["Task completes successfully".into()],
1268 ontology: parent_seed.ontology.clone(),
1269 created_at: chrono::Utc::now(),
1270 generation: parent_seed.generation + 1,
1271 parent_seed_id: Some(parent_seed.id),
1272 cspace_hint: None,
1273 original_request: parent_seed.original_request.clone(),
1274 output_schema: None,
1275 project_id: None,
1276 };
1277 match self
1278 .lifecycle
1279 .spawn_and_run(&child_seed, Priority::Normal)
1280 .await
1281 {
1282 Ok(result) => {
1283 task.result = Some(result.output.clone());
1284 }
1285 Err(e) => {
1286 task.result = Some(format!("Failed: {e}"));
1287 task.success = false;
1288 }
1289 }
1290 Ok(vec![task])
1291 }
1292
1293 async fn delegate_via_a2a(
1303 &self,
1304 subtasks: Vec<SubTask>,
1305 parent_seed: &Seed,
1306 a2a: &Arc<crate::a2a::A2AProtocol>,
1307 ) -> Result<Vec<SubTask>> {
1308 use crate::a2a::TaskPriority;
1309 use tokio::task::JoinSet;
1310
1311 tracing::info!(
1312 subtasks = subtasks.len(),
1313 "Delegating subtasks via A2A protocol"
1314 );
1315
1316 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
1317 let subtask_count = subtasks.len();
1318 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
1319
1320 for (idx, subtask) in subtasks.into_iter().enumerate() {
1321 let capability = subtask.required_capability.clone();
1322 let description = subtask.description.clone();
1323 let subtask_id = subtask.id;
1324 let role = subtask.role.clone();
1325 let a2a = Arc::clone(a2a);
1326 let parent_seed = parent_seed.clone();
1327 let lifecycle = self.lifecycle.clone();
1328
1329 join_set.spawn(async move {
1330 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
1332 a2a.query_capabilities(cap).await.ok()
1333 .and_then(|agents| agents.into_iter().next())
1334 } else {
1335 None
1336 };
1337
1338 let (output, success) = if let Some(ref target_card) = target {
1339 let target_id = target_card.agent_id;
1340 tracing::info!(
1341 subtask_index = idx,
1342 target = %target_card.name,
1343 target_id = %target_id,
1344 "A2A dispatching subtask"
1345 );
1346
1347 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
1348 "parent_seed": parent_seed.id.to_string(),
1349 "goal": description,
1350 }))
1351 .with_priority(TaskPriority::Normal);
1352
1353 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
1355
1356 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
1358 Some(Ok(result)) => {
1359 let out = result.get("output")
1360 .and_then(|v| v.as_str())
1361 .unwrap_or("")
1362 .to_string();
1363 let ok = result.get("success")
1364 .and_then(|v| v.as_bool())
1365 .unwrap_or(false);
1366 (out, ok)
1367 }
1368 Some(Err(e)) => {
1369 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
1370 (format!("Failed: {e}"), false)
1371 }
1372 None => {
1373 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
1375 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1376 }
1377 }
1378 } else {
1379 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
1380 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1381 };
1382
1383 (idx, SubTask {
1384 id: subtask_id,
1385 description,
1386 required_capability: capability,
1387 result: Some(output),
1388 success,
1389 role,
1390 })
1391 });
1392 }
1393
1394 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
1395 while let Some(join_result) = join_set.join_next().await {
1396 match join_result {
1397 Ok((idx, subtask)) => {
1398 results[idx] = Some(subtask);
1399 }
1400 Err(e) => {
1401 tracing::error!(error = %e, "A2A task panicked");
1402 }
1403 }
1404 }
1405
1406 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
1407 tracing::info!(
1408 completed = completed.len(),
1409 succeeded = completed.iter().filter(|r| r.success).count(),
1410 "A2A delegation complete"
1411 );
1412 Ok(completed)
1413 }
1414
1415 async fn delegate_via_lifecycle(
1416 &self,
1417 subtasks: Vec<SubTask>,
1418 parent_seed: &Seed,
1419 ) -> Result<Vec<SubTask>> {
1420 use crate::agent_group::OxiosAgentGroup;
1421 use tokio::task::JoinSet;
1422
1423 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
1424 let group = OxiosAgentGroup::new(parent_seed, descriptions);
1425 let group_id = group.id;
1426
1427 self.event_bus.publish(KernelEvent::AgentGroupCreated {
1428 group_id,
1429 agent_count: group.agents.len(),
1430 })?;
1431
1432 tracing::info!(
1433 group_id = %group_id,
1434 agent_count = group.agents.len(),
1435 "Starting parallel multi-agent execution"
1436 );
1437
1438 let mut join_set: JoinSet<(
1439 usize,
1440 crate::types::AgentId,
1441 Result<oxios_ouroboros::ExecutionResult>,
1442 )> = JoinSet::new();
1443
1444 for (idx, agent_entry) in group.agents.iter().enumerate() {
1445 let child_seed = agent_entry.seed.clone();
1446 let agent_id = agent_entry.id;
1447 let lifecycle = self.lifecycle.clone();
1448
1449 join_set.spawn(async move {
1450 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1451 (idx, agent_id, result)
1452 });
1453 }
1454
1455 let subtask_count = subtasks.len();
1456 let mut completed = vec![None; subtask_count];
1457 while let Some(join_result) = join_set.join_next().await {
1458 match join_result {
1459 Ok((idx, agent_id, Ok(exec_result))) => {
1460 let _ = self
1461 .event_bus
1462 .publish(KernelEvent::AgentGroupMemberCompleted {
1463 group_id,
1464 agent_id,
1465 success: exec_result.success,
1466 });
1467 completed[idx] = Some(SubTask {
1468 id: subtasks[idx].id,
1469 description: subtasks[idx].description.clone(),
1470 required_capability: subtasks[idx].required_capability.clone(),
1471 result: Some(exec_result.output.clone()),
1472 success: exec_result.success,
1473 role: subtasks[idx].role.clone(),
1474 });
1475 }
1476 Ok((idx, agent_id, Err(e))) => {
1477 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1478 let _ = self
1479 .event_bus
1480 .publish(KernelEvent::AgentGroupMemberCompleted {
1481 group_id,
1482 agent_id,
1483 success: false,
1484 });
1485 completed[idx] = Some(SubTask {
1486 id: subtasks[idx].id,
1487 description: subtasks[idx].description.clone(),
1488 required_capability: subtasks[idx].required_capability.clone(),
1489 result: Some(format!("Failed: {e}")),
1490 success: false,
1491 role: subtasks[idx].role.clone(),
1492 });
1493 }
1494 Err(e) => {
1495 tracing::error!(error = %e, "JoinSet task panicked");
1496 }
1497 }
1498 }
1499
1500 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1501 let succeeded = completed.iter().filter(|r| r.success).count();
1502 let total = completed.len();
1503
1504 tracing::info!(
1505 group_id = %group_id,
1506 succeeded,
1507 total,
1508 "Parallel multi-agent execution complete"
1509 );
1510
1511 let _ = self
1513 .state_store
1514 .save_json("agent_groups", &group_id.to_string(), &group)
1515 .await;
1516 self.git_commit(
1517 &format!("agent_groups/{group_id}.json"),
1518 "orchestrator: save group",
1519 );
1520
1521 Ok(completed)
1522 }
1523}
1524
1525#[derive(Debug, Clone)]
1527#[allow(unused)]
1528struct InterviewSession {
1529 id: String,
1530 interview: InterviewResult,
1531 phase: Phase,
1532 seed_id: Option<Uuid>,
1533 agent_id: Option<AgentId>,
1534}
1535
1536fn default_chat_mode() -> String {
1537 "chat".into()
1538}
1539
1540#[derive(Debug, Clone, Serialize, Deserialize)]
1542pub struct OrchestrationResult {
1543 #[serde(skip_serializing_if = "Option::is_none")]
1545 pub session_id: Option<String>,
1546 #[serde(skip_serializing_if = "Option::is_none")]
1548 pub primary_project_id: Option<Uuid>,
1549 #[serde(skip_serializing_if = "Option::is_none")]
1551 pub project_tag: Option<String>,
1552 pub response: String,
1554 #[serde(skip_serializing_if = "Option::is_none")]
1556 pub seed_id: Option<Uuid>,
1557 #[serde(skip_serializing_if = "Option::is_none")]
1559 pub agent_id: Option<AgentId>,
1560 pub phase_reached: Phase,
1562 pub evaluation_passed: Option<bool>,
1568 #[serde(skip_serializing_if = "Option::is_none")]
1570 pub output: Option<String>,
1571 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1573 pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1574 #[serde(default, skip_serializing_if = "Option::is_none")]
1582 pub interview_questions:
1583 Option<Vec<oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput>>,
1584 #[serde(default, skip_serializing_if = "Option::is_none")]
1587 pub interview_round: Option<u32>,
1588 #[serde(default, skip_serializing_if = "Option::is_none")]
1591 pub interview_ambiguity: Option<f64>,
1592 #[serde(default = "default_chat_mode")]
1594 pub mode: String,
1595}
1596
1597fn format_questions(questions: &[String]) -> String {
1599 if questions.is_empty() {
1600 "I need a bit more clarification before I can proceed.".to_string()
1601 } else {
1602 format!(
1603 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1604 questions
1605 .iter()
1606 .enumerate()
1607 .map(|(i, q)| format!("{}. {}", i + 1, q))
1608 .collect::<Vec<_>>()
1609 .join("\n")
1610 )
1611 }
1612}
1613
1614fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1617 let mut lines = Vec::new();
1618
1619 if exec.success {
1620 lines.push(format!("â
'{}'", seed.goal));
1621 } else {
1622 lines.push(format!(
1623 "â ī¸ '{}'ė(ëĨŧ) ėëíė§ë§ ėė í ėąęŗĩíė§ ëĒģíėĩëë¤.",
1624 seed.goal
1625 ));
1626 }
1627
1628 if !exec.output.is_empty() {
1630 let preview = if exec.output.len() > 500 {
1631 format!("{}...", &exec.output[..500])
1632 } else {
1633 exec.output.clone()
1634 };
1635 lines.push(String::new());
1636 lines.push(preview);
1637 }
1638
1639 lines.join("\n")
1640}
1641
1642fn should_split_seed(seed: &Seed) -> bool {
1647 seed.acceptance_criteria.len() >= 5
1651}
1652
1653fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1659 seed.acceptance_criteria
1660 .iter()
1661 .map(|criterion| {
1662 let desc = format!("{}: {}", seed.goal, criterion);
1663 let desc_lower = desc.to_lowercase();
1664
1665 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1667 Some("code-review".to_string())
1668 } else if desc_lower.contains("test") {
1669 Some("testing".to_string())
1670 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1671 Some("refactoring".to_string())
1672 } else if desc_lower.contains("write")
1673 || desc_lower.contains("create")
1674 || desc_lower.contains("implement")
1675 {
1676 Some("code-generation".to_string())
1677 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1678 Some("debugging".to_string())
1679 } else {
1680 None
1681 };
1682
1683 let mut subtask = SubTask::new(desc);
1684 subtask.required_capability = cap;
1685 subtask
1686 })
1687 .collect()
1688}
1689
1690fn format_result_combined(combined: &str) -> String {
1692 if combined.is_empty() {
1693 "No subtasks completed successfully.".to_string()
1694 } else {
1695 format!("Multi-agent execution completed:\n\n{combined}")
1696 }
1697}
1698
1699async fn run_via_lifecycle(
1701 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1702 parent_seed: &Seed,
1703 description: &str,
1704) -> (String, bool) {
1705 let child_seed = Seed {
1706 id: Uuid::new_v4(),
1707 goal: description.to_string(),
1708 constraints: parent_seed.constraints.clone(),
1709 acceptance_criteria: vec!["Task completes successfully".into()],
1710 ontology: parent_seed.ontology.clone(),
1711 created_at: chrono::Utc::now(),
1712 generation: parent_seed.generation + 1,
1713 parent_seed_id: Some(parent_seed.id),
1714 cspace_hint: None,
1715 original_request: parent_seed.original_request.clone(),
1716 output_schema: None,
1717 project_id: None,
1718 };
1719 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1720 Ok(result) => (result.output, result.success),
1721 Err(e) => (format!("Failed: {e}"), false),
1722 }
1723}