1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{
19 EvaluationResult, ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed,
20};
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::agent_lifecycle::AgentLifecycleManager;
26use crate::event_bus::{EventBus, KernelEvent};
27use crate::git_layer::GitLayer;
28use crate::metrics::get_metrics;
29use crate::project::{ConversationBuffer, ProjectManager};
30use crate::scheduler::Priority;
31use crate::state_store::StateStore;
32use crate::types::AgentId;
33
34#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
36pub enum AgentRole {
37 #[default]
39 Worker,
40 Manager,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct SubTask {
47 pub id: Uuid,
49 pub description: String,
51 pub required_capability: Option<String>,
53 pub result: Option<String>,
55 pub success: bool,
57 #[serde(default)]
59 pub role: AgentRole,
60}
61
62impl SubTask {
63 pub fn new(description: impl Into<String>) -> Self {
65 Self {
66 id: Uuid::new_v4(),
67 description: description.into(),
68 required_capability: None,
69 result: None,
70 success: false,
71 role: AgentRole::default(),
72 }
73 }
74
75 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
77 self.required_capability = Some(cap.into());
78 self
79 }
80}
81
82pub struct Orchestrator {
84 ouroboros: Arc<dyn OuroborosProtocol>,
85 event_bus: EventBus,
86 state_store: Arc<StateStore>,
87 git_layer: Option<Arc<GitLayer>>,
89 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
91 lifecycle: AgentLifecycleManager,
93 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
95 project_manager: RwLock<Option<Arc<ProjectManager>>>,
97 conversation_buffer: RwLock<ConversationBuffer>,
99 delegation_config: DelegationConfig,
101 a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
103 evolution_config: RwLock<EvolutionConfig>,
105}
106
107#[derive(Debug, Clone)]
109struct DelegationConfig {
110 max_retries: u32,
112 base_delay_ms: u64,
114 max_delay_ms: u64,
116 #[allow(dead_code)]
118 timeout_ms: u64,
119}
120
121impl Default for DelegationConfig {
122 fn default() -> Self {
123 Self {
124 max_retries: 3,
125 base_delay_ms: 100,
126 max_delay_ms: 5000,
127 timeout_ms: 5000,
128 }
129 }
130}
131
132impl DelegationConfig {
133 fn backoff_delay(&self, attempt: u32) -> u64 {
135 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
136 delay.min(self.max_delay_ms)
137 }
138}
139
140#[derive(Debug, Clone)]
142struct EvolutionConfig {
143 max_iterations: u32,
145 score_threshold: f64,
147 #[allow(dead_code)]
149 eval_cache_enabled: bool,
150}
151
152impl From<crate::config::OrchestratorConfig> for EvolutionConfig {
153 fn from(c: crate::config::OrchestratorConfig) -> Self {
154 Self {
155 max_iterations: c.max_evolution_iterations,
156 score_threshold: c.min_evaluation_score,
157 eval_cache_enabled: c.eval_cache_enabled,
158 }
159 }
160}
161
162impl Orchestrator {
163 pub fn new(
165 ouroboros: Arc<dyn OuroborosProtocol>,
166 event_bus: EventBus,
167 state_store: Arc<StateStore>,
168 lifecycle: AgentLifecycleManager,
169 ) -> Self {
170 Self::with_config(
171 ouroboros,
172 event_bus,
173 state_store,
174 lifecycle,
175 crate::config::OrchestratorConfig::default(),
176 )
177 }
178
179 pub fn with_config(
181 ouroboros: Arc<dyn OuroborosProtocol>,
182 event_bus: EventBus,
183 state_store: Arc<StateStore>,
184 lifecycle: AgentLifecycleManager,
185 config: crate::config::OrchestratorConfig,
186 ) -> Self {
187 let evolution_config = EvolutionConfig::from(config.clone());
188 Self {
189 ouroboros,
190 event_bus,
191 state_store,
192 git_layer: None,
193 sessions: RwLock::new(std::collections::HashMap::new()),
194 lifecycle,
195 a2a: None,
196 project_manager: RwLock::new(None),
197 conversation_buffer: RwLock::new(ConversationBuffer::default()),
198 delegation_config: DelegationConfig::default(),
199 a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
200 evolution_config: RwLock::new(evolution_config),
201 }
202 }
203
204 pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
206 *self.project_manager.write() = Some(manager);
207 }
208
209 pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
211 self.project_manager.read().as_ref().cloned()
212 }
213
214 pub fn detect_project_tag(&self, message: &str) -> Option<String> {
216 self.project_manager.read().as_ref().and_then(|pm| {
217 let projects = pm.list_projects();
218 let result = crate::project::detect_project(message, &projects);
219 match result {
220 crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
221 crate::project::DetectionResult::NoMatch { .. } => None,
222 }
223 })
224 }
225
226 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
228 self.a2a = Some(a2a);
229 }
230
231 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
233 self.git_layer = Some(git_layer);
234 }
235
236 pub fn update_evolution_config(&self, config: crate::config::OrchestratorConfig) {
240 *self.evolution_config.write() = EvolutionConfig::from(config);
241 tracing::info!("Orchestrator evolution config hot-reloaded");
242 }
243
244 pub async fn restore_sessions(&self) {
251 let summaries = match self.state_store.list_sessions().await {
252 Ok(s) => s,
253 Err(e) => {
254 tracing::warn!(error = %e, "Failed to list sessions for restore");
255 return;
256 }
257 };
258
259 let mut restored = 0usize;
260 for summary in &summaries {
261 let Some(ref seed_id_str) = summary.active_seed_id else {
263 continue;
264 };
265
266 let session_id = crate::state_store::SessionId(summary.id.clone());
267 let session = match self.state_store.load_session(&session_id).await {
268 Ok(Some(s)) => s,
269 Ok(None) => continue,
270 Err(e) => {
271 tracing::warn!(
272 session_id = %summary.id,
273 error = %e,
274 "Failed to load session for restore"
275 );
276 continue;
277 }
278 };
279
280 let mut interview = oxios_ouroboros::InterviewResult::new();
284 interview.is_task = true; interview.original_message = session
286 .user_messages
287 .last()
288 .map(|m| m.content.clone())
289 .unwrap_or_default();
290
291 let history: Vec<oxios_ouroboros::interview::Exchange> = session
293 .user_messages
294 .iter()
295 .zip(session.agent_responses.iter())
296 .map(|(user, agent)| oxios_ouroboros::interview::Exchange {
297 user: user.content.clone(),
298 agent: agent.content.clone(),
299 })
300 .collect();
301 interview.conversation_history = history;
302
303 let seed_id = seed_id_str.parse::<Uuid>().ok();
304
305 let interview_session = InterviewSession {
306 id: session.id.0.clone(),
307 interview,
308 phase: Phase::Execute,
309 seed_id,
310 agent_id: None,
311 };
312
313 {
314 let mut sessions = self.sessions.write();
315 sessions.insert(session.id.0.clone(), interview_session);
316 }
317
318 restored += 1;
319 }
320
321 if restored > 0 {
322 tracing::info!(restored, total = summaries.len(), "Sessions restored");
323 }
324 }
325
326 fn git_commit(&self, rel_path: &str, message: &str) {
328 if let Some(ref gl) = self.git_layer {
329 if gl.is_enabled() {
330 let _ = gl.commit_file(rel_path, message);
331 }
332 }
333 }
334
335 pub async fn handle_message(
344 &self,
345 user_id: &str,
346 user_message: &str,
347 session_id: Option<&str>,
348 project_ids: Option<&str>,
349 request_id: &str,
350 ) -> Result<OrchestrationResult> {
351 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
352 get_metrics().messages.inc();
353 let orch_start = std::time::Instant::now();
354
355 let session_id = session_id
356 .map(String::from)
357 .unwrap_or_else(|| Uuid::new_v4().to_string());
358
359 tracing::info!(session_id = %session_id, user_id = %user_id, request_id = %request_id, content_len = user_message.len(), "Orchestrator handling message");
360
361 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
364 ids_str
366 .split(',')
367 .next()
368 .and_then(|s| Uuid::parse_str(s.trim()).ok())
369 } else {
370 self.detect_project_tag(user_message).and_then(|_tag| {
372 self.project_manager().and_then(|pm| {
374 let projects = pm.list_projects();
375 let result = crate::project::detect_project(user_message, &projects);
376 match result {
377 crate::project::DetectionResult::Found(id) => Some(id),
378 crate::project::DetectionResult::NoMatch { .. } => None,
379 }
380 })
381 })
382 };
383
384 let project_tag = primary_project_id
386 .and_then(|id| {
387 self.project_manager()
388 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
389 })
390 .unwrap_or_default();
391
392 if let Some(pid) = primary_project_id {
394 if let Some(pm) = self.project_manager() {
395 pm.touch(pid);
396 }
397 }
398
399 let _conversation_turns = {
400 let buffer = self.conversation_buffer.read();
401 buffer.turns().iter().cloned().collect::<Vec<_>>()
402 };
403
404 {
406 let mut buffer = self.conversation_buffer.write();
407 buffer.push_user(user_message);
408 }
409
410 self.publish_phase_started(&session_id, Phase::Interview)
412 .await;
413
414 let needs_interview;
416 let existing_history: Option<Vec<_>>;
417 {
418 let sessions = self.sessions.read();
419 needs_interview = !sessions.contains_key(&session_id);
420 existing_history = if !needs_interview {
421 sessions
422 .get(&session_id)
423 .map(|s| s.interview.conversation_history.clone())
424 } else {
425 None
426 };
427 }
429
430 let interview = {
432 tracing::info!(phase = "interview", "Starting interview phase");
433 if needs_interview {
434 self.ouroboros.interview(user_message).await?
435 } else {
436 let multi_turn_context = {
439 let mut context_parts = Vec::new();
440 if let Some(ref history) = existing_history {
441 for exchange in history {
442 context_parts.push(format!(
443 "User: {}\nAgent: {}",
444 exchange.user, exchange.agent
445 ));
446 }
447 }
448 context_parts.push(format!("User: {user_message}"));
449 context_parts.join("\n\n")
450 };
451
452 {
454 let mut sessions = self.sessions.write();
455 if let Some(s) = sessions.get_mut(&session_id) {
456 let last_q = s.interview.questions.last().cloned().unwrap_or_default();
457 s.interview.add_exchange(&last_q, user_message);
458 }
459 }
460
461 self.ouroboros.interview(&multi_turn_context).await?
463 }
464 };
465
466 if !interview.is_task {
468 tracing::info!(session_id = %session_id, "Chat response (non-task)");
469
470 let response_text = if interview.chat_response.is_empty() {
471 "Hello! How can I help you today?".to_string()
472 } else {
473 interview.chat_response.clone()
474 };
475
476 {
478 let mut buffer = self.conversation_buffer.write();
479 buffer.push_agent(&response_text, None);
480 }
481
482 {
485 let mut sessions = self.sessions.write();
486 if let Some(session) = sessions.get_mut(&session_id) {
487 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
488 session
489 .interview
490 .add_to_history(user_message, &response_text);
491 } else {
492 let mut interview = InterviewResult::new();
494 interview.is_task = false;
495 interview.chat_response = response_text.clone();
496 interview.add_to_history(user_message, &response_text);
497 sessions.insert(
498 session_id.clone(),
499 InterviewSession {
500 id: session_id.clone(),
501 interview,
502 phase: Phase::Interview,
503 seed_id: None,
504 agent_id: None,
505 },
506 );
507 }
508 }
509
510 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
511 .await;
512
513 return Ok(OrchestrationResult {
514 session_id: Some(session_id.clone()),
515 primary_project_id,
516 project_tag: Some(project_tag.clone()),
517 response: response_text,
518 seed_id: None,
519 agent_id: None,
520 phase_reached: Phase::Interview,
521 evaluation_passed: false,
522 output: None,
523 tool_calls: vec![],
524 });
525 }
526
527 if !interview.ready_for_seed {
529 {
531 let mut sessions = self.sessions.write();
532 let session =
533 sessions
534 .entry(session_id.clone())
535 .or_insert_with(|| InterviewSession {
536 id: session_id.clone(),
537 interview: interview.clone(),
538 phase: Phase::Interview,
539 seed_id: None,
540 agent_id: None,
541 });
542 let questions_text = interview.questions.join("\n");
545 let last_answer = session.interview.answers.last().cloned();
546 if let Some(ref ans) = last_answer {
547 if !ans.is_empty() {
548 session.interview.add_to_history(ans, &questions_text);
549 }
550 }
551 } let questions = interview
554 .questions
555 .iter()
556 .filter(|q| !q.is_empty())
557 .cloned()
558 .collect::<Vec<_>>();
559
560 tracing::info!(
561 session_id = %session_id,
562 ambiguity = interview.ambiguity.ambiguity(),
563 questions = questions.len(),
564 "Interview needs clarification"
565 );
566
567 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
568 .await;
569
570 return Ok(OrchestrationResult {
571 session_id: Some(session_id.clone()),
572 primary_project_id,
573 project_tag: Some(project_tag.clone()),
574 response: format_questions(&questions),
575 seed_id: None,
576 agent_id: None,
577 phase_reached: Phase::Interview,
578 evaluation_passed: false,
579 output: None,
580 tool_calls: vec![],
581 });
582 }
583
584 {
588 let mut buffer = self.conversation_buffer.write();
589 buffer.push_agent("[interview: ready]", None);
590 }
591
592 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
594 .await;
595 self.publish_phase_started(&session_id, Phase::Seed).await;
596
597 let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
603
604 let seed = if is_simple {
605 tracing::info!(
606 phase = "seed",
607 method = "from_message",
608 "Simple task — ad-hoc seed"
609 );
610 Seed::from_message(&interview.original_message)
611 } else {
612 tracing::info!(
613 phase = "seed",
614 method = "llm",
615 "Complex task — LLM-generated seed"
616 );
617 self.ouroboros.generate_seed(&interview).await?
618 };
619
620 self.save_seed(&seed).await?;
622
623 self.event_bus
625 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
626
627 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
628 .await;
629 self.publish_phase_started(&session_id, Phase::Execute)
630 .await;
631
632 if should_split_seed(&seed) {
636 let subtasks = split_into_subtasks(&seed);
637 if subtasks.len() > 1 {
638 tracing::info!(
639 phase = "delegate",
640 subtasks = subtasks.len(),
641 "Delegating to multi-agent"
642 );
643 let results = self.delegate_subtasks(subtasks, &seed).await?;
644
645 let combined: String = results
647 .iter()
648 .filter(|r| r.success)
649 .filter_map(|r| r.result.as_deref())
650 .collect::<Vec<_>>()
651 .join("\n\n");
652
653 let all_passed = results.iter().all(|r| r.success);
654
655 {
657 let mut sessions = self.sessions.write();
658 sessions.remove(&session_id);
659 }
660
661 tracing::info!(
662 session_id = %session_id,
663 subtasks = results.len(),
664 passed = all_passed,
665 "Multi-agent orchestration complete"
666 );
667
668 return Ok(OrchestrationResult {
669 session_id: Some(session_id),
670 primary_project_id,
671 project_tag: Some(project_tag.clone()),
672 response: format_result_combined(&combined),
673 seed_id: Some(seed.id),
674 agent_id: None,
675 phase_reached: Phase::Execute,
676 evaluation_passed: all_passed,
677 output: Some(combined),
678 tool_calls: vec![],
679 });
680 }
681 }
682
683 {
685 let mut buffer = self.conversation_buffer.write();
686 buffer.push_agent("[multi-agent: complete]", None);
687 }
688
689 tracing::info!(phase = "execute", "Starting execution phase");
691 let exec_result = self
692 .lifecycle
693 .spawn_and_run(&seed, Priority::Normal)
694 .await?;
695
696 self.lifecycle.reap_zombies();
698
699 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
700 .await;
701
702 let (final_result, final_seed, passed, phase_reached) = if let Some(ref schema) =
709 seed.output_schema
710 {
711 let passed = match oxi_sdk::StructuredOutput::extract(
713 &exec_result.output,
714 &oxi_sdk::OutputMode::ValidatedJson {
715 schema: schema.clone(),
716 },
717 ) {
718 Ok(_) => {
719 tracing::info!(session_id = %session_id, "Structured output validation passed");
720 true
721 }
722 Err(e) => {
723 tracing::warn!(session_id = %session_id, error = %e, "Structured output validation failed");
724 false
725 }
726 };
727 (exec_result, seed.clone(), passed, Phase::Execute)
728 } else if self.should_evaluate(&seed) {
729 self.publish_phase_started(&session_id, Phase::Evaluate)
731 .await;
732
733 let (result, eval, evolved_seed) = self
734 .run_evolution_loop(&session_id, &seed, exec_result)
735 .await?;
736
737 let passed =
738 eval.all_passed() && eval.score >= self.evolution_config.read().score_threshold;
739
740 self.publish_phase_completed(
741 &session_id,
742 Phase::Evaluate,
743 &format!("score={:.2}", eval.score),
744 )
745 .await;
746
747 let reached = if evolved_seed.generation > 0 {
748 Phase::Evolve
749 } else {
750 Phase::Evaluate
751 };
752
753 (result, evolved_seed, passed, reached)
754 } else {
755 let passed = exec_result.success;
757 (exec_result, seed.clone(), passed, Phase::Execute)
758 };
759
760 {
762 let mut sessions = self.sessions.write();
763 sessions.remove(&session_id);
764 }
765
766 tracing::info!(
767 session_id = %session_id,
768 passed,
769 phase = %phase_reached,
770 "Orchestration complete"
771 );
772
773 let metrics = get_metrics();
775 metrics
776 .orch_duration
777 .observe(orch_start.elapsed().as_secs_f64());
778 if passed {
779 metrics.agents_completed.inc();
780 } else {
781 metrics.agents_failed.inc();
782 }
783
784 {
786 let mut buffer = self.conversation_buffer.write();
787 buffer.push_agent(&final_seed.goal, None);
788 }
789
790 Ok(OrchestrationResult {
791 session_id: Some(session_id),
792 primary_project_id,
793 project_tag: Some(project_tag.clone()),
794 response: format_execution_result(&final_seed, &final_result),
795 seed_id: Some(final_seed.id),
796 agent_id: None,
797 phase_reached,
798 evaluation_passed: passed,
799 output: Some(final_result.output.clone()),
800 tool_calls: final_result.tool_calls.clone(),
801 })
802 }
803
804 fn should_evaluate(&self, seed: &Seed) -> bool {
809 !seed.acceptance_criteria.is_empty() && seed.output_schema.is_none()
810 }
811
812 async fn execute_seed(&self, seed: &Seed) -> Result<ExecutionResult> {
814 self.lifecycle.spawn_and_run(seed, Priority::Normal).await
815 }
816
817 async fn run_evolution_loop(
822 &self,
823 _session_id: &str,
824 seed: &Seed,
825 initial_result: ExecutionResult,
826 ) -> Result<(ExecutionResult, EvaluationResult, Seed)> {
827 let max_iterations = self.evolution_config.read().max_iterations;
828 let threshold = self.evolution_config.read().score_threshold;
829
830 let mut current_seed = seed.clone();
831 let mut current_result = initial_result;
832
833 let mut best_result = current_result.clone();
835 let mut best_seed = current_seed.clone();
836 let mut best_eval: Option<EvaluationResult> = None;
837
838 for iteration in 0..=max_iterations {
839 let evaluation = self
841 .ouroboros
842 .evaluate(¤t_seed, ¤t_result)
843 .await?;
844
845 tracing::info!(
846 iteration,
847 seed_id = %current_seed.id,
848 score = evaluation.score,
849 passed = evaluation.all_passed(),
850 "Evaluation complete"
851 );
852
853 let _ = self.event_bus.publish(KernelEvent::EvaluationComplete {
854 seed_id: current_seed.id,
855 passed: evaluation.all_passed(),
856 });
857
858 if best_eval
860 .as_ref()
861 .is_none_or(|b| evaluation.score >= b.score)
862 {
863 best_result = current_result.clone();
864 best_seed = current_seed.clone();
865 best_eval = Some(evaluation.clone());
866 }
867
868 if evaluation.score >= threshold || iteration == max_iterations {
870 if iteration == max_iterations && max_iterations > 0 {
871 let _ = self.event_bus.publish(KernelEvent::EvolutionMaxReached {
872 seed_id: current_seed.id,
873 final_score: evaluation.score,
874 iterations: iteration,
875 });
876 }
877 return Ok((
878 best_result,
879 best_eval.ok_or_else(|| {
880 anyhow::anyhow!(
881 "Evolve loop exited with threshold met but no evaluation was produced"
882 )
883 })?,
884 best_seed,
885 ));
886 }
887
888 if max_iterations == 0 {
890 return Ok((
891 best_result,
892 best_eval.ok_or_else(|| {
893 anyhow::anyhow!("No iterations configured and no evaluation was produced")
894 })?,
895 best_seed,
896 ));
897 }
898
899 let evolved = self.ouroboros.evolve(¤t_seed, &evaluation).await?;
901 match evolved {
902 Some(new_seed) => {
903 tracing::info!(
904 old_seed_id = %current_seed.id,
905 new_seed_id = %new_seed.id,
906 iteration,
907 "Seed evolved, re-executing"
908 );
909
910 let _ = self.event_bus.publish(KernelEvent::EvolutionStarted {
911 seed_id: current_seed.id,
912 new_seed_id: new_seed.id,
913 iteration,
914 });
915
916 self.save_seed(&new_seed).await?;
918
919 current_seed = new_seed;
920 current_result = self.execute_seed(¤t_seed).await?;
921 }
922 None => {
923 tracing::info!(
924 seed_id = %current_seed.id,
925 "Evolve returned None, stopping loop"
926 );
927 return Ok((
928 best_result,
929 best_eval.ok_or_else(|| {
930 anyhow::anyhow!(
931 "Evolve returned no seed and no evaluation was produced"
932 )
933 })?,
934 best_seed,
935 ));
936 }
937 }
938 }
939
940 unreachable!()
942 }
943
944 async fn save_seed(&self, seed: &Seed) -> Result<()> {
946 let key = seed.id.to_string();
947
948 self.state_store
949 .save_json("seeds", &key, seed)
950 .await
951 .context("failed to save seed to state store")?;
952
953 self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
954
955 Ok(())
956 }
957
958 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
961 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
962 session_id: session_id.to_owned(),
963 phase,
964 });
965 }
966
967 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
969 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
970 session_id: session_id.to_owned(),
971 phase,
972 result_summary: result.to_owned(),
973 });
974 }
975
976 pub async fn delegate_subtasks(
984 &self,
985 subtasks: Vec<SubTask>,
986 parent_seed: &Seed,
987 ) -> Result<Vec<SubTask>> {
988 if subtasks.len() == 1 {
990 return self.execute_single_subtask(subtasks, parent_seed).await;
991 }
992
993 if let Some(ref a2a) = self.a2a {
995 if !self.a2a_breaker.is_allowed() {
997 tracing::warn!(
998 state = ?self.a2a_breaker.state(),
999 "A2A circuit breaker open, using lifecycle fallback"
1000 );
1001 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1002 }
1003
1004 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
1006 }
1007
1008 self.delegate_via_lifecycle(subtasks, parent_seed).await
1010 }
1011
1012 async fn delegate_with_retry(
1014 &self,
1015 subtasks: Vec<SubTask>,
1016 parent_seed: &Seed,
1017 a2a: &Arc<crate::a2a::A2AProtocol>,
1018 ) -> Result<Vec<SubTask>> {
1019 let mut attempt = 0;
1020 let max_retries = self.delegation_config.max_retries;
1021
1022 loop {
1023 match self
1024 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
1025 .await
1026 {
1027 Ok(results) => {
1028 self.a2a_breaker.record_success();
1029 return Ok(results);
1030 }
1031 Err(e) => {
1032 self.a2a_breaker.record_failure();
1033 attempt += 1;
1034
1035 if attempt >= max_retries {
1036 tracing::error!(
1037 attempts = attempt,
1038 error = %e,
1039 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
1040 attempt
1041 );
1042 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1043 }
1044
1045 let delay = self.delegation_config.backoff_delay(attempt);
1047 tracing::warn!(
1048 attempt,
1049 delay_ms = delay,
1050 error = %e,
1051 "A2A delegation failed, retrying with backoff"
1052 );
1053 tokio::time::sleep(Duration::from_millis(delay)).await;
1054 }
1055 }
1056 }
1057 }
1058
1059 async fn execute_single_subtask(
1061 &self,
1062 subtasks: Vec<SubTask>,
1063 parent_seed: &Seed,
1064 ) -> Result<Vec<SubTask>> {
1065 let mut task = subtasks
1066 .into_iter()
1067 .next()
1068 .expect("execute_single_subtask is only called when subtasks is non-empty");
1069 let child_seed = Seed {
1070 id: Uuid::new_v4(),
1071 goal: task.description.clone(),
1072 constraints: parent_seed.constraints.clone(),
1073 acceptance_criteria: vec!["Task completes successfully".into()],
1074 ontology: parent_seed.ontology.clone(),
1075 created_at: chrono::Utc::now(),
1076 generation: parent_seed.generation + 1,
1077 parent_seed_id: Some(parent_seed.id),
1078 cspace_hint: None,
1079 original_request: parent_seed.original_request.clone(),
1080 output_schema: None,
1081 };
1082 match self
1083 .lifecycle
1084 .spawn_and_run(&child_seed, Priority::Normal)
1085 .await
1086 {
1087 Ok(result) => {
1088 task.result = Some(result.output.clone());
1089 }
1090 Err(e) => {
1091 task.result = Some(format!("Failed: {e}"));
1092 task.success = false;
1093 }
1094 }
1095 Ok(vec![task])
1096 }
1097
1098 async fn delegate_via_a2a(
1108 &self,
1109 subtasks: Vec<SubTask>,
1110 parent_seed: &Seed,
1111 a2a: &Arc<crate::a2a::A2AProtocol>,
1112 ) -> Result<Vec<SubTask>> {
1113 use crate::a2a::TaskPriority;
1114 use tokio::task::JoinSet;
1115
1116 tracing::info!(
1117 subtasks = subtasks.len(),
1118 "Delegating subtasks via A2A protocol"
1119 );
1120
1121 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
1122 let subtask_count = subtasks.len();
1123 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
1124
1125 for (idx, subtask) in subtasks.into_iter().enumerate() {
1126 let capability = subtask.required_capability.clone();
1127 let description = subtask.description.clone();
1128 let subtask_id = subtask.id;
1129 let role = subtask.role.clone();
1130 let a2a = Arc::clone(a2a);
1131 let parent_seed = parent_seed.clone();
1132 let lifecycle = self.lifecycle.clone();
1133
1134 join_set.spawn(async move {
1135 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
1137 a2a.query_capabilities(cap).await.ok()
1138 .and_then(|agents| agents.into_iter().next())
1139 } else {
1140 None
1141 };
1142
1143 let (output, success) = if let Some(ref target_card) = target {
1144 let target_id = target_card.agent_id;
1145 tracing::info!(
1146 subtask_index = idx,
1147 target = %target_card.name,
1148 target_id = %target_id,
1149 "A2A dispatching subtask"
1150 );
1151
1152 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
1153 "parent_seed": parent_seed.id.to_string(),
1154 "goal": description,
1155 }))
1156 .with_priority(TaskPriority::Normal);
1157
1158 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
1160
1161 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
1163 Some(Ok(result)) => {
1164 let out = result.get("output")
1165 .and_then(|v| v.as_str())
1166 .unwrap_or("")
1167 .to_string();
1168 let ok = result.get("success")
1169 .and_then(|v| v.as_bool())
1170 .unwrap_or(false);
1171 (out, ok)
1172 }
1173 Some(Err(e)) => {
1174 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
1175 (format!("Failed: {e}"), false)
1176 }
1177 None => {
1178 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
1180 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1181 }
1182 }
1183 } else {
1184 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
1185 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1186 };
1187
1188 (idx, SubTask {
1189 id: subtask_id,
1190 description,
1191 required_capability: capability,
1192 result: Some(output),
1193 success,
1194 role,
1195 })
1196 });
1197 }
1198
1199 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
1200 while let Some(join_result) = join_set.join_next().await {
1201 match join_result {
1202 Ok((idx, subtask)) => {
1203 results[idx] = Some(subtask);
1204 }
1205 Err(e) => {
1206 tracing::error!(error = %e, "A2A task panicked");
1207 }
1208 }
1209 }
1210
1211 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
1212 tracing::info!(
1213 completed = completed.len(),
1214 succeeded = completed.iter().filter(|r| r.success).count(),
1215 "A2A delegation complete"
1216 );
1217 Ok(completed)
1218 }
1219
1220 async fn delegate_via_lifecycle(
1221 &self,
1222 subtasks: Vec<SubTask>,
1223 parent_seed: &Seed,
1224 ) -> Result<Vec<SubTask>> {
1225 use crate::agent_group::OxiosAgentGroup;
1226 use tokio::task::JoinSet;
1227
1228 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
1229 let group = OxiosAgentGroup::new(parent_seed, descriptions);
1230 let group_id = group.id;
1231
1232 self.event_bus.publish(KernelEvent::AgentGroupCreated {
1233 group_id,
1234 agent_count: group.agents.len(),
1235 })?;
1236
1237 tracing::info!(
1238 group_id = %group_id,
1239 agent_count = group.agents.len(),
1240 "Starting parallel multi-agent execution"
1241 );
1242
1243 let mut join_set: JoinSet<(
1244 usize,
1245 crate::types::AgentId,
1246 Result<oxios_ouroboros::ExecutionResult>,
1247 )> = JoinSet::new();
1248
1249 for (idx, agent_entry) in group.agents.iter().enumerate() {
1250 let child_seed = agent_entry.seed.clone();
1251 let agent_id = agent_entry.id;
1252 let lifecycle = self.lifecycle.clone();
1253
1254 join_set.spawn(async move {
1255 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1256 (idx, agent_id, result)
1257 });
1258 }
1259
1260 let subtask_count = subtasks.len();
1261 let mut completed = vec![None; subtask_count];
1262 while let Some(join_result) = join_set.join_next().await {
1263 match join_result {
1264 Ok((idx, agent_id, Ok(exec_result))) => {
1265 let _ = self
1266 .event_bus
1267 .publish(KernelEvent::AgentGroupMemberCompleted {
1268 group_id,
1269 agent_id,
1270 success: exec_result.success,
1271 });
1272 completed[idx] = Some(SubTask {
1273 id: subtasks[idx].id,
1274 description: subtasks[idx].description.clone(),
1275 required_capability: subtasks[idx].required_capability.clone(),
1276 result: Some(exec_result.output.clone()),
1277 success: exec_result.success,
1278 role: subtasks[idx].role.clone(),
1279 });
1280 }
1281 Ok((idx, agent_id, Err(e))) => {
1282 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1283 let _ = self
1284 .event_bus
1285 .publish(KernelEvent::AgentGroupMemberCompleted {
1286 group_id,
1287 agent_id,
1288 success: false,
1289 });
1290 completed[idx] = Some(SubTask {
1291 id: subtasks[idx].id,
1292 description: subtasks[idx].description.clone(),
1293 required_capability: subtasks[idx].required_capability.clone(),
1294 result: Some(format!("Failed: {e}")),
1295 success: false,
1296 role: subtasks[idx].role.clone(),
1297 });
1298 }
1299 Err(e) => {
1300 tracing::error!(error = %e, "JoinSet task panicked");
1301 }
1302 }
1303 }
1304
1305 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1306 let succeeded = completed.iter().filter(|r| r.success).count();
1307 let total = completed.len();
1308
1309 tracing::info!(
1310 group_id = %group_id,
1311 succeeded,
1312 total,
1313 "Parallel multi-agent execution complete"
1314 );
1315
1316 let _ = self
1318 .state_store
1319 .save_json("agent_groups", &group_id.to_string(), &group)
1320 .await;
1321 self.git_commit(
1322 &format!("agent_groups/{group_id}.json"),
1323 "orchestrator: save group",
1324 );
1325
1326 Ok(completed)
1327 }
1328}
1329
1330#[derive(Debug, Clone)]
1332#[allow(unused)]
1333struct InterviewSession {
1334 id: String,
1335 interview: InterviewResult,
1336 phase: Phase,
1337 seed_id: Option<Uuid>,
1338 agent_id: Option<AgentId>,
1339}
1340
1341#[derive(Debug, Clone, Serialize, Deserialize)]
1343pub struct OrchestrationResult {
1344 #[serde(skip_serializing_if = "Option::is_none")]
1346 pub session_id: Option<String>,
1347 #[serde(skip_serializing_if = "Option::is_none")]
1349 pub primary_project_id: Option<Uuid>,
1350 #[serde(skip_serializing_if = "Option::is_none")]
1352 pub project_tag: Option<String>,
1353 pub response: String,
1355 #[serde(skip_serializing_if = "Option::is_none")]
1357 pub seed_id: Option<Uuid>,
1358 #[serde(skip_serializing_if = "Option::is_none")]
1360 pub agent_id: Option<AgentId>,
1361 pub phase_reached: Phase,
1363 pub evaluation_passed: bool,
1365 #[serde(skip_serializing_if = "Option::is_none")]
1367 pub output: Option<String>,
1368 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1370 pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1371}
1372
1373fn format_questions(questions: &[String]) -> String {
1375 if questions.is_empty() {
1376 "I need a bit more clarification before I can proceed.".to_string()
1377 } else {
1378 format!(
1379 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1380 questions
1381 .iter()
1382 .enumerate()
1383 .map(|(i, q)| format!("{}. {}", i + 1, q))
1384 .collect::<Vec<_>>()
1385 .join("\n")
1386 )
1387 }
1388}
1389
1390fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1393 let mut lines = Vec::new();
1394
1395 if exec.success {
1396 lines.push(format!("✅ '{}'", seed.goal));
1397 } else {
1398 lines.push(format!(
1399 "⚠️ '{}'을(를) 시도했지만 완전히 성공하지 못했습니다.",
1400 seed.goal
1401 ));
1402 }
1403
1404 if !exec.output.is_empty() {
1406 let preview = if exec.output.len() > 500 {
1407 format!("{}...", &exec.output[..500])
1408 } else {
1409 exec.output.clone()
1410 };
1411 lines.push(String::new());
1412 lines.push(preview);
1413 }
1414
1415 lines.join("\n")
1416}
1417
1418fn should_split_seed(seed: &Seed) -> bool {
1423 seed.acceptance_criteria.len() >= 5
1427}
1428
1429fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1435 seed.acceptance_criteria
1436 .iter()
1437 .map(|criterion| {
1438 let desc = format!("{}: {}", seed.goal, criterion);
1439 let desc_lower = desc.to_lowercase();
1440
1441 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1443 Some("code-review".to_string())
1444 } else if desc_lower.contains("test") {
1445 Some("testing".to_string())
1446 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1447 Some("refactoring".to_string())
1448 } else if desc_lower.contains("write")
1449 || desc_lower.contains("create")
1450 || desc_lower.contains("implement")
1451 {
1452 Some("code-generation".to_string())
1453 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1454 Some("debugging".to_string())
1455 } else {
1456 None
1457 };
1458
1459 let mut subtask = SubTask::new(desc);
1460 subtask.required_capability = cap;
1461 subtask
1462 })
1463 .collect()
1464}
1465
1466fn format_result_combined(combined: &str) -> String {
1468 if combined.is_empty() {
1469 "No subtasks completed successfully.".to_string()
1470 } else {
1471 format!("Multi-agent execution completed:\n\n{combined}")
1472 }
1473}
1474
1475async fn run_via_lifecycle(
1477 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1478 parent_seed: &Seed,
1479 description: &str,
1480) -> (String, bool) {
1481 let child_seed = Seed {
1482 id: Uuid::new_v4(),
1483 goal: description.to_string(),
1484 constraints: parent_seed.constraints.clone(),
1485 acceptance_criteria: vec!["Task completes successfully".into()],
1486 ontology: parent_seed.ontology.clone(),
1487 created_at: chrono::Utc::now(),
1488 generation: parent_seed.generation + 1,
1489 parent_seed_id: Some(parent_seed.id),
1490 cspace_hint: None,
1491 original_request: parent_seed.original_request.clone(),
1492 output_schema: None,
1493 };
1494 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1495 Ok(result) => (result.output, result.success),
1496 Err(e) => (format!("Failed: {e}"), false),
1497 }
1498}