1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{
19 EvaluationResult, ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed,
20};
21use parking_lot::RwLock;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::agent_lifecycle::AgentLifecycleManager;
26use crate::event_bus::{EventBus, KernelEvent};
27use crate::git_layer::GitLayer;
28use crate::metrics::get_metrics;
29use crate::mount::{MountId, MountManager};
30use crate::project::{ConversationBuffer, ProjectManager};
31use crate::scheduler::Priority;
32use crate::state_store::StateStore;
33use crate::types::AgentId;
34
35#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
37pub enum AgentRole {
38 #[default]
40 Worker,
41 Manager,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SubTask {
48 pub id: Uuid,
50 pub description: String,
52 pub required_capability: Option<String>,
54 pub result: Option<String>,
56 pub success: bool,
58 #[serde(default)]
60 pub role: AgentRole,
61}
62
63impl SubTask {
64 pub fn new(description: impl Into<String>) -> Self {
66 Self {
67 id: Uuid::new_v4(),
68 description: description.into(),
69 required_capability: None,
70 result: None,
71 success: false,
72 role: AgentRole::default(),
73 }
74 }
75
76 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
78 self.required_capability = Some(cap.into());
79 self
80 }
81}
82
83pub struct Orchestrator {
85 ouroboros: Arc<dyn OuroborosProtocol>,
86 event_bus: EventBus,
87 state_store: Arc<StateStore>,
88 git_layer: Option<Arc<GitLayer>>,
90 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
92 lifecycle: AgentLifecycleManager,
94 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
96 project_manager: RwLock<Option<Arc<ProjectManager>>>,
98 mount_manager: RwLock<Option<Arc<MountManager>>>,
100 conversation_buffer: RwLock<ConversationBuffer>,
102 delegation_config: DelegationConfig,
104 a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
106 evolution_config: RwLock<EvolutionConfig>,
108}
109
110#[derive(Debug, Clone)]
112struct DelegationConfig {
113 max_retries: u32,
115 base_delay_ms: u64,
117 max_delay_ms: u64,
119 #[allow(dead_code)]
121 timeout_ms: u64,
122}
123
124impl Default for DelegationConfig {
125 fn default() -> Self {
126 Self {
127 max_retries: 3,
128 base_delay_ms: 100,
129 max_delay_ms: 5000,
130 timeout_ms: 5000,
131 }
132 }
133}
134
135impl DelegationConfig {
136 fn backoff_delay(&self, attempt: u32) -> u64 {
138 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
139 delay.min(self.max_delay_ms)
140 }
141}
142
143#[derive(Debug, Clone)]
145struct EvolutionConfig {
146 max_iterations: u32,
148 score_threshold: f64,
150 #[allow(dead_code)]
152 eval_cache_enabled: bool,
153}
154
155impl From<crate::config::OrchestratorConfig> for EvolutionConfig {
156 fn from(c: crate::config::OrchestratorConfig) -> Self {
157 Self {
158 max_iterations: c.max_evolution_iterations,
159 score_threshold: c.min_evaluation_score,
160 eval_cache_enabled: c.eval_cache_enabled,
161 }
162 }
163}
164
165impl Orchestrator {
166 pub fn new(
168 ouroboros: Arc<dyn OuroborosProtocol>,
169 event_bus: EventBus,
170 state_store: Arc<StateStore>,
171 lifecycle: AgentLifecycleManager,
172 ) -> Self {
173 Self::with_config(
174 ouroboros,
175 event_bus,
176 state_store,
177 lifecycle,
178 crate::config::OrchestratorConfig::default(),
179 )
180 }
181
182 pub fn with_config(
184 ouroboros: Arc<dyn OuroborosProtocol>,
185 event_bus: EventBus,
186 state_store: Arc<StateStore>,
187 lifecycle: AgentLifecycleManager,
188 config: crate::config::OrchestratorConfig,
189 ) -> Self {
190 let evolution_config = EvolutionConfig::from(config.clone());
191 Self {
192 ouroboros,
193 event_bus,
194 state_store,
195 git_layer: None,
196 sessions: RwLock::new(std::collections::HashMap::new()),
197 lifecycle,
198 a2a: None,
199 project_manager: RwLock::new(None),
200 mount_manager: RwLock::new(None),
201 conversation_buffer: RwLock::new(ConversationBuffer::default()),
202 delegation_config: DelegationConfig::default(),
203 a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
204 evolution_config: RwLock::new(evolution_config),
205 }
206 }
207
208 pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
210 *self.project_manager.write() = Some(manager);
211 }
212
213 pub fn set_mount_manager(&self, manager: Arc<MountManager>) {
215 *self.mount_manager.write() = Some(manager);
216 }
217
218 pub fn mount_manager(&self) -> Option<Arc<MountManager>> {
220 self.mount_manager.read().as_ref().cloned()
221 }
222
223 pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
225 self.project_manager.read().as_ref().cloned()
226 }
227
228 pub fn detect_project_tag(&self, message: &str) -> Option<String> {
230 self.project_manager.read().as_ref().and_then(|pm| {
231 let projects = pm.list_projects();
232 let result = crate::project::detect_project(message, &projects);
233 match result {
234 crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
235 crate::project::DetectionResult::NoMatch { .. } => None,
236 }
237 })
238 }
239
240 fn resolve_mount_workspace(
254 &self,
255 mount_ids: Option<&str>,
256 project_ids: Option<&str>,
257 user_message: &str,
258 ) -> (
259 Vec<MountId>,
260 Option<String>,
261 Vec<std::path::PathBuf>,
262 String,
263 ) {
264 use crate::mount::Mount;
265
266 let Some(mm) = self.mount_manager() else {
267 return (Vec::new(), None, Vec::new(), String::new());
268 };
269
270 let mut ids: Vec<MountId> = if let Some(ids_str) = mount_ids {
272 ids_str
273 .split(',')
274 .filter_map(|s| MountId::parse_str(s.trim()).ok())
275 .collect()
276 } else {
277 match mm.detect(user_message) {
278 crate::mount::DetectionResult::Found(id) => vec![id],
279 crate::mount::DetectionResult::NoMatch { .. } => vec![],
280 }
281 };
282 let mut seen = std::collections::HashSet::new();
284 ids.retain(|id| seen.insert(*id));
285
286 let project_for_instructions: Option<crate::project::Project> = if let Some(project_ids_str) =
293 project_ids
294 && let Some(first_id_str) = project_ids_str.split(',').next()
295 && let Some(pm) = self.project_manager()
296 && let Ok(pid) = Uuid::parse_str(first_id_str.trim())
297 {
298 let proj = pm.get_project(pid);
299 if let Some(ref project) = proj {
300 for mid in &project.mount_ids {
301 if !ids.contains(mid) {
302 ids.push(*mid);
303 }
304 }
305 }
306 proj
307 } else {
308 None
309 };
310
311 if ids.is_empty() {
312 return (Vec::new(), None, Vec::new(), String::new());
313 }
314
315 for id in &ids {
318 mm.touch(*id);
319 }
320
321 let mounts: Vec<Mount> = mm.get_mounts_ordered(&ids);
322 if mounts.is_empty() {
323 return (Vec::new(), None, Vec::new(), String::new());
324 }
325
326 let mut paths: Vec<std::path::PathBuf> = Vec::new();
328 for m in &mounts {
329 for p in &m.paths {
330 if !paths.contains(p) {
331 paths.push(p.clone());
332 }
333 }
334 }
335
336 if let Some(project) = &project_for_instructions
341 && project.mount_ids.is_empty()
342 && !project.paths.is_empty()
343 {
344 for p in &project.paths {
345 if !paths.contains(p) {
346 paths.push(p.clone());
347 }
348 }
349 }
350
351 let tag = if mounts.len() == 1 {
353 mounts[0].tag()
354 } else {
355 let names: Vec<&str> = mounts.iter().map(|m| m.name.as_str()).collect();
356 format!("[đ§ {}]", names.join(" + "))
357 };
358
359 let mut context = build_workspace_context_body(&mounts).unwrap_or_default();
360
361 if let Some(project) = project_for_instructions {
367 let instructions = if project.instructions.len() > 2000 {
369 let mut end = 2000;
370 while end > 0 && !project.instructions.is_char_boundary(end) {
371 end -= 1;
372 }
373 format!("{}...", &project.instructions[..end])
374 } else {
375 project.instructions.clone()
376 };
377 if !instructions.is_empty() {
378 context.push_str(&format!(
379 "\n### Project Instructions: {}\n{}\n",
380 project.name, instructions
381 ));
382 }
383 }
384
385 const MAX_CONTEXT_CHARS: usize = 6000;
387 if context.len() > MAX_CONTEXT_CHARS {
388 let mut end = MAX_CONTEXT_CHARS;
389 while end > 0 && !context.is_char_boundary(end) {
390 end -= 1;
391 }
392 context.truncate(end);
393 context.push_str("\n...(context truncated)...\n");
394 }
395
396 let context_opt = if context.is_empty() {
397 None
398 } else {
399 Some(context)
400 };
401 (ids, context_opt, paths, tag)
402 }
403
404 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
406 self.a2a = Some(a2a);
407 }
408
409 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
411 self.git_layer = Some(git_layer);
412 }
413
414 pub fn update_evolution_config(&self, config: crate::config::OrchestratorConfig) {
418 *self.evolution_config.write() = EvolutionConfig::from(config);
419 tracing::info!("Orchestrator evolution config hot-reloaded");
420 }
421
422 pub async fn restore_sessions(&self) {
429 let summaries = match self.state_store.list_sessions().await {
430 Ok(s) => s,
431 Err(e) => {
432 tracing::warn!(error = %e, "Failed to list sessions for restore");
433 return;
434 }
435 };
436
437 let mut restored = 0usize;
438 for summary in &summaries {
439 let Some(ref seed_id_str) = summary.active_seed_id else {
441 continue;
442 };
443
444 let session_id = crate::state_store::SessionId(summary.id.clone());
445 let session = match self.state_store.load_session(&session_id).await {
446 Ok(Some(s)) => s,
447 Ok(None) => continue,
448 Err(e) => {
449 tracing::warn!(
450 session_id = %summary.id,
451 error = %e,
452 "Failed to load session for restore"
453 );
454 continue;
455 }
456 };
457
458 let mut interview = oxios_ouroboros::InterviewResult::new();
462 interview.is_task = true; interview.original_message = session
464 .user_messages
465 .last()
466 .map(|m| m.content.clone())
467 .unwrap_or_default();
468
469 let history: Vec<oxios_ouroboros::interview::Exchange> = session
471 .user_messages
472 .iter()
473 .zip(session.agent_responses.iter())
474 .map(|(user, agent)| oxios_ouroboros::interview::Exchange {
475 user: user.content.clone(),
476 agent: agent.content.clone(),
477 })
478 .collect();
479 interview.conversation_history = history;
480
481 let seed_id = seed_id_str.parse::<Uuid>().ok();
482
483 let interview_session = InterviewSession {
484 id: session.id.0.clone(),
485 interview,
486 phase: Phase::Execute,
487 seed_id,
488 agent_id: None,
489 };
490
491 {
492 let mut sessions = self.sessions.write();
493 sessions.insert(session.id.0.clone(), interview_session);
494 }
495
496 restored += 1;
497 }
498
499 if restored > 0 {
500 tracing::info!(restored, total = summaries.len(), "Sessions restored");
501 }
502 }
503
504 fn git_commit(&self, rel_path: &str, message: &str) {
506 if let Some(ref gl) = self.git_layer
507 && gl.is_enabled()
508 {
509 let _ = gl.commit_file(rel_path, message);
510 }
511 }
512
513 pub async fn handle_message(
522 &self,
523 user_id: &str,
524 user_message: &str,
525 session_id: Option<&str>,
526 project_ids: Option<&str>,
527 mount_ids: Option<&str>,
528 request_id: &str,
529 ) -> Result<OrchestrationResult> {
530 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
531 get_metrics().messages.inc();
532 let orch_start = std::time::Instant::now();
533
534 let session_id = session_id
535 .map(String::from)
536 .unwrap_or_else(|| Uuid::new_v4().to_string());
537
538 tracing::info!(session_id = %session_id, user_id = %user_id, request_id = %request_id, content_len = user_message.len(), "Orchestrator handling message");
539
540 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
543 ids_str
545 .split(',')
546 .next()
547 .and_then(|s| Uuid::parse_str(s.trim()).ok())
548 } else {
549 self.detect_project_tag(user_message).and_then(|_tag| {
551 self.project_manager().and_then(|pm| {
553 let projects = pm.list_projects();
554 let result = crate::project::detect_project(user_message, &projects);
555 match result {
556 crate::project::DetectionResult::Found(id) => Some(id),
557 crate::project::DetectionResult::NoMatch { .. } => None,
558 }
559 })
560 })
561 };
562
563 let project_tag = primary_project_id
565 .and_then(|id| {
566 self.project_manager()
567 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
568 })
569 .unwrap_or_default();
570
571 if let Some(pid) = primary_project_id
573 && let Some(pm) = self.project_manager()
574 {
575 pm.touch(pid);
576 }
577
578 let (active_mount_ids, workspace_context, mount_paths, mount_tag) =
584 self.resolve_mount_workspace(mount_ids, project_ids, user_message);
585 let mount_tag_opt = if mount_tag.is_empty() {
586 None
587 } else {
588 Some(mount_tag.clone())
589 };
590
591 let project_tag = if mount_tag_opt.is_some() {
595 String::new()
596 } else {
597 project_tag
598 };
599
600 let _conversation_turns = {
601 let buffer = self.conversation_buffer.read();
602 buffer.turns().iter().cloned().collect::<Vec<_>>()
603 };
604
605 {
607 let mut buffer = self.conversation_buffer.write();
608 buffer.push_user(user_message);
609 }
610
611 self.publish_phase_started(&session_id, Phase::Interview)
613 .await;
614
615 let needs_interview;
617 let existing_history: Option<Vec<_>>;
618 {
619 let sessions = self.sessions.read();
620 needs_interview = !sessions.contains_key(&session_id);
621 existing_history = if !needs_interview {
622 sessions
623 .get(&session_id)
624 .map(|s| s.interview.conversation_history.clone())
625 } else {
626 None
627 };
628 }
630
631 let interview = {
633 tracing::info!(phase = "interview", "Starting interview phase");
634 if needs_interview {
635 self.ouroboros.interview(user_message).await?
636 } else {
637 let multi_turn_context = {
640 let mut context_parts = Vec::new();
641 if let Some(ref history) = existing_history {
642 for exchange in history {
643 context_parts.push(format!(
644 "User: {}\nAgent: {}",
645 exchange.user, exchange.agent
646 ));
647 }
648 }
649 context_parts.push(format!("User: {user_message}"));
650 context_parts.join("\n\n")
651 };
652
653 {
658 let mut sessions = self.sessions.write();
659 if let Some(s) = sessions.get_mut(&session_id) {
660 let all_questions = s.interview.questions.join("\n");
661 s.interview.add_to_history(user_message, &all_questions);
662 }
663 }
664
665 self.ouroboros.interview(&multi_turn_context).await?
667 }
668 };
669
670 if !interview.is_task {
672 tracing::info!(session_id = %session_id, "Chat response (non-task)");
673
674 let response_text = if interview.chat_response.is_empty() {
675 "Hello! How can I help you today?".to_string()
676 } else {
677 interview.chat_response.clone()
678 };
679
680 {
682 let mut buffer = self.conversation_buffer.write();
683 buffer.push_agent(&response_text, None);
684 }
685
686 {
689 let mut sessions = self.sessions.write();
690 if let Some(session) = sessions.get_mut(&session_id) {
691 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
692 session
693 .interview
694 .add_to_history(user_message, &response_text);
695 } else {
696 let mut interview = InterviewResult::new();
698 interview.is_task = false;
699 interview.chat_response = response_text.clone();
700 interview.add_to_history(user_message, &response_text);
701 sessions.insert(
702 session_id.clone(),
703 InterviewSession {
704 id: session_id.clone(),
705 interview,
706 phase: Phase::Interview,
707 seed_id: None,
708 agent_id: None,
709 },
710 );
711 }
712 }
713
714 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
715 .await;
716
717 return Ok(OrchestrationResult {
718 session_id: Some(session_id.clone()),
719 primary_project_id,
720 project_tag: Some(project_tag.clone()),
721 active_mount_ids: active_mount_ids.clone(),
722 mount_tag: mount_tag_opt.clone(),
723 response: response_text,
724 seed_id: None,
725 agent_id: None,
726 phase_reached: Phase::Interview,
727 evaluation_passed: None,
728 output: None,
729 tool_calls: vec![],
730 interview_questions: None,
731 interview_round: None,
732 interview_ambiguity: None,
733 mode: "ouroboros".to_string(),
734 });
735 }
736
737 if !interview.ready_for_seed {
739 {
741 let mut sessions = self.sessions.write();
742 let session =
743 sessions
744 .entry(session_id.clone())
745 .or_insert_with(|| InterviewSession {
746 id: session_id.clone(),
747 interview: interview.clone(),
748 phase: Phase::Interview,
749 seed_id: None,
750 agent_id: None,
751 });
752
753 let questions_text = interview.questions.join("\n");
754
755 let is_first_round = session.interview.conversation_history.is_empty();
760 if is_first_round {
761 let original = if interview.original_message.is_empty() {
762 user_message.to_string()
763 } else {
764 interview.original_message.clone()
765 };
766 session.interview.add_to_history(&original, &questions_text);
767 } else {
768 let last_answer = session.interview.answers.last().cloned();
770 if let Some(ref ans) = last_answer
771 && !ans.is_empty()
772 {
773 session.interview.add_to_history(ans, &questions_text);
774 }
775 }
776 } let questions = interview
779 .questions
780 .iter()
781 .filter(|q| !q.is_empty())
782 .cloned()
783 .collect::<Vec<_>>();
784
785 tracing::info!(
786 session_id = %session_id,
787 ambiguity = interview.ambiguity.ambiguity(),
788 questions = questions.len(),
789 "Interview needs clarification"
790 );
791
792 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
793 .await;
794
795 let structured_input = if existing_history.is_some() {
805 let mut context_parts = Vec::new();
808 if let Some(ref history) = existing_history {
809 for exchange in history {
810 context_parts.push(format!(
811 "User: {}\nAgent: {}",
812 exchange.user, exchange.agent
813 ));
814 }
815 }
816 context_parts.push(format!("User: {user_message}"));
817 context_parts.join("\n\n")
818 } else {
819 user_message.to_string()
820 };
821
822 let mut structured = match self.ouroboros.interview_structured(&structured_input).await
823 {
824 Ok(Some(s)) if !s.is_empty() => Some(s),
825 Ok(_) => None,
826 Err(e) => {
827 tracing::warn!(error = %e, "interview_structured failed; falling back to markdown");
828 None
829 }
830 };
831
832 if structured.is_none() && !questions.is_empty() {
838 structured = Some(
839 questions
840 .iter()
841 .enumerate()
842 .map(
843 |(i, q)| oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput {
844 id: format!("q{}", i + 1),
845 text: q.clone(),
846 kind: "free_text".to_string(),
847 options: vec![],
848 },
849 )
850 .collect(),
851 );
852 }
853
854 let interview_round = {
857 let sessions = self.sessions.read();
858 sessions
859 .get(&session_id)
860 .map(|s| (s.interview.answers.len().saturating_sub(1)).max(1) as u32)
861 .unwrap_or(1)
862 };
863
864 return Ok(OrchestrationResult {
865 session_id: Some(session_id.clone()),
866 primary_project_id,
867 project_tag: Some(project_tag.clone()),
868 active_mount_ids: active_mount_ids.clone(),
869 mount_tag: mount_tag_opt.clone(),
870 response: format_questions(&questions),
871 seed_id: None,
872 agent_id: None,
873 phase_reached: Phase::Interview,
874 evaluation_passed: None,
875 output: None,
876 tool_calls: vec![],
877 interview_questions: structured,
878 interview_round: Some(interview_round),
879 interview_ambiguity: Some(interview.ambiguity.ambiguity()),
880 mode: "ouroboros".to_string(),
881 });
882 }
883
884 {
888 let mut buffer = self.conversation_buffer.write();
889 buffer.push_agent("[interview: ready]", None);
890 }
891
892 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
894 .await;
895 self.publish_phase_started(&session_id, Phase::Seed).await;
896
897 let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
903
904 let mut seed = if is_simple {
905 tracing::info!(
906 phase = "seed",
907 method = "from_message",
908 "Simple task â ad-hoc seed"
909 );
910 Seed::from_message(&interview.original_message)
911 } else {
912 tracing::info!(
913 phase = "seed",
914 method = "llm",
915 "Complex task â LLM-generated seed"
916 );
917 self.ouroboros.generate_seed(&interview).await?
918 };
919 seed.project_id = primary_project_id;
920 seed.workspace_context = workspace_context.clone();
921 seed.mount_paths = mount_paths.clone();
922
923 self.save_seed(&seed).await?;
925
926 self.event_bus
928 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
929
930 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
931 .await;
932 self.publish_phase_started(&session_id, Phase::Execute)
933 .await;
934
935 if should_split_seed(&seed) {
939 let subtasks = split_into_subtasks(&seed);
940 if subtasks.len() > 1 {
941 tracing::info!(
942 phase = "delegate",
943 subtasks = subtasks.len(),
944 "Delegating to multi-agent"
945 );
946 let results = self.delegate_subtasks(subtasks, &seed).await?;
947
948 let combined: String = results
950 .iter()
951 .filter(|r| r.success)
952 .filter_map(|r| r.result.as_deref())
953 .collect::<Vec<_>>()
954 .join("\n\n");
955
956 let all_passed = results.iter().all(|r| r.success);
957
958 {
960 let mut sessions = self.sessions.write();
961 sessions.remove(&session_id);
962 }
963
964 tracing::info!(
965 session_id = %session_id,
966 subtasks = results.len(),
967 passed = all_passed,
968 "Multi-agent orchestration complete"
969 );
970
971 return Ok(OrchestrationResult {
972 session_id: Some(session_id),
973 primary_project_id,
974 project_tag: Some(project_tag.clone()),
975 active_mount_ids: active_mount_ids.clone(),
976 mount_tag: mount_tag_opt.clone(),
977 response: format_result_combined(&combined),
978 seed_id: Some(seed.id),
979 agent_id: None,
980 phase_reached: Phase::Execute,
981 evaluation_passed: Some(all_passed),
982 output: Some(combined),
983 tool_calls: vec![],
984 interview_questions: None,
985 interview_round: None,
986 interview_ambiguity: None,
987 mode: "ouroboros".to_string(),
988 });
989 }
990 }
991
992 {
994 let mut buffer = self.conversation_buffer.write();
995 buffer.push_agent("[multi-agent: complete]", None);
996 }
997
998 tracing::info!(phase = "execute", "Starting execution phase");
1000 let exec_result = self
1001 .lifecycle
1002 .spawn_and_run(&seed, Priority::Normal)
1003 .await?;
1004
1005 self.lifecycle.reap_zombies();
1007
1008 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
1009 .await;
1010
1011 let (final_result, final_seed, passed, phase_reached) = if let Some(ref schema) =
1018 seed.output_schema
1019 {
1020 let passed = match oxi_sdk::StructuredOutput::extract(
1022 &exec_result.output,
1023 &oxi_sdk::OutputMode::ValidatedJson {
1024 schema: schema.clone(),
1025 },
1026 ) {
1027 Ok(_) => {
1028 tracing::info!(session_id = %session_id, "Structured output validation passed");
1029 true
1030 }
1031 Err(e) => {
1032 tracing::warn!(session_id = %session_id, error = %e, "Structured output validation failed");
1033 false
1034 }
1035 };
1036 (exec_result, seed.clone(), passed, Phase::Execute)
1037 } else if self.should_evaluate(&seed) {
1038 self.publish_phase_started(&session_id, Phase::Evaluate)
1040 .await;
1041
1042 let (result, eval, evolved_seed) = self
1043 .run_evolution_loop(&session_id, &seed, exec_result)
1044 .await?;
1045
1046 let passed =
1047 eval.all_passed() && eval.score >= self.evolution_config.read().score_threshold;
1048
1049 self.publish_phase_completed(
1050 &session_id,
1051 Phase::Evaluate,
1052 &format!("score={:.2}", eval.score),
1053 )
1054 .await;
1055
1056 let reached = if evolved_seed.generation > 0 {
1057 Phase::Evolve
1058 } else {
1059 Phase::Evaluate
1060 };
1061
1062 (result, evolved_seed, passed, reached)
1063 } else {
1064 let passed = exec_result.success;
1066 (exec_result, seed.clone(), passed, Phase::Execute)
1067 };
1068
1069 {
1071 let mut sessions = self.sessions.write();
1072 sessions.remove(&session_id);
1073 }
1074
1075 tracing::info!(
1076 session_id = %session_id,
1077 passed,
1078 phase = %phase_reached,
1079 "Orchestration complete"
1080 );
1081
1082 let metrics = get_metrics();
1084 metrics
1085 .orch_duration
1086 .observe(orch_start.elapsed().as_secs_f64());
1087 if passed {
1088 metrics.agents_completed.inc();
1089 } else {
1090 metrics.agents_failed.inc();
1091 }
1092
1093 {
1095 let mut buffer = self.conversation_buffer.write();
1096 buffer.push_agent(&final_seed.goal, None);
1097 }
1098
1099 Ok(OrchestrationResult {
1100 session_id: Some(session_id),
1101 primary_project_id,
1102 project_tag: Some(project_tag.clone()),
1103 active_mount_ids: active_mount_ids.clone(),
1104 mount_tag: mount_tag_opt.clone(),
1105 response: format_execution_result(&final_seed, &final_result),
1106 seed_id: Some(final_seed.id),
1107 agent_id: None,
1108 phase_reached,
1109 evaluation_passed: Some(passed),
1110 output: Some(final_result.output.clone()),
1111 tool_calls: final_result.tool_calls.clone(),
1112 interview_questions: None,
1113 interview_round: None,
1114 interview_ambiguity: None,
1115 mode: "ouroboros".to_string(),
1116 })
1117 }
1118
1119 fn should_evaluate(&self, seed: &Seed) -> bool {
1124 !seed.acceptance_criteria.is_empty() && seed.output_schema.is_none()
1125 }
1126
1127 pub async fn chat(
1131 &self,
1132 _user_id: &str,
1133 user_message: &str,
1134 session_id: Option<&str>,
1135 project_ids: Option<&str>,
1136 mount_ids: Option<&str>,
1137 request_id: &str,
1138 ) -> Result<OrchestrationResult> {
1139 tracing::info!(name = "orchestrator.chat", session_id = %session_id.unwrap_or("new"), request_id = %request_id, "starting");
1140 let metrics = get_metrics();
1141 metrics.messages.inc();
1142 let orch_start = std::time::Instant::now();
1143
1144 let session_id = session_id
1145 .map(String::from)
1146 .unwrap_or_else(|| Uuid::new_v4().to_string());
1147
1148 let primary_project_id: Option<Uuid> = if let Some(ids_str) = project_ids {
1150 ids_str
1151 .split(',')
1152 .next()
1153 .and_then(|s| Uuid::parse_str(s.trim()).ok())
1154 } else {
1155 self.detect_project_tag(user_message).and_then(|_tag| {
1156 self.project_manager().and_then(|pm| {
1157 let projects = pm.list_projects();
1158 let result = crate::project::detect_project(user_message, &projects);
1159 match result {
1160 crate::project::DetectionResult::Found(id) => Some(id),
1161 crate::project::DetectionResult::NoMatch { .. } => None,
1162 }
1163 })
1164 })
1165 };
1166
1167 let project_tag = primary_project_id
1168 .and_then(|id| {
1169 self.project_manager()
1170 .and_then(|pm| pm.get_project(id).map(|p| p.tag()))
1171 })
1172 .unwrap_or_default();
1173
1174 let (active_mount_ids, workspace_context, mount_paths, mount_tag) =
1176 self.resolve_mount_workspace(mount_ids, project_ids, user_message);
1177 let mount_tag_opt = if mount_tag.is_empty() {
1178 None
1179 } else {
1180 Some(mount_tag.clone())
1181 };
1182
1183 let project_tag = if mount_tag_opt.is_some() {
1185 String::new()
1186 } else {
1187 project_tag
1188 };
1189
1190 let mut seed = Seed::from_message(user_message);
1192 seed.project_id = primary_project_id;
1193 seed.workspace_context = workspace_context;
1194 seed.mount_paths = mount_paths;
1195
1196 tracing::info!(
1198 phase = "execute",
1199 mode = "chat",
1200 "Starting direct execution"
1201 );
1202 let exec_result = self
1203 .lifecycle
1204 .spawn_and_run(&seed, Priority::Normal)
1205 .await?;
1206 self.lifecycle.reap_zombies();
1207
1208 let metrics = get_metrics();
1209 metrics
1210 .orch_duration
1211 .observe(orch_start.elapsed().as_secs_f64());
1212 if exec_result.success {
1213 metrics.agents_completed.inc();
1214 } else {
1215 metrics.agents_failed.inc();
1216 }
1217
1218 Ok(OrchestrationResult {
1219 session_id: Some(session_id),
1220 primary_project_id,
1221 project_tag: Some(project_tag),
1222 active_mount_ids: active_mount_ids.clone(),
1223 mount_tag: mount_tag_opt.clone(),
1224 response: exec_result.output.clone(),
1225 seed_id: Some(seed.id),
1226 agent_id: None,
1227 phase_reached: Phase::Execute,
1228 evaluation_passed: None,
1229 output: Some(exec_result.output),
1230 tool_calls: exec_result.tool_calls,
1231 interview_questions: None,
1232 interview_round: None,
1233 interview_ambiguity: None,
1234 mode: "chat".to_string(),
1235 })
1236 }
1237
1238 async fn execute_seed(&self, seed: &Seed) -> Result<ExecutionResult> {
1240 self.lifecycle.spawn_and_run(seed, Priority::Normal).await
1241 }
1242
1243 async fn run_evolution_loop(
1248 &self,
1249 _session_id: &str,
1250 seed: &Seed,
1251 initial_result: ExecutionResult,
1252 ) -> Result<(ExecutionResult, EvaluationResult, Seed)> {
1253 let max_iterations = self.evolution_config.read().max_iterations;
1254 let threshold = self.evolution_config.read().score_threshold;
1255
1256 let mut current_seed = seed.clone();
1257 let mut current_result = initial_result;
1258
1259 let mut best_result = current_result.clone();
1261 let mut best_seed = current_seed.clone();
1262 let mut best_eval: Option<EvaluationResult> = None;
1263
1264 for iteration in 0..=max_iterations {
1265 let evaluation = self
1267 .ouroboros
1268 .evaluate(¤t_seed, ¤t_result)
1269 .await?;
1270
1271 tracing::info!(
1272 iteration,
1273 seed_id = %current_seed.id,
1274 score = evaluation.score,
1275 passed = evaluation.all_passed(),
1276 "Evaluation complete"
1277 );
1278
1279 let _ = self.event_bus.publish(KernelEvent::EvaluationComplete {
1280 seed_id: current_seed.id,
1281 passed: evaluation.all_passed(),
1282 });
1283
1284 if best_eval
1286 .as_ref()
1287 .is_none_or(|b| evaluation.score >= b.score)
1288 {
1289 best_result = current_result.clone();
1290 best_seed = current_seed.clone();
1291 best_eval = Some(evaluation.clone());
1292 }
1293
1294 if evaluation.score >= threshold || iteration == max_iterations {
1296 if iteration == max_iterations && max_iterations > 0 {
1297 let _ = self.event_bus.publish(KernelEvent::EvolutionMaxReached {
1298 seed_id: current_seed.id,
1299 final_score: evaluation.score,
1300 iterations: iteration,
1301 });
1302 }
1303 return Ok((
1304 best_result,
1305 best_eval.ok_or_else(|| {
1306 anyhow::anyhow!(
1307 "Evolve loop exited with threshold met but no evaluation was produced"
1308 )
1309 })?,
1310 best_seed,
1311 ));
1312 }
1313
1314 if max_iterations == 0 {
1316 return Ok((
1317 best_result,
1318 best_eval.ok_or_else(|| {
1319 anyhow::anyhow!("No iterations configured and no evaluation was produced")
1320 })?,
1321 best_seed,
1322 ));
1323 }
1324
1325 let evolved = self.ouroboros.evolve(¤t_seed, &evaluation).await?;
1327 match evolved {
1328 Some(new_seed) => {
1329 tracing::info!(
1330 old_seed_id = %current_seed.id,
1331 new_seed_id = %new_seed.id,
1332 iteration,
1333 "Seed evolved, re-executing"
1334 );
1335
1336 let _ = self.event_bus.publish(KernelEvent::EvolutionStarted {
1337 seed_id: current_seed.id,
1338 new_seed_id: new_seed.id,
1339 iteration,
1340 });
1341
1342 self.save_seed(&new_seed).await?;
1344
1345 current_seed = new_seed;
1346 current_result = self.execute_seed(¤t_seed).await?;
1347 }
1348 None => {
1349 tracing::info!(
1350 seed_id = %current_seed.id,
1351 "Evolve returned None, stopping loop"
1352 );
1353 return Ok((
1354 best_result,
1355 best_eval.ok_or_else(|| {
1356 anyhow::anyhow!(
1357 "Evolve returned no seed and no evaluation was produced"
1358 )
1359 })?,
1360 best_seed,
1361 ));
1362 }
1363 }
1364 }
1365
1366 unreachable!()
1368 }
1369
1370 async fn save_seed(&self, seed: &Seed) -> Result<()> {
1372 let key = seed.id.to_string();
1373
1374 self.state_store
1375 .save_json("seeds", &key, seed)
1376 .await
1377 .context("failed to save seed to state store")?;
1378
1379 self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
1380
1381 Ok(())
1382 }
1383
1384 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
1387 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
1388 session_id: session_id.to_owned(),
1389 phase,
1390 });
1391 }
1392
1393 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
1395 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
1396 session_id: session_id.to_owned(),
1397 phase,
1398 result_summary: result.to_owned(),
1399 });
1400 }
1401
1402 pub async fn delegate_subtasks(
1410 &self,
1411 subtasks: Vec<SubTask>,
1412 parent_seed: &Seed,
1413 ) -> Result<Vec<SubTask>> {
1414 if subtasks.len() == 1 {
1416 return self.execute_single_subtask(subtasks, parent_seed).await;
1417 }
1418
1419 if let Some(ref a2a) = self.a2a {
1421 if !self.a2a_breaker.is_allowed() {
1423 tracing::warn!(
1424 state = ?self.a2a_breaker.state(),
1425 "A2A circuit breaker open, using lifecycle fallback"
1426 );
1427 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1428 }
1429
1430 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
1432 }
1433
1434 self.delegate_via_lifecycle(subtasks, parent_seed).await
1436 }
1437
1438 async fn delegate_with_retry(
1440 &self,
1441 subtasks: Vec<SubTask>,
1442 parent_seed: &Seed,
1443 a2a: &Arc<crate::a2a::A2AProtocol>,
1444 ) -> Result<Vec<SubTask>> {
1445 let mut attempt = 0;
1446 let max_retries = self.delegation_config.max_retries;
1447
1448 loop {
1449 match self
1450 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
1451 .await
1452 {
1453 Ok(results) => {
1454 self.a2a_breaker.record_success();
1455 return Ok(results);
1456 }
1457 Err(e) => {
1458 self.a2a_breaker.record_failure();
1459 attempt += 1;
1460
1461 if attempt >= max_retries {
1462 tracing::error!(
1463 attempts = attempt,
1464 error = %e,
1465 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
1466 attempt
1467 );
1468 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
1469 }
1470
1471 let delay = self.delegation_config.backoff_delay(attempt);
1473 tracing::warn!(
1474 attempt,
1475 delay_ms = delay,
1476 error = %e,
1477 "A2A delegation failed, retrying with backoff"
1478 );
1479 tokio::time::sleep(Duration::from_millis(delay)).await;
1480 }
1481 }
1482 }
1483 }
1484
1485 async fn execute_single_subtask(
1487 &self,
1488 subtasks: Vec<SubTask>,
1489 parent_seed: &Seed,
1490 ) -> Result<Vec<SubTask>> {
1491 let mut task = subtasks.into_iter().next().ok_or_else(|| {
1492 anyhow::anyhow!("execute_single_subtask called with an empty subtask list")
1493 })?;
1494 let child_seed = Seed {
1495 id: Uuid::new_v4(),
1496 goal: task.description.clone(),
1497 constraints: parent_seed.constraints.clone(),
1498 acceptance_criteria: vec!["Task completes successfully".into()],
1499 ontology: parent_seed.ontology.clone(),
1500 created_at: chrono::Utc::now(),
1501 generation: parent_seed.generation + 1,
1502 parent_seed_id: Some(parent_seed.id),
1503 cspace_hint: None,
1504 original_request: parent_seed.original_request.clone(),
1505 output_schema: None,
1506 project_id: parent_seed.project_id,
1507 workspace_context: parent_seed.workspace_context.clone(),
1508 mount_paths: parent_seed.mount_paths.clone(),
1509 };
1510 match self
1511 .lifecycle
1512 .spawn_and_run(&child_seed, Priority::Normal)
1513 .await
1514 {
1515 Ok(result) => {
1516 task.result = Some(result.output.clone());
1517 }
1518 Err(e) => {
1519 task.result = Some(format!("Failed: {e}"));
1520 task.success = false;
1521 }
1522 }
1523 Ok(vec![task])
1524 }
1525
1526 async fn delegate_via_a2a(
1536 &self,
1537 subtasks: Vec<SubTask>,
1538 parent_seed: &Seed,
1539 a2a: &Arc<crate::a2a::A2AProtocol>,
1540 ) -> Result<Vec<SubTask>> {
1541 use crate::a2a::TaskPriority;
1542 use tokio::task::JoinSet;
1543
1544 tracing::info!(
1545 subtasks = subtasks.len(),
1546 "Delegating subtasks via A2A protocol"
1547 );
1548
1549 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
1550 let subtask_count = subtasks.len();
1551 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
1552
1553 for (idx, subtask) in subtasks.into_iter().enumerate() {
1554 let capability = subtask.required_capability.clone();
1555 let description = subtask.description.clone();
1556 let subtask_id = subtask.id;
1557 let role = subtask.role.clone();
1558 let a2a = Arc::clone(a2a);
1559 let parent_seed = parent_seed.clone();
1560 let lifecycle = self.lifecycle.clone();
1561
1562 join_set.spawn(async move {
1563 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
1565 a2a.query_capabilities(cap).await.ok()
1566 .and_then(|agents| agents.into_iter().next())
1567 } else {
1568 None
1569 };
1570
1571 let (output, success) = if let Some(ref target_card) = target {
1572 let target_id = target_card.agent_id;
1573 tracing::info!(
1574 subtask_index = idx,
1575 target = %target_card.name,
1576 target_id = %target_id,
1577 "A2A dispatching subtask"
1578 );
1579
1580 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
1581 "parent_seed": parent_seed.id.to_string(),
1582 "goal": description,
1583 }))
1584 .with_priority(TaskPriority::Normal);
1585
1586 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
1588
1589 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
1591 Some(Ok(result)) => {
1592 let out = result.get("output")
1593 .and_then(|v| v.as_str())
1594 .unwrap_or("")
1595 .to_string();
1596 let ok = result.get("success")
1597 .and_then(|v| v.as_bool())
1598 .unwrap_or(false);
1599 (out, ok)
1600 }
1601 Some(Err(e)) => {
1602 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
1603 (format!("Failed: {e}"), false)
1604 }
1605 None => {
1606 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
1608 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1609 }
1610 }
1611 } else {
1612 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
1613 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
1614 };
1615
1616 (idx, SubTask {
1617 id: subtask_id,
1618 description,
1619 required_capability: capability,
1620 result: Some(output),
1621 success,
1622 role,
1623 })
1624 });
1625 }
1626
1627 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
1628 while let Some(join_result) = join_set.join_next().await {
1629 match join_result {
1630 Ok((idx, subtask)) => {
1631 results[idx] = Some(subtask);
1632 }
1633 Err(e) => {
1634 tracing::error!(error = %e, "A2A task panicked");
1635 }
1636 }
1637 }
1638
1639 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
1640 tracing::info!(
1641 completed = completed.len(),
1642 succeeded = completed.iter().filter(|r| r.success).count(),
1643 "A2A delegation complete"
1644 );
1645 Ok(completed)
1646 }
1647
1648 async fn delegate_via_lifecycle(
1649 &self,
1650 subtasks: Vec<SubTask>,
1651 parent_seed: &Seed,
1652 ) -> Result<Vec<SubTask>> {
1653 use crate::agent_group::OxiosAgentGroup;
1654 use tokio::task::JoinSet;
1655
1656 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
1657 let group = OxiosAgentGroup::new(parent_seed, descriptions);
1658 let group_id = group.id;
1659
1660 self.event_bus.publish(KernelEvent::AgentGroupCreated {
1661 group_id,
1662 agent_count: group.agents.len(),
1663 })?;
1664
1665 tracing::info!(
1666 group_id = %group_id,
1667 agent_count = group.agents.len(),
1668 "Starting parallel multi-agent execution"
1669 );
1670
1671 let mut join_set: JoinSet<(
1672 usize,
1673 crate::types::AgentId,
1674 Result<oxios_ouroboros::ExecutionResult>,
1675 )> = JoinSet::new();
1676
1677 for (idx, agent_entry) in group.agents.iter().enumerate() {
1678 let child_seed = agent_entry.seed.clone();
1679 let agent_id = agent_entry.id;
1680 let lifecycle = self.lifecycle.clone();
1681
1682 join_set.spawn(async move {
1683 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
1684 (idx, agent_id, result)
1685 });
1686 }
1687
1688 let subtask_count = subtasks.len();
1689 let mut completed = vec![None; subtask_count];
1690 while let Some(join_result) = join_set.join_next().await {
1691 match join_result {
1692 Ok((idx, agent_id, Ok(exec_result))) => {
1693 let _ = self
1694 .event_bus
1695 .publish(KernelEvent::AgentGroupMemberCompleted {
1696 group_id,
1697 agent_id,
1698 success: exec_result.success,
1699 });
1700 completed[idx] = Some(SubTask {
1701 id: subtasks[idx].id,
1702 description: subtasks[idx].description.clone(),
1703 required_capability: subtasks[idx].required_capability.clone(),
1704 result: Some(exec_result.output.clone()),
1705 success: exec_result.success,
1706 role: subtasks[idx].role.clone(),
1707 });
1708 }
1709 Ok((idx, agent_id, Err(e))) => {
1710 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
1711 let _ = self
1712 .event_bus
1713 .publish(KernelEvent::AgentGroupMemberCompleted {
1714 group_id,
1715 agent_id,
1716 success: false,
1717 });
1718 completed[idx] = Some(SubTask {
1719 id: subtasks[idx].id,
1720 description: subtasks[idx].description.clone(),
1721 required_capability: subtasks[idx].required_capability.clone(),
1722 result: Some(format!("Failed: {e}")),
1723 success: false,
1724 role: subtasks[idx].role.clone(),
1725 });
1726 }
1727 Err(e) => {
1728 tracing::error!(error = %e, "JoinSet task panicked");
1729 }
1730 }
1731 }
1732
1733 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
1734 let succeeded = completed.iter().filter(|r| r.success).count();
1735 let total = completed.len();
1736
1737 tracing::info!(
1738 group_id = %group_id,
1739 succeeded,
1740 total,
1741 "Parallel multi-agent execution complete"
1742 );
1743
1744 let _ = self
1746 .state_store
1747 .save_json("agent_groups", &group_id.to_string(), &group)
1748 .await;
1749 self.git_commit(
1750 &format!("agent_groups/{group_id}.json"),
1751 "orchestrator: save group",
1752 );
1753
1754 Ok(completed)
1755 }
1756}
1757
1758#[derive(Debug, Clone)]
1760#[allow(unused)]
1761struct InterviewSession {
1762 id: String,
1763 interview: InterviewResult,
1764 phase: Phase,
1765 seed_id: Option<Uuid>,
1766 agent_id: Option<AgentId>,
1767}
1768
1769fn default_chat_mode() -> String {
1770 "chat".into()
1771}
1772
1773#[derive(Debug, Clone, Serialize, Deserialize)]
1775pub struct OrchestrationResult {
1776 #[serde(skip_serializing_if = "Option::is_none")]
1778 pub session_id: Option<String>,
1779 #[serde(skip_serializing_if = "Option::is_none")]
1781 pub primary_project_id: Option<Uuid>,
1782 #[serde(skip_serializing_if = "Option::is_none")]
1784 pub project_tag: Option<String>,
1785 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1787 pub active_mount_ids: Vec<MountId>,
1788 #[serde(skip_serializing_if = "Option::is_none")]
1790 pub mount_tag: Option<String>,
1791 pub response: String,
1793 #[serde(skip_serializing_if = "Option::is_none")]
1795 pub seed_id: Option<Uuid>,
1796 #[serde(skip_serializing_if = "Option::is_none")]
1798 pub agent_id: Option<AgentId>,
1799 pub phase_reached: Phase,
1801 pub evaluation_passed: Option<bool>,
1807 #[serde(skip_serializing_if = "Option::is_none")]
1809 pub output: Option<String>,
1810 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1812 pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1813 #[serde(default, skip_serializing_if = "Option::is_none")]
1821 pub interview_questions:
1822 Option<Vec<oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput>>,
1823 #[serde(default, skip_serializing_if = "Option::is_none")]
1826 pub interview_round: Option<u32>,
1827 #[serde(default, skip_serializing_if = "Option::is_none")]
1830 pub interview_ambiguity: Option<f64>,
1831 #[serde(default = "default_chat_mode")]
1833 pub mode: String,
1834}
1835
1836fn format_questions(questions: &[String]) -> String {
1838 if questions.is_empty() {
1839 "I need a bit more clarification before I can proceed.".to_string()
1840 } else {
1841 format!(
1842 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1843 questions
1844 .iter()
1845 .enumerate()
1846 .map(|(i, q)| format!("{}. {}", i + 1, q))
1847 .collect::<Vec<_>>()
1848 .join("\n")
1849 )
1850 }
1851}
1852
1853fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1856 let mut lines = Vec::new();
1857
1858 if exec.success {
1859 lines.push(format!("â
'{}'", seed.goal));
1860 } else {
1861 lines.push(format!(
1862 "â ī¸ '{}'ė(ëĨŧ) ėëíė§ë§ ėė í ėąęŗĩíė§ ëĒģíėĩëë¤.",
1863 seed.goal
1864 ));
1865 }
1866
1867 if !exec.output.is_empty() {
1869 let preview = if exec.output.len() > 500 {
1870 format!("{}...", &exec.output[..500])
1871 } else {
1872 exec.output.clone()
1873 };
1874 lines.push(String::new());
1875 lines.push(preview);
1876 }
1877
1878 lines.join("\n")
1879}
1880
1881fn should_split_seed(seed: &Seed) -> bool {
1886 seed.acceptance_criteria.len() >= 5
1890}
1891
1892fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1898 seed.acceptance_criteria
1899 .iter()
1900 .map(|criterion| {
1901 let desc = format!("{}: {}", seed.goal, criterion);
1902 let desc_lower = desc.to_lowercase();
1903
1904 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1906 Some("code-review".to_string())
1907 } else if desc_lower.contains("test") {
1908 Some("testing".to_string())
1909 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1910 Some("refactoring".to_string())
1911 } else if desc_lower.contains("write")
1912 || desc_lower.contains("create")
1913 || desc_lower.contains("implement")
1914 {
1915 Some("code-generation".to_string())
1916 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1917 Some("debugging".to_string())
1918 } else {
1919 None
1920 };
1921
1922 let mut subtask = SubTask::new(desc);
1923 subtask.required_capability = cap;
1924 subtask
1925 })
1926 .collect()
1927}
1928
1929fn format_result_combined(combined: &str) -> String {
1931 if combined.is_empty() {
1932 "No subtasks completed successfully.".to_string()
1933 } else {
1934 format!("Multi-agent execution completed:\n\n{combined}")
1935 }
1936}
1937
1938async fn run_via_lifecycle(
1940 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1941 parent_seed: &Seed,
1942 description: &str,
1943) -> (String, bool) {
1944 let child_seed = Seed {
1945 id: Uuid::new_v4(),
1946 goal: description.to_string(),
1947 constraints: parent_seed.constraints.clone(),
1948 acceptance_criteria: vec!["Task completes successfully".into()],
1949 ontology: parent_seed.ontology.clone(),
1950 created_at: chrono::Utc::now(),
1951 generation: parent_seed.generation + 1,
1952 parent_seed_id: Some(parent_seed.id),
1953 cspace_hint: None,
1954 original_request: parent_seed.original_request.clone(),
1955 output_schema: None,
1956 project_id: parent_seed.project_id,
1957 workspace_context: parent_seed.workspace_context.clone(),
1958 mount_paths: parent_seed.mount_paths.clone(),
1959 };
1960 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1961 Ok(result) => (result.output, result.success),
1962 Err(e) => (format!("Failed: {e}"), false),
1963 }
1964}
1965
1966fn build_workspace_context_body(mounts: &[crate::mount::Mount]) -> Option<String> {
1975 if mounts.is_empty() {
1976 return None;
1977 }
1978 let mut out = String::new();
1979 out.push_str("### Active Mounts\n");
1980
1981 for (i, m) in mounts.iter().enumerate() {
1982 let primary = i == 0;
1983 let path = m
1984 .primary_path()
1985 .map(|p| p.to_string_lossy().to_string())
1986 .unwrap_or_else(|| "(no path)".to_string());
1987
1988 if primary {
1989 out.push_str(&format!("- **{}** â {}\n", m.name, path));
1990 if !m.auto_description.is_empty() {
1991 let desc: String = m
1993 .auto_description
1994 .lines()
1995 .take(3)
1996 .collect::<Vec<_>>()
1997 .join("\n ");
1998 out.push_str(&format!(" {}\n", desc));
1999 }
2000 let summary = m.summary_line();
2001 if !summary.is_empty() {
2002 out.push_str(&format!(" _{}_\n", summary));
2003 }
2004 if m.enrichment_pending {
2005 out.push_str(" _(content changed â consider re-scanning this Mount)_\n");
2006 }
2007 } else {
2008 let summary = m.summary_line();
2010 let suffix = if summary.is_empty() {
2011 String::new()
2012 } else {
2013 format!(" â {}", summary)
2014 };
2015 out.push_str(&format!("- **{}** â {}{}\n", m.name, path, suffix));
2016 }
2017 }
2018
2019 Some(out)
2020}
2021
2022#[cfg(test)]
2023mod mount_workspace_tests {
2024 use super::*;
2025 use crate::mount::{Mount, MountSource};
2026 use std::path::PathBuf;
2027
2028 #[test]
2029 fn test_workspace_context_primary_full_secondary_terse() {
2030 let mut oxios =
2031 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2032 oxios.auto_description = "Agent OS.\nRust + tokio.".to_string();
2033 oxios.auto_meta.summary = "Rust agent OS".to_string();
2034
2035 let mut oxi = Mount::from_name_and_path("oxi", PathBuf::from("/oxi"));
2036 oxi.auto_meta.summary = "SDK".to_string();
2037
2038 let body = build_workspace_context_body(&[oxios, oxi]).unwrap();
2039 assert!(body.contains("### Active Mounts"));
2040 assert!(body.contains("Agent OS."));
2042 assert!(body.contains("_Rust agent OS_"));
2043 assert!(body.contains("**oxi** â /oxi â SDK"));
2045 }
2046
2047 #[test]
2048 fn test_workspace_context_empty_is_none() {
2049 assert!(build_workspace_context_body(&[]).is_none());
2050 }
2051
2052 #[test]
2056 fn test_resolve_mount_workspace_detects_and_collects_paths() {
2057 use crate::mount::MountManager;
2058 use oxios_memory::memory::sqlite::MemoryDatabase;
2059 use std::sync::Arc;
2060
2061 let db = Arc::new(MemoryDatabase::open_in_memory(64).unwrap());
2062 let mm = Arc::new(MountManager::new(db, None).unwrap());
2063
2064 let oxios = mm
2066 .create_mount(
2067 "oxios".to_string(),
2068 vec![PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios")],
2069 MountSource::Manual,
2070 )
2071 .unwrap();
2072 let oxi_sdk = mm
2073 .create_mount(
2074 "oxi-sdk".to_string(),
2075 vec![PathBuf::from("/Users/me/oxi")],
2076 MountSource::Manual,
2077 )
2078 .unwrap();
2079 mm.update_enrichment(oxios.id, Some("Agent OS in Rust.".to_string()), None)
2080 .unwrap();
2081
2082 let mounts = mm.get_mounts_ordered(&[oxios.id, oxi_sdk.id]);
2086 assert_eq!(mounts.len(), 2);
2087
2088 let body = build_workspace_context_body(&mounts).unwrap();
2089 assert!(body.contains("oxios"));
2090 assert!(body.contains("Agent OS in Rust."));
2091 assert!(body.contains("oxi-sdk"));
2092
2093 let mut paths = Vec::new();
2095 for m in &mounts {
2096 for p in &m.paths {
2097 if !paths.contains(p) {
2098 paths.push(p.clone());
2099 }
2100 }
2101 }
2102 assert_eq!(paths.len(), 2);
2103 assert_eq!(paths[0], PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2104 assert_eq!(paths[1], PathBuf::from("/Users/me/oxi"));
2105 }
2106
2107 #[test]
2110 fn test_detection_seeds_primary_on_name_mention() {
2111 use crate::mount::{DetectionResult, detect_mounts};
2112
2113 let oxios =
2114 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
2115 let result = detect_mounts("oxios ėŊëëĻŦ롰í´ė¤", &[oxios.clone()]);
2116 assert!(matches!(result, DetectionResult::Found(id) if id == oxios.id));
2117 }
2118}