use anyhow::Result;
use clap::Parser;
use clawgarden_agent::bus_client::BusClient;
use clawgarden_agent::main_helpers::{
self, count_consecutive_agent_turns, format_speaker, get_history, loop_policy, record_history,
settings,
};
use clawgarden_agent::memory::load_memory;
use clawgarden_agent::role::load_role;
use clawgarden_agent::skill_loader;
use clawgarden_agent::loop_guard::LoopGuard;
use clawgarden_proto::{
generate_event_id, generate_trace_id, Envelope, EventType, MessagePayload,
Payload,
};
use std::time::Duration;
use tokio::time::interval;
static SKILLS: once_cell::sync::Lazy<std::sync::Mutex<String>> =
once_cell::sync::Lazy::new(|| std::sync::Mutex::new(String::new()));
fn store_skills(skills: &str) {
let mut s = SKILLS.lock().unwrap_or_else(|e| e.into_inner());
*s = skills.to_string();
}
fn get_skills() -> String {
SKILLS.lock().unwrap_or_else(|e| e.into_inner()).clone()
}
const HEARTBEAT_INTERVAL_SECS: u64 = 5;
const CHECKPOINT_GC_INTERVAL_SECS: u64 = 600;
#[derive(Parser, Debug)]
#[command(name = "clawgarden-agent")]
struct Opts {
#[arg(long)]
agent_name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opts = Opts::parse();
let name = opts.agent_name;
log::info!("Starting agent: {}", name);
let role = load_role(&name).await?;
let memory = load_memory(&name).await?;
let skills_raw = skill_loader::load_skills().await?;
if role.is_empty() {
log::warn!("No role for {}", name);
}
let skills = skill_loader::format_skills(&skills_raw);
if !skills.is_empty() {
log::info!("Skills loaded for {}: {} chars", name, skills.len());
}
let mut guard = LoopGuard::new(name.clone());
let mut bus = BusClient::new();
loop {
match bus.connect().await {
Ok(_) => break,
Err(e) => {
log::error!("Bus connect failed: {}, retry 5s", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
log::info!("Connected to bus");
bus.send(&Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: format!("subscribe:{}", name),
correlation_id: format!("sub_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: generate_trace_id(),
source: format!("agent:{}", name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "subscribe".into(),
message: format!("main from {}", name),
},
})
.await?;
let hb_name = name.clone();
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
let mut hb = BusClient::new();
loop {
ticker.tick().await;
if !hb.is_connected() {
if let Err(e) = hb.connect().await {
log::warn!("Heartbeat connect failed: {}, retrying in 5s", e);
continue;
}
}
let result = hb
.send(&Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: format!("heartbeat:{}", hb_name),
correlation_id: format!("hb_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: generate_trace_id(),
source: format!("agent:{}", hb_name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "heartbeat".into(),
message: format!("hb from {}", hb_name),
},
})
.await;
if result.is_err() {
hb.disconnect();
}
}
});
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(CHECKPOINT_GC_INTERVAL_SECS));
loop {
ticker.tick().await;
match clawgarden_agent::loop_checkpoint::gc_stale_checkpoints().await {
Ok(removed) if removed > 0 => {
log::info!("Checkpoint GC removed {} stale files", removed);
}
Ok(_) => {}
Err(e) => {
log::warn!("Checkpoint GC failed: {}", e);
}
}
}
});
loop {
if !bus.is_connected() {
log::warn!("Bus disconnected, reconnecting...");
match bus.connect().await {
Ok(_) => {
log::info!("Reconnected");
let _ = bus
.send(&Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: format!("subscribe:{}", name),
correlation_id: format!("sub_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: generate_trace_id(),
source: format!("agent:{}", name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "subscribe".into(),
message: format!("reconnect from {}", name),
},
})
.await;
}
Err(e) => {
log::error!("Reconnect failed: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
let env = match tokio::time::timeout(Duration::from_secs(1), bus.recv()).await {
Ok(Ok(env)) => env,
Ok(Err(e)) => {
log::error!("Recv failed: {}", e);
bus.disconnect();
continue;
}
Err(_) => continue,
};
if let Err(e) = handle(&mut bus, &env, &name, &role, &memory, &mut guard).await {
log::error!("Handle error: {}", e);
}
}
}
async fn handle(
bus: &mut BusClient,
env: &Envelope,
name: &str,
role: &str,
memory: &str,
guard: &mut LoopGuard,
) -> Result<()> {
if env.source == format!("agent:{}", name) {
return Ok(());
}
match &env.event_type {
EventType::UserMessage => {
let speaker = format_speaker(&env.source);
let msg = format!("[{}]: {}", speaker, env.payload.content());
log::info!("UserMsg: {}", msg.chars().take(60).collect::<String>());
record_history(&env.conversation_id, &msg);
}
EventType::AgentMessage => {
let speaker = format_speaker(&env.source);
let msg = format!("[{}]: {}", speaker, env.payload.content());
log::info!("AgentMsg: {}", msg.chars().take(60).collect::<String>());
record_history(&env.conversation_id, &msg);
if env.source == format!("agent:{}", name) {
return Ok(());
}
let depth = count_consecutive_agent_turns(&env.conversation_id);
if depth >= settings().max_agent_turns {
log::info!(
"Agent turn depth {} >= {}, staying silent",
depth,
settings().max_agent_turns
);
return Ok(());
}
let history = get_history(&env.conversation_id);
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
true,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
EventType::AgentWhisper => {
if env.target != name && env.target != "broadcast" {
return Ok(());
}
let speaker = format_speaker(&env.source);
let msg = format!("[{}]: {}", speaker, env.payload.content());
log::info!("Whisper: {}", msg.chars().take(60).collect::<String>());
record_history(&env.conversation_id, &msg);
if env.source == format!("agent:{}", name) {
return Ok(());
}
let depth = count_consecutive_agent_turns(&env.conversation_id);
if depth >= settings().max_agent_turns {
log::info!(
"Agent turn depth {} >= {}, staying silent",
depth,
settings().max_agent_turns
);
return Ok(());
}
let history = get_history(&env.conversation_id);
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
true,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
EventType::ForceRespond => {
let content = env.payload.content();
let msg = if content.starts_with('[') {
content.to_string()
} else {
format!("[User]: {}", content)
};
log::info!("ForceRespond: {}", msg.chars().take(60).collect::<String>());
let history = get_history(&env.conversation_id);
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
true,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
EventType::DecisionOnly => {
let msg = format!(
"[{}]: {}",
format_speaker(&env.source),
env.payload.content()
);
let history = get_history(&env.conversation_id);
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
false,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
EventType::TaskCompleted => {
let fwd = main_helpers::make_envelope(
env,
name,
EventType::AgentMessage,
"broadcast",
MessagePayload {
content: format!("Task done: {}", env.payload.content()),
context: vec![],
},
);
bus.send(&fwd).await?;
}
EventType::SystemNotice => {
if let Payload::SystemNotice {
ref notice_type,
ref message,
} = env.payload
{
if notice_type == "conversation_update" && !message.is_empty() {
log::info!(
"ConversationUpdate: {}",
message.chars().take(80).collect::<String>()
);
for line in message.lines() {
if !line.trim().is_empty() {
record_history(&env.conversation_id, line);
}
}
return Ok(());
}
}
log::debug!(
"SysNotice: {}",
env.payload.content().chars().take(60).collect::<String>()
);
}
EventType::ScheduleTriggered => {}
EventType::SkillInvoke | EventType::SkillResult | EventType::SkillFailed => {
log::debug!("Skill event: {:?}", &env.event_type);
}
EventType::SkillCreate => {
log::debug!("SkillCreate event from another agent");
}
EventType::SkillAdded { skill_name } => {
log::info!("New skill added: {}", skill_name);
if let Ok(skills) = skill_loader::load_skills().await {
let formatted = skill_loader::format_skills(&skills);
store_skills(&formatted);
log::info!(
"Reloaded skills after SkillAdded: {} skills loaded",
skills.len()
);
}
}
EventType::RegistryUpdated => {
log::debug!(
"Registry updated: {}",
env.payload.content().chars().take(60).collect::<String>()
);
}
EventType::IntentResponse => {
log::debug!("IntentResponse from {}", env.source);
}
EventType::TurnAssign => {
let content = env.payload.content();
let previous_responses = env.payload.context();
let msg = if content.starts_with('[') {
content.to_string()
} else {
format!("[User]: {}", content)
};
log::info!("TurnAssign: {}", msg.chars().take(60).collect::<String>());
let mut history = get_history(&env.conversation_id);
for resp in previous_responses {
history.push(resp.clone());
}
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
true,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
EventType::IntentRequest => {
let payload = match &env.payload {
Payload::IntentRequest(p) => p.clone(),
_ => {
log::warn!("IntentRequest with wrong payload type");
return Ok(());
}
};
log::info!(
"IntentRequest: turn={} last_speaker={:?}",
payload.turn_number,
payload.last_speaker
);
let history = get_history(&env.conversation_id);
let intent = clawgarden_agent::intent_generator::generate_intent(
name,
role,
&payload.message,
&history,
payload.turn_number,
payload.last_speaker.as_deref(),
)
.await;
log::info!(
"Intent: want={} priority={} reason={}",
intent.want,
intent.priority,
intent.reason.chars().take(50).collect::<String>()
);
let response = Envelope::new_intent_response(
env.conversation_id.clone(),
env.correlation_id.clone(),
env.trace_id.clone(),
name.to_string(),
intent,
);
bus.send(&response).await?;
}
EventType::RespondRequest => {
let content = env.payload.content();
let previous_responses = env.payload.context();
let msg = if content.starts_with('[') {
content.to_string()
} else {
format!("[User]: {}", content)
};
log::info!("RespondRequest: {}", msg.chars().take(60).collect::<String>());
let mut history = get_history(&env.conversation_id);
for resp in previous_responses {
history.push(resp.clone());
}
respond_to(
bus,
env,
name,
role,
memory,
guard,
&msg,
&history,
true,
EventType::AgentMessage,
"broadcast",
)
.await?;
}
}
Ok(())
}
async fn respond_to(
bus: &mut BusClient,
env: &Envelope,
name: &str,
role: &str,
memory: &str,
guard: &mut LoopGuard,
msg: &str,
context: &[String],
force: bool,
reply_event_type: EventType,
reply_target: &str,
) -> Result<()> {
if guard.should_block(&env.correlation_id, msg) {
return Ok(());
}
let ctx_mgr = clawgarden_agent::context::ContextManager::with_default_budget();
let selected_history = ctx_mgr.select_history(context, msg);
let loop_result = tokio::time::timeout(
Duration::from_millis(settings().agent_loop_timeout_ms),
clawgarden_agent::agent_loop::agent_loop(
name,
role,
memory,
&get_skills(),
msg,
&selected_history,
force,
bus,
env,
reply_event_type,
reply_target,
guard,
settings().response_timeout_ms,
&loop_policy(),
),
)
.await;
match loop_result {
Ok(Ok(result)) => {
let snapshot = clawgarden_agent::loop_metrics::record(
&result.termination,
result.steps,
result.tool_calls.len(),
);
log::info!(
"Agent loop completed: termination={:?} steps={} tool_calls={} conv={} corr={} trace={}",
result.termination,
result.steps,
result.tool_calls.len(),
env.conversation_id,
env.correlation_id,
env.trace_id,
);
let termination_json = serde_json::json!({
"termination": format!("{:?}", result.termination),
"steps": result.steps,
"tool_calls": result.tool_calls.len(),
"conversation_id": env.conversation_id,
"correlation_id": env.correlation_id,
"trace_id": env.trace_id,
})
.to_string();
emit_loop_termination_notice(bus, name, env, &termination_json).await;
if snapshot.total_runs % 20 == 0 {
log::info!(
"Agent loop metrics: runs={} avg_steps={:.2} avg_tools={:.2} terminations={:?}",
snapshot.total_runs,
snapshot.avg_steps,
snapshot.avg_tool_calls,
snapshot.terminations,
);
if let Ok(metrics_json) = clawgarden_agent::loop_metrics::export_json() {
emit_loop_metrics_notice(bus, name, env, &metrics_json).await;
}
}
}
Ok(Err(e)) => {
log::error!(
"Agent loop error: {} (conv={} corr={} trace={})",
e,
env.conversation_id,
env.correlation_id,
env.trace_id,
);
let err_json = serde_json::json!({
"termination": "Error",
"error": e.to_string(),
"conversation_id": env.conversation_id,
"correlation_id": env.correlation_id,
"trace_id": env.trace_id,
})
.to_string();
emit_loop_termination_notice(bus, name, env, &err_json).await;
}
Err(_) => {
log::error!(
"Agent loop timeout ({}ms) (conv={} corr={} trace={})",
settings().agent_loop_timeout_ms,
env.conversation_id,
env.correlation_id,
env.trace_id,
);
let timeout_json = serde_json::json!({
"termination": "Timeout",
"timeout_ms": settings().agent_loop_timeout_ms,
"conversation_id": env.conversation_id,
"correlation_id": env.correlation_id,
"trace_id": env.trace_id,
})
.to_string();
emit_loop_termination_notice(bus, name, env, &timeout_json).await;
}
}
Ok(())
}
async fn emit_loop_termination_notice(
bus: &mut BusClient,
agent_name: &str,
env: &Envelope,
message: &str,
) {
let notice = Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: env.conversation_id.clone(),
correlation_id: env.correlation_id.clone(),
reply_to: Some(env.id.clone()),
trace_id: env.trace_id.clone(),
source: format!("agent:{}", agent_name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "agent_loop_termination".into(),
message: message.to_string(),
},
};
let _ = bus.send(¬ice).await;
}
async fn emit_loop_metrics_notice(
bus: &mut BusClient,
agent_name: &str,
env: &Envelope,
message: &str,
) {
let notice = Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: env.conversation_id.clone(),
correlation_id: env.correlation_id.clone(),
reply_to: Some(env.id.clone()),
trace_id: env.trace_id.clone(),
source: format!("agent:{}", agent_name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "agent_loop_metrics".into(),
message: message.to_string(),
},
};
let _ = bus.send(¬ice).await;
}