1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use crate::agent_lifecycle::AgentLifecycleManager;
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::git_layer::GitLayer;
26use crate::metrics::get_metrics;
27use crate::scheduler::Priority;
28use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
29use crate::state_store::StateStore;
30use crate::types::AgentId;
31
32#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
34pub enum AgentRole {
35 #[default]
37 Worker,
38 Manager,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SubTask {
45 pub id: Uuid,
47 pub description: String,
49 pub required_capability: Option<String>,
51 pub result: Option<String>,
53 pub success: bool,
55 #[serde(default)]
57 pub role: AgentRole,
58}
59
60impl SubTask {
61 pub fn new(description: impl Into<String>) -> Self {
63 Self {
64 id: Uuid::new_v4(),
65 description: description.into(),
66 required_capability: None,
67 result: None,
68 success: false,
69 role: AgentRole::default(),
70 }
71 }
72
73 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
75 self.required_capability = Some(cap.into());
76 self
77 }
78}
79
80pub struct Orchestrator {
82 ouroboros: Arc<dyn OuroborosProtocol>,
83 event_bus: EventBus,
84 state_store: Arc<StateStore>,
85 git_layer: Option<Arc<GitLayer>>,
87 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
89 lifecycle: AgentLifecycleManager,
91 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
93 space_manager: RwLock<Option<Arc<SpaceManager>>>,
95 conversation_buffer: RwLock<ConversationBuffer>,
97 delegation_config: DelegationConfig,
99 a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
101}
102
103#[derive(Debug, Clone)]
105struct DelegationConfig {
106 max_retries: u32,
108 base_delay_ms: u64,
110 max_delay_ms: u64,
112 #[allow(dead_code)]
114 timeout_ms: u64,
115}
116
117impl Default for DelegationConfig {
118 fn default() -> Self {
119 Self {
120 max_retries: 3,
121 base_delay_ms: 100,
122 max_delay_ms: 5000,
123 timeout_ms: 5000,
124 }
125 }
126}
127
128impl DelegationConfig {
129 fn backoff_delay(&self, attempt: u32) -> u64 {
131 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
132 delay.min(self.max_delay_ms)
133 }
134}
135
136impl Orchestrator {
137 pub fn new(
139 ouroboros: Arc<dyn OuroborosProtocol>,
140 event_bus: EventBus,
141 state_store: Arc<StateStore>,
142 lifecycle: AgentLifecycleManager,
143 ) -> Self {
144 Self::with_config(
145 ouroboros,
146 event_bus,
147 state_store,
148 lifecycle,
149 crate::config::OrchestratorConfig::default(),
150 )
151 }
152
153 pub fn with_config(
155 ouroboros: Arc<dyn OuroborosProtocol>,
156 event_bus: EventBus,
157 state_store: Arc<StateStore>,
158 lifecycle: AgentLifecycleManager,
159 _config: crate::config::OrchestratorConfig,
160 ) -> Self {
161 Self {
162 ouroboros,
163 event_bus,
164 state_store,
165 git_layer: None,
166 sessions: RwLock::new(std::collections::HashMap::new()),
167 lifecycle,
168 a2a: None,
169 space_manager: RwLock::new(None),
170 conversation_buffer: RwLock::new(ConversationBuffer::default()),
171 delegation_config: DelegationConfig::default(),
172 a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
173 }
174 }
175
176 pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
178 *self.space_manager.write() = Some(manager);
179 }
180
181 pub fn current_space_id(&self) -> Option<SpaceId> {
183 self.space_manager
184 .read()
185 .as_ref()
186 .map(|m| m.current_space_id())
187 }
188
189 pub fn current_space_tag(&self) -> String {
191 self.space_manager
192 .read()
193 .as_ref()
194 .and_then(|m| {
195 m.current_space().map(|s| {
196 if s.is_default() {
197 String::new()
198 } else {
199 format!("[{} {}]", s.emoji(), s.name)
200 }
201 })
202 })
203 .unwrap_or_default()
204 }
205
206 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
208 self.a2a = Some(a2a);
209 }
210
211 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
213 self.git_layer = Some(git_layer);
214 }
215
216 fn git_commit(&self, rel_path: &str, message: &str) {
218 if let Some(ref gl) = self.git_layer {
219 if gl.is_enabled() {
220 let _ = gl.commit_file(rel_path, message);
221 }
222 }
223 }
224
225 #[allow(clippy::await_holding_lock)]
234 pub async fn handle_message(
235 &self,
236 user_id: &str,
237 user_message: &str,
238 session_id: Option<&str>,
239 ) -> Result<OrchestrationResult> {
240 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
241 get_metrics().messages.inc();
242 let orch_start = std::time::Instant::now();
243
244 let session_id = session_id
245 .map(String::from)
246 .unwrap_or_else(|| Uuid::new_v4().to_string());
247
248 tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
249
250 let space_tag = self.current_space_tag();
252 let (turns, sm_arc) = {
253 let buffer = self.conversation_buffer.read();
254 let sm_guard = self.space_manager.read();
255 let turns: Vec<_> = buffer.turns().iter().cloned().collect();
256 let sm_arc = sm_guard.as_ref().cloned();
258 (turns, sm_arc)
259 };
260 let space_id = if let Some(ref sm) = sm_arc {
261 match sm.detect_or_create(user_message, &turns).await {
262 Ok(id) => {
263 tracing::info!(space_id = %id, "Space detected/created for message");
264 id
265 }
266 Err(e) => {
267 tracing::warn!(error = %e, "Space detection failed, using default");
268 sm.default_space_id()
269 }
270 }
271 } else {
272 uuid::Uuid::nil()
273 };
274
275 {
277 let mut buffer = self.conversation_buffer.write();
278 buffer.push_user(user_message);
279 }
280
281 self.publish_phase_started(&session_id, Phase::Interview)
283 .await;
284
285 let needs_interview;
287 let existing_history: Option<Vec<_>>;
288 {
289 let sessions = self.sessions.read();
290 needs_interview = !sessions.contains_key(&session_id);
291 existing_history = if !needs_interview {
292 sessions
293 .get(&session_id)
294 .map(|s| s.interview.conversation_history.clone())
295 } else {
296 None
297 };
298 }
300
301 let interview = {
303 tracing::info!(phase = "interview", "Starting interview phase");
304 if needs_interview {
305 self.ouroboros.interview(user_message).await?
306 } else {
307 let multi_turn_context = {
310 let mut context_parts = Vec::new();
311 if let Some(ref history) = existing_history {
312 for exchange in history {
313 context_parts.push(format!(
314 "User: {}\nAgent: {}",
315 exchange.user, exchange.agent
316 ));
317 }
318 }
319 context_parts.push(format!("User: {}", user_message));
320 context_parts.join("\n\n")
321 };
322
323 {
325 let mut sessions = self.sessions.write();
326 if let Some(s) = sessions.get_mut(&session_id) {
327 let last_q = s.interview.questions.last().cloned().unwrap_or_default();
328 s.interview.add_exchange(&last_q, user_message);
329 }
330 }
331
332 self.ouroboros.interview(&multi_turn_context).await?
334 }
335 };
336
337 if !interview.is_task {
339 tracing::info!(session_id = %session_id, "Chat response (non-task)");
340
341 let response_text = if interview.chat_response.is_empty() {
342 "Hello! How can I help you today?".to_string()
343 } else {
344 interview.chat_response.clone()
345 };
346
347 {
349 let mut buffer = self.conversation_buffer.write();
350 buffer.push_agent(&response_text, &space_id);
351 }
352
353 {
356 let mut sessions = self.sessions.write();
357 if let Some(session) = sessions.get_mut(&session_id) {
358 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
359 session
360 .interview
361 .add_to_history(user_message, &response_text);
362 } else {
363 let mut interview = InterviewResult::new();
365 interview.is_task = false;
366 interview.chat_response = response_text.clone();
367 interview.add_to_history(user_message, &response_text);
368 sessions.insert(
369 session_id.clone(),
370 InterviewSession {
371 id: session_id.clone(),
372 interview,
373 phase: Phase::Interview,
374 seed_id: None,
375 agent_id: None,
376 },
377 );
378 }
379 }
380
381 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
382 .await;
383
384 return Ok(OrchestrationResult {
385 session_id: Some(session_id.clone()),
386 space_id: Some(space_id),
387 space_tag: Some(space_tag.clone()),
388 response: response_text,
389 seed_id: None,
390 agent_id: None,
391 phase_reached: Phase::Interview,
392 evaluation_passed: false,
393 output: None,
394 });
395 }
396
397 if !interview.ready_for_seed {
399 {
401 let mut sessions = self.sessions.write();
402 let session =
403 sessions
404 .entry(session_id.clone())
405 .or_insert_with(|| InterviewSession {
406 id: session_id.clone(),
407 interview: interview.clone(),
408 phase: Phase::Interview,
409 seed_id: None,
410 agent_id: None,
411 });
412 let questions_text = interview.questions.join("\n");
415 let last_answer = session.interview.answers.last().cloned();
416 if let Some(ref ans) = last_answer {
417 if !ans.is_empty() {
418 session.interview.add_to_history(ans, &questions_text);
419 }
420 }
421 } let questions = interview
424 .questions
425 .iter()
426 .filter(|q| !q.is_empty())
427 .cloned()
428 .collect::<Vec<_>>();
429
430 tracing::info!(
431 session_id = %session_id,
432 ambiguity = interview.ambiguity.ambiguity(),
433 questions = questions.len(),
434 "Interview needs clarification"
435 );
436
437 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
438 .await;
439
440 return Ok(OrchestrationResult {
441 session_id: Some(session_id.clone()),
442 space_id: Some(space_id),
443 space_tag: Some(space_tag.clone()),
444 response: format_questions(&questions),
445 seed_id: None,
446 agent_id: None,
447 phase_reached: Phase::Interview,
448 evaluation_passed: false,
449 output: None,
450 });
451 }
452
453 {
457 let mut buffer = self.conversation_buffer.write();
458 buffer.push_agent("[interview: ready]", &space_id);
459 }
460
461 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
463 .await;
464 self.publish_phase_started(&session_id, Phase::Seed).await;
465
466 let is_simple = interview.complexity == "simple" && interview.ambiguity.ambiguity() <= 0.3;
472
473 let seed = if is_simple {
474 tracing::info!(
475 phase = "seed",
476 method = "from_message",
477 "Simple task — ad-hoc seed"
478 );
479 Seed::from_message(&interview.original_message)
480 } else {
481 tracing::info!(
482 phase = "seed",
483 method = "llm",
484 "Complex task — LLM-generated seed"
485 );
486 self.ouroboros.generate_seed(&interview).await?
487 };
488
489 self.save_seed(&seed).await?;
491
492 self.event_bus
494 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
495
496 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
497 .await;
498 self.publish_phase_started(&session_id, Phase::Execute)
499 .await;
500
501 if should_split_seed(&seed) {
505 let subtasks = split_into_subtasks(&seed);
506 if subtasks.len() > 1 {
507 tracing::info!(
508 phase = "delegate",
509 subtasks = subtasks.len(),
510 "Delegating to multi-agent"
511 );
512 let results = self.delegate_subtasks(subtasks, &seed).await?;
513
514 let combined: String = results
516 .iter()
517 .filter(|r| r.success)
518 .filter_map(|r| r.result.as_deref())
519 .collect::<Vec<_>>()
520 .join("\n\n");
521
522 let all_passed = results.iter().all(|r| r.success);
523
524 {
526 let mut sessions = self.sessions.write();
527 sessions.remove(&session_id);
528 }
529
530 tracing::info!(
531 session_id = %session_id,
532 subtasks = results.len(),
533 passed = all_passed,
534 "Multi-agent orchestration complete"
535 );
536
537 return Ok(OrchestrationResult {
538 session_id: Some(session_id),
539 space_id: Some(space_id),
540 space_tag: Some(space_tag.clone()),
541 response: format_result_combined(&combined),
542 seed_id: Some(seed.id),
543 agent_id: None,
544 phase_reached: Phase::Execute,
545 evaluation_passed: all_passed,
546 output: Some(combined),
547 });
548 }
549 }
550
551 {
553 let mut buffer = self.conversation_buffer.write();
554 buffer.push_agent("[multi-agent: complete]", &space_id);
555 }
556
557 tracing::info!(phase = "execute", "Starting execution phase");
559 let exec_result = self
560 .lifecycle
561 .spawn_and_run(&seed, Priority::Normal)
562 .await?;
563
564 self.lifecycle.reap_zombies();
566
567 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
568 .await;
569
570 {
572 let mut sessions = self.sessions.write();
573 sessions.remove(&session_id);
574 }
575
576 let passed = exec_result.success;
577
578 tracing::info!(
579 session_id = %session_id,
580 passed,
581 "Orchestration complete"
582 );
583
584 let metrics = get_metrics();
586 metrics
587 .orch_duration
588 .observe(orch_start.elapsed().as_secs_f64());
589 if passed {
590 metrics.agents_completed.inc();
591 } else {
592 metrics.agents_failed.inc();
593 }
594
595 {
597 let mut buffer = self.conversation_buffer.write();
598 buffer.push_agent(&seed.goal, &space_id);
599 }
600
601 Ok(OrchestrationResult {
602 session_id: Some(session_id),
603 space_id: Some(space_id),
604 space_tag: Some(space_tag.clone()),
605 response: format_execution_result(&seed, &exec_result),
606 seed_id: Some(seed.id),
607 agent_id: None,
608 phase_reached: Phase::Execute,
609 evaluation_passed: passed,
610 output: Some(exec_result.output.clone()),
611 })
612 }
613
614 async fn save_seed(&self, seed: &Seed) -> Result<()> {
616 let key = seed.id.to_string();
617
618 self.state_store
619 .save_json("seeds", &key, seed)
620 .await
621 .context("failed to save seed to state store")?;
622
623 self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
624
625 Ok(())
626 }
627
628 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
631 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
632 session_id: session_id.to_owned(),
633 phase,
634 });
635 }
636
637 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
639 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
640 session_id: session_id.to_owned(),
641 phase,
642 result_summary: result.to_owned(),
643 });
644 }
645
646 pub async fn delegate_subtasks(
654 &self,
655 subtasks: Vec<SubTask>,
656 parent_seed: &Seed,
657 ) -> Result<Vec<SubTask>> {
658 if subtasks.len() == 1 {
660 return self.execute_single_subtask(subtasks, parent_seed).await;
661 }
662
663 if let Some(ref a2a) = self.a2a {
665 if !self.a2a_breaker.is_allowed() {
667 tracing::warn!(
668 state = ?self.a2a_breaker.state(),
669 "A2A circuit breaker open, using lifecycle fallback"
670 );
671 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
672 }
673
674 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
676 }
677
678 self.delegate_via_lifecycle(subtasks, parent_seed).await
680 }
681
682 async fn delegate_with_retry(
684 &self,
685 subtasks: Vec<SubTask>,
686 parent_seed: &Seed,
687 a2a: &Arc<crate::a2a::A2AProtocol>,
688 ) -> Result<Vec<SubTask>> {
689 let mut attempt = 0;
690 let max_retries = self.delegation_config.max_retries;
691
692 loop {
693 match self
694 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
695 .await
696 {
697 Ok(results) => {
698 self.a2a_breaker.record_success();
699 return Ok(results);
700 }
701 Err(e) => {
702 self.a2a_breaker.record_failure();
703 attempt += 1;
704
705 if attempt >= max_retries {
706 tracing::error!(
707 attempts = attempt,
708 error = %e,
709 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
710 attempt
711 );
712 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
713 }
714
715 let delay = self.delegation_config.backoff_delay(attempt);
717 tracing::warn!(
718 attempt,
719 delay_ms = delay,
720 error = %e,
721 "A2A delegation failed, retrying with backoff"
722 );
723 tokio::time::sleep(Duration::from_millis(delay)).await;
724 }
725 }
726 }
727 }
728
729 async fn execute_single_subtask(
731 &self,
732 subtasks: Vec<SubTask>,
733 parent_seed: &Seed,
734 ) -> Result<Vec<SubTask>> {
735 let mut task = subtasks
736 .into_iter()
737 .next()
738 .expect("execute_single_subtask is only called when subtasks is non-empty");
739 let child_seed = Seed {
740 id: Uuid::new_v4(),
741 goal: task.description.clone(),
742 constraints: parent_seed.constraints.clone(),
743 acceptance_criteria: vec!["Task completes successfully".into()],
744 ontology: parent_seed.ontology.clone(),
745 created_at: chrono::Utc::now(),
746 generation: parent_seed.generation + 1,
747 parent_seed_id: Some(parent_seed.id),
748 cspace_hint: None,
749 original_request: parent_seed.original_request.clone(),
750 };
751 match self
752 .lifecycle
753 .spawn_and_run(&child_seed, Priority::Normal)
754 .await
755 {
756 Ok(result) => {
757 task.result = Some(result.output.clone());
758 }
759 Err(e) => {
760 task.result = Some(format!("Failed: {e}"));
761 task.success = false;
762 }
763 }
764 Ok(vec![task])
765 }
766
767 async fn delegate_via_a2a(
777 &self,
778 subtasks: Vec<SubTask>,
779 parent_seed: &Seed,
780 a2a: &Arc<crate::a2a::A2AProtocol>,
781 ) -> Result<Vec<SubTask>> {
782 use crate::a2a::TaskPriority;
783 use tokio::task::JoinSet;
784
785 tracing::info!(
786 subtasks = subtasks.len(),
787 "Delegating subtasks via A2A protocol"
788 );
789
790 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
791 let subtask_count = subtasks.len();
792 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
793
794 for (idx, subtask) in subtasks.into_iter().enumerate() {
795 let capability = subtask.required_capability.clone();
796 let description = subtask.description.clone();
797 let subtask_id = subtask.id;
798 let role = subtask.role.clone();
799 let a2a = Arc::clone(a2a);
800 let parent_seed = parent_seed.clone();
801 let lifecycle = self.lifecycle.clone();
802
803 join_set.spawn(async move {
804 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
806 a2a.query_capabilities(cap).await.ok()
807 .and_then(|agents| agents.into_iter().next())
808 } else {
809 None
810 };
811
812 let (output, success) = if let Some(ref target_card) = target {
813 let target_id = target_card.agent_id;
814 tracing::info!(
815 subtask_index = idx,
816 target = %target_card.name,
817 target_id = %target_id,
818 "A2A dispatching subtask"
819 );
820
821 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
822 "parent_seed": parent_seed.id.to_string(),
823 "goal": description,
824 }))
825 .with_priority(TaskPriority::Normal);
826
827 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
829
830 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
832 Some(Ok(result)) => {
833 let out = result.get("output")
834 .and_then(|v| v.as_str())
835 .unwrap_or("")
836 .to_string();
837 let ok = result.get("success")
838 .and_then(|v| v.as_bool())
839 .unwrap_or(false);
840 (out, ok)
841 }
842 Some(Err(e)) => {
843 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
844 (format!("Failed: {e}"), false)
845 }
846 None => {
847 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
849 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
850 }
851 }
852 } else {
853 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
854 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
855 };
856
857 (idx, SubTask {
858 id: subtask_id,
859 description,
860 required_capability: capability,
861 result: Some(output),
862 success,
863 role,
864 })
865 });
866 }
867
868 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
869 while let Some(join_result) = join_set.join_next().await {
870 match join_result {
871 Ok((idx, subtask)) => {
872 results[idx] = Some(subtask);
873 }
874 Err(e) => {
875 tracing::error!(error = %e, "A2A task panicked");
876 }
877 }
878 }
879
880 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
881 tracing::info!(
882 completed = completed.len(),
883 succeeded = completed.iter().filter(|r| r.success).count(),
884 "A2A delegation complete"
885 );
886 Ok(completed)
887 }
888
889 async fn delegate_via_lifecycle(
890 &self,
891 subtasks: Vec<SubTask>,
892 parent_seed: &Seed,
893 ) -> Result<Vec<SubTask>> {
894 use crate::agent_group::OxiosAgentGroup;
895 use tokio::task::JoinSet;
896
897 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
898 let group = OxiosAgentGroup::new(parent_seed, descriptions);
899 let group_id = group.id;
900
901 self.event_bus.publish(KernelEvent::AgentGroupCreated {
902 group_id,
903 agent_count: group.agents.len(),
904 })?;
905
906 tracing::info!(
907 group_id = %group_id,
908 agent_count = group.agents.len(),
909 "Starting parallel multi-agent execution"
910 );
911
912 let mut join_set: JoinSet<(
913 usize,
914 crate::types::AgentId,
915 Result<oxios_ouroboros::ExecutionResult>,
916 )> = JoinSet::new();
917
918 for (idx, agent_entry) in group.agents.iter().enumerate() {
919 let child_seed = agent_entry.seed.clone();
920 let agent_id = agent_entry.id;
921 let lifecycle = self.lifecycle.clone();
922
923 join_set.spawn(async move {
924 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
925 (idx, agent_id, result)
926 });
927 }
928
929 let subtask_count = subtasks.len();
930 let mut completed = vec![None; subtask_count];
931 while let Some(join_result) = join_set.join_next().await {
932 match join_result {
933 Ok((idx, agent_id, Ok(exec_result))) => {
934 let _ = self
935 .event_bus
936 .publish(KernelEvent::AgentGroupMemberCompleted {
937 group_id,
938 agent_id,
939 success: exec_result.success,
940 });
941 completed[idx] = Some(SubTask {
942 id: subtasks[idx].id,
943 description: subtasks[idx].description.clone(),
944 required_capability: subtasks[idx].required_capability.clone(),
945 result: Some(exec_result.output.clone()),
946 success: exec_result.success,
947 role: subtasks[idx].role.clone(),
948 });
949 }
950 Ok((idx, agent_id, Err(e))) => {
951 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
952 let _ = self
953 .event_bus
954 .publish(KernelEvent::AgentGroupMemberCompleted {
955 group_id,
956 agent_id,
957 success: false,
958 });
959 completed[idx] = Some(SubTask {
960 id: subtasks[idx].id,
961 description: subtasks[idx].description.clone(),
962 required_capability: subtasks[idx].required_capability.clone(),
963 result: Some(format!("Failed: {e}")),
964 success: false,
965 role: subtasks[idx].role.clone(),
966 });
967 }
968 Err(e) => {
969 tracing::error!(error = %e, "JoinSet task panicked");
970 }
971 }
972 }
973
974 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
975 let succeeded = completed.iter().filter(|r| r.success).count();
976 let total = completed.len();
977
978 tracing::info!(
979 group_id = %group_id,
980 succeeded,
981 total,
982 "Parallel multi-agent execution complete"
983 );
984
985 let _ = self
987 .state_store
988 .save_json("agent_groups", &group_id.to_string(), &group)
989 .await;
990 self.git_commit(
991 &format!("agent_groups/{}.json", group_id),
992 "orchestrator: save group",
993 );
994
995 Ok(completed)
996 }
997}
998
999#[derive(Debug, Clone)]
1001#[allow(unused)]
1002struct InterviewSession {
1003 id: String,
1004 interview: InterviewResult,
1005 phase: Phase,
1006 seed_id: Option<Uuid>,
1007 agent_id: Option<AgentId>,
1008}
1009
1010#[derive(Debug, Clone, Serialize, Deserialize)]
1012pub struct OrchestrationResult {
1013 #[serde(skip_serializing_if = "Option::is_none")]
1015 pub session_id: Option<String>,
1016 #[serde(skip_serializing_if = "Option::is_none")]
1018 pub space_id: Option<Uuid>,
1019 #[serde(skip_serializing_if = "Option::is_none")]
1021 pub space_tag: Option<String>,
1022 pub response: String,
1024 #[serde(skip_serializing_if = "Option::is_none")]
1026 pub seed_id: Option<Uuid>,
1027 #[serde(skip_serializing_if = "Option::is_none")]
1029 pub agent_id: Option<AgentId>,
1030 pub phase_reached: Phase,
1032 pub evaluation_passed: bool,
1034 #[serde(skip_serializing_if = "Option::is_none")]
1036 pub output: Option<String>,
1037}
1038
1039fn format_questions(questions: &[String]) -> String {
1041 if questions.is_empty() {
1042 "I need a bit more clarification before I can proceed.".to_string()
1043 } else {
1044 format!(
1045 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1046 questions
1047 .iter()
1048 .enumerate()
1049 .map(|(i, q)| format!("{}. {}", i + 1, q))
1050 .collect::<Vec<_>>()
1051 .join("\n")
1052 )
1053 }
1054}
1055
1056fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1059 let mut lines = Vec::new();
1060
1061 if exec.success {
1062 lines.push(format!("✅ '{}'", seed.goal));
1063 } else {
1064 lines.push(format!(
1065 "⚠️ '{}'을(를) 시도했지만 완전히 성공하지 못했습니다.",
1066 seed.goal
1067 ));
1068 }
1069
1070 if !exec.output.is_empty() {
1072 let preview = if exec.output.len() > 500 {
1073 format!("{}...", &exec.output[..500])
1074 } else {
1075 exec.output.clone()
1076 };
1077 lines.push(String::new());
1078 lines.push(preview);
1079 }
1080
1081 lines.join("\n")
1082}
1083
1084fn should_split_seed(seed: &Seed) -> bool {
1089 seed.acceptance_criteria.len() >= 5
1093}
1094
1095fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1101 seed.acceptance_criteria
1102 .iter()
1103 .map(|criterion| {
1104 let desc = format!("{}: {}", seed.goal, criterion);
1105 let desc_lower = desc.to_lowercase();
1106
1107 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1109 Some("code-review".to_string())
1110 } else if desc_lower.contains("test") {
1111 Some("testing".to_string())
1112 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1113 Some("refactoring".to_string())
1114 } else if desc_lower.contains("write")
1115 || desc_lower.contains("create")
1116 || desc_lower.contains("implement")
1117 {
1118 Some("code-generation".to_string())
1119 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1120 Some("debugging".to_string())
1121 } else {
1122 None
1123 };
1124
1125 let mut subtask = SubTask::new(desc);
1126 subtask.required_capability = cap;
1127 subtask
1128 })
1129 .collect()
1130}
1131
1132fn format_result_combined(combined: &str) -> String {
1134 if combined.is_empty() {
1135 "No subtasks completed successfully.".to_string()
1136 } else {
1137 format!("Multi-agent execution completed:\n\n{}", combined)
1138 }
1139}
1140
1141async fn run_via_lifecycle(
1143 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1144 parent_seed: &Seed,
1145 description: &str,
1146) -> (String, bool) {
1147 let child_seed = Seed {
1148 id: Uuid::new_v4(),
1149 goal: description.to_string(),
1150 constraints: parent_seed.constraints.clone(),
1151 acceptance_criteria: vec!["Task completes successfully".into()],
1152 ontology: parent_seed.ontology.clone(),
1153 created_at: chrono::Utc::now(),
1154 generation: parent_seed.generation + 1,
1155 parent_seed_id: Some(parent_seed.id),
1156 cspace_hint: None,
1157 original_request: parent_seed.original_request.clone(),
1158 };
1159 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1160 Ok(result) => (result.output, result.success),
1161 Err(e) => (format!("Failed: {e}"), false),
1162 }
1163}