use anyhow::{anyhow, Result};
use std::sync::Arc;
use symbi_runtime::reasoning::conversation::{
Conversation, ConversationMessage, MessageRole, ToolCall,
};
use symbi_runtime::reasoning::executor::ActionExecutor;
use symbi_runtime::reasoning::inference::InferenceProvider;
use symbi_runtime::reasoning::loop_types::{
BufferedJournal, JournalWriter, LoopConfig, TerminationReason,
};
use symbi_runtime::reasoning::policy_bridge::DefaultPolicyGate;
use symbi_runtime::reasoning::reasoning_loop::ReasoningLoopRunner;
use symbi_runtime::types::AgentId;
const CONTEXT_TOKEN_BUDGET: usize = 80_000;
const LOOP_TOKEN_BUDGET: u32 = 200_000;
const LOOP_MAX_ITERATIONS: u32 = 20;
const SYSTEM_PROMPT: &str = r#"You are the Symbiont orchestrator, an AI agent that helps users create, manage, and operate AI agents.
You have tools available:
- validate_dsl: Validate Symbiont DSL code against project constraints
- validate_cedar: Validate Cedar policies against project constraints
- validate_toolclad: Validate ToolClad TOML manifests against project constraints
- save_artifact: Save a validated artifact to disk
- list_agents: List all running agents
When the user asks you to create an artifact (agent, policy, tool manifest):
1. Generate the appropriate DSL/Cedar/TOML.
2. Call the matching validate_* tool; if it reports errors, fix the artifact and re-validate until it passes.
3. Decide whether to save immediately or present for review:
- If the user's request was specific (named a profile, described a concrete need, or told you to scaffold/init/create/save), call save_artifact for each validated file without asking again — their request IS the approval.
- If the request was exploratory ("what would a policy for X look like?", "sketch something"), present for review and wait for an explicit approve-or-edit reply before calling save_artifact.
4. After saving, briefly tell the user what was saved and suggest the next command to run (for example /spawn <agent-name>).
5. If validation keeps failing after two attempts, stop, show the user the latest artifact plus the validator's errors, and ask how they want to proceed.
Keep responses concise and actionable. You are running inside symbi shell — a terminal-based orchestration interface."#;
pub struct Orchestrator {
runner: ReasoningLoopRunner,
conversation: Conversation,
model_name: String,
agent_id: AgentId,
#[allow(dead_code)] journal: Arc<BufferedJournal>,
system_prompt: String,
}
const AUTO_APPROVE_ADDENDUM: &str = r#"
The user launched symbi-shell with --yes (auto-approve mode): every save/create/scaffold request is pre-approved. Do not present artifacts for review — call save_artifact immediately for every artifact that passes validation. Skip the "present for review" branch entirely."#;
impl Orchestrator {
pub fn new(
provider: Arc<dyn InferenceProvider>,
executor: Arc<dyn ActionExecutor>,
auto_approve: bool,
) -> Self {
let model_name = provider.default_model().to_string();
let prompt = if auto_approve {
format!("{}{}", SYSTEM_PROMPT, AUTO_APPROVE_ADDENDUM)
} else {
SYSTEM_PROMPT.to_string()
};
let conversation = Conversation::with_system(&prompt);
let journal = Arc::new(BufferedJournal::new(1000));
let runner = ReasoningLoopRunner::builder()
.provider(provider)
.executor(executor)
.policy_gate(Arc::new(DefaultPolicyGate::new()))
.journal(Arc::clone(&journal) as Arc<dyn JournalWriter>)
.build();
Self {
runner,
conversation,
model_name,
agent_id: AgentId::new(),
journal,
system_prompt: prompt,
}
}
pub async fn send(&mut self, user_message: &str) -> Result<OrchestratorResponse> {
self.conversation
.push(ConversationMessage::user(user_message));
let tokens_before = self.conversation.estimate_tokens();
self.conversation.truncate_to_budget(CONTEXT_TOKEN_BUDGET);
let tokens_after = self.conversation.estimate_tokens();
if tokens_after < tokens_before {
tracing::info!(
"Auto-compacted orchestrator context: {} -> {} tokens",
tokens_before,
tokens_after
);
}
let turn_start = self.conversation.messages().len();
let config = LoopConfig {
max_iterations: LOOP_MAX_ITERATIONS,
max_total_tokens: LOOP_TOKEN_BUDGET,
context_token_budget: CONTEXT_TOKEN_BUDGET,
temperature: 0.3,
..LoopConfig::default()
};
const RETRY_BACKOFF: std::time::Duration = std::time::Duration::from_millis(1500);
let mut result = self
.runner
.run(self.agent_id, self.conversation.clone(), config.clone())
.await;
if is_transient_error(&result.termination_reason) {
tracing::warn!(
"orchestrator: transient error ({}), retrying after {:?}",
describe_termination(&result.termination_reason),
RETRY_BACKOFF,
);
tokio::time::sleep(RETRY_BACKOFF).await;
result = self
.runner
.run(self.agent_id, self.conversation.clone(), config)
.await;
}
self.conversation = result.conversation;
if result.output.is_empty() {
if let Some(msg) = self.conversation.last_assistant_message() {
let mut content = msg.content.clone();
if content.trim().is_empty() {
content = format!(
"(no textual response before loop stopped — {})",
describe_termination(&result.termination_reason)
);
} else {
content.push_str(&format!(
"\n\n_(loop stopped: {}; /compact to free context or rephrase to continue)_",
describe_termination(&result.termination_reason),
));
}
return Ok(OrchestratorResponse {
content,
tokens_used: result.total_usage.total_tokens as u64,
iterations: result.iterations,
duration_ms: result.duration.as_millis() as u64,
tool_calls: extract_tool_calls(&self.conversation, turn_start),
});
}
return Err(anyhow!(
"Orchestrator produced no output ({}). Try /compact, /clear, or rephrasing.",
describe_termination(&result.termination_reason),
));
}
Ok(OrchestratorResponse {
content: result.output,
tokens_used: result.total_usage.total_tokens as u64,
iterations: result.iterations,
duration_ms: result.duration.as_millis() as u64,
tool_calls: extract_tool_calls(&self.conversation, turn_start),
})
}
pub fn model_name(&self) -> &str {
&self.model_name
}
pub fn journal(&self) -> &Arc<BufferedJournal> {
&self.journal
}
pub fn context_tokens(&self) -> usize {
self.conversation.estimate_tokens()
}
pub fn context_budget(&self) -> usize {
CONTEXT_TOKEN_BUDGET
}
pub fn compact(&mut self, budget: Option<usize>) -> (usize, usize) {
let budget = budget.unwrap_or(CONTEXT_TOKEN_BUDGET / 2);
let before = self.conversation.estimate_tokens();
self.conversation.truncate_to_budget(budget);
let after = self.conversation.estimate_tokens();
(before, after)
}
pub fn clear(&mut self) {
self.conversation = Conversation::with_system(&self.system_prompt);
}
pub fn conversation(&self) -> &Conversation {
&self.conversation
}
pub fn set_conversation(&mut self, conversation: Conversation) {
self.conversation = conversation;
}
}
pub struct OrchestratorResponse {
pub content: String,
pub tokens_used: u64,
pub iterations: u32,
pub duration_ms: u64,
pub tool_calls: Vec<ToolCallRecord>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ToolCallRecord {
pub call_id: String,
pub name: String,
pub args: String,
pub args_summary: String,
pub output: String,
pub is_error: bool,
pub is_edit: bool,
}
pub fn extract_tool_calls(conversation: &Conversation, start: usize) -> Vec<ToolCallRecord> {
let messages = conversation.messages();
let turn = messages.get(start..).unwrap_or(&[]);
let mut records: Vec<ToolCallRecord> = Vec::new();
for (i, msg) in turn.iter().enumerate() {
if !matches!(msg.role, MessageRole::Assistant) || msg.tool_calls.is_empty() {
continue;
}
for call in &msg.tool_calls {
let observation = find_tool_observation(&turn[i + 1..], &call.id);
let (output, is_error) = observation
.map(|m| (m.content.clone(), looks_like_error(&m.content)))
.unwrap_or_default();
records.push(ToolCallRecord {
call_id: call.id.clone(),
name: call.name.clone(),
args: call.arguments.clone(),
args_summary: summarise_args(call),
output,
is_error,
is_edit: is_edit_tool(call),
});
}
}
records
}
fn find_tool_observation<'a>(
after: &'a [ConversationMessage],
call_id: &str,
) -> Option<&'a ConversationMessage> {
after
.iter()
.find(|m| matches!(m.role, MessageRole::Tool) && m.tool_call_id.as_deref() == Some(call_id))
}
fn looks_like_error(content: &str) -> bool {
let head = content.trim_start();
head.starts_with("Error") || head.starts_with("error:") || head.starts_with("ERROR")
}
fn is_edit_tool(call: &ToolCall) -> bool {
looks_like_edit_tool(&call.name, &call.arguments)
}
pub fn looks_like_edit_tool(name: &str, args: &str) -> bool {
matches!(
name,
"save_artifact" | "write_file" | "edit_file" | "apply_edit"
) || args.contains("\"old_string\"")
}
fn summarise_args(call: &ToolCall) -> String {
summarise_tool_args(&call.name, &call.arguments)
}
pub fn summarise_tool_args(_name: &str, arguments: &str) -> String {
const MAX_LEN: usize = 80;
let Ok(value) = serde_json::from_str::<serde_json::Value>(arguments) else {
return truncate_one_line(arguments, MAX_LEN);
};
let obj = match value.as_object() {
Some(o) => o,
None => return truncate_one_line(arguments, MAX_LEN),
};
if let Some(cmd) = obj.get("command").and_then(|v| v.as_str()) {
return truncate_one_line(cmd, MAX_LEN);
}
for key in ["path", "file_path", "filename", "artifact_path"] {
if let Some(p) = obj.get(key).and_then(|v| v.as_str()) {
return truncate_one_line(p, MAX_LEN);
}
}
let mut parts: Vec<String> = Vec::new();
for (k, v) in obj.iter().take(3) {
let rendered = match v {
serde_json::Value::String(s) => truncate_one_line(s, 30),
other => other.to_string(),
};
parts.push(format!("{}={}", k, rendered));
}
truncate_one_line(&parts.join(", "), MAX_LEN)
}
fn truncate_one_line(s: &str, max_len: usize) -> String {
let flat: String = s.chars().map(|c| if c == '\n' { ' ' } else { c }).collect();
if flat.chars().count() <= max_len {
return flat;
}
let take = max_len.saturating_sub(1);
let truncated: String = flat.chars().take(take).collect();
format!("{}…", truncated)
}
fn describe_termination(reason: &TerminationReason) -> String {
match reason {
TerminationReason::Completed => "completed normally".to_string(),
TerminationReason::MaxIterations => {
"reached the iteration limit — too many tool calls this turn".to_string()
}
TerminationReason::MaxTokens => {
"hit the per-turn token budget — context is too large".to_string()
}
TerminationReason::Timeout => "timed out".to_string(),
TerminationReason::PolicyDenial { reason } => format!("policy denied: {}", reason),
TerminationReason::Error { message } => format!("error: {}", message),
}
}
fn is_transient_error(reason: &TerminationReason) -> bool {
let TerminationReason::Error { message } = reason else {
return false;
};
let lower = message.to_ascii_lowercase();
lower.contains("429")
|| lower.contains("rate limit")
|| lower.contains("rate_limit")
|| lower.contains("503")
|| lower.contains("502")
|| lower.contains("504")
|| lower.contains("timeout")
|| lower.contains("timed out")
|| lower.contains("connection reset")
|| lower.contains("connection refused")
|| lower.contains("overloaded")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_transient_detects_429() {
let r = TerminationReason::Error {
message: "HTTP 429 Too Many Requests".to_string(),
};
assert!(is_transient_error(&r));
}
#[test]
fn is_transient_detects_503_overloaded() {
let r = TerminationReason::Error {
message: "Upstream provider 503 overloaded".to_string(),
};
assert!(is_transient_error(&r));
}
#[test]
fn is_transient_false_for_completed_and_policy() {
assert!(!is_transient_error(&TerminationReason::Completed));
assert!(!is_transient_error(&TerminationReason::MaxTokens));
assert!(!is_transient_error(&TerminationReason::PolicyDenial {
reason: "denied".to_string()
}));
}
#[test]
fn is_transient_false_for_non_transient_error() {
let r = TerminationReason::Error {
message: "JSON parse failed: unexpected token".to_string(),
};
assert!(!is_transient_error(&r));
}
}