1use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono;
18use oxios_ouroboros::{ExecutionResult, InterviewResult, OuroborosProtocol, Phase, Seed};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use crate::agent_lifecycle::AgentLifecycleManager;
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::git_layer::GitLayer;
26use crate::metrics::get_metrics;
27use crate::scheduler::Priority;
28use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
29use crate::state_store::StateStore;
30use crate::types::AgentId;
31
32#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
34pub enum AgentRole {
35 #[default]
37 Worker,
38 Manager,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SubTask {
45 pub id: Uuid,
47 pub description: String,
49 pub required_capability: Option<String>,
51 pub result: Option<String>,
53 pub success: bool,
55 #[serde(default)]
57 pub role: AgentRole,
58}
59
60impl SubTask {
61 pub fn new(description: impl Into<String>) -> Self {
63 Self {
64 id: Uuid::new_v4(),
65 description: description.into(),
66 required_capability: None,
67 result: None,
68 success: false,
69 role: AgentRole::default(),
70 }
71 }
72
73 pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
75 self.required_capability = Some(cap.into());
76 self
77 }
78}
79
80pub struct Orchestrator {
82 ouroboros: Arc<dyn OuroborosProtocol>,
83 event_bus: EventBus,
84 state_store: Arc<StateStore>,
85 git_layer: Option<Arc<GitLayer>>,
87 sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
89 lifecycle: AgentLifecycleManager,
91 a2a: Option<Arc<crate::a2a::A2AProtocol>>,
93 space_manager: RwLock<Option<Arc<SpaceManager>>>,
95 conversation_buffer: RwLock<ConversationBuffer>,
97 delegation_config: DelegationConfig,
99 a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
101}
102
103#[derive(Debug, Clone)]
105struct DelegationConfig {
106 max_retries: u32,
108 base_delay_ms: u64,
110 max_delay_ms: u64,
112 #[allow(dead_code)]
114 timeout_ms: u64,
115}
116
117impl Default for DelegationConfig {
118 fn default() -> Self {
119 Self {
120 max_retries: 3,
121 base_delay_ms: 100,
122 max_delay_ms: 5000,
123 timeout_ms: 5000,
124 }
125 }
126}
127
128impl DelegationConfig {
129 fn backoff_delay(&self, attempt: u32) -> u64 {
131 let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
132 delay.min(self.max_delay_ms)
133 }
134}
135
136impl Orchestrator {
137 pub fn new(
139 ouroboros: Arc<dyn OuroborosProtocol>,
140 event_bus: EventBus,
141 state_store: Arc<StateStore>,
142 lifecycle: AgentLifecycleManager,
143 ) -> Self {
144 Self::with_config(
145 ouroboros,
146 event_bus,
147 state_store,
148 lifecycle,
149 crate::config::OrchestratorConfig::default(),
150 )
151 }
152
153 pub fn with_config(
155 ouroboros: Arc<dyn OuroborosProtocol>,
156 event_bus: EventBus,
157 state_store: Arc<StateStore>,
158 lifecycle: AgentLifecycleManager,
159 _config: crate::config::OrchestratorConfig,
160 ) -> Self {
161 Self {
162 ouroboros,
163 event_bus,
164 state_store,
165 git_layer: None,
166 sessions: RwLock::new(std::collections::HashMap::new()),
167 lifecycle,
168 a2a: None,
169 space_manager: RwLock::new(None),
170 conversation_buffer: RwLock::new(ConversationBuffer::default()),
171 delegation_config: DelegationConfig::default(),
172 a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
173 }
174 }
175
176 pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
178 *self.space_manager.write() = Some(manager);
179 }
180
181 pub fn current_space_id(&self) -> Option<SpaceId> {
183 self.space_manager
184 .read()
185 .as_ref()
186 .map(|m| m.current_space_id())
187 }
188
189 pub fn current_space_tag(&self) -> String {
191 self.space_manager
192 .read()
193 .as_ref()
194 .and_then(|m| {
195 m.current_space().map(|s| {
196 if s.is_default() {
197 String::new()
198 } else {
199 format!("[{} {}]", s.emoji(), s.name)
200 }
201 })
202 })
203 .unwrap_or_default()
204 }
205
206 pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
208 self.a2a = Some(a2a);
209 }
210
211 pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
213 self.git_layer = Some(git_layer);
214 }
215
216 fn git_commit(&self, rel_path: &str, message: &str) {
218 if let Some(ref gl) = self.git_layer {
219 if gl.is_enabled() {
220 let _ = gl.commit_file(rel_path, message);
221 }
222 }
223 }
224
225 #[allow(clippy::await_holding_lock)]
234 pub async fn handle_message(
235 &self,
236 user_id: &str,
237 user_message: &str,
238 session_id: Option<&str>,
239 ) -> Result<OrchestrationResult> {
240 tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
241 get_metrics().messages.inc();
242 let orch_start = std::time::Instant::now();
243
244 let session_id = session_id
245 .map(String::from)
246 .unwrap_or_else(|| Uuid::new_v4().to_string());
247
248 tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
249
250 let space_tag = self.current_space_tag();
252 let (turns, sm_arc) = {
253 let buffer = self.conversation_buffer.read();
254 let sm_guard = self.space_manager.read();
255 let turns: Vec<_> = buffer.turns().iter().cloned().collect();
256 let sm_arc = sm_guard.as_ref().cloned();
258 (turns, sm_arc)
259 };
260 let space_id = if let Some(ref sm) = sm_arc {
261 match sm.detect_or_create(user_message, &turns).await {
262 Ok(id) => {
263 tracing::info!(space_id = %id, "Space detected/created for message");
264 id
265 }
266 Err(e) => {
267 tracing::warn!(error = %e, "Space detection failed, using default");
268 sm.default_space_id()
269 }
270 }
271 } else {
272 uuid::Uuid::nil()
273 };
274
275 {
277 let mut buffer = self.conversation_buffer.write();
278 buffer.push_user(user_message);
279 }
280
281 self.publish_phase_started(&session_id, Phase::Interview)
283 .await;
284
285 let needs_interview;
287 let existing_history: Option<Vec<_>>;
288 {
289 let sessions = self.sessions.read();
290 needs_interview = !sessions.contains_key(&session_id);
291 existing_history = if !needs_interview {
292 sessions
293 .get(&session_id)
294 .map(|s| s.interview.conversation_history.clone())
295 } else {
296 None
297 };
298 }
300
301 let interview = {
303 tracing::info!(phase = "interview", "Starting interview phase");
304 if needs_interview {
305 self.ouroboros.interview(user_message).await?
306 } else {
307 let multi_turn_context = {
310 let mut context_parts = Vec::new();
311 if let Some(ref history) = existing_history {
312 for exchange in history {
313 context_parts.push(format!(
314 "User: {}\nAgent: {}",
315 exchange.user, exchange.agent
316 ));
317 }
318 }
319 context_parts.push(format!("User: {}", user_message));
320 context_parts.join("\n\n")
321 };
322
323 {
325 let mut sessions = self.sessions.write();
326 if let Some(s) = sessions.get_mut(&session_id) {
327 let last_q = s.interview.questions.last().cloned().unwrap_or_default();
328 s.interview.add_exchange(&last_q, user_message);
329 }
330 }
331
332 self.ouroboros.interview(&multi_turn_context).await?
334 }
335 };
336
337 if !interview.is_task {
339 tracing::info!(session_id = %session_id, "Chat response (non-task)");
340
341 let response_text = if interview.chat_response.is_empty() {
342 "Hello! How can I help you today?".to_string()
343 } else {
344 interview.chat_response.clone()
345 };
346
347 {
349 let mut buffer = self.conversation_buffer.write();
350 buffer.push_agent(&response_text, &space_id);
351 }
352
353 {
356 let mut sessions = self.sessions.write();
357 if let Some(session) = sessions.get_mut(&session_id) {
358 tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
359 session
360 .interview
361 .add_to_history(user_message, &response_text);
362 } else {
363 let mut interview = InterviewResult::new();
365 interview.is_task = false;
366 interview.chat_response = response_text.clone();
367 interview.add_to_history(user_message, &response_text);
368 sessions.insert(
369 session_id.clone(),
370 InterviewSession {
371 id: session_id.clone(),
372 interview,
373 phase: Phase::Interview,
374 seed_id: None,
375 agent_id: None,
376 },
377 );
378 }
379 }
380
381 self.publish_phase_completed(&session_id, Phase::Interview, "chat")
382 .await;
383
384 return Ok(OrchestrationResult {
385 session_id: Some(session_id.clone()),
386 space_id: Some(space_id),
387 space_tag: Some(space_tag.clone()),
388 response: response_text,
389 seed_id: None,
390 agent_id: None,
391 phase_reached: Phase::Interview,
392 evaluation_passed: false,
393 output: None,
394 });
395 }
396
397 if !interview.ready_for_seed {
399 {
401 let mut sessions = self.sessions.write();
402 let session =
403 sessions
404 .entry(session_id.clone())
405 .or_insert_with(|| InterviewSession {
406 id: session_id.clone(),
407 interview: interview.clone(),
408 phase: Phase::Interview,
409 seed_id: None,
410 agent_id: None,
411 });
412 let questions_text = interview.questions.join("\n");
415 let last_answer = session.interview.answers.last().cloned();
416 if let Some(ref ans) = last_answer {
417 if !ans.is_empty() {
418 session.interview.add_to_history(ans, &questions_text);
419 }
420 }
421 } let questions = interview
424 .questions
425 .iter()
426 .filter(|q| !q.is_empty())
427 .cloned()
428 .collect::<Vec<_>>();
429
430 tracing::info!(
431 session_id = %session_id,
432 ambiguity = interview.ambiguity.ambiguity(),
433 questions = questions.len(),
434 "Interview needs clarification"
435 );
436
437 self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
438 .await;
439
440 return Ok(OrchestrationResult {
441 session_id: Some(session_id.clone()),
442 space_id: Some(space_id),
443 space_tag: Some(space_tag.clone()),
444 response: format_questions(&questions),
445 seed_id: None,
446 agent_id: None,
447 phase_reached: Phase::Interview,
448 evaluation_passed: false,
449 output: None,
450 });
451 }
452
453 {
457 let mut buffer = self.conversation_buffer.write();
458 buffer.push_agent("[interview: ready]", &space_id);
459 }
460
461 self.publish_phase_completed(&session_id, Phase::Interview, "ready")
463 .await;
464 self.publish_phase_started(&session_id, Phase::Seed).await;
465
466 let is_simple = interview.complexity == "simple"
472 && interview.ambiguity.ambiguity() <= 0.3;
473
474 let seed = if is_simple {
475 tracing::info!(phase = "seed", method = "from_message", "Simple task — ad-hoc seed");
476 Seed::from_message(&interview.original_message)
477 } else {
478 tracing::info!(phase = "seed", method = "llm", "Complex task — LLM-generated seed");
479 self.ouroboros.generate_seed(&interview).await?
480 };
481
482 self.save_seed(&seed).await?;
484
485 self.event_bus
487 .publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
488
489 self.publish_phase_completed(&session_id, Phase::Seed, "generated")
490 .await;
491 self.publish_phase_started(&session_id, Phase::Execute)
492 .await;
493
494 if should_split_seed(&seed) {
498 let subtasks = split_into_subtasks(&seed);
499 if subtasks.len() > 1 {
500 tracing::info!(
501 phase = "delegate",
502 subtasks = subtasks.len(),
503 "Delegating to multi-agent"
504 );
505 let results = self.delegate_subtasks(subtasks, &seed).await?;
506
507 let combined: String = results
509 .iter()
510 .filter(|r| r.success)
511 .filter_map(|r| r.result.as_deref())
512 .collect::<Vec<_>>()
513 .join("\n\n");
514
515 let all_passed = results.iter().all(|r| r.success);
516
517 {
519 let mut sessions = self.sessions.write();
520 sessions.remove(&session_id);
521 }
522
523 tracing::info!(
524 session_id = %session_id,
525 subtasks = results.len(),
526 passed = all_passed,
527 "Multi-agent orchestration complete"
528 );
529
530 return Ok(OrchestrationResult {
531 session_id: Some(session_id),
532 space_id: Some(space_id),
533 space_tag: Some(space_tag.clone()),
534 response: format_result_combined(&combined),
535 seed_id: Some(seed.id),
536 agent_id: None,
537 phase_reached: Phase::Execute,
538 evaluation_passed: all_passed,
539 output: Some(combined),
540 });
541 }
542 }
543
544 {
546 let mut buffer = self.conversation_buffer.write();
547 buffer.push_agent("[multi-agent: complete]", &space_id);
548 }
549
550 tracing::info!(phase = "execute", "Starting execution phase");
552 let exec_result = self
553 .lifecycle
554 .spawn_and_run(&seed, Priority::Normal)
555 .await?;
556
557 self.lifecycle.reap_zombies();
559
560 self.publish_phase_completed(&session_id, Phase::Execute, "completed")
561 .await;
562
563 {
565 let mut sessions = self.sessions.write();
566 sessions.remove(&session_id);
567 }
568
569 let passed = exec_result.success;
570
571 tracing::info!(
572 session_id = %session_id,
573 passed,
574 "Orchestration complete"
575 );
576
577 let metrics = get_metrics();
579 metrics
580 .orch_duration
581 .observe(orch_start.elapsed().as_secs_f64());
582 if passed {
583 metrics.agents_completed.inc();
584 } else {
585 metrics.agents_failed.inc();
586 }
587
588 {
590 let mut buffer = self.conversation_buffer.write();
591 buffer.push_agent(&seed.goal, &space_id);
592 }
593
594 Ok(OrchestrationResult {
595 session_id: Some(session_id),
596 space_id: Some(space_id),
597 space_tag: Some(space_tag.clone()),
598 response: format_execution_result(&seed, &exec_result),
599 seed_id: Some(seed.id),
600 agent_id: None,
601 phase_reached: Phase::Execute,
602 evaluation_passed: passed,
603 output: Some(exec_result.output.clone()),
604 })
605 }
606
607 async fn save_seed(&self, seed: &Seed) -> Result<()> {
609 let key = seed.id.to_string();
610
611 self.state_store
612 .save_json("seeds", &key, seed)
613 .await
614 .context("failed to save seed to state store")?;
615
616 self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
617
618 Ok(())
619 }
620
621 async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
624 let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
625 session_id: session_id.to_owned(),
626 phase,
627 });
628 }
629
630 async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
632 let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
633 session_id: session_id.to_owned(),
634 phase,
635 result_summary: result.to_owned(),
636 });
637 }
638
639 pub async fn delegate_subtasks(
647 &self,
648 subtasks: Vec<SubTask>,
649 parent_seed: &Seed,
650 ) -> Result<Vec<SubTask>> {
651 if subtasks.len() == 1 {
653 return self.execute_single_subtask(subtasks, parent_seed).await;
654 }
655
656 if let Some(ref a2a) = self.a2a {
658 if !self.a2a_breaker.is_allowed() {
660 tracing::warn!(
661 state = ?self.a2a_breaker.state(),
662 "A2A circuit breaker open, using lifecycle fallback"
663 );
664 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
665 }
666
667 return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
669 }
670
671 self.delegate_via_lifecycle(subtasks, parent_seed).await
673 }
674
675 async fn delegate_with_retry(
677 &self,
678 subtasks: Vec<SubTask>,
679 parent_seed: &Seed,
680 a2a: &Arc<crate::a2a::A2AProtocol>,
681 ) -> Result<Vec<SubTask>> {
682 let mut attempt = 0;
683 let max_retries = self.delegation_config.max_retries;
684
685 loop {
686 match self
687 .delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
688 .await
689 {
690 Ok(results) => {
691 self.a2a_breaker.record_success();
692 return Ok(results);
693 }
694 Err(e) => {
695 self.a2a_breaker.record_failure();
696 attempt += 1;
697
698 if attempt >= max_retries {
699 tracing::error!(
700 attempts = attempt,
701 error = %e,
702 "A2A delegation exhausted after {} attempts, using lifecycle fallback",
703 attempt
704 );
705 return self.delegate_via_lifecycle(subtasks, parent_seed).await;
706 }
707
708 let delay = self.delegation_config.backoff_delay(attempt);
710 tracing::warn!(
711 attempt,
712 delay_ms = delay,
713 error = %e,
714 "A2A delegation failed, retrying with backoff"
715 );
716 tokio::time::sleep(Duration::from_millis(delay)).await;
717 }
718 }
719 }
720 }
721
722 async fn execute_single_subtask(
724 &self,
725 subtasks: Vec<SubTask>,
726 parent_seed: &Seed,
727 ) -> Result<Vec<SubTask>> {
728 let mut task = subtasks
729 .into_iter()
730 .next()
731 .expect("execute_single_subtask is only called when subtasks is non-empty");
732 let child_seed = Seed {
733 id: Uuid::new_v4(),
734 goal: task.description.clone(),
735 constraints: parent_seed.constraints.clone(),
736 acceptance_criteria: vec!["Task completes successfully".into()],
737 ontology: parent_seed.ontology.clone(),
738 created_at: chrono::Utc::now(),
739 generation: parent_seed.generation + 1,
740 parent_seed_id: Some(parent_seed.id),
741 cspace_hint: None,
742 original_request: parent_seed.original_request.clone(),
743 };
744 match self
745 .lifecycle
746 .spawn_and_run(&child_seed, Priority::Normal)
747 .await
748 {
749 Ok(result) => {
750 task.result = Some(result.output.clone());
751 }
752 Err(e) => {
753 task.result = Some(format!("Failed: {e}"));
754 task.success = false;
755 }
756 }
757 Ok(vec![task])
758 }
759
760 async fn delegate_via_a2a(
770 &self,
771 subtasks: Vec<SubTask>,
772 parent_seed: &Seed,
773 a2a: &Arc<crate::a2a::A2AProtocol>,
774 ) -> Result<Vec<SubTask>> {
775 use crate::a2a::TaskPriority;
776 use tokio::task::JoinSet;
777
778 tracing::info!(
779 subtasks = subtasks.len(),
780 "Delegating subtasks via A2A protocol"
781 );
782
783 let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
784 let subtask_count = subtasks.len();
785 let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
786
787 for (idx, subtask) in subtasks.into_iter().enumerate() {
788 let capability = subtask.required_capability.clone();
789 let description = subtask.description.clone();
790 let subtask_id = subtask.id;
791 let role = subtask.role.clone();
792 let a2a = Arc::clone(a2a);
793 let parent_seed = parent_seed.clone();
794 let lifecycle = self.lifecycle.clone();
795
796 join_set.spawn(async move {
797 let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
799 a2a.query_capabilities(cap).await.ok()
800 .and_then(|agents| agents.into_iter().next())
801 } else {
802 None
803 };
804
805 let (output, success) = if let Some(ref target_card) = target {
806 let target_id = target_card.agent_id;
807 tracing::info!(
808 subtask_index = idx,
809 target = %target_card.name,
810 target_id = %target_id,
811 "A2A dispatching subtask"
812 );
813
814 let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
815 "parent_seed": parent_seed.id.to_string(),
816 "goal": description,
817 }))
818 .with_priority(TaskPriority::Normal);
819
820 let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
822
823 match a2a.execute_delegation(orchestrator_id, target_id, task).await {
825 Some(Ok(result)) => {
826 let out = result.get("output")
827 .and_then(|v| v.as_str())
828 .unwrap_or("")
829 .to_string();
830 let ok = result.get("success")
831 .and_then(|v| v.as_bool())
832 .unwrap_or(false);
833 (out, ok)
834 }
835 Some(Err(e)) => {
836 tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
837 (format!("Failed: {e}"), false)
838 }
839 None => {
840 tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
842 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
843 }
844 }
845 } else {
846 tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
847 run_via_lifecycle(&lifecycle, &parent_seed, &description).await
848 };
849
850 (idx, SubTask {
851 id: subtask_id,
852 description,
853 required_capability: capability,
854 result: Some(output),
855 success,
856 role,
857 })
858 });
859 }
860
861 let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
862 while let Some(join_result) = join_set.join_next().await {
863 match join_result {
864 Ok((idx, subtask)) => {
865 results[idx] = Some(subtask);
866 }
867 Err(e) => {
868 tracing::error!(error = %e, "A2A task panicked");
869 }
870 }
871 }
872
873 let completed: Vec<SubTask> = results.into_iter().flatten().collect();
874 tracing::info!(
875 completed = completed.len(),
876 succeeded = completed.iter().filter(|r| r.success).count(),
877 "A2A delegation complete"
878 );
879 Ok(completed)
880 }
881
882 async fn delegate_via_lifecycle(
883 &self,
884 subtasks: Vec<SubTask>,
885 parent_seed: &Seed,
886 ) -> Result<Vec<SubTask>> {
887 use crate::agent_group::OxiosAgentGroup;
888 use tokio::task::JoinSet;
889
890 let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
891 let group = OxiosAgentGroup::new(parent_seed, descriptions);
892 let group_id = group.id;
893
894 self.event_bus.publish(KernelEvent::AgentGroupCreated {
895 group_id,
896 agent_count: group.agents.len(),
897 })?;
898
899 tracing::info!(
900 group_id = %group_id,
901 agent_count = group.agents.len(),
902 "Starting parallel multi-agent execution"
903 );
904
905 let mut join_set: JoinSet<(
906 usize,
907 crate::types::AgentId,
908 Result<oxios_ouroboros::ExecutionResult>,
909 )> = JoinSet::new();
910
911 for (idx, agent_entry) in group.agents.iter().enumerate() {
912 let child_seed = agent_entry.seed.clone();
913 let agent_id = agent_entry.id;
914 let lifecycle = self.lifecycle.clone();
915
916 join_set.spawn(async move {
917 let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
918 (idx, agent_id, result)
919 });
920 }
921
922 let subtask_count = subtasks.len();
923 let mut completed = vec![None; subtask_count];
924 while let Some(join_result) = join_set.join_next().await {
925 match join_result {
926 Ok((idx, agent_id, Ok(exec_result))) => {
927 let _ = self
928 .event_bus
929 .publish(KernelEvent::AgentGroupMemberCompleted {
930 group_id,
931 agent_id,
932 success: exec_result.success,
933 });
934 completed[idx] = Some(SubTask {
935 id: subtasks[idx].id,
936 description: subtasks[idx].description.clone(),
937 required_capability: subtasks[idx].required_capability.clone(),
938 result: Some(exec_result.output.clone()),
939 success: exec_result.success,
940 role: subtasks[idx].role.clone(),
941 });
942 }
943 Ok((idx, agent_id, Err(e))) => {
944 tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
945 let _ = self
946 .event_bus
947 .publish(KernelEvent::AgentGroupMemberCompleted {
948 group_id,
949 agent_id,
950 success: false,
951 });
952 completed[idx] = Some(SubTask {
953 id: subtasks[idx].id,
954 description: subtasks[idx].description.clone(),
955 required_capability: subtasks[idx].required_capability.clone(),
956 result: Some(format!("Failed: {e}")),
957 success: false,
958 role: subtasks[idx].role.clone(),
959 });
960 }
961 Err(e) => {
962 tracing::error!(error = %e, "JoinSet task panicked");
963 }
964 }
965 }
966
967 let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
968 let succeeded = completed.iter().filter(|r| r.success).count();
969 let total = completed.len();
970
971 tracing::info!(
972 group_id = %group_id,
973 succeeded,
974 total,
975 "Parallel multi-agent execution complete"
976 );
977
978 let _ = self
980 .state_store
981 .save_json("agent_groups", &group_id.to_string(), &group)
982 .await;
983 self.git_commit(
984 &format!("agent_groups/{}.json", group_id),
985 "orchestrator: save group",
986 );
987
988 Ok(completed)
989 }
990}
991
992#[derive(Debug, Clone)]
994#[allow(unused)]
995struct InterviewSession {
996 id: String,
997 interview: InterviewResult,
998 phase: Phase,
999 seed_id: Option<Uuid>,
1000 agent_id: Option<AgentId>,
1001}
1002
1003#[derive(Debug, Clone, Serialize, Deserialize)]
1005pub struct OrchestrationResult {
1006 #[serde(skip_serializing_if = "Option::is_none")]
1008 pub session_id: Option<String>,
1009 #[serde(skip_serializing_if = "Option::is_none")]
1011 pub space_id: Option<Uuid>,
1012 #[serde(skip_serializing_if = "Option::is_none")]
1014 pub space_tag: Option<String>,
1015 pub response: String,
1017 #[serde(skip_serializing_if = "Option::is_none")]
1019 pub seed_id: Option<Uuid>,
1020 #[serde(skip_serializing_if = "Option::is_none")]
1022 pub agent_id: Option<AgentId>,
1023 pub phase_reached: Phase,
1025 pub evaluation_passed: bool,
1027 #[serde(skip_serializing_if = "Option::is_none")]
1029 pub output: Option<String>,
1030}
1031
1032fn format_questions(questions: &[String]) -> String {
1034 if questions.is_empty() {
1035 "I need a bit more clarification before I can proceed.".to_string()
1036 } else {
1037 format!(
1038 "I'd like to understand your request better. Could you help clarify:\n\n{}",
1039 questions
1040 .iter()
1041 .enumerate()
1042 .map(|(i, q)| format!("{}. {}", i + 1, q))
1043 .collect::<Vec<_>>()
1044 .join("\n")
1045 )
1046 }
1047}
1048
1049fn format_execution_result(seed: &Seed, exec: &ExecutionResult) -> String {
1052 let mut lines = Vec::new();
1053
1054 if exec.success {
1055 lines.push(format!("✅ '{}'", seed.goal));
1056 } else {
1057 lines.push(format!(
1058 "⚠️ '{}'을(를) 시도했지만 완전히 성공하지 못했습니다.",
1059 seed.goal
1060 ));
1061 }
1062
1063 if !exec.output.is_empty() {
1065 let preview = if exec.output.len() > 500 {
1066 format!("{}...", &exec.output[..500])
1067 } else {
1068 exec.output.clone()
1069 };
1070 lines.push(String::new());
1071 lines.push(preview);
1072 }
1073
1074 lines.join("\n")
1075}
1076
1077fn should_split_seed(seed: &Seed) -> bool {
1082 seed.acceptance_criteria.len() >= 5
1086}
1087
1088fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
1094 seed.acceptance_criteria
1095 .iter()
1096 .map(|criterion| {
1097 let desc = format!("{}: {}", seed.goal, criterion);
1098 let desc_lower = desc.to_lowercase();
1099
1100 let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
1102 Some("code-review".to_string())
1103 } else if desc_lower.contains("test") {
1104 Some("testing".to_string())
1105 } else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
1106 Some("refactoring".to_string())
1107 } else if desc_lower.contains("write")
1108 || desc_lower.contains("create")
1109 || desc_lower.contains("implement")
1110 {
1111 Some("code-generation".to_string())
1112 } else if desc_lower.contains("debug") || desc_lower.contains("fix") {
1113 Some("debugging".to_string())
1114 } else {
1115 None
1116 };
1117
1118 let mut subtask = SubTask::new(desc);
1119 subtask.required_capability = cap;
1120 subtask
1121 })
1122 .collect()
1123}
1124
1125fn format_result_combined(combined: &str) -> String {
1127 if combined.is_empty() {
1128 "No subtasks completed successfully.".to_string()
1129 } else {
1130 format!("Multi-agent execution completed:\n\n{}", combined)
1131 }
1132}
1133
1134async fn run_via_lifecycle(
1136 lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
1137 parent_seed: &Seed,
1138 description: &str,
1139) -> (String, bool) {
1140 let child_seed = Seed {
1141 id: Uuid::new_v4(),
1142 goal: description.to_string(),
1143 constraints: parent_seed.constraints.clone(),
1144 acceptance_criteria: vec!["Task completes successfully".into()],
1145 ontology: parent_seed.ontology.clone(),
1146 created_at: chrono::Utc::now(),
1147 generation: parent_seed.generation + 1,
1148 parent_seed_id: Some(parent_seed.id),
1149 cspace_hint: None,
1150 original_request: parent_seed.original_request.clone(),
1151 };
1152 match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
1153 Ok(result) => (result.output, result.success),
1154 Err(e) => (format!("Failed: {e}"), false),
1155 }
1156}