1#![allow(dead_code)]
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::{Context, Result};
21use chrono;
22use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
23use parking_lot::RwLock;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::agent_lifecycle::AgentLifecycleManager;
28use crate::event_bus::{EventBus, KernelEvent};
29use crate::git_layer::GitLayer;
30use crate::metrics::get_metrics;
31use crate::mount::{MountId, MountManager};
32use crate::project::{ConversationBuffer, ProjectManager};
33use crate::scheduler::Priority;
34use crate::state_store::StateStore;
35use crate::types::AgentId;
36
37#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
39pub enum AgentRole {
40 #[default]
42 Worker,
43 Manager,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SubTask {
50 pub id: Uuid,
52 pub description: String,
54 pub required_capability: Option<String>,
56 pub result: Option<String>,
58 pub success: bool,
60 #[serde(default)]
62 pub role: AgentRole,
63}
64
65impl SubTask {
66 pub fn new(description: impl Into<String>) -> Self {
68 Self {
69 id: Uuid::new_v4(),
70 description: description.into(),
71 required_capability: None,
72 result: None,
73 success: false,
74 role: AgentRole::default(),
75 }
76 }
77
78 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
80 self.required_capability = Some(cap.into());
81 self
82 }
83}
84
85pub struct Orchestrator {
87 #[allow(dead_code)]
88 ouroboros: Arc<dyn OuroborosProtocol>,
89 intent_engine: RwLock<Option<Arc<dyn oxios_ouroboros::IntentEngineOps>>>,
92 event_bus: EventBus,
93 state_store: Arc<StateStore>,
94 git_layer: Option<Arc<GitLayer>>,
96 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
98 lifecycle: AgentLifecycleManager,
100 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
102 project_manager: RwLock<Option<Arc<ProjectManager>>>,
104 mount_manager: RwLock<Option<Arc<MountManager>>>,
106 conversation_buffer: RwLock<ConversationBuffer>,
108 delegation_config: DelegationConfig,
110 a2a_breaker: Arc<crate::a2a::circuit_breaker::A2ACircuitBreaker>,
112 intent_config: RwLock<crate::config::IntentConfig>,
114}
115
116#[derive(Debug, Clone)]
118struct DelegationConfig {
119 max_retries: u32,
121 base_delay_ms: u64,
123 max_delay_ms: u64,
125 #[allow(dead_code)]
127 timeout_ms: u64,
128}
129
130impl Default for DelegationConfig {
131 fn default() -> Self {
132 Self {
133 max_retries: 3,
134 base_delay_ms: 100,
135 max_delay_ms: 5000,
136 timeout_ms: 5000,
137 }
138 }
139}
140
141impl DelegationConfig {
142 fn backoff_delay(&self, attempt: u32) -> u64 {
144 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
145 delay.min(self.max_delay_ms)
146 }
147}
148
149impl Orchestrator {
150 pub fn new(
152 ouroboros: Arc<dyn OuroborosProtocol>,
153 event_bus: EventBus,
154 state_store: Arc<StateStore>,
155 lifecycle: AgentLifecycleManager,
156 ) -> Self {
157 Self::with_config(
158 ouroboros,
159 event_bus,
160 state_store,
161 lifecycle,
162 crate::config::OrchestratorConfig::default(),
163 )
164 }
165
166 pub fn with_config(
168 ouroboros: Arc<dyn OuroborosProtocol>,
169 event_bus: EventBus,
170 state_store: Arc<StateStore>,
171 lifecycle: AgentLifecycleManager,
172 _config: crate::config::OrchestratorConfig,
173 ) -> Self {
174 Self {
175 ouroboros,
176 intent_engine: RwLock::new(None),
177 event_bus,
178 state_store,
179 git_layer: None,
180 sessions: RwLock::new(std::collections::HashMap::new()),
181 lifecycle,
182 a2a: None,
183 project_manager: RwLock::new(None),
184 mount_manager: RwLock::new(None),
185 conversation_buffer: RwLock::new(ConversationBuffer::default()),
186 delegation_config: DelegationConfig::default(),
187 intent_config: RwLock::new(crate::config::IntentConfig::default()),
188 a2a_breaker: Arc::new(crate::a2a::circuit_breaker::A2ACircuitBreaker::new(5, 30)),
189 }
190 }
191
192 pub fn set_intent_engine(&self, engine: Arc<dyn oxios_ouroboros::IntentEngineOps>) {
195 *self.intent_engine.write() = Some(engine);
196 }
197
198 pub fn has_intent_engine(&self) -> bool {
200 self.intent_engine.read().is_some()
201 }
202
203 pub fn set_project_manager(&self, manager: Arc<ProjectManager>) {
205 *self.project_manager.write() = Some(manager);
206 }
207
208 pub fn set_mount_manager(&self, manager: Arc<MountManager>) {
210 *self.mount_manager.write() = Some(manager);
211 }
212
213 pub fn mount_manager(&self) -> Option<Arc<MountManager>> {
215 self.mount_manager.read().as_ref().cloned()
216 }
217
218 pub fn project_manager(&self) -> Option<Arc<ProjectManager>> {
220 self.project_manager.read().as_ref().cloned()
221 }
222
223 pub fn detect_project_tag(&self, message: &str) -> Option<String> {
225 self.project_manager.read().as_ref().and_then(|pm| {
226 let projects = pm.list_projects();
227 let result = crate::project::detect_project(message, &projects);
228 match result {
229 crate::project::DetectionResult::Found(id) => pm.get_project(id).map(|p| p.tag()),
230 crate::project::DetectionResult::NoMatch { .. } => None,
231 }
232 })
233 }
234
235 fn resolve_mount_workspace(
249 &self,
250 mount_ids: Option<&str>,
251 project_ids: Option<&str>,
252 user_message: &str,
253 ) -> (
254 Vec<MountId>,
255 Option<String>,
256 Vec<std::path::PathBuf>,
257 String,
258 ) {
259 use crate::mount::Mount;
260
261 let Some(mm) = self.mount_manager() else {
262 return (Vec::new(), None, Vec::new(), String::new());
263 };
264
265 let mut ids: Vec<MountId> = if let Some(ids_str) = mount_ids {
267 ids_str
268 .split(',')
269 .filter_map(|s| MountId::parse_str(s.trim()).ok())
270 .collect()
271 } else {
272 match mm.detect(user_message) {
273 crate::mount::DetectionResult::Found(id) => vec![id],
274 crate::mount::DetectionResult::NoMatch { .. } => vec![],
275 }
276 };
277 let mut seen = std::collections::HashSet::new();
279 ids.retain(|id| seen.insert(*id));
280
281 let project_for_instructions: Option<crate::project::Project> = if let Some(project_ids_str) =
288 project_ids
289 && let Some(first_id_str) = project_ids_str.split(',').next()
290 && let Some(pm) = self.project_manager()
291 && let Ok(pid) = Uuid::parse_str(first_id_str.trim())
292 {
293 let proj = pm.get_project(pid);
294 if let Some(ref project) = proj {
295 for mid in &project.mount_ids {
296 if !ids.contains(mid) {
297 ids.push(*mid);
298 }
299 }
300 }
301 proj
302 } else {
303 None
304 };
305
306 if ids.is_empty() {
307 return (Vec::new(), None, Vec::new(), String::new());
308 }
309
310 for id in &ids {
313 mm.touch(*id);
314 }
315
316 let mounts: Vec<Mount> = mm.get_mounts_ordered(&ids);
317 if mounts.is_empty() {
318 return (Vec::new(), None, Vec::new(), String::new());
319 }
320
321 let mut paths: Vec<std::path::PathBuf> = Vec::new();
323 for m in &mounts {
324 for p in &m.paths {
325 if !paths.contains(p) {
326 paths.push(p.clone());
327 }
328 }
329 }
330
331 if let Some(project) = &project_for_instructions
336 && project.mount_ids.is_empty()
337 && !project.paths.is_empty()
338 {
339 for p in &project.paths {
340 if !paths.contains(p) {
341 paths.push(p.clone());
342 }
343 }
344 }
345
346 let tag = if mounts.len() == 1 {
348 mounts[0].tag()
349 } else {
350 let names: Vec<&str> = mounts.iter().map(|m| m.name.as_str()).collect();
351 format!("[๐ง {}]", names.join(" + "))
352 };
353
354 let mut context = build_workspace_context_body(&mounts).unwrap_or_default();
355
356 if let Some(project) = project_for_instructions {
362 let instructions = if project.instructions.len() > 2000 {
364 let mut end = 2000;
365 while end > 0 && !project.instructions.is_char_boundary(end) {
366 end -= 1;
367 }
368 format!("{}...", &project.instructions[..end])
369 } else {
370 project.instructions.clone()
371 };
372 if !instructions.is_empty() {
373 context.push_str(&format!(
374 "\n### Project Instructions: {}\n{}\n",
375 project.name, instructions
376 ));
377 }
378 }
379
380 const MAX_CONTEXT_CHARS: usize = 6000;
382 if context.len() > MAX_CONTEXT_CHARS {
383 let mut end = MAX_CONTEXT_CHARS;
384 while end > 0 && !context.is_char_boundary(end) {
385 end -= 1;
386 }
387 context.truncate(end);
388 context.push_str("\n...(context truncated)...\n");
389 }
390
391 let context_opt = if context.is_empty() {
392 None
393 } else {
394 Some(context)
395 };
396 (ids, context_opt, paths, tag)
397 }
398
399 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
401 self.a2a = Some(a2a);
402 }
403
404 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
406 self.git_layer = Some(git_layer);
407 }
408
409 pub async fn restore_sessions(&self) {
416 let summaries = match self.state_store.list_sessions().await {
417 Ok(s) => s,
418 Err(e) => {
419 tracing::warn!(error = %e, "Failed to list sessions for restore");
420 return;
421 }
422 };
423
424 let mut restored = 0usize;
425 for summary in &summaries {
426 let Some(ref seed_id_str) = summary.active_seed_id else {
428 continue;
429 };
430
431 let session_id = crate::state_store::SessionId(summary.id.clone());
432 let session = match self.state_store.load_session(&session_id).await {
433 Ok(Some(s)) => s,
434 Ok(None) => continue,
435 Err(e) => {
436 tracing::warn!(
437 session_id = %summary.id,
438 error = %e,
439 "Failed to load session for restore"
440 );
441 continue;
442 }
443 };
444
445 let mut interview = oxios_ouroboros::InterviewResult::new();
449 interview.is_task = true; interview.original_message = session
451 .user_messages
452 .last()
453 .map(|m| m.content.clone())
454 .unwrap_or_default();
455
456 let history: Vec<oxios_ouroboros::interview::Exchange> = session
461 .user_messages
462 .iter()
463 .enumerate()
464 .map(|(i, user)| oxios_ouroboros::interview::Exchange {
465 user: user.content.clone(),
466 agent: session
467 .agent_responses
468 .get(i)
469 .map(|a| a.content.clone())
470 .unwrap_or_default(),
471 })
472 .collect();
473 interview.conversation_history = history;
474
475 let seed_id = seed_id_str.parse::<Uuid>().ok();
476
477 let interview_session = InterviewSession {
478 id: session.id.0.clone(),
479 interview,
480 phase: Phase::Execute,
481 seed_id,
482 agent_id: None,
483 };
484
485 {
486 let mut sessions = self.sessions.write();
487 sessions.insert(session.id.0.clone(), interview_session);
488 }
489
490 restored += 1;
491 }
492
493 if restored > 0 {
494 tracing::info!(restored, total = summaries.len(), "Sessions restored");
495 }
496 }
497
498 fn git_commit(&self, rel_path: &str, message: &str) {
500 if let Some(ref gl) = self.git_layer
501 && gl.is_enabled()
502 {
503 let _ = gl.commit_file(rel_path, message);
504 }
505 }
506
507 async fn save_seed(&self, seed: &Seed) -> Result<()> {
509 let key = seed.id.to_string();
510
511 self.state_store
512 .save_json("seeds", &key, seed)
513 .await
514 .context("failed to save seed to state store")?;
515
516 self.git_commit(&format!("seeds/{key}.json"), "ourobors: save seed");
517
518 Ok(())
519 }
520
521 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
524 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
525 session_id: session_id.to_owned(),
526 phase,
527 });
528 }
529
530 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
532 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
533 session_id: session_id.to_owned(),
534 phase,
535 result_summary: result.to_owned(),
536 });
537 }
538
539 pub async fn delegate_subtasks(
547 &self,
548 subtasks: Vec<SubTask>,
549 parent_seed: &Seed,
550 ) -> Result<Vec<SubTask>> {
551 if subtasks.len() == 1 {
553 return self.execute_single_subtask(subtasks, parent_seed).await;
554 }
555
556 if let Some(ref a2a) = self.a2a {
558 if !self.a2a_breaker.is_allowed() {
560 tracing::warn!(
561 state = ?self.a2a_breaker.state(),
562 "A2A circuit breaker open, using lifecycle fallback"
563 );
564 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
565 }
566
567 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
569 }
570
571 self.delegate_via_lifecycle(subtasks, parent_seed).await
573 }
574
575 async fn delegate_with_retry(
577 &self,
578 subtasks: Vec<SubTask>,
579 parent_seed: &Seed,
580 a2a: &Arc<crate::a2a::A2AProtocol>,
581 ) -> Result<Vec<SubTask>> {
582 let mut attempt = 0;
583 let max_retries = self.delegation_config.max_retries;
584
585 loop {
586 match self
587 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
588 .await
589 {
590 Ok(results) => {
591 self.a2a_breaker.record_success();
592 return Ok(results);
593 }
594 Err(e) => {
595 self.a2a_breaker.record_failure();
596 attempt += 1;
597
598 if attempt >= max_retries {
599 tracing::error!(
600 attempts = attempt,
601 error = %e,
602 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
603 attempt
604 );
605 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
606 }
607
608 let delay = self.delegation_config.backoff_delay(attempt);
610 tracing::warn!(
611 attempt,
612 delay_ms = delay,
613 error = %e,
614 "A2A delegation failed, retrying with backoff"
615 );
616 tokio::time::sleep(Duration::from_millis(delay)).await;
617 }
618 }
619 }
620 }
621
622 async fn execute_single_subtask(
624 &self,
625 subtasks: Vec<SubTask>,
626 parent_seed: &Seed,
627 ) -> Result<Vec<SubTask>> {
628 let mut task = subtasks.into_iter().next().ok_or_else(|| {
629 anyhow::anyhow!("execute_single_subtask called with an empty subtask list")
630 })?;
631 let child_seed = Seed {
632 id: Uuid::new_v4(),
633 goal: task.description.clone(),
634 constraints: parent_seed.constraints.clone(),
635 acceptance_criteria: vec!["Task completes successfully".into()],
636 ontology: parent_seed.ontology.clone(),
637 created_at: chrono::Utc::now(),
638 generation: parent_seed.generation + 1,
639 parent_seed_id: Some(parent_seed.id),
640 cspace_hint: None,
641 original_request: parent_seed.original_request.clone(),
642 output_schema: None,
643 project_id: parent_seed.project_id,
644 workspace_context: parent_seed.workspace_context.clone(),
645 mount_paths: parent_seed.mount_paths.clone(),
646 };
647 match self
648 .lifecycle
649 .spawn_and_run(&child_seed, Priority::Normal)
650 .await
651 {
652 Ok(result) => {
653 task.result = Some(result.output.clone());
654 }
655 Err(e) => {
656 task.result = Some(format!("Failed: {e}"));
657 task.success = false;
658 }
659 }
660 Ok(vec![task])
661 }
662
663 async fn delegate_via_a2a(
673 &self,
674 subtasks: Vec<SubTask>,
675 parent_seed: &Seed,
676 a2a: &Arc<crate::a2a::A2AProtocol>,
677 ) -> Result<Vec<SubTask>> {
678 use crate::a2a::TaskPriority;
679 use tokio::task::JoinSet;
680
681 tracing::info!(
682 subtasks = subtasks.len(),
683 "Delegating subtasks via A2A protocol"
684 );
685
686 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
687 let subtask_count = subtasks.len();
688 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
689
690 for (idx, subtask) in subtasks.into_iter().enumerate() {
691 let capability = subtask.required_capability.clone();
692 let description = subtask.description.clone();
693 let subtask_id = subtask.id;
694 let role = subtask.role.clone();
695 let a2a = Arc::clone(a2a);
696 let parent_seed = parent_seed.clone();
697 let lifecycle = self.lifecycle.clone();
698
699 join_set.spawn(async move {
700 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
702 a2a.query_capabilities(cap).await.ok()
703 .and_then(|agents| agents.into_iter().next())
704 } else {
705 None
706 };
707
708 let (output, success) = if let Some(ref target_card) = target {
709 let target_id = target_card.agent_id;
710 tracing::info!(
711 subtask_index = idx,
712 target = %target_card.name,
713 target_id = %target_id,
714 "A2A dispatching subtask"
715 );
716
717 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
718 "parent_seed": parent_seed.id.to_string(),
719 "goal": description,
720 }))
721 .with_priority(TaskPriority::Normal);
722
723 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
725
726 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
728 Some(Ok(result)) => {
729 let out = result.get("output")
730 .and_then(|v| v.as_str())
731 .unwrap_or("")
732 .to_string();
733 let ok = result.get("success")
734 .and_then(|v| v.as_bool())
735 .unwrap_or(false);
736 (out, ok)
737 }
738 Some(Err(e)) => {
739 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
740 (format!("Failed: {e}"), false)
741 }
742 None => {
743 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
745 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
746 }
747 }
748 } else {
749 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
750 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
751 };
752
753 (idx, SubTask {
754 id: subtask_id,
755 description,
756 required_capability: capability,
757 result: Some(output),
758 success,
759 role,
760 })
761 });
762 }
763
764 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
765 while let Some(join_result) = join_set.join_next().await {
766 match join_result {
767 Ok((idx, subtask)) => {
768 results[idx] = Some(subtask);
769 }
770 Err(e) => {
771 tracing::error!(error = %e, "A2A task panicked");
772 }
773 }
774 }
775
776 let completed: Vec<SubTask> = results
780 .into_iter()
781 .enumerate()
782 .map(|(idx, opt)| {
783 opt.unwrap_or_else(|| SubTask {
784 id: Uuid::new_v4(),
785 description: format!("subtask {idx} (failed)"),
786 required_capability: None,
787 result: Some("Task panicked or did not complete".into()),
788 success: false,
789 role: AgentRole::default(),
790 })
791 })
792 .collect();
793 tracing::info!(
794 completed = completed.len(),
795 succeeded = completed.iter().filter(|r| r.success).count(),
796 "A2A delegation complete"
797 );
798 Ok(completed)
799 }
800
801 async fn delegate_via_lifecycle(
802 &self,
803 subtasks: Vec<SubTask>,
804 parent_seed: &Seed,
805 ) -> Result<Vec<SubTask>> {
806 use crate::agent_group::OxiosAgentGroup;
807 use tokio::task::JoinSet;
808
809 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
810 let group = OxiosAgentGroup::new(parent_seed, descriptions);
811 let group_id = group.id;
812
813 self.event_bus.publish(KernelEvent::AgentGroupCreated {
814 group_id,
815 agent_count: group.agents.len(),
816 })?;
817
818 tracing::info!(
819 group_id = %group_id,
820 agent_count = group.agents.len(),
821 "Starting parallel multi-agent execution"
822 );
823
824 let mut join_set: JoinSet<(
825 usize,
826 crate::types::AgentId,
827 Result<oxios_ouroboros::ExecutionResult>,
828 )> = JoinSet::new();
829
830 for (idx, agent_entry) in group.agents.iter().enumerate() {
831 let child_seed = agent_entry.seed.clone();
832 let agent_id = agent_entry.id;
833 let lifecycle = self.lifecycle.clone();
834
835 join_set.spawn(async move {
836 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
837 (idx, agent_id, result)
838 });
839 }
840
841 let subtask_count = subtasks.len();
842 let mut completed = vec![None; subtask_count];
843 while let Some(join_result) = join_set.join_next().await {
844 match join_result {
845 Ok((idx, agent_id, Ok(exec_result))) => {
846 let _ = self
847 .event_bus
848 .publish(KernelEvent::AgentGroupMemberCompleted {
849 group_id,
850 agent_id,
851 success: exec_result.success,
852 });
853 completed[idx] = Some(SubTask {
854 id: subtasks[idx].id,
855 description: subtasks[idx].description.clone(),
856 required_capability: subtasks[idx].required_capability.clone(),
857 result: Some(exec_result.output.clone()),
858 success: exec_result.success,
859 role: subtasks[idx].role.clone(),
860 });
861 }
862 Ok((idx, agent_id, Err(e))) => {
863 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
864 let _ = self
865 .event_bus
866 .publish(KernelEvent::AgentGroupMemberCompleted {
867 group_id,
868 agent_id,
869 success: false,
870 });
871 completed[idx] = Some(SubTask {
872 id: subtasks[idx].id,
873 description: subtasks[idx].description.clone(),
874 required_capability: subtasks[idx].required_capability.clone(),
875 result: Some(format!("Failed: {e}")),
876 success: false,
877 role: subtasks[idx].role.clone(),
878 });
879 }
880 Err(e) => {
881 tracing::error!(error = %e, "JoinSet task panicked");
882 }
883 }
884 }
885
886 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
887 let succeeded = completed.iter().filter(|r| r.success).count();
888 let total = completed.len();
889
890 tracing::info!(
891 group_id = %group_id,
892 succeeded,
893 total,
894 "Parallel multi-agent execution complete"
895 );
896
897 let _ = self
899 .state_store
900 .save_json("agent_groups", &group_id.to_string(), &group)
901 .await;
902 self.git_commit(
903 &format!("agent_groups/{group_id}.json"),
904 "orchestrator: save group",
905 );
906
907 Ok(completed)
908 }
909
910 pub async fn handle(
950 &self,
951 engine: &dyn oxios_ouroboros::IntentEngineOps,
952 msg: &str,
953 ctx: &oxios_ouroboros::MsgCtx,
954 ) -> Result<HandleResponse> {
955 let assessment = engine.assess(msg, ctx).await?;
957
958 match assessment {
959 oxios_ouroboros::Assessment::Conversation(reply) => Ok(HandleResponse::Reply(reply)),
960
961 oxios_ouroboros::Assessment::Clarify { questions } => {
962 Ok(HandleResponse::Clarify(questions))
963 }
964
965 oxios_ouroboros::Assessment::Task(scope) => {
966 let mut directive = match scope {
968 oxios_ouroboros::Scope::Trivial => {
969 oxios_ouroboros::Directive::from_message(msg)
970 }
971 oxios_ouroboros::Scope::Substantial => engine.crystallize(msg, ctx).await?,
972 };
973
974 let env = self.resolve_exec_env(ctx, msg);
976
977 let mut result = self.execute_directive(&directive, &env).await?;
980
981 let (verdict, evaluation_passed) = match scope {
983 oxios_ouroboros::Scope::Trivial => (None, None),
984 oxios_ouroboros::Scope::Substantial => {
985 let (r, v) = self
986 .verify_or_retry(engine, &mut directive, &env, result, msg, ctx)
987 .await?;
988 result = r;
989 let passed = v.all_passed();
990 (Some(v), Some(passed))
991 }
992 };
993
994 self.lifecycle.reap_zombies();
995
996 Ok(HandleResponse::Task {
997 scope,
998 directive: Box::new(directive),
999 env,
1000 result: Box::new(result),
1001 verdict,
1002 evaluation_passed,
1003 })
1004 }
1005 }
1006 }
1007
1008 pub async fn handle_unified(
1015 &self,
1016 user_id: &str,
1017 msg: &str,
1018 session_id: Option<&str>,
1019 project_ids: Option<&str>,
1020 mount_ids: Option<&str>,
1021 request_id: &str,
1022 ) -> Result<OrchestrationResult> {
1023 let engine = self
1025 .intent_engine
1026 .read()
1027 .clone()
1028 .expect("IntentEngine not wired โ kernel assembler bug");
1029
1030 let sid = session_id.unwrap_or(request_id).to_string();
1032 let history = self.load_session_history(&sid).await;
1033 let ctx = oxios_ouroboros::MsgCtx {
1034 session_id: sid.clone(),
1035 history,
1036 project_ids: project_ids.map(String::from),
1037 mount_ids: mount_ids.map(String::from),
1038 user_id: user_id.to_string(),
1039 };
1040
1041 let start = std::time::Instant::now();
1043 let response = self.handle(engine.as_ref(), msg, &ctx).await?;
1044 let duration_ms = start.elapsed().as_millis() as u64;
1045
1046 Ok(self.handle_response_to_orchestration_result(response, &ctx, duration_ms))
1047 }
1048
1049 async fn load_session_history(&self, session_id: &str) -> Vec<oxios_ouroboros::Exchange> {
1051 let sid = crate::state_store::SessionId(session_id.to_string());
1052 match self.state_store.load_session(&sid).await {
1053 Ok(Some(session)) => session
1054 .user_messages
1055 .iter()
1056 .zip(session.agent_responses.iter())
1057 .map(|(u, a)| oxios_ouroboros::Exchange {
1058 user: u.content.clone(),
1059 agent: a.content.clone(),
1060 })
1061 .collect(),
1062 _ => Vec::new(),
1063 }
1064 }
1065
1066 fn handle_response_to_orchestration_result(
1067 &self,
1068 response: HandleResponse,
1069 ctx: &oxios_ouroboros::MsgCtx,
1070 duration_ms: u64,
1071 ) -> OrchestrationResult {
1072 let metrics = get_metrics();
1073 metrics.orch_duration.observe(duration_ms as f64 / 1000.0);
1074
1075 match response {
1076 HandleResponse::Reply(reply) => OrchestrationResult {
1077 session_id: Some(ctx.session_id.clone()),
1078 primary_project_id: None,
1079 project_tag: None,
1080 active_mount_ids: Vec::new(),
1081 mount_tag: None,
1082 response: reply,
1083 seed_id: None,
1084 agent_id: None,
1085 phase_reached: oxios_ouroboros::Phase::Interview,
1086 evaluation_passed: None,
1087 output: None,
1088 tool_calls: Vec::new(),
1089 interview_questions: None,
1090 interview_round: None,
1091 interview_ambiguity: None,
1092 mode: "unified".to_string(),
1093 },
1094 HandleResponse::Clarify(questions) => {
1095 let questions_text = questions
1096 .iter()
1097 .map(|q| q.text.clone())
1098 .collect::<Vec<_>>()
1099 .join("\n");
1100 let structured = Some(
1101 questions
1102 .iter()
1103 .map(
1104 |q| oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput {
1105 id: q.id.clone(),
1106 text: q.text.clone(),
1107 kind: format!("{:?}", q.kind).to_lowercase(),
1108 options: q
1109 .options
1110 .iter()
1111 .map(|o| {
1112 oxios_ouroboros::ouroboros_engine::InterviewOptionOutput {
1113 value: o.value.clone(),
1114 label: o.label.clone(),
1115 description: String::new(),
1116 }
1117 })
1118 .collect(),
1119 },
1120 )
1121 .collect(),
1122 );
1123 OrchestrationResult {
1124 session_id: Some(ctx.session_id.clone()),
1125 primary_project_id: None,
1126 project_tag: None,
1127 active_mount_ids: Vec::new(),
1128 mount_tag: None,
1129 response: questions_text,
1130 seed_id: None,
1131 agent_id: None,
1132 phase_reached: oxios_ouroboros::Phase::Interview,
1133 evaluation_passed: None,
1134 output: None,
1135 tool_calls: Vec::new(),
1136 interview_questions: structured,
1137 interview_round: Some(((ctx.history.len() / 2) as u32).max(1)),
1138 interview_ambiguity: None,
1139 mode: "unified".to_string(),
1140 }
1141 }
1142 HandleResponse::Task {
1143 scope: _,
1144 directive,
1145 env,
1146 result,
1147 verdict,
1148 evaluation_passed,
1149 } => {
1150 let response_text = if directive.acceptance_criteria.is_empty() {
1151 result.output.clone()
1152 } else {
1153 match &verdict {
1154 Some(v) if v.all_passed() => result.output.clone(),
1155 Some(v) => format!(
1156 "{}\n\nโ Review notes:\n{}",
1157 result.output,
1158 v.notes.join("\n")
1159 ),
1160 None => result.output.clone(),
1161 }
1162 };
1163 if evaluation_passed.unwrap_or(false) {
1164 metrics.agents_completed.inc();
1165 } else {
1166 metrics.agents_failed.inc();
1167 }
1168 OrchestrationResult {
1169 session_id: Some(ctx.session_id.clone()),
1170 primary_project_id: env.project_id,
1171 project_tag: None,
1172 active_mount_ids: Vec::new(),
1173 mount_tag: None,
1174 response: response_text,
1175 seed_id: None,
1176 agent_id: None,
1177 phase_reached: oxios_ouroboros::Phase::Execute,
1178 evaluation_passed,
1179 output: Some(result.output.clone()),
1180 tool_calls: result.tool_calls.clone(),
1181 interview_questions: None,
1182 interview_round: None,
1183 interview_ambiguity: None,
1184 mode: "unified".to_string(),
1185 }
1186 }
1187 }
1188 }
1189
1190 fn resolve_exec_env(
1197 &self,
1198 ctx: &oxios_ouroboros::MsgCtx,
1199 msg: &str,
1200 ) -> oxios_ouroboros::ExecEnv {
1201 let (active_mount_ids, workspace_context, mount_paths, _mount_tag) =
1202 self.resolve_mount_workspace(ctx.mount_ids.as_deref(), ctx.project_ids.as_deref(), msg);
1203 let _ = active_mount_ids;
1207
1208 let project_id = ctx
1211 .project_ids
1212 .as_deref()
1213 .and_then(|ids| {
1214 ids.split(',')
1215 .next()
1216 .and_then(|s| Uuid::parse_str(s.trim()).ok())
1217 })
1218 .or_else(|| {
1219 self.detect_project_tag(msg).and_then(|_tag| {
1220 self.project_manager().and_then(|pm| {
1221 let projects = pm.list_projects();
1222 match crate::project::detect_project(msg, &projects) {
1223 crate::project::DetectionResult::Found(id) => Some(id),
1224 crate::project::DetectionResult::NoMatch { .. } => None,
1225 }
1226 })
1227 })
1228 });
1229
1230 if let Some(pid) = project_id
1232 && let Some(pm) = self.project_manager()
1233 {
1234 pm.touch(pid);
1235 }
1236
1237 oxios_ouroboros::ExecEnv {
1238 workspace_context,
1239 mount_paths,
1240 project_id,
1241 cspace_hint: None,
1242 }
1243 }
1244
1245 async fn execute_directive(
1255 &self,
1256 directive: &oxios_ouroboros::Directive,
1257 env: &oxios_ouroboros::ExecEnv,
1258 ) -> Result<ExecutionResult> {
1259 let seed = self.directive_to_seed(directive, env);
1260 self.lifecycle.spawn_and_run(&seed, Priority::Normal).await
1261 }
1262
1263 fn directive_to_seed(
1270 &self,
1271 directive: &oxios_ouroboros::Directive,
1272 env: &oxios_ouroboros::ExecEnv,
1273 ) -> Seed {
1274 let mut seed = Seed::new(directive.goal.clone());
1275 seed.original_request = directive.original_request.clone();
1276 seed.constraints = directive.constraints.clone();
1277 seed.acceptance_criteria = directive.acceptance_criteria.clone();
1278 seed.output_schema = directive.output_schema.clone();
1279 seed.project_id = env.project_id;
1280 seed.workspace_context = env.workspace_context.clone();
1281 seed.mount_paths = env.mount_paths.clone();
1282 seed.cspace_hint = env.cspace_hint.clone();
1283 seed
1284 }
1285
1286 async fn verify_or_retry(
1293 &self,
1294 engine: &dyn oxios_ouroboros::IntentEngineOps,
1295 directive: &mut oxios_ouroboros::Directive,
1296 env: &oxios_ouroboros::ExecEnv,
1297 initial_result: ExecutionResult,
1298 _msg: &str,
1299 _ctx: &oxios_ouroboros::MsgCtx,
1300 ) -> Result<(ExecutionResult, oxios_ouroboros::Verdict)> {
1301 let verdict = engine.review(directive, &initial_result).await?;
1302
1303 if verdict.all_passed() || verdict.gaps.is_empty() {
1304 return Ok((initial_result, verdict));
1305 }
1306
1307 let enable_retry = self.intent_config.read().enable_retry;
1310 if !enable_retry {
1311 tracing::info!("Review failed but retry disabled (enable_retry=false)");
1312 return Ok((initial_result, verdict));
1313 }
1314
1315 let metrics = get_metrics();
1316 metrics.retry_attempted.inc();
1317
1318 tracing::info!(
1319 gaps = verdict.gaps.len(),
1320 "Review failed โ retrying with feedback"
1321 );
1322
1323 let retry_result = self
1325 .lifecycle
1326 .execute_with_feedback(
1327 directive,
1328 env,
1329 &initial_result,
1330 &verdict.gaps,
1331 Priority::Normal,
1332 )
1333 .await?;
1334
1335 let retry_verdict = engine.review(directive, &retry_result).await?;
1337
1338 if retry_verdict.score > verdict.score {
1340 metrics.retry_improved.inc();
1341 } else if retry_verdict.score < verdict.score {
1342 metrics.retry_degraded.inc();
1343 } else {
1344 metrics.retry_unchanged.inc();
1345 }
1346
1347 let chosen_result = if retry_verdict.score >= verdict.score {
1349 retry_result
1350 } else {
1351 initial_result
1352 };
1353
1354 Ok((chosen_result, retry_verdict))
1355 }
1356}
1357
1358#[derive(Debug, Clone)]
1370pub enum HandleResponse {
1371 Reply(String),
1373 Clarify(Vec<oxios_ouroboros::Question>),
1375 Task {
1377 scope: oxios_ouroboros::Scope,
1379 directive: Box<oxios_ouroboros::Directive>,
1381 env: oxios_ouroboros::ExecEnv,
1383 result: Box<ExecutionResult>,
1385 verdict: Option<oxios_ouroboros::Verdict>,
1387 evaluation_passed: Option<bool>,
1389 },
1390}
1391
1392#[derive(Debug, Clone)]
1394#[allow(unused)]
1395struct InterviewSession {
1396 id: String,
1397 interview: InterviewResult,
1398 phase: Phase,
1399 seed_id: Option<Uuid>,
1400 agent_id: Option<AgentId>,
1401}
1402
1403fn default_chat_mode() -> String {
1404 "chat".into()
1405}
1406
1407#[derive(Debug, Clone, Serialize, Deserialize)]
1409pub struct OrchestrationResult {
1410 #[serde(skip_serializing_if = "Option::is_none")]
1412 pub session_id: Option<String>,
1413 #[serde(skip_serializing_if = "Option::is_none")]
1415 pub primary_project_id: Option<Uuid>,
1416 #[serde(skip_serializing_if = "Option::is_none")]
1418 pub project_tag: Option<String>,
1419 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1421 pub active_mount_ids: Vec<MountId>,
1422 #[serde(skip_serializing_if = "Option::is_none")]
1424 pub mount_tag: Option<String>,
1425 pub response: String,
1427 #[serde(skip_serializing_if = "Option::is_none")]
1429 pub seed_id: Option<Uuid>,
1430 #[serde(skip_serializing_if = "Option::is_none")]
1432 pub agent_id: Option<AgentId>,
1433 pub phase_reached: Phase,
1435 pub evaluation_passed: Option<bool>,
1441 #[serde(skip_serializing_if = "Option::is_none")]
1443 pub output: Option<String>,
1444 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1446 pub tool_calls: Vec<oxios_ouroboros::ToolCallRecord>,
1447 #[serde(default, skip_serializing_if = "Option::is_none")]
1455 pub interview_questions:
1456 Option<Vec<oxios_ouroboros::ouroboros_engine::InterviewQuestionOutput>>,
1457 #[serde(default, skip_serializing_if = "Option::is_none")]
1460 pub interview_round: Option<u32>,
1461 #[serde(default, skip_serializing_if = "Option::is_none")]
1464 pub interview_ambiguity: Option<f64>,
1465 #[serde(default = "default_chat_mode")]
1467 pub mode: String,
1468}
1469
1470fn format_questions(questions: &[String]) -> String {
1472 if questions.is_empty() {
1473 "I need a bit more clarification before I can proceed.".to_string()
1474 } else {
1475 format!(
1476 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1477 questions
1478 .iter()
1479 .enumerate()
1480 .map(|(i, q)| format!("{}. {}", i + 1, q))
1481 .collect::<Vec<_>>()
1482 .join("\n")
1483 )
1484 }
1485}
1486
1487fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1490 let mut lines = Vec::new();
1491
1492 if exec.success {
1493 lines.push(format!("โ
'{}'", seed.goal));
1494 } else {
1495 lines.push(format!(
1496 "โ ๏ธ '{}'์(๋ฅผ) ์๋ํ์ง๋ง ์์ ํ ์ฑ๊ณตํ์ง ๋ชปํ์ต๋๋ค.",
1497 seed.goal
1498 ));
1499 }
1500
1501 if !exec.output.is_empty() {
1503 let preview = if exec.output.len() > 500 {
1504 let mut end = 500;
1507 while end > 0 && !exec.output.is_char_boundary(end) {
1508 end -= 1;
1509 }
1510 format!("{}...", &exec.output[..end])
1511 } else {
1512 exec.output.clone()
1513 };
1514 lines.push(String::new());
1515 lines.push(preview);
1516 }
1517
1518 lines.join("\n")
1519}
1520
1521fn should_split_seed(seed: &Seed) -> bool {
1526 seed.acceptance_criteria.len() >= 5
1530}
1531
1532fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1538 seed.acceptance_criteria
1539 .iter()
1540 .map(|criterion| {
1541 let desc = format!("{}: {}", seed.goal, criterion);
1542 let desc_lower = desc.to_lowercase();
1543
1544 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1546 Some("code-review".to_string())
1547 } else if desc_lower.contains("test") {
1548 Some("testing".to_string())
1549 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1550 Some("refactoring".to_string())
1551 } else if desc_lower.contains("write")
1552 || desc_lower.contains("create")
1553 || desc_lower.contains("implement")
1554 {
1555 Some("code-generation".to_string())
1556 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1557 Some("debugging".to_string())
1558 } else {
1559 None
1560 };
1561
1562 let mut subtask = SubTask::new(desc);
1563 subtask.required_capability = cap;
1564 subtask
1565 })
1566 .collect()
1567}
1568
1569fn format_result_combined(combined: &str) -> String {
1571 if combined.is_empty() {
1572 "No subtasks completed successfully.".to_string()
1573 } else {
1574 format!("Multi-agent execution completed:\n\n{combined}")
1575 }
1576}
1577
1578async fn run_via_lifecycle(
1580 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1581 parent_seed: &Seed,
1582 description: &str,
1583) -> (String, bool) {
1584 let child_seed = Seed {
1585 id: Uuid::new_v4(),
1586 goal: description.to_string(),
1587 constraints: parent_seed.constraints.clone(),
1588 acceptance_criteria: vec!["Task completes successfully".into()],
1589 ontology: parent_seed.ontology.clone(),
1590 created_at: chrono::Utc::now(),
1591 generation: parent_seed.generation + 1,
1592 parent_seed_id: Some(parent_seed.id),
1593 cspace_hint: None,
1594 original_request: parent_seed.original_request.clone(),
1595 output_schema: None,
1596 project_id: parent_seed.project_id,
1597 workspace_context: parent_seed.workspace_context.clone(),
1598 mount_paths: parent_seed.mount_paths.clone(),
1599 };
1600 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1601 Ok(result) => (result.output, result.success),
1602 Err(e) => (format!("Failed: {e}"), false),
1603 }
1604}
1605
1606fn build_workspace_context_body(mounts: &[crate::mount::Mount]) -> Option<String> {
1615 if mounts.is_empty() {
1616 return None;
1617 }
1618 let mut out = String::new();
1619 out.push_str("### Active Mounts\n");
1620
1621 for (i, m) in mounts.iter().enumerate() {
1622 let primary = i == 0;
1623 let path = m
1624 .primary_path()
1625 .map(|p| p.to_string_lossy().to_string())
1626 .unwrap_or_else(|| "(no path)".to_string());
1627
1628 if primary {
1629 out.push_str(&format!("- **{}** โ {}\n", m.name, path));
1630 if !m.auto_description.is_empty() {
1631 let desc: String = m
1633 .auto_description
1634 .lines()
1635 .take(3)
1636 .collect::<Vec<_>>()
1637 .join("\n ");
1638 out.push_str(&format!(" {}\n", desc));
1639 }
1640 let summary = m.summary_line();
1641 if !summary.is_empty() {
1642 out.push_str(&format!(" _{}_\n", summary));
1643 }
1644 if m.enrichment_pending {
1645 out.push_str(" _(content changed โ consider re-scanning this Mount)_\n");
1646 }
1647 } else {
1648 let summary = m.summary_line();
1650 let suffix = if summary.is_empty() {
1651 String::new()
1652 } else {
1653 format!(" โ {}", summary)
1654 };
1655 out.push_str(&format!("- **{}** โ {}{}\n", m.name, path, suffix));
1656 }
1657 }
1658
1659 Some(out)
1660}
1661
1662#[cfg(test)]
1663mod mount_workspace_tests {
1664 use super::*;
1665 use crate::mount::{Mount, MountSource};
1666 use std::path::PathBuf;
1667
1668 #[test]
1669 fn test_workspace_context_primary_full_secondary_terse() {
1670 let mut oxios =
1671 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1672 oxios.auto_description = "Agent OS.\nRust + tokio.".to_string();
1673 oxios.auto_meta.summary = "Rust agent OS".to_string();
1674
1675 let mut oxi = Mount::from_name_and_path("oxi", PathBuf::from("/oxi"));
1676 oxi.auto_meta.summary = "SDK".to_string();
1677
1678 let body = build_workspace_context_body(&[oxios, oxi]).unwrap();
1679 assert!(body.contains("### Active Mounts"));
1680 assert!(body.contains("Agent OS."));
1682 assert!(body.contains("_Rust agent OS_"));
1683 assert!(body.contains("**oxi** โ /oxi โ SDK"));
1685 }
1686
1687 #[test]
1688 fn test_workspace_context_empty_is_none() {
1689 assert!(build_workspace_context_body(&[]).is_none());
1690 }
1691
1692 #[test]
1696 fn test_resolve_mount_workspace_detects_and_collects_paths() {
1697 use crate::mount::MountManager;
1698 use oxios_memory::memory::sqlite::MemoryDatabase;
1699 use std::sync::Arc;
1700
1701 let db = Arc::new(MemoryDatabase::open_in_memory(64).unwrap());
1702 let mm = Arc::new(MountManager::new(db, None).unwrap());
1703
1704 let oxios = mm
1706 .create_mount(
1707 "oxios".to_string(),
1708 vec![PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios")],
1709 MountSource::Manual,
1710 )
1711 .unwrap();
1712 let oxi_sdk = mm
1713 .create_mount(
1714 "oxi-sdk".to_string(),
1715 vec![PathBuf::from("/Users/me/oxi")],
1716 MountSource::Manual,
1717 )
1718 .unwrap();
1719 mm.update_enrichment(oxios.id, Some("Agent OS in Rust.".to_string()), None)
1720 .unwrap();
1721
1722 let mounts = mm.get_mounts_ordered(&[oxios.id, oxi_sdk.id]);
1726 assert_eq!(mounts.len(), 2);
1727
1728 let body = build_workspace_context_body(&mounts).unwrap();
1729 assert!(body.contains("oxios"));
1730 assert!(body.contains("Agent OS in Rust."));
1731 assert!(body.contains("oxi-sdk"));
1732
1733 let mut paths = Vec::new();
1735 for m in &mounts {
1736 for p in &m.paths {
1737 if !paths.contains(p) {
1738 paths.push(p.clone());
1739 }
1740 }
1741 }
1742 assert_eq!(paths.len(), 2);
1743 assert_eq!(paths[0], PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1744 assert_eq!(paths[1], PathBuf::from("/Users/me/oxi"));
1745 }
1746
1747 #[test]
1750 fn test_detection_seeds_primary_on_name_mention() {
1751 use crate::mount::{DetectionResult, detect_mounts};
1752
1753 let oxios =
1754 Mount::from_name_and_path("oxios", PathBuf::from("/Volumes/MERCURY/PROJECTS/oxios"));
1755 let result = detect_mounts("oxios ์ฝ๋๋ฆฌ๋ทฐํด์ค", std::slice::from_ref(&oxios));
1756 assert!(matches!(result, DetectionResult::Found(id) if id == oxios.id));
1757 }
1758}