1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{
19 EvaluationResult, ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed,
20};
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::agent_lifecycle::AgentLifecycleManager;
26use crate::event_bus::{EventBus, KernelEvent};
27use crate::git_layer::GitLayer;
28use crate::metrics::get_metrics;
29use crate::mount::{MountId, MountManager};
30use crate::project::{ConversationBuffer, ProjectManager};
31use crate::scheduler::Priority;
32use crate::state_store::StateStore;
33use crate::types::AgentId;
34
35#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
37pub enum AgentRole {
38 #[default]
40 Worker,
41 Manager,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SubTask {
48 pub id: Uuid,
50 pub description: String,
52 pub required_capability: Option<String>,
54 pub result: Option<String>,
56 pub success: bool,
58 #[serde(default)]
60 pub role: AgentRole,
61}
62
63impl SubTask {
64 pub fn new(description: impl Into<String>) -> Self {
66 Self {
67 id: Uuid::new_v4(),
68 description: description.into(),
69 required_capability: None,
70 result: None,
71 success: false,
72 role: AgentRole::default(),
73 }
74 }
75
76 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
78 self.required_capability = Some(cap.into());
79 self
80 }
81}
82
83pub struct Orchestrator {
85 ouroboros: Arc<dyn OuroborosProtocol>,
86 event_bus: EventBus,
87 state_store: Arc<StateStore>,
88 git_layer: Option<Arc<GitLayer>>,
90 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
92 lifecycle: AgentLifecycleManager,
94 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
96 project_manager: RwLock<Option<Arc<ProjectManager>>>,
98 mount_manager: RwLock<Option<Arc<MountManager>>>,
100 conversation_buffer: RwLock<ConversationBuffer>,
102 delegation_config: DelegationConfig,
104 a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
106 evolution_config: RwLock<EvolutionConfig>,
108}
109
110#[derive(Debug, Clone)]
112struct DelegationConfig {
113 max_retries: u32,
115 base_delay_ms: u64,
117 max_delay_ms: u64,
119 #[allow(dead_code)]
121 timeout_ms: u64,
122}
123
124impl Default for DelegationConfig {
125 fn default() -> Self {
126 Self {
127 max_retries: 3,
128 base_delay_ms: 100,
129 max_delay_ms: 5000,
130 timeout_ms: 5000,
131 }
132 }
133}
134
135impl DelegationConfig {
136 fn backoff_delay(&self, attempt: u32) -> u64 {
138 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
139 delay.min(self.max_delay_ms)
140 }
141}
142
143#[derive(Debug, Clone)]
145struct EvolutionConfig {
146 max_iterations: u32,
148 score_threshold: f64,
150}
151
152impl From<crate::config::OrchestratorConfig> for EvolutionConfig {
153 fn from(c: crate::config::OrchestratorConfig) -> Self {
154 Self {
155 max_iterations: c.max_evolution_iterations,
156 score_threshold: c.min_evaluation_score,
157 }
158 }
159}
160
161impl Orchestrator {
162 pub fn new(
164 ouroboros: Arc<dyn OuroborosProtocol>,
165 event_bus: EventBus,
166 state_store: Arc<StateStore>,
167 lifecycle: AgentLifecycleManager,
168 ) -> Self {
169 Self::with_config(
170 ouroboros,
171 event_bus,
172 state_store,
173 lifecycle,
174 crate::config::OrchestratorConfig::default(),
175 )
176 }
177
178 pub fn with_config(
180 ouroboros: Arc<dyn OuroborosProtocol>,
181 event_bus: EventBus,
182 state_store: Arc<StateStore>,
183 lifecycle: AgentLifecycleManager,
184 config: crate::config::OrchestratorConfig,
185 ) -> Self {
186 let evolution_config = EvolutionConfig::from(config.clone());
187 Self {
188 ouroboros,
189 event_bus,
190 state_store,
191 git_layer: None,
192 sessions: RwLock::new(std::collections::HashMap::new()),
193 lifecycle,
194 a2a: None,
195 project_manager: RwLock::new(None),
196 mount_manager: RwLock::new(None),
197 conversation_buffer: RwLock::new(ConversationBuffer::default()),
198 delegation_config: DelegationConfig::default(),
199 a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
200 evolution_config: RwLock::new(evolution_config),
201 }
202 }
203
204 pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
206 *self.project_manager.write() = Some(manager);
207 }
208
209 pub fn set_mount_manager(&self, manager: Arc<MountManager>) {
211 *self.mount_manager.write() = Some(manager);
212 }
213
214 pub fn mount_manager(&self) -> Option<Arc<MountManager>> {
216 self.mount_manager.read().as_ref().cloned()
217 }
218
219 pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
221 self.project_manager.read().as_ref().cloned()
222 }
223
224 pub fn detect_project_tag(&self, message: &str) -> Option<String> {
226 self.project_manager.read().as_ref().and_then(|pm| {
227 let projects = pm.list_projects();
228 let result = crate::project::detect_project(message, &projects);
229 match result {
230 crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
231 crate::project::DetectionResult::NoMatch { .. } => None,
232 }
233 })
234 }
235
236 fn resolve_mount_workspace(
250 &self,
251 mount_ids: Option<&str>,
252 project_ids: Option<&str>,
253 user_message: &str,
254 ) -> (
255 Vec<MountId>,
256 Option<String>,
257 Vec<std::path::PathBuf>,
258 String,
259 ) {
260 use crate::mount::Mount;
261
262 let Some(mm) = self.mount_manager() else {
263 return (Vec::new(), None, Vec::new(), String::new());
264 };
265
266 let mut ids: Vec<MountId> = if let Some(ids_str) = mount_ids {
268 ids_str
269 .split(',')
270 .filter_map(|s| MountId::parse_str(s.trim()).ok())
271 .collect()
272 } else {
273 match mm.detect(user_message) {
274 crate::mount::DetectionResult::Found(id) => vec![id],
275 crate::mount::DetectionResult::NoMatch { .. } => vec![],
276 }
277 };
278 let mut seen = std::collections::HashSet::new();
280 ids.retain(|id| seen.insert(*id));
281
282 let project_for_instructions: Option<crate::project::Project> = if let Some(project_ids_str) =
289 project_ids
290 && let Some(first_id_str) = project_ids_str.split(',').next()
291 && let Some(pm) = self.project_manager()
292 && let Ok(pid) = Uuid::parse_str(first_id_str.trim())
293 {
294 let proj = pm.get_project(pid);
295 if let Some(ref project) = proj {
296 for mid in &project.mount_ids {
297 if !ids.contains(mid) {
298 ids.push(*mid);
299 }
300 }
301 }
302 proj
303 } else {
304 None
305 };
306
307 if ids.is_empty() {
308 return (Vec::new(), None, Vec::new(), String::new());
309 }
310
311 for id in &ids {
314 mm.touch(*id);
315 }
316
317 let mounts: Vec<Mount> = mm.get_mounts_ordered(&ids);
318 if mounts.is_empty() {
319 return (Vec::new(), None, Vec::new(), String::new());
320 }
321
322 let mut paths: Vec<std::path::PathBuf> = Vec::new();
324 for m in &mounts {
325 for p in &m.paths {
326 if !paths.contains(p) {
327 paths.push(p.clone());
328 }
329 }
330 }
331
332 if let Some(project) = &project_for_instructions
337 && project.mount_ids.is_empty()
338 && !project.paths.is_empty()
339 {
340 for p in &project.paths {
341 if !paths.contains(p) {
342 paths.push(p.clone());
343 }
344 }
345 }
346
347 let tag = if mounts.len() == 1 {
349 mounts[0].tag()
350 } else {
351 let names: Vec<&str> = mounts.iter().map(|m| m.name.as_str()).collect();
352 format!("[đ§ {}]", names.join(" + "))
353 };
354
355 let mut context = build_workspace_context_body(&mounts).unwrap_or_default();
356
357 if let Some(project) = project_for_instructions {
363 let instructions = if project.instructions.len() > 2000 {
365 let mut end = 2000;
366 while end > 0 && !project.instructions.is_char_boundary(end) {
367 end -= 1;
368 }
369 format!("{}...", &project.instructions[..end])
370 } else {
371 project.instructions.clone()
372 };
373 if !instructions.is_empty() {
374 context.push_str(&format!(
375 "\n### Project Instructions: {}\n{}\n",
376 project.name, instructions
377 ));
378 }
379 }
380
381 const MAX_CONTEXT_CHARS: usize = 6000;
383 if context.len() > MAX_CONTEXT_CHARS {
384 let mut end = MAX_CONTEXT_CHARS;
385 while end > 0 && !context.is_char_boundary(end) {
386 end -= 1;
387 }
388 context.truncate(end);
389 context.push_str("\n...(context truncated)...\n");
390 }
391
392 let context_opt = if context.is_empty() {
393 None
394 } else {
395 Some(context)
396 };
397 (ids, context_opt, paths, tag)
398 }
399
400 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
402 self.a2a = Some(a2a);
403 }
404
405 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
407 self.git_layer = Some(git_layer);
408 }
409
410 pub fn update_evolution_config(&self, config: crate::config::OrchestratorConfig) {
414 *self.evolution_config.write() = EvolutionConfig::from(config);
415 tracing::info!("Orchestrator evolution config hot-reloaded");
416 }
417
418 pub async fn restore_sessions(&self) {
425 let summaries = match self.state_store.list_sessions().await {
426 Ok(s) => s,
427 Err(e) => {
428 tracing::warn!(error = %e, "Failed to list sessions for restore");
429 return;
430 }
431 };
432
433 let mut restored = 0usize;
434 for summary in &summaries {
435 let Some(ref seed_id_str) = summary.active_seed_id else {
437 continue;
438 };
439
440 let session_id = crate::state_store::SessionId(summary.id.clone());
441 let session = match self.state_store.load_session(&session_id).await {
442 Ok(Some(s)) => s,
443 Ok(None) => continue,
444 Err(e) => {
445 tracing::warn!(
446 session_id = %summary.id,
447 error = %e,
448 "Failed to load session for restore"
449 );
450 continue;
451 }
452 };
453
454 let mut interview = oxios_ouroboros::InterviewResult::new();
458 interview.is_task = true; interview.original_message = session
460 .user_messages
461 .last()
462 .map(|m| m.content.clone())
463 .unwrap_or_default();
464
465 let history: Vec<oxios_ouroboros::interview::Exchange> = session
470 .user_messages
471 .iter()
472 .enumerate()
473 .map(|(i, user)| oxios_ouroboros::interview::Exchange {
474 user: user.content.clone(),
475 agent: session
476 .agent_responses
477 .get(i)
478 .map(|a| a.content.clone())
479 .unwrap_or_default(),
480 })
481 .collect();
482 interview.conversation_history = history;
483
484 let seed_id = seed_id_str.parse::<Uuid>().ok();
485
486 let interview_session = InterviewSession {
487 id: session.id.0.clone(),
488 interview,
489 phase: Phase::Execute,
490 seed_id,
491 agent_id: None,
492 };
493
494 {
495 let mut sessions = self.sessions.write();
496 sessions.insert(session.id.0.clone(), interview_session);
497 }
498
499 restored += 1;
500 }
501
502 if restored > 0 {
503 tracing::info!(restored, total = summaries.len(), "Sessions restored");
504 }
505 }
506
507 fn git_commit(&self, rel_path: &str, message: &str) {
509 if let Some(ref gl) = self.git_layer
510 && gl.is_enabled()
511 {
512 let _ = gl.commit_file(rel_path, message);
513 }
514 }
515
516 pub async fn handle_message(
525 &self,
526 user_id: &str,
527 user_message: &str,
528 session_id: Option<&str>,
529 project_ids: Option<&str>,
530 mount_ids: Option<&str>,
531 request_id: &str,
532 ) -> Result<OrchestrationResult> {
533 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
534 get_metrics().messages.inc();
535 let orch_start = std::time::Instant::now();
536
537 let session_id = session_id
538 .map(String::from)
539 .unwrap_or_else(|| Uuid::new_v4().to_string());
540
541 tracing::info!(session_id = %session_id, user_id = %user_id, request_id = %request_id, content_len = user_message.len(), "Orchestrator handling message");
542
543 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
546 ids_str
548 .split(',')
549 .next()
550 .and_then(|s| Uuid::parse_str(s.trim()).ok())
551 } else {
552 self.detect_project_tag(user_message).and_then(|_tag| {
554 self.project_manager().and_then(|pm| {
556 let projects = pm.list_projects();
557 let result = crate::project::detect_project(user_message, &projects);
558 match result {
559 crate::project::DetectionResult::Found(id) => Some(id),
560 crate::project::DetectionResult::NoMatch { .. } => None,
561 }
562 })
563 })
564 };
565
566 let project_tag = primary_project_id
568 .and_then(|id| {
569 self.project_manager()
570 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
571 })
572 .unwrap_or_default();
573
574 if let Some(pid) = primary_project_id
576 && let Some(pm) = self.project_manager()
577 {
578 pm.touch(pid);
579 }
580
581 let (active_mount_ids, workspace_context, mount_paths, mount_tag) =
587 self.resolve_mount_workspace(mount_ids, project_ids, user_message);
588 let mount_tag_opt = if mount_tag.is_empty() {
589 None
590 } else {
591 Some(mount_tag.clone())
592 };
593
594 let project_tag = if mount_tag_opt.is_some() {
598 String::new()
599 } else {
600 project_tag
601 };
602
603 let _conversation_turns = {
604 let buffer = self.conversation_buffer.read();
605 buffer.turns().iter().cloned().collect::<Vec<_>>()
606 };
607
608 {
610 let mut buffer = self.conversation_buffer.write();
611 buffer.push_user(user_message);
612 }
613
614 self.publish_phase_started(&session_id, Phase::Interview)
616 .await;
617
618 let needs_interview;
620 let existing_history: Option<Vec<_>>;
621 {
622 let sessions = self.sessions.read();
623 needs_interview = !sessions.contains_key(&session_id);
624 existing_history = if !needs_interview {
625 sessions
626 .get(&session_id)
627 .map(|s| s.interview.conversation_history.clone())
628 } else {
629 None
630 };
631 }
633
634 let interview = {
636 tracing::info!(phase = "interview", "Starting interview phase");
637 if needs_interview {
638 self.ouroboros.interview(user_message).await?
639 } else {
640 let multi_turn_context = {
643 let mut context_parts = Vec::new();
644 if let Some(ref history) = existing_history {
645 for exchange in history {
646 context_parts.push(format!(
647 "User: {}\nAgent: {}",
648 exchange.user, exchange.agent
649 ));
650 }
651 }
652 context_parts.push(format!("User: {user_message}"));
653 context_parts.join("\n\n")
654 };
655
656 {
661 let mut sessions = self.sessions.write();
662 if let Some(s) = sessions.get_mut(&session_id) {
663 let all_questions = s.interview.questions.join("\n");
664 s.interview.add_to_history(user_message, &all_questions);
665 }
666 }
667
668 self.ouroboros.interview(&multi_turn_context).await?
670 }
671 };
672
673 if !interview.is_task {
675 tracing::info!(session_id = %session_id, "Chat response (non-task)");
676
677 let response_text = if interview.chat_response.is_empty() {
678 "Hello! How can I help you today?".to_string()
679 } else {
680 interview.chat_response.clone()
681 };
682
683 {
685 let mut buffer = self.conversation_buffer.write();
686 buffer.push_agent(&response_text, None);
687 }
688
689 {
692 let mut sessions = self.sessions.write();
693 if let Some(session) = sessions.get_mut(&session_id) {
694 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
695 session
696 .interview
697 .add_to_history(user_message, &response_text);
698 } else {
699 let mut interview = InterviewResult::new();
701 interview.is_task = false;
702 interview.chat_response = response_text.clone();
703 interview.add_to_history(user_message, &response_text);
704 sessions.insert(
705 session_id.clone(),
706 InterviewSession {
707 id: session_id.clone(),
708 interview,
709 phase: Phase::Interview,
710 seed_id: None,
711 agent_id: None,
712 },
713 );
714 }
715 }
716
717 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
718 .await;
719
720 return Ok(OrchestrationResult {
721 session_id: Some(session_id.clone()),
722 primary_project_id,
723 project_tag: Some(project_tag.clone()),
724 active_mount_ids: active_mount_ids.clone(),
725 mount_tag: mount_tag_opt.clone(),
726 response: response_text,
727 seed_id: None,
728 agent_id: None,
729 phase_reached: Phase::Interview,
730 evaluation_passed: None,
731 output: None,
732 tool_calls: vec![],
733 interview_questions: None,
734 interview_round: None,
735 interview_ambiguity: None,
736 mode: "ouroboros".to_string(),
737 });
738 }
739
740 if !interview.ready_for_seed {
742 {
744 let mut sessions = self.sessions.write();
745 let session =
746 sessions
747 .entry(session_id.clone())
748 .or_insert_with(|| InterviewSession {
749 id: session_id.clone(),
750 interview: interview.clone(),
751 phase: Phase::Interview,
752 seed_id: None,
753 agent_id: None,
754 });
755
756 let questions_text = interview.questions.join("\n");
757
758 let is_first_round = session.interview.conversation_history.is_empty();
763 if is_first_round {
764 let original = if interview.original_message.is_empty() {
765 user_message.to_string()
766 } else {
767 interview.original_message.clone()
768 };
769 session.interview.add_to_history(&original, &questions_text);
770 } else {
771 let last_answer = session.interview.answers.last().cloned();
773 if let Some(ref ans) = last_answer
774 && !ans.is_empty()
775 {
776 session.interview.add_to_history(ans, &questions_text);
777 }
778 }
779 } let questions = interview
782 .questions
783 .iter()
784 .filter(|q| !q.is_empty())
785 .cloned()
786 .collect::<Vec<_>>();
787
788 tracing::info!(
789 session_id = %session_id,
790 ambiguity = interview.ambiguity.ambiguity(),
791 questions = questions.len(),
792 "Interview needs clarification"
793 );
794
795 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
796 .await;
797
798 let structured = interview.structured_questions.clone();
804
805 let interview_round = {
810 let sessions = self.sessions.read();
811 sessions
812 .get(&session_id)
813 .map(|s| ((s.interview.conversation_history.len() / 2) as u32).max(1))
814 .unwrap_or(1)
815 };
816
817 return Ok(OrchestrationResult {
818 session_id: Some(session_id.clone()),
819 primary_project_id,
820 project_tag: Some(project_tag.clone()),
821 active_mount_ids: active_mount_ids.clone(),
822 mount_tag: mount_tag_opt.clone(),
823 response: format_questions(&questions),
824 seed_id: None,
825 agent_id: None,
826 phase_reached: Phase::Interview,
827 evaluation_passed: None,
828 output: None,
829 tool_calls: vec![],
830 interview_questions: structured,
831 interview_round: Some(interview_round),
832 interview_ambiguity: Some(interview.ambiguity.ambiguity()),
833 mode: "ouroboros".to_string(),
834 });
835 }
836
837 {
841 let mut buffer = self.conversation_buffer.write();
842 buffer.push_agent("[interview: ready]", None);
843 }
844
845 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
847 .await;
848 self.publish_phase_started(&session_id, Phase::Seed).await;
849
850 let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
856
857 let mut seed = if is_simple {
858 tracing::info!(
859 phase = "seed",
860 method = "from_message",
861 "Simple task â ad-hoc seed"
862 );
863 Seed::from_message(&interview.original_message)
864 } else {
865 tracing::info!(
866 phase = "seed",
867 method = "llm",
868 "Complex task â LLM-generated seed"
869 );
870 self.ouroboros.generate_seed(&interview).await?
871 };
872 seed.project_id = primary_project_id;
873 seed.workspace_context = workspace_context.clone();
874 seed.mount_paths = mount_paths.clone();
875
876 self.save_seed(&seed).await?;
878
879 self.event_bus
881 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
882
883 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
884 .await;
885 self.publish_phase_started(&session_id, Phase::Execute)
886 .await;
887
888 if should_split_seed(&seed) {
892 let subtasks = split_into_subtasks(&seed);
893 if subtasks.len() > 1 {
894 tracing::info!(
895 phase = "delegate",
896 subtasks = subtasks.len(),
897 "Delegating to multi-agent"
898 );
899 let results = self.delegate_subtasks(subtasks, &seed).await?;
900
901 let combined: String = results
903 .iter()
904 .filter(|r| r.success)
905 .filter_map(|r| r.result.as_deref())
906 .collect::<Vec<_>>()
907 .join("\n\n");
908
909 let all_passed = results.iter().all(|r| r.success);
910
911 {
913 let mut sessions = self.sessions.write();
914 sessions.remove(&session_id);
915 }
916
917 let metrics = get_metrics();
920 metrics
921 .orch_duration
922 .observe(orch_start.elapsed().as_secs_f64());
923 if all_passed {
924 metrics.agents_completed.inc();
925 } else {
926 metrics.agents_failed.inc();
927 }
928
929 tracing::info!(
930 session_id = %session_id,
931 subtasks = results.len(),
932 passed = all_passed,
933 "Multi-agent orchestration complete"
934 );
935
936 return Ok(OrchestrationResult {
937 session_id: Some(session_id),
938 primary_project_id,
939 project_tag: Some(project_tag.clone()),
940 active_mount_ids: active_mount_ids.clone(),
941 mount_tag: mount_tag_opt.clone(),
942 response: format_result_combined(&combined),
943 seed_id: Some(seed.id),
944 agent_id: None,
945 phase_reached: Phase::Execute,
946 evaluation_passed: Some(all_passed),
947 output: Some(combined),
948 tool_calls: vec![],
949 interview_questions: None,
950 interview_round: None,
951 interview_ambiguity: None,
952 mode: "ouroboros".to_string(),
953 });
954 }
955 }
956
957 {
959 let mut buffer = self.conversation_buffer.write();
960 buffer.push_agent("[multi-agent: complete]", None);
961 }
962
963 tracing::info!(phase = "execute", "Starting execution phase");
965 let exec_result = self
966 .lifecycle
967 .spawn_and_run(&seed, Priority::Normal)
968 .await?;
969
970 self.lifecycle.reap_zombies();
972
973 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
974 .await;
975
976 let (final_result, final_seed, passed, phase_reached) = if let Some(ref schema) =
983 seed.output_schema
984 {
985 let passed = match oxi_sdk::StructuredOutput::extract(
987 &exec_result.output,
988 &oxi_sdk::OutputMode::ValidatedJson {
989 schema: schema.clone(),
990 },
991 ) {
992 Ok(_) => {
993 tracing::info!(session_id = %session_id, "Structured output validation passed");
994 true
995 }
996 Err(e) => {
997 tracing::warn!(session_id = %session_id, error = %e, "Structured output validation failed");
998 false
999 }
1000 };
1001 (exec_result, seed.clone(), passed, Phase::Execute)
1002 } else if self.should_evaluate(&seed) {
1003 self.publish_phase_started(&session_id, Phase::Evaluate)
1005 .await;
1006
1007 let (result, eval, evolved_seed) = self
1008 .run_evolution_loop(&session_id, &seed, exec_result)
1009 .await?;
1010
1011 let passed = {
1014 let cfg = self.evolution_config.read();
1015 eval.all_passed() && eval.score >= cfg.score_threshold
1016 };
1017
1018 self.publish_phase_completed(
1019 &session_id,
1020 Phase::Evaluate,
1021 &format!("score={:.2}", eval.score),
1022 )
1023 .await;
1024
1025 let reached = if evolved_seed.generation > 0 {
1026 Phase::Evolve
1027 } else {
1028 Phase::Evaluate
1029 };
1030
1031 (result, evolved_seed, passed, reached)
1032 } else {
1033 let passed = exec_result.success;
1035 (exec_result, seed.clone(), passed, Phase::Execute)
1036 };
1037
1038 {
1040 let mut sessions = self.sessions.write();
1041 sessions.remove(&session_id);
1042 }
1043
1044 tracing::info!(
1045 session_id = %session_id,
1046 passed,
1047 phase = %phase_reached,
1048 "Orchestration complete"
1049 );
1050
1051 let metrics = get_metrics();
1053 metrics
1054 .orch_duration
1055 .observe(orch_start.elapsed().as_secs_f64());
1056 if passed {
1057 metrics.agents_completed.inc();
1058 } else {
1059 metrics.agents_failed.inc();
1060 }
1061
1062 {
1064 let mut buffer = self.conversation_buffer.write();
1065 buffer.push_agent(&final_seed.goal, None);
1066 }
1067
1068 Ok(OrchestrationResult {
1069 session_id: Some(session_id),
1070 primary_project_id,
1071 project_tag: Some(project_tag.clone()),
1072 active_mount_ids: active_mount_ids.clone(),
1073 mount_tag: mount_tag_opt.clone(),
1074 response: format_execution_result(&final_seed, &final_result),
1075 seed_id: Some(final_seed.id),
1076 agent_id: None,
1077 phase_reached,
1078 evaluation_passed: Some(passed),
1079 output: Some(final_result.output.clone()),
1080 tool_calls: final_result.tool_calls.clone(),
1081 interview_questions: None,
1082 interview_round: None,
1083 interview_ambiguity: None,
1084 mode: "ouroboros".to_string(),
1085 })
1086 }
1087
1088 fn should_evaluate(&self, seed: &Seed) -> bool {
1093 !seed.acceptance_criteria.is_empty() && seed.output_schema.is_none()
1094 }
1095
1096 pub async fn chat(
1100 &self,
1101 _user_id: &str,
1102 user_message: &str,
1103 session_id: Option<&str>,
1104 project_ids: Option<&str>,
1105 mount_ids: Option<&str>,
1106 request_id: &str,
1107 ) -> Result<OrchestrationResult> {
1108 tracing::info!(name = "orchestrator.chat", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
1109 let metrics = get_metrics();
1110 metrics.messages.inc();
1111 let orch_start = std::time::Instant::now();
1112
1113 let session_id = session_id
1114 .map(String::from)
1115 .unwrap_or_else(|| Uuid::new_v4().to_string());
1116
1117 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
1119 ids_str
1120 .split(',')
1121 .next()
1122 .and_then(|s| Uuid::parse_str(s.trim()).ok())
1123 } else {
1124 self.detect_project_tag(user_message).and_then(|_tag| {
1125 self.project_manager().and_then(|pm| {
1126 let projects = pm.list_projects();
1127 let result = crate::project::detect_project(user_message, &projects);
1128 match result {
1129 crate::project::DetectionResult::Found(id) => Some(id),
1130 crate::project::DetectionResult::NoMatch { .. } => None,
1131 }
1132 })
1133 })
1134 };
1135
1136 let project_tag = primary_project_id
1137 .and_then(|id| {
1138 self.project_manager()
1139 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
1140 })
1141 .unwrap_or_default();
1142
1143 let (active_mount_ids, workspace_context, mount_paths, mount_tag) =
1145 self.resolve_mount_workspace(mount_ids, project_ids, user_message);
1146 let mount_tag_opt = if mount_tag.is_empty() {
1147 None
1148 } else {
1149 Some(mount_tag.clone())
1150 };
1151
1152 let project_tag = if mount_tag_opt.is_some() {
1154 String::new()
1155 } else {
1156 project_tag
1157 };
1158
1159 let mut seed = Seed::from_message(user_message);
1161 seed.project_id = primary_project_id;
1162 seed.workspace_context = workspace_context;
1163 seed.mount_paths = mount_paths;
1164
1165 tracing::info!(
1167 phase = "execute",
1168 mode = "chat",
1169 "Starting direct execution"
1170 );
1171 let exec_result = self
1172 .lifecycle
1173 .spawn_and_run(&seed, Priority::Normal)
1174 .await?;
1175 self.lifecycle.reap_zombies();
1176
1177 let metrics = get_metrics();
1178 metrics
1179 .orch_duration
1180 .observe(orch_start.elapsed().as_secs_f64());
1181 if exec_result.success {
1182 metrics.agents_completed.inc();
1183 } else {
1184 metrics.agents_failed.inc();
1185 }
1186
1187 Ok(OrchestrationResult {
1188 session_id: Some(session_id),
1189 primary_project_id,
1190 project_tag: Some(project_tag),
1191 active_mount_ids: active_mount_ids.clone(),
1192 mount_tag: mount_tag_opt.clone(),
1193 response: exec_result.output.clone(),
1194 seed_id: Some(seed.id),
1195 agent_id: None,
1196 phase_reached: Phase::Execute,
1197 evaluation_passed: None,
1198 output: Some(exec_result.output),
1199 tool_calls: exec_result.tool_calls,
1200 interview_questions: None,
1201 interview_round: None,
1202 interview_ambiguity: None,
1203 mode: "chat".to_string(),
1204 })
1205 }
1206
1207 async fn execute_seed(&self, seed: &Seed) -> Result<ExecutionResult> {
1209 self.lifecycle.spawn_and_run(seed, Priority::Normal).await
1210 }
1211
1212 async fn run_evolution_loop(
1217 &self,
1218 _session_id: &str,
1219 seed: &Seed,
1220 initial_result: ExecutionResult,
1221 ) -> Result<(ExecutionResult, EvaluationResult, Seed)> {
1222 let (max_iterations, threshold) = {
1226 let cfg = self.evolution_config.read();
1227 (cfg.max_iterations, cfg.score_threshold)
1228 };
1229
1230 let mut current_seed = seed.clone();
1231 let mut current_result = initial_result;
1232
1233 let mut best_result = current_result.clone();
1235 let mut best_seed = current_seed.clone();
1236 let mut best_eval: Option<EvaluationResult> = None;
1237
1238 for iteration in 0..=max_iterations {
1239 let evaluation = self
1241 .ouroboros
1242 .evaluate(¤t_seed, ¤t_result)
1243 .await?;
1244
1245 tracing::info!(
1246 iteration,
1247 seed_id = %current_seed.id,
1248 score = evaluation.score,
1249 passed = evaluation.all_passed(),
1250 "Evaluation complete"
1251 );
1252
1253 let _ = self.event_bus.publish(KernelEvent::EvaluationComplete {
1254 seed_id: current_seed.id,
1255 passed: evaluation.all_passed(),
1256 });
1257 if best_eval
1262 .as_ref()
1263 .is_none_or(|b| evaluation.score > b.score)
1264 {
1265 best_result = current_result.clone();
1266 best_seed = current_seed.clone();
1267 best_eval = Some(evaluation.clone());
1268 }
1269
1270 if evaluation.score >= threshold || iteration == max_iterations {
1272 if iteration == max_iterations && max_iterations > 0 {
1273 let _ = self.event_bus.publish(KernelEvent::EvolutionMaxReached {
1274 seed_id: current_seed.id,
1275 final_score: evaluation.score,
1276 iterations: iteration,
1277 });
1278 }
1279 return Ok((
1280 best_result,
1281 best_eval.ok_or_else(|| {
1282 anyhow::anyhow!(
1283 "Evolve loop exited with threshold met but no evaluation was produced"
1284 )
1285 })?,
1286 best_seed,
1287 ));
1288 }
1289
1290 if max_iterations == 0 {
1292 return Ok((
1293 best_result,
1294 best_eval.ok_or_else(|| {
1295 anyhow::anyhow!("No iterations configured and no evaluation was produced")
1296 })?,
1297 best_seed,
1298 ));
1299 }
1300
1301 let evolved = self.ouroboros.evolve(¤t_seed, &evaluation).await?;
1303 match evolved {
1304 Some(new_seed) => {
1305 tracing::info!(
1306 old_seed_id = %current_seed.id,
1307 new_seed_id = %new_seed.id,
1308 iteration,
1309 "Seed evolved, re-executing"
1310 );
1311
1312 let _ = self.event_bus.publish(KernelEvent::EvolutionStarted {
1313 seed_id: current_seed.id,
1314 new_seed_id: new_seed.id,
1315 iteration,
1316 });
1317
1318 self.save_seed(&new_seed).await?;
1320
1321 current_seed = new_seed;
1322 current_result = self.execute_seed(¤t_seed).await?;
1323 }
1324 None => {
1325 tracing::info!(
1326 seed_id = %current_seed.id,
1327 "Evolve returned None, stopping loop"
1328 );
1329 return Ok((
1330 best_result,
1331 best_eval.ok_or_else(|| {
1332 anyhow::anyhow!(
1333 "Evolve returned no seed and no evaluation was produced"
1334 )
1335 })?,
1336 best_seed,
1337 ));
1338 }
1339 }
1340 }
1341
1342 unreachable!()
1344 }
1345
1346 async fn save_seed(&self, seed: &Seed) -> Result<()> {
1348 let key = seed.id.to_string();
1349
1350 self.state_store
1351 .save_json("seeds", &key, seed)
1352 .await
1353 .context("failed to save seed to state store")?;
1354
1355 self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
1356
1357 Ok(())
1358 }
1359
1360 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
1363 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
1364 session_id: session_id.to_owned(),
1365 phase,
1366 });
1367 }
1368
1369 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
1371 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
1372 session_id: session_id.to_owned(),
1373 phase,
1374 result_summary: result.to_owned(),
1375 });
1376 }
1377
1378 pub async fn delegate_subtasks(
1386 &self,
1387 subtasks: Vec<SubTask>,
1388 parent_seed: &Seed,
1389 ) -> Result<Vec<SubTask>> {
1390 if subtasks.len() == 1 {
1392 return self.execute_single_subtask(subtasks, parent_seed).await;
1393 }
1394
1395 if let Some(ref a2a) = self.a2a {
1397 if !self.a2a_breaker.is_allowed() {
1399 tracing::warn!(
1400 state = ?self.a2a_breaker.state(),
1401 "A2A circuit breaker open, using lifecycle fallback"
1402 );
1403 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1404 }
1405
1406 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
1408 }
1409
1410 self.delegate_via_lifecycle(subtasks, parent_seed).await
1412 }
1413
1414 async fn delegate_with_retry(
1416 &self,
1417 subtasks: Vec<SubTask>,
1418 parent_seed: &Seed,
1419 a2a: &Arc<crate::a2a::A2AProtocol>,
1420 ) -> Result<Vec<SubTask>> {
1421 let mut attempt = 0;
1422 let max_retries = self.delegation_config.max_retries;
1423
1424 loop {
1425 match self
1426 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
1427 .await
1428 {
1429 Ok(results) => {
1430 self.a2a_breaker.record_success();
1431 return Ok(results);
1432 }
1433 Err(e) => {
1434 self.a2a_breaker.record_failure();
1435 attempt += 1;
1436
1437 if attempt >= max_retries {
1438 tracing::error!(
1439 attempts = attempt,
1440 error = %e,
1441 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
1442 attempt
1443 );
1444 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1445 }
1446
1447 let delay = self.delegation_config.backoff_delay(attempt);
1449 tracing::warn!(
1450 attempt,
1451 delay_ms = delay,
1452 error = %e,
1453 "A2A delegation failed, retrying with backoff"
1454 );
1455 tokio::time::sleep(Duration::from_millis(delay)).await;
1456 }
1457 }
1458 }
1459 }
1460
1461 async fn execute_single_subtask(
1463 &self,
1464 subtasks: Vec<SubTask>,
1465 parent_seed: &Seed,
1466 ) -> Result<Vec<SubTask>> {
1467 let mut task = subtasks.into_iter().next().ok_or_else(|| {
1468 anyhow::anyhow!("execute_single_subtask called with an empty subtask list")
1469 })?;
1470 let child_seed = Seed {
1471 id: Uuid::new_v4(),
1472 goal: task.description.clone(),
1473 constraints: parent_seed.constraints.clone(),
1474 acceptance_criteria: vec!["Task completes successfully".into()],
1475 ontology: parent_seed.ontology.clone(),
1476 created_at: chrono::Utc::now(),
1477 generation: parent_seed.generation + 1,
1478 parent_seed_id: Some(parent_seed.id),
1479 cspace_hint: None,
1480 original_request: parent_seed.original_request.clone(),
1481 output_schema: None,
1482 project_id: parent_seed.project_id,
1483 workspace_context: parent_seed.workspace_context.clone(),
1484 mount_paths: parent_seed.mount_paths.clone(),
1485 };
1486 match self
1487 .lifecycle
1488 .spawn_and_run(&child_seed, Priority::Normal)
1489 .await
1490 {
1491 Ok(result) => {
1492 task.result = Some(result.output.clone());
1493 }
1494 Err(e) => {
1495 task.result = Some(format!("Failed: {e}"));
1496 task.success = false;
1497 }
1498 }
1499 Ok(vec![task])
1500 }
1501
1502 async fn delegate_via_a2a(
1512 &self,
1513 subtasks: Vec<SubTask>,
1514 parent_seed: &Seed,
1515 a2a: &Arc<crate::a2a::A2AProtocol>,
1516 ) -> Result<Vec<SubTask>> {
1517 use crate::a2a::TaskPriority;
1518 use tokio::task::JoinSet;
1519
1520 tracing::info!(
1521 subtasks = subtasks.len(),
1522 "Delegating subtasks via A2A protocol"
1523 );
1524
1525 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
1526 let subtask_count = subtasks.len();
1527 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
1528
1529 for (idx, subtask) in subtasks.into_iter().enumerate() {
1530 let capability = subtask.required_capability.clone();
1531 let description = subtask.description.clone();
1532 let subtask_id = subtask.id;
1533 let role = subtask.role.clone();
1534 let a2a = Arc::clone(a2a);
1535 let parent_seed = parent_seed.clone();
1536 let lifecycle = self.lifecycle.clone();
1537
1538 join_set.spawn(async move {
1539 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
1541 a2a.query_capabilities(cap).await.ok()
1542 .and_then(|agents| agents.into_iter().next())
1543 } else {
1544 None
1545 };
1546
1547 let (output, success) = if let Some(ref target_card) = target {
1548 let target_id = target_card.agent_id;
1549 tracing::info!(
1550 subtask_index = idx,
1551 target = %target_card.name,
1552 target_id = %target_id,
1553 "A2A dispatching subtask"
1554 );
1555
1556 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
1557 "parent_seed": parent_seed.id.to_string(),
1558 "goal": description,
1559 }))
1560 .with_priority(TaskPriority::Normal);
1561
1562 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
1564
1565 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
1567 Some(Ok(result)) => {
1568 let out = result.get("output")
1569 .and_then(|v| v.as_str())
1570 .unwrap_or("")
1571 .to_string();
1572 let ok = result.get("success")
1573 .and_then(|v| v.as_bool())
1574 .unwrap_or(false);
1575 (out, ok)
1576 }
1577 Some(Err(e)) => {
1578 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
1579 (format!("Failed: {e}"), false)
1580 }
1581 None => {
1582 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
1584 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1585 }
1586 }
1587 } else {
1588 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
1589 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1590 };
1591
1592 (idx, SubTask {
1593 id: subtask_id,
1594 description,
1595 required_capability: capability,
1596 result: Some(output),
1597 success,
1598 role,
1599 })
1600 });
1601 }
1602
1603 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
1604 while let Some(join_result) = join_set.join_next().await {
1605 match join_result {
1606 Ok((idx, subtask)) => {
1607 results[idx] = Some(subtask);
1608 }
1609 Err(e) => {
1610 tracing::error!(error = %e, "A2A task panicked");
1611 }
1612 }
1613 }
1614
1615 let completed: Vec<SubTask> = results
1619 .into_iter()
1620 .enumerate()
1621 .map(|(idx, opt)| {
1622 opt.unwrap_or_else(|| SubTask {
1623 id: Uuid::new_v4(),
1624 description: format!("subtask {idx} (failed)"),
1625 required_capability: None,
1626 result: Some("Task panicked or did not complete".into()),
1627 success: false,
1628 role: AgentRole::default(),
1629 })
1630 })
1631 .collect();
1632 tracing::info!(
1633 completed = completed.len(),
1634 succeeded = completed.iter().filter(|r| r.success).count(),
1635 "A2A delegation complete"
1636 );
1637 Ok(completed)
1638 }
1639
1640 async fn delegate_via_lifecycle(
1641 &self,
1642 subtasks: Vec<SubTask>,
1643 parent_seed: &Seed,
1644 ) -> Result<Vec<SubTask>> {
1645 use crate::agent_group::OxiosAgentGroup;
1646 use tokio::task::JoinSet;
1647
1648 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
1649 let group = OxiosAgentGroup::new(parent_seed, descriptions);
1650 let group_id = group.id;
1651
1652 self.event_bus.publish(KernelEvent::AgentGroupCreated {
1653 group_id,
1654 agent_count: group.agents.len(),
1655 })?;
1656
1657 tracing::info!(
1658 group_id = %group_id,
1659 agent_count = group.agents.len(),
1660 "Starting parallel multi-agent execution"
1661 );
1662
1663 let mut join_set: JoinSet<(
1664 usize,
1665 crate::types::AgentId,
1666 Result<oxios_ouroboros::ExecutionResult>,
1667 )> = JoinSet::new();
1668
1669 for (idx, agent_entry) in group.agents.iter().enumerate() {
1670 let child_seed = agent_entry.seed.clone();
1671 let agent_id = agent_entry.id;
1672 let lifecycle = self.lifecycle.clone();
1673
1674 join_set.spawn(async move {
1675 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1676 (idx, agent_id, result)
1677 });
1678 }
1679
1680 let subtask_count = subtasks.len();
1681 let mut completed = vec![None; subtask_count];
1682 while let Some(join_result) = join_set.join_next().await {
1683 match join_result {
1684 Ok((idx, agent_id, Ok(exec_result))) => {
1685 let _ = self
1686 .event_bus
1687 .publish(KernelEvent::AgentGroupMemberCompleted {
1688 group_id,
1689 agent_id,
1690 success: exec_result.success,
1691 });
1692 completed[idx] = Some(SubTask {
1693 id: subtasks[idx].id,
1694 description: subtasks[idx].description.clone(),
1695 required_capability: subtasks[idx].required_capability.clone(),
1696 result: Some(exec_result.output.clone()),
1697 success: exec_result.success,
1698 role: subtasks[idx].role.clone(),
1699 });
1700 }
1701 Ok((idx, agent_id, Err(e))) => {
1702 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1703 let _ = self
1704 .event_bus
1705 .publish(KernelEvent::AgentGroupMemberCompleted {
1706 group_id,
1707 agent_id,
1708 success: false,
1709 });
1710 completed[idx] = Some(SubTask {
1711 id: subtasks[idx].id,
1712 description: subtasks[idx].description.clone(),
1713 required_capability: subtasks[idx].required_capability.clone(),
1714 result: Some(format!("Failed: {e}")),
1715 success: false,
1716 role: subtasks[idx].role.clone(),
1717 });
1718 }
1719 Err(e) => {
1720 tracing::error!(error = %e, "JoinSet task panicked");
1721 }
1722 }
1723 }
1724
1725 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1726 let succeeded = completed.iter().filter(|r| r.success).count();
1727 let total = completed.len();
1728
1729 tracing::info!(
1730 group_id = %group_id,
1731 succeeded,
1732 total,
1733 "Parallel multi-agent execution complete"
1734 );
1735
1736 let _ = self
1738 .state_store
1739 .save_json("agent_groups", &group_id.to_string(), &group)
1740 .await;
1741 self.git_commit(
1742 &format!("agent_groups/{group_id}.json"),
1743 "orchestrator: save group",
1744 );
1745
1746 Ok(completed)
1747 }
1748}
1749
1750#[derive(Debug, Clone)]
1752#[allow(unused)]
1753struct InterviewSession {
1754 id: String,
1755 interview: InterviewResult,
1756 phase: Phase,
1757 seed_id: Option<Uuid>,
1758 agent_id: Option<AgentId>,
1759}
1760
1761fn default_chat_mode() -> String {
1762 "chat".into()
1763}
1764
1765#[derive(Debug, Clone, Serialize, Deserialize)]
1767pub struct OrchestrationResult {
1768 #[serde(skip_serializing_if = "Option::is_none")]
1770 pub session_id: Option<String>,
1771 #[serde(skip_serializing_if = "Option::is_none")]
1773 pub primary_project_id: Option<Uuid>,
1774 #[serde(skip_serializing_if = "Option::is_none")]
1776 pub project_tag: Option<String>,
1777 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1779 pub active_mount_ids: Vec<MountId>,
1780 #[serde(skip_serializing_if = "Option::is_none")]
1782 pub mount_tag: Option<String>,
1783 pub response: String,
1785 #[serde(skip_serializing_if = "Option::is_none")]
1787 pub seed_id: Option<Uuid>,
1788 #[serde(skip_serializing_if = "Option::is_none")]
1790 pub agent_id: Option<AgentId>,
1791 pub phase_reached: Phase,
1793 pub evaluation_passed: Option<bool>,
1799 #[serde(skip_serializing_if = "Option::is_none")]
1801 pub output: Option<String>,
1802 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1804 pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1805 #[serde(default, skip_serializing_if = "Option::is_none")]
1813 pub interview_questions:
1814 Option<Vec<oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput>>,
1815 #[serde(default, skip_serializing_if = "Option::is_none")]
1818 pub interview_round: Option<u32>,
1819 #[serde(default, skip_serializing_if = "Option::is_none")]
1822 pub interview_ambiguity: Option<f64>,
1823 #[serde(default = "default_chat_mode")]
1825 pub mode: String,
1826}
1827
1828fn format_questions(questions: &[String]) -> String {
1830 if questions.is_empty() {
1831 "I need a bit more clarification before I can proceed.".to_string()
1832 } else {
1833 format!(
1834 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1835 questions
1836 .iter()
1837 .enumerate()
1838 .map(|(i, q)| format!("{}. {}", i + 1, q))
1839 .collect::<Vec<_>>()
1840 .join("\n")
1841 )
1842 }
1843}
1844
1845fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1848 let mut lines = Vec::new();
1849
1850 if exec.success {
1851 lines.push(format!("â
'{}'", seed.goal));
1852 } else {
1853 lines.push(format!(
1854 "â ī¸ '{}'ė(ëĨŧ) ėëíė§ë§ ėė í ėąęŗĩíė§ ëĒģíėĩëë¤.",
1855 seed.goal
1856 ));
1857 }
1858
1859 if !exec.output.is_empty() {
1861 let preview = if exec.output.len() > 500 {
1862 let mut end = 500;
1865 while end > 0 && !exec.output.is_char_boundary(end) {
1866 end -= 1;
1867 }
1868 format!("{}...", &exec.output[..end])
1869 } else {
1870 exec.output.clone()
1871 };
1872 lines.push(String::new());
1873 lines.push(preview);
1874 }
1875
1876 lines.join("\n")
1877}
1878
1879fn should_split_seed(seed: &Seed) -> bool {
1884 seed.acceptance_criteria.len() >= 5
1888}
1889
1890fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1896 seed.acceptance_criteria
1897 .iter()
1898 .map(|criterion| {
1899 let desc = format!("{}: {}", seed.goal, criterion);
1900 let desc_lower = desc.to_lowercase();
1901
1902 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1904 Some("code-review".to_string())
1905 } else if desc_lower.contains("test") {
1906 Some("testing".to_string())
1907 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1908 Some("refactoring".to_string())
1909 } else if desc_lower.contains("write")
1910 || desc_lower.contains("create")
1911 || desc_lower.contains("implement")
1912 {
1913 Some("code-generation".to_string())
1914 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1915 Some("debugging".to_string())
1916 } else {
1917 None
1918 };
1919
1920 let mut subtask = SubTask::new(desc);
1921 subtask.required_capability = cap;
1922 subtask
1923 })
1924 .collect()
1925}
1926
1927fn format_result_combined(combined: &str) -> String {
1929 if combined.is_empty() {
1930 "No subtasks completed successfully.".to_string()
1931 } else {
1932 format!("Multi-agent execution completed:\n\n{combined}")
1933 }
1934}
1935
1936async fn run_via_lifecycle(
1938 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1939 parent_seed: &Seed,
1940 description: &str,
1941) -> (String, bool) {
1942 let child_seed = Seed {
1943 id: Uuid::new_v4(),
1944 goal: description.to_string(),
1945 constraints: parent_seed.constraints.clone(),
1946 acceptance_criteria: vec!["Task completes successfully".into()],
1947 ontology: parent_seed.ontology.clone(),
1948 created_at: chrono::Utc::now(),
1949 generation: parent_seed.generation + 1,
1950 parent_seed_id: Some(parent_seed.id),
1951 cspace_hint: None,
1952 original_request: parent_seed.original_request.clone(),
1953 output_schema: None,
1954 project_id: parent_seed.project_id,
1955 workspace_context: parent_seed.workspace_context.clone(),
1956 mount_paths: parent_seed.mount_paths.clone(),
1957 };
1958 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1959 Ok(result) => (result.output, result.success),
1960 Err(e) => (format!("Failed: {e}"), false),
1961 }
1962}
1963
1964fn build_workspace_context_body(mounts: &[crate::mount::Mount]) -> Option<String> {
1973 if mounts.is_empty() {
1974 return None;
1975 }
1976 let mut out = String::new();
1977 out.push_str("### Active Mounts\n");
1978
1979 for (i, m) in mounts.iter().enumerate() {
1980 let primary = i == 0;
1981 let path = m
1982 .primary_path()
1983 .map(|p| p.to_string_lossy().to_string())
1984 .unwrap_or_else(|| "(no path)".to_string());
1985
1986 if primary {
1987 out.push_str(&format!("- **{}** â {}\n", m.name, path));
1988 if !m.auto_description.is_empty() {
1989 let desc: String = m
1991 .auto_description
1992 .lines()
1993 .take(3)
1994 .collect::<Vec<_>>()
1995 .join("\n ");
1996 out.push_str(&format!(" {}\n", desc));
1997 }
1998 let summary = m.summary_line();
1999 if !summary.is_empty() {
2000 out.push_str(&format!(" _{}_\n", summary));
2001 }
2002 if m.enrichment_pending {
2003 out.push_str(" _(content changed â consider re-scanning this Mount)_\n");
2004 }
2005 } else {
2006 let summary = m.summary_line();
2008 let suffix = if summary.is_empty() {
2009 String::new()
2010 } else {
2011 format!(" â {}", summary)
2012 };
2013 out.push_str(&format!("- **{}** â {}{}\n", m.name, path, suffix));
2014 }
2015 }
2016
2017 Some(out)
2018}
2019
2020#[cfg(test)]
2021mod mount_workspace_tests {
2022 use super::*;
2023 use crate::mount::{Mount, MountSource};
2024 use std::path::PathBuf;
2025
2026 #[test]
2027 fn test_workspace_context_primary_full_secondary_terse() {
2028 let mut oxios =
2029 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2030 oxios.auto_description = "Agent OS.\nRust + tokio.".to_string();
2031 oxios.auto_meta.summary = "Rust agent OS".to_string();
2032
2033 let mut oxi = Mount::from_name_and_path("oxi", PathBuf::from("/oxi"));
2034 oxi.auto_meta.summary = "SDK".to_string();
2035
2036 let body = build_workspace_context_body(&[oxios, oxi]).unwrap();
2037 assert!(body.contains("### Active Mounts"));
2038 assert!(body.contains("Agent OS."));
2040 assert!(body.contains("_Rust agent OS_"));
2041 assert!(body.contains("**oxi** â /oxi â SDK"));
2043 }
2044
2045 #[test]
2046 fn test_workspace_context_empty_is_none() {
2047 assert!(build_workspace_context_body(&[]).is_none());
2048 }
2049
2050 #[test]
2054 fn test_resolve_mount_workspace_detects_and_collects_paths() {
2055 use crate::mount::MountManager;
2056 use oxios_memory::memory::sqlite::MemoryDatabase;
2057 use std::sync::Arc;
2058
2059 let db = Arc::new(MemoryDatabase::open_in_memory(64).unwrap());
2060 let mm = Arc::new(MountManager::new(db, None).unwrap());
2061
2062 let oxios = mm
2064 .create_mount(
2065 "oxios".to_string(),
2066 vec![PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios")],
2067 MountSource::Manual,
2068 )
2069 .unwrap();
2070 let oxi_sdk = mm
2071 .create_mount(
2072 "oxi-sdk".to_string(),
2073 vec![PathBuf::from("/Users/me/oxi")],
2074 MountSource::Manual,
2075 )
2076 .unwrap();
2077 mm.update_enrichment(oxios.id, Some("Agent OS in Rust.".to_string()), None)
2078 .unwrap();
2079
2080 let mounts = mm.get_mounts_ordered(&[oxios.id, oxi_sdk.id]);
2084 assert_eq!(mounts.len(), 2);
2085
2086 let body = build_workspace_context_body(&mounts).unwrap();
2087 assert!(body.contains("oxios"));
2088 assert!(body.contains("Agent OS in Rust."));
2089 assert!(body.contains("oxi-sdk"));
2090
2091 let mut paths = Vec::new();
2093 for m in &mounts {
2094 for p in &m.paths {
2095 if !paths.contains(p) {
2096 paths.push(p.clone());
2097 }
2098 }
2099 }
2100 assert_eq!(paths.len(), 2);
2101 assert_eq!(paths[0], PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2102 assert_eq!(paths[1], PathBuf::from("/Users/me/oxi"));
2103 }
2104
2105 #[test]
2108 fn test_detection_seeds_primary_on_name_mention() {
2109 use crate::mount::{DetectionResult, detect_mounts};
2110
2111 let oxios =
2112 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2113 let result = detect_mounts("oxios ėŊëëĻŦ롰í´ė¤", std::slice::from_ref(&oxios));
2114 assert!(matches!(result, DetectionResult::Found(id) if id == oxios.id));
2115 }
2116}