use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use chrono;
use oxios_ouroboros::{EvaluationResult, InterviewResult, OuroborosProtocol, Phase, Seed};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::agent_lifecycle::AgentLifecycleManager;
use crate::event_bus::{EventBus, KernelEvent};
use crate::git_layer::GitLayer;
use crate::metrics::get_metrics;
use crate::scheduler::Priority;
use crate::space::{ConversationBuffer, SpaceId, SpaceManager};
use crate::state_store::StateStore;
use crate::types::AgentId;
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub enum AgentRole {
#[default]
Worker,
Manager,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubTask {
pub id: Uuid,
pub description: String,
pub required_capability: Option<String>,
pub result: Option<String>,
pub success: bool,
#[serde(default)]
pub role: AgentRole,
}
impl SubTask {
pub fn new(description: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
description: description.into(),
required_capability: None,
result: None,
success: false,
role: AgentRole::default(),
}
}
pub fn with_capability(mut self, cap: impl Into<String>) -> Self {
self.required_capability = Some(cap.into());
self
}
}
pub struct Orchestrator {
ouroboros: Arc<dyn OuroborosProtocol>,
event_bus: EventBus,
state_store: Arc<StateStore>,
git_layer: Option<Arc<GitLayer>>,
sessions: RwLock<std::collections::HashMap<String, InterviewSession>>,
lifecycle: AgentLifecycleManager,
a2a: Option<Arc<crate::a2a::A2AProtocol>>,
space_manager: RwLock<Option<Arc<SpaceManager>>>,
conversation_buffer: RwLock<ConversationBuffer>,
config: crate::config::OrchestratorConfig,
delegation_config: DelegationConfig,
a2a_breaker: Arc<crate::a2a_circuit_breaker::A2ACircuitBreaker>,
}
#[derive(Debug, Clone)]
struct DelegationConfig {
max_retries: u32,
base_delay_ms: u64,
max_delay_ms: u64,
#[allow(dead_code)]
timeout_ms: u64,
}
impl Default for DelegationConfig {
fn default() -> Self {
Self {
max_retries: 3,
base_delay_ms: 100,
max_delay_ms: 5000,
timeout_ms: 5000,
}
}
}
impl DelegationConfig {
fn backoff_delay(&self, attempt: u32) -> u64 {
let delay = self.base_delay_ms * 2_u64.saturating_pow(attempt.min(10));
delay.min(self.max_delay_ms)
}
}
impl Orchestrator {
pub fn new(
ouroboros: Arc<dyn OuroborosProtocol>,
event_bus: EventBus,
state_store: Arc<StateStore>,
lifecycle: AgentLifecycleManager,
) -> Self {
Self::with_config(
ouroboros,
event_bus,
state_store,
lifecycle,
crate::config::OrchestratorConfig::default(),
)
}
pub fn with_config(
ouroboros: Arc<dyn OuroborosProtocol>,
event_bus: EventBus,
state_store: Arc<StateStore>,
lifecycle: AgentLifecycleManager,
config: crate::config::OrchestratorConfig,
) -> Self {
Self {
ouroboros,
event_bus,
state_store,
git_layer: None,
sessions: RwLock::new(std::collections::HashMap::new()),
lifecycle,
a2a: None,
space_manager: RwLock::new(None),
conversation_buffer: RwLock::new(ConversationBuffer::default()),
config,
delegation_config: DelegationConfig::default(),
a2a_breaker: Arc::new(crate::a2a_circuit_breaker::A2ACircuitBreaker::new(5, 30)),
}
}
pub fn set_space_manager(&self, manager: Arc<SpaceManager>) {
*self.space_manager.write() = Some(manager);
}
pub fn current_space_id(&self) -> Option<SpaceId> {
self.space_manager
.read()
.as_ref()
.map(|m| m.current_space_id())
}
pub fn current_space_tag(&self) -> String {
self.space_manager
.read()
.as_ref()
.and_then(|m| {
m.current_space().map(|s| {
if s.is_default() {
String::new()
} else {
format!("[{} {}]", s.emoji(), s.name)
}
})
})
.unwrap_or_default()
}
pub fn set_a2a(&mut self, a2a: Arc<crate::a2a::A2AProtocol>) {
self.a2a = Some(a2a);
}
pub fn set_git_layer(&mut self, git_layer: Arc<GitLayer>) {
self.git_layer = Some(git_layer);
}
fn git_commit(&self, rel_path: &str, message: &str) {
if let Some(ref gl) = self.git_layer {
if gl.is_enabled() {
let _ = gl.commit_file(rel_path, message);
}
}
}
#[allow(clippy::await_holding_lock)]
pub async fn handle_message(
&self,
user_id: &str,
user_message: &str,
session_id: Option<&str>,
) -> Result<OrchestrationResult> {
tracing::info!(name = "orchestrator.handle_message", session_id = %session_id.unwrap_or("new"), "starting");
get_metrics().messages.inc();
let orch_start = std::time::Instant::now();
let session_id = session_id
.map(String::from)
.unwrap_or_else(|| Uuid::new_v4().to_string());
tracing::info!(session_id = %session_id, user_id = %user_id, content_len = user_message.len(), "Orchestrator handling message");
let space_tag = self.current_space_tag();
let (turns, sm_arc) = {
let buffer = self.conversation_buffer.read();
let sm_guard = self.space_manager.read();
let turns: Vec<_> = buffer.turns().iter().cloned().collect();
let sm_arc = sm_guard.as_ref().cloned();
(turns, sm_arc)
};
let space_id = if let Some(ref sm) = sm_arc {
match sm.detect_or_create(user_message, &turns).await {
Ok(id) => {
tracing::info!(space_id = %id, "Space detected/created for message");
id
}
Err(e) => {
tracing::warn!(error = %e, "Space detection failed, using default");
sm.default_space_id()
}
}
} else {
uuid::Uuid::nil()
};
{
let mut buffer = self.conversation_buffer.write();
buffer.push_user(user_message);
}
self.publish_phase_started(&session_id, Phase::Interview)
.await;
let needs_interview;
let existing_history: Option<Vec<_>>;
{
let sessions = self.sessions.read();
needs_interview = !sessions.contains_key(&session_id);
existing_history = if !needs_interview {
sessions
.get(&session_id)
.map(|s| s.interview.conversation_history.clone())
} else {
None
};
}
let interview = {
tracing::info!(phase = "interview", "Starting interview phase");
if needs_interview {
self.ouroboros.interview(user_message).await?
} else {
let multi_turn_context = {
let mut context_parts = Vec::new();
if let Some(ref history) = existing_history {
for exchange in history {
context_parts.push(format!(
"User: {}\nAgent: {}",
exchange.user, exchange.agent
));
}
}
context_parts.push(format!("User: {}", user_message));
context_parts.join("\n\n")
};
{
let mut sessions = self.sessions.write();
if let Some(s) = sessions.get_mut(&session_id) {
let last_q = s.interview.questions.last().cloned().unwrap_or_default();
s.interview.add_exchange(&last_q, user_message);
}
}
self.ouroboros.interview(&multi_turn_context).await?
}
};
if !interview.is_task {
tracing::info!(session_id = %session_id, "Chat response (non-task)");
let response_text = if interview.chat_response.is_empty() {
"Hello! How can I help you today?".to_string()
} else {
interview.chat_response.clone()
};
{
let mut buffer = self.conversation_buffer.write();
buffer.push_agent(&response_text, &space_id);
}
{
let mut sessions = self.sessions.write();
if let Some(session) = sessions.get_mut(&session_id) {
tracing::debug!(session_id = %session_id, history_len = session.interview.conversation_history.len(), "Adding to existing session history");
session
.interview
.add_to_history(user_message, &response_text);
} else {
let mut interview = InterviewResult::new();
interview.is_task = false;
interview.chat_response = response_text.clone();
interview.add_to_history(user_message, &response_text);
sessions.insert(
session_id.clone(),
InterviewSession {
id: session_id.clone(),
interview,
phase: Phase::Interview,
seed_id: None,
agent_id: None,
},
);
}
}
self.publish_phase_completed(&session_id, Phase::Interview, "chat")
.await;
return Ok(OrchestrationResult {
session_id: Some(session_id.clone()),
space_id: Some(space_id),
space_tag: Some(space_tag.clone()),
response: response_text,
seed_id: None,
agent_id: None,
phase_reached: Phase::Interview,
evaluation_passed: false,
output: None,
});
}
if !interview.ready_for_seed {
{
let mut sessions = self.sessions.write();
let session =
sessions
.entry(session_id.clone())
.or_insert_with(|| InterviewSession {
id: session_id.clone(),
interview: interview.clone(),
phase: Phase::Interview,
seed_id: None,
agent_id: None,
});
let questions_text = interview.questions.join("\n");
let last_answer = session.interview.answers.last().cloned();
if let Some(ref ans) = last_answer {
if !ans.is_empty() {
session.interview.add_to_history(ans, &questions_text);
}
}
}
let questions = interview
.questions
.iter()
.filter(|q| !q.is_empty())
.cloned()
.collect::<Vec<_>>();
tracing::info!(
session_id = %session_id,
ambiguity = interview.ambiguity.ambiguity(),
questions = questions.len(),
"Interview needs clarification"
);
self.publish_phase_completed(&session_id, Phase::Interview, "needs clarification")
.await;
return Ok(OrchestrationResult {
session_id: Some(session_id.clone()),
space_id: Some(space_id),
space_tag: Some(space_tag.clone()),
response: format_questions(&questions),
seed_id: None,
agent_id: None,
phase_reached: Phase::Interview,
evaluation_passed: false,
output: None,
});
}
{
let mut buffer = self.conversation_buffer.write();
buffer.push_agent("[interview: questions]", &space_id);
}
self.publish_phase_completed(&session_id, Phase::Interview, "ready for seed")
.await;
self.publish_phase_started(&session_id, Phase::Seed).await;
tracing::info!(phase = "seed", "Starting seed generation");
let seed = self.ouroboros.generate_seed(&interview).await?;
self.save_seed(&seed).await?;
self.event_bus
.publish(KernelEvent::SeedCreated { seed_id: seed.id })?;
self.publish_phase_completed(&session_id, Phase::Seed, "generated")
.await;
self.publish_phase_started(&session_id, Phase::Execute)
.await;
if should_split_seed(&seed) {
let subtasks = split_into_subtasks(&seed);
if subtasks.len() > 1 {
tracing::info!(
phase = "delegate",
subtasks = subtasks.len(),
"Delegating to multi-agent"
);
let results = self.delegate_subtasks(subtasks, &seed).await?;
let combined: String = results
.iter()
.filter(|r| r.success)
.filter_map(|r| r.result.as_deref())
.collect::<Vec<_>>()
.join("\n\n");
let all_passed = results.iter().all(|r| r.success);
{
let mut sessions = self.sessions.write();
sessions.remove(&session_id);
}
tracing::info!(
session_id = %session_id,
subtasks = results.len(),
passed = all_passed,
"Multi-agent orchestration complete"
);
return Ok(OrchestrationResult {
session_id: Some(session_id),
space_id: Some(space_id),
space_tag: Some(space_tag.clone()),
response: format_result_combined(&combined),
seed_id: Some(seed.id),
agent_id: None,
phase_reached: Phase::Execute,
evaluation_passed: all_passed,
output: Some(combined),
});
}
}
{
let mut buffer = self.conversation_buffer.write();
buffer.push_agent("[multi-agent: complete]", &space_id);
}
tracing::info!(phase = "execute", "Starting execution phase");
let exec_result = self
.lifecycle
.spawn_and_run(&seed, Priority::Normal)
.await?;
self.lifecycle.reap_zombies();
self.publish_phase_completed(&session_id, Phase::Execute, "completed")
.await;
self.publish_phase_started(&session_id, Phase::Evaluate)
.await;
tracing::info!(phase = "evaluate", "Starting evaluation phase");
let evaluation = self.ouroboros.evaluate(&seed, &exec_result).await?;
self.publish_phase_completed(
&session_id,
Phase::Evaluate,
&format!("score={:.2}", evaluation.score),
)
.await;
self.event_bus.publish(KernelEvent::EvaluationComplete {
seed_id: seed.id,
passed: evaluation.all_passed(),
})?;
self.save_evaluation(&seed, &evaluation).await?;
let mut current_seed = Some(seed);
let mut current_evaluation = evaluation;
let mut iterations = 0;
while !current_evaluation.all_passed()
&& current_evaluation.score < self.config.min_evaluation_score
&& iterations < self.config.max_evolution_iterations
{
iterations += 1;
self.publish_phase_started(&session_id, Phase::Evolve).await;
tracing::info!(
phase = "evolve",
iteration = iterations,
max_iterations = self.config.max_evolution_iterations,
"Starting evolve phase"
);
let evolve_result = self
.ouroboros
.evolve(
current_seed.as_ref().expect("seed exists"),
¤t_evaluation,
)
.await?;
if let Some(evolved) = evolve_result {
current_seed = Some(evolved.clone());
self.save_seed(&evolved).await?;
self.event_bus.publish(KernelEvent::SeedCreated {
seed_id: evolved.id,
})?;
self.publish_phase_completed(&session_id, Phase::Evolve, "evolved")
.await;
self.publish_phase_started(&session_id, Phase::Execute)
.await;
tracing::info!(
phase = "re-execute",
iteration = iterations,
"Re-executing with evolved seed"
);
let new_exec = self
.lifecycle
.spawn_and_run(&evolved, Priority::High)
.await?;
self.publish_phase_completed(&session_id, Phase::Execute, "completed")
.await;
self.publish_phase_started(&session_id, Phase::Evaluate)
.await;
tracing::info!(
phase = "re-evaluate",
iteration = iterations,
"Re-evaluating evolved result"
);
let new_eval = self.ouroboros.evaluate(&evolved, &new_exec).await?;
current_evaluation = new_eval;
self.publish_phase_completed(
&session_id,
Phase::Evaluate,
&format!("score={:.2}", current_evaluation.score),
)
.await;
self.save_evaluation(&evolved, ¤t_evaluation).await?;
} else {
self.publish_phase_completed(&session_id, Phase::Evolve, "no evolution")
.await;
break;
}
}
{
let mut sessions = self.sessions.write();
sessions.remove(&session_id);
}
let final_seed = current_seed.expect("at least one seed exists");
let passed = current_evaluation.all_passed() || current_evaluation.score >= 0.7;
tracing::info!(
session_id = %session_id,
iterations,
score = current_evaluation.score,
passed,
"Orchestration complete"
);
let metrics = get_metrics();
metrics
.orch_duration
.observe(orch_start.elapsed().as_secs_f64());
if passed {
metrics.agents_completed.inc();
} else {
metrics.agents_failed.inc();
}
{
let mut buffer = self.conversation_buffer.write();
buffer.push_agent(&final_seed.goal, &space_id);
}
Ok(OrchestrationResult {
session_id: Some(session_id),
space_id: Some(space_id),
space_tag: Some(space_tag.clone()),
response: format_result(&final_seed, ¤t_evaluation),
seed_id: Some(final_seed.id),
agent_id: None,
phase_reached: Phase::Evaluate,
evaluation_passed: passed,
output: Some(current_evaluation.notes.join("; ")),
})
}
async fn save_seed(&self, seed: &Seed) -> Result<()> {
let key = seed.id.to_string();
self.state_store
.save_json("seeds", &key, seed)
.await
.context("failed to save seed to state store")?;
self.git_commit(&format!("seeds/{}.json", key), "ourobors: save seed");
Ok(())
}
async fn save_evaluation(&self, seed: &Seed, evaluation: &EvaluationResult) -> Result<()> {
let key = format!("{}-eval", seed.id);
self.state_store
.save_json("evals", &key, evaluation)
.await
.context("failed to save evaluation to state store")?;
self.git_commit(&format!("evals/{}.json", key), "ourobors: save eval");
Ok(())
}
async fn publish_phase_started(&self, session_id: &str, phase: Phase) {
let _ = self.event_bus.publish(KernelEvent::PhaseStarted {
session_id: session_id.to_owned(),
phase,
});
}
async fn publish_phase_completed(&self, session_id: &str, phase: Phase, result: &str) {
let _ = self.event_bus.publish(KernelEvent::PhaseCompleted {
session_id: session_id.to_owned(),
phase,
result_summary: result.to_owned(),
});
}
pub async fn delegate_subtasks(
&self,
subtasks: Vec<SubTask>,
parent_seed: &Seed,
) -> Result<Vec<SubTask>> {
if subtasks.len() == 1 {
return self.execute_single_subtask(subtasks, parent_seed).await;
}
if let Some(ref a2a) = self.a2a {
if !self.a2a_breaker.is_allowed() {
tracing::warn!(
state = ?self.a2a_breaker.state(),
"A2A circuit breaker open, using lifecycle fallback"
);
return self.delegate_via_lifecycle(subtasks, parent_seed).await;
}
return self.delegate_with_retry(subtasks, parent_seed, a2a).await;
}
self.delegate_via_lifecycle(subtasks, parent_seed).await
}
async fn delegate_with_retry(
&self,
subtasks: Vec<SubTask>,
parent_seed: &Seed,
a2a: &Arc<crate::a2a::A2AProtocol>,
) -> Result<Vec<SubTask>> {
let mut attempt = 0;
let max_retries = self.delegation_config.max_retries;
loop {
match self
.delegate_via_a2a(subtasks.clone(), parent_seed, a2a)
.await
{
Ok(results) => {
self.a2a_breaker.record_success();
return Ok(results);
}
Err(e) => {
self.a2a_breaker.record_failure();
attempt += 1;
if attempt >= max_retries {
tracing::error!(
attempts = attempt,
error = %e,
"A2A delegation exhausted after {} attempts, using lifecycle fallback",
attempt
);
return self.delegate_via_lifecycle(subtasks, parent_seed).await;
}
let delay = self.delegation_config.backoff_delay(attempt);
tracing::warn!(
attempt,
delay_ms = delay,
error = %e,
"A2A delegation failed, retrying with backoff"
);
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
}
async fn execute_single_subtask(
&self,
subtasks: Vec<SubTask>,
parent_seed: &Seed,
) -> Result<Vec<SubTask>> {
let mut task = subtasks
.into_iter()
.next()
.expect("execute_single_subtask is only called when subtasks is non-empty");
let child_seed = Seed {
id: Uuid::new_v4(),
goal: task.description.clone(),
constraints: parent_seed.constraints.clone(),
acceptance_criteria: vec!["Task completes successfully".into()],
ontology: parent_seed.ontology.clone(),
created_at: chrono::Utc::now(),
generation: parent_seed.generation + 1,
parent_seed_id: Some(parent_seed.id),
cspace_hint: None,
};
match self
.lifecycle
.spawn_and_run(&child_seed, Priority::Normal)
.await
{
Ok(result) => {
task.result = Some(result.output.clone());
}
Err(e) => {
task.result = Some(format!("Failed: {e}"));
task.success = false;
}
}
Ok(vec![task])
}
async fn delegate_via_a2a(
&self,
subtasks: Vec<SubTask>,
parent_seed: &Seed,
a2a: &Arc<crate::a2a::A2AProtocol>,
) -> Result<Vec<SubTask>> {
use crate::a2a::TaskPriority;
use tokio::task::JoinSet;
tracing::info!(
subtasks = subtasks.len(),
"Delegating subtasks via A2A protocol"
);
let orchestrator_id: crate::types::AgentId = uuid::Uuid::nil();
let subtask_count = subtasks.len();
let mut join_set: JoinSet<(usize, SubTask)> = JoinSet::new();
for (idx, subtask) in subtasks.into_iter().enumerate() {
let capability = subtask.required_capability.clone();
let description = subtask.description.clone();
let subtask_id = subtask.id;
let role = subtask.role.clone();
let a2a = Arc::clone(a2a);
let parent_seed = parent_seed.clone();
let lifecycle = self.lifecycle.clone();
join_set.spawn(async move {
let target: Option<crate::a2a::AgentCard> = if let Some(ref cap) = capability {
a2a.query_capabilities(cap).await.ok()
.and_then(|agents| agents.into_iter().next())
} else {
None
};
let (output, success) = if let Some(ref target_card) = target {
let target_id = target_card.agent_id;
tracing::info!(
subtask_index = idx,
target = %target_card.name,
target_id = %target_id,
"A2A dispatching subtask"
);
let task = crate::a2a::TaskSpec::new(&description, serde_json::json!({
"parent_seed": parent_seed.id.to_string(),
"goal": description,
}))
.with_priority(TaskPriority::Normal);
let _ = a2a.delegate_task(orchestrator_id, target_id, task.clone()).await;
match a2a.execute_delegation(orchestrator_id, target_id, task).await {
Some(Ok(result)) => {
let out = result.get("output")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let ok = result.get("success")
.and_then(|v| v.as_bool())
.unwrap_or(false);
(out, ok)
}
Some(Err(e)) => {
tracing::warn!(subtask_index = idx, error = %e, "execute_delegation failed");
(format!("Failed: {e}"), false)
}
None => {
tracing::warn!(subtask_index = idx, "No dispatch handler, lifecycle fallback");
run_via_lifecycle(&lifecycle, &parent_seed, &description).await
}
}
} else {
tracing::info!(subtask_index = idx, "No A2A agent found, lifecycle fallback");
run_via_lifecycle(&lifecycle, &parent_seed, &description).await
};
(idx, SubTask {
id: subtask_id,
description,
required_capability: capability,
result: Some(output),
success,
role,
})
});
}
let mut results: Vec<Option<SubTask>> = vec![None; subtask_count];
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok((idx, subtask)) => {
results[idx] = Some(subtask);
}
Err(e) => {
tracing::error!(error = %e, "A2A task panicked");
}
}
}
let completed: Vec<SubTask> = results.into_iter().flatten().collect();
tracing::info!(
completed = completed.len(),
succeeded = completed.iter().filter(|r| r.success).count(),
"A2A delegation complete"
);
Ok(completed)
}
async fn delegate_via_lifecycle(
&self,
subtasks: Vec<SubTask>,
parent_seed: &Seed,
) -> Result<Vec<SubTask>> {
use crate::agent_group::OxiosAgentGroup;
use tokio::task::JoinSet;
let descriptions: Vec<String> = subtasks.iter().map(|st| st.description.clone()).collect();
let group = OxiosAgentGroup::new(parent_seed, descriptions);
let group_id = group.id;
self.event_bus.publish(KernelEvent::AgentGroupCreated {
group_id,
agent_count: group.agents.len(),
})?;
tracing::info!(
group_id = %group_id,
agent_count = group.agents.len(),
"Starting parallel multi-agent execution"
);
let mut join_set: JoinSet<(
usize,
crate::types::AgentId,
Result<oxios_ouroboros::ExecutionResult>,
)> = JoinSet::new();
for (idx, agent_entry) in group.agents.iter().enumerate() {
let child_seed = agent_entry.seed.clone();
let agent_id = agent_entry.id;
let lifecycle = self.lifecycle.clone();
join_set.spawn(async move {
let result = lifecycle.spawn_and_run(&child_seed, Priority::Normal).await;
(idx, agent_id, result)
});
}
let subtask_count = subtasks.len();
let mut completed = vec![None; subtask_count];
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok((idx, agent_id, Ok(exec_result))) => {
let _ = self
.event_bus
.publish(KernelEvent::AgentGroupMemberCompleted {
group_id,
agent_id,
success: exec_result.success,
});
completed[idx] = Some(SubTask {
id: subtasks[idx].id,
description: subtasks[idx].description.clone(),
required_capability: subtasks[idx].required_capability.clone(),
result: Some(exec_result.output.clone()),
success: exec_result.success,
role: subtasks[idx].role.clone(),
});
}
Ok((idx, agent_id, Err(e))) => {
tracing::warn!(subtask_index = idx, error = %e, "Subtask failed");
let _ = self
.event_bus
.publish(KernelEvent::AgentGroupMemberCompleted {
group_id,
agent_id,
success: false,
});
completed[idx] = Some(SubTask {
id: subtasks[idx].id,
description: subtasks[idx].description.clone(),
required_capability: subtasks[idx].required_capability.clone(),
result: Some(format!("Failed: {e}")),
success: false,
role: subtasks[idx].role.clone(),
});
}
Err(e) => {
tracing::error!(error = %e, "JoinSet task panicked");
}
}
}
let completed: Vec<SubTask> = completed.into_iter().flatten().collect();
let succeeded = completed.iter().filter(|r| r.success).count();
let total = completed.len();
tracing::info!(
group_id = %group_id,
succeeded,
total,
"Parallel multi-agent execution complete"
);
let _ = self
.state_store
.save_json("agent_groups", &group_id.to_string(), &group)
.await;
self.git_commit(
&format!("agent_groups/{}.json", group_id),
"orchestrator: save group",
);
Ok(completed)
}
}
#[derive(Debug, Clone)]
#[allow(unused)]
struct InterviewSession {
id: String,
interview: InterviewResult,
phase: Phase,
seed_id: Option<Uuid>,
agent_id: Option<AgentId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestrationResult {
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub space_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub space_tag: Option<String>,
pub response: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub seed_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_id: Option<AgentId>,
pub phase_reached: Phase,
pub evaluation_passed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<String>,
}
fn format_questions(questions: &[String]) -> String {
if questions.is_empty() {
"I need a bit more clarification before I can proceed.".to_string()
} else {
format!(
"I'd like to understand your request better. Could you help clarify:\n\n{}",
questions
.iter()
.enumerate()
.map(|(i, q)| format!("{}. {}", i + 1, q))
.collect::<Vec<_>>()
.join("\n")
)
}
}
fn format_result(seed: &Seed, evaluation: &EvaluationResult) -> String {
let passed = evaluation.all_passed();
let mut lines = Vec::new();
lines.push(format!("Task '{}' completed.", seed.goal));
if passed {
lines.push("✅ Evaluation passed.".to_string());
} else {
lines.push(format!(
"⚠️ Evaluation score: {:.0}%",
evaluation.score * 100.0
));
}
if !evaluation.notes.is_empty() {
lines.push("\nNotes:".to_string());
for note in &evaluation.notes {
lines.push(format!("- {}", note));
}
}
lines.join("\n")
}
fn should_split_seed(seed: &Seed) -> bool {
seed.acceptance_criteria.len() >= 5
}
fn split_into_subtasks(seed: &Seed) -> Vec<SubTask> {
seed.acceptance_criteria
.iter()
.map(|criterion| {
let desc = format!("{}: {}", seed.goal, criterion);
let desc_lower = desc.to_lowercase();
let cap = if desc_lower.contains("review") || desc_lower.contains("code") {
Some("code-review".to_string())
} else if desc_lower.contains("test") {
Some("testing".to_string())
} else if desc_lower.contains("refactor") || desc_lower.contains("improve") {
Some("refactoring".to_string())
} else if desc_lower.contains("write")
|| desc_lower.contains("create")
|| desc_lower.contains("implement")
{
Some("code-generation".to_string())
} else if desc_lower.contains("debug") || desc_lower.contains("fix") {
Some("debugging".to_string())
} else {
None
};
let mut subtask = SubTask::new(desc);
subtask.required_capability = cap;
subtask
})
.collect()
}
fn format_result_combined(combined: &str) -> String {
if combined.is_empty() {
"No subtasks completed successfully.".to_string()
} else {
format!("Multi-agent execution completed:\n\n{}", combined)
}
}
async fn run_via_lifecycle(
lifecycle: &crate::agent_lifecycle::AgentLifecycleManager,
parent_seed: &Seed,
description: &str,
) -> (String, bool) {
let child_seed = Seed {
id: Uuid::new_v4(),
goal: description.to_string(),
constraints: parent_seed.constraints.clone(),
acceptance_criteria: vec!["Task completes successfully".into()],
ontology: parent_seed.ontology.clone(),
created_at: chrono::Utc::now(),
generation: parent_seed.generation + 1,
parent_seed_id: Some(parent_seed.id),
cspace_hint: None,
};
match lifecycle.spawn_and_run(&child_seed, Priority::Normal).await {
Ok(result) => (result.output, result.success),
Err(e) => (format!("Failed: {e}"), false),
}
}