mod bus_client;
mod judge;
mod loop_guard;
mod memory;
mod persona;
mod pi_rpc;
use anyhow::Result;
use bus_client::BusClient;
use clap::Parser;
use clawgarden_proto::{
generate_event_id, generate_trace_id, Envelope, EventType, MessagePayload, Payload,
};
use judge::{judge, JudgeInput};
use loop_guard::LoopGuard;
use memory::load_memory;
use persona::load_persona;
use pi_rpc::{call_pi_rpc_safe, PiRpcRequest};
use std::time::Duration;
use tokio::time::interval;
const HEARTBEAT_INTERVAL_SECS: u64 = 5;
const RESPONSE_TIMEOUT_MS: u64 = 1600;
#[derive(Parser, Debug)]
#[command(name = "clawgarden-agent")]
struct Opts {
#[arg(long)]
agent_name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opts = Opts::parse();
let agent_name = opts.agent_name;
log::info!("Starting ClawGarden agent: {}", agent_name);
let persona = load_persona(&agent_name).await?;
let memory = load_memory(&agent_name).await?;
if persona.is_empty() {
log::warn!("Agent {} has no persona loaded", agent_name);
}
let mut loop_guard = LoopGuard::new(agent_name.clone());
let mut bus = BusClient::new();
loop {
match bus.connect().await {
Ok(_) => break,
Err(e) => {
log::error!("Failed to connect to bus: {}, retrying in 5s", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
log::info!("Connected to bus, entering event loop");
let heartbeat_name = agent_name.clone();
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
loop {
ticker.tick().await;
let mut hb_bus = BusClient::new();
match hb_bus.connect().await {
Ok(()) => {
let heartbeat_env = Envelope {
id: clawgarden_proto::generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::SystemNotice,
conversation_id: format!("heartbeat:{}", heartbeat_name),
correlation_id: format!("hb_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: clawgarden_proto::generate_trace_id(),
source: format!("agent:{}", heartbeat_name),
target: "bus".to_string(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "heartbeat".to_string(),
message: format!("heartbeat from {}", heartbeat_name),
},
};
if let Err(e) = hb_bus.send(&heartbeat_env).await {
log::debug!("Heartbeat send failed: {}", e);
}
}
Err(e) => {
log::debug!("Heartbeat connect failed: {}", e);
}
}
}
});
loop {
if !bus.is_connected() {
log::warn!("Bus disconnected, reconnecting...");
match bus.connect().await {
Ok(_) => log::info!("Reconnected to bus"),
Err(e) => {
log::error!("Reconnection failed: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
let envelope = match tokio::time::timeout(Duration::from_millis(1000), bus.recv()).await {
Ok(Ok(env)) => env,
Ok(Err(e)) => {
log::error!("Failed to receive envelope: {}", e);
bus.disconnect();
continue;
}
Err(_) => {
continue;
}
};
if let Err(e) = process_envelope(
&mut bus,
&envelope,
&agent_name,
&persona,
&memory,
&mut loop_guard,
)
.await
{
log::error!("Error processing envelope: {}", e);
}
}
}
async fn process_envelope(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
persona: &str,
memory: &str,
loop_guard: &mut LoopGuard,
) -> Result<()> {
if envelope.source == format!("agent:{}", agent_name) {
log::debug!("Skipping own envelope: {}", envelope.id);
return Ok(());
}
match envelope.event_type {
EventType::UserMessage => {
log::info!(
"Received UserMessage in conversation {}: {}",
envelope.conversation_id,
envelope
.payload
.content()
.chars()
.take(50)
.collect::<String>()
);
handle_user_message(bus, envelope, agent_name, persona, memory, loop_guard).await?;
}
EventType::AgentMessage => {
log::info!(
"Received AgentMessage from {} in conversation {}",
envelope.source,
envelope.conversation_id
);
handle_agent_message(bus, envelope, agent_name, persona, memory, loop_guard).await?;
}
EventType::TaskCompleted => {
log::info!(
"Received TaskCompleted in conversation {}",
envelope.conversation_id
);
handle_task_completed(bus, envelope, agent_name).await?;
}
EventType::SystemNotice => {
log::info!("Received SystemNotice: {}", envelope.payload.content());
}
EventType::DecisionOnly => {
handle_decision_request(bus, envelope, agent_name, persona, memory, loop_guard).await?;
}
EventType::ForceRespond => {
handle_force_respond(bus, envelope, agent_name, persona, memory).await?;
}
EventType::AgentWhisper => {
if envelope.target == agent_name {
log::info!(
"Received whisper from {} in conversation {}",
envelope.source,
envelope.conversation_id
);
handle_agent_message(bus, envelope, agent_name, persona, memory, loop_guard)
.await?;
}
}
EventType::ScheduleTriggered => {
log::debug!("Received ScheduleTriggered");
}
}
Ok(())
}
async fn handle_user_message(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
persona: &str,
memory: &str,
loop_guard: &mut LoopGuard,
) -> Result<()> {
let judge_input = JudgeInput {
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
recent_messages: envelope.payload.context().to_vec(),
};
let output = judge(judge_input).await;
log::info!(
"Judge decision: speak={}, confidence={:.2}",
output.speak,
output.confidence
);
if !output.speak {
log::debug!("Judge said don't speak");
return Ok(());
}
if let Some(notice) =
loop_guard.get_block_notice(&envelope.correlation_id, envelope.payload.content())
{
log::warn!("{}", notice);
let notice_env = Envelope::new_system_notice(
envelope.conversation_id.clone(),
envelope.correlation_id.clone(),
"loop_guard".to_string(),
notice,
);
bus.send(¬ice_env).await?;
return Ok(());
}
let rpc_request = PiRpcRequest {
agent_name: agent_name.to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
recent_messages: envelope.payload.context().to_vec(),
};
let rpc_result = tokio::time::timeout(
Duration::from_millis(RESPONSE_TIMEOUT_MS),
call_pi_rpc_safe(rpc_request),
)
.await;
match rpc_result {
Ok(Ok(msg_payload)) => {
loop_guard.record(&envelope.correlation_id, &msg_payload.content);
let response_envelope = Envelope {
id: generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::AgentMessage,
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
reply_to: Some(envelope.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: "broadcast".to_string(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(msg_payload),
};
bus.send(&response_envelope).await?;
log::info!("Sent AgentMessage response");
}
Ok(Err(e)) => {
log::error!("pi RPC failed: {}", e);
let notice_env = Envelope::new_system_notice(
envelope.conversation_id.clone(),
envelope.correlation_id.clone(),
"pi_rpc_error".to_string(),
format!("pi RPC failed: {}", e),
);
bus.send(¬ice_env).await?;
}
Err(_) => {
log::error!("pi RPC timed out after {}ms", RESPONSE_TIMEOUT_MS);
let notice_env = Envelope::new_system_notice(
envelope.conversation_id.clone(),
envelope.correlation_id.clone(),
"pi_rpc_timeout".to_string(),
"pi RPC timed out".to_string(),
);
bus.send(¬ice_env).await?;
}
}
Ok(())
}
async fn handle_agent_message(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
persona: &str,
memory: &str,
loop_guard: &mut LoopGuard,
) -> Result<()> {
let mut recent = envelope.payload.context().to_vec();
recent.insert(0, envelope.payload.content().to_string());
let judge_input = JudgeInput {
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
recent_messages: recent.clone(),
};
let output = judge(judge_input).await;
if !output.speak {
return Ok(());
}
if loop_guard.should_block(&envelope.correlation_id, envelope.payload.content()) {
log::debug!("Would respond but blocked by loop guard");
return Ok(());
}
let rpc_request = PiRpcRequest {
agent_name: agent_name.to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: format!("Replying to: {}", envelope.payload.content()),
recent_messages: recent,
};
let rpc_result = tokio::time::timeout(
Duration::from_millis(RESPONSE_TIMEOUT_MS),
call_pi_rpc_safe(rpc_request),
)
.await;
if let Ok(Ok(msg_payload)) = rpc_result {
loop_guard.record(&envelope.correlation_id, &msg_payload.content);
let whisper_envelope = Envelope {
id: generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::AgentWhisper,
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
reply_to: Some(envelope.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: envelope.source.clone(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(msg_payload),
};
bus.send(&whisper_envelope).await?;
log::info!("Sent AgentWhisper response to {}", envelope.source);
}
Ok(())
}
async fn handle_task_completed(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
) -> Result<()> {
let forward_envelope = Envelope {
id: generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::AgentMessage,
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
reply_to: Some(envelope.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: "broadcast".to_string(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(MessagePayload {
content: format!("Task completed: {}", envelope.payload.content()),
context: vec![],
}),
};
bus.send(&forward_envelope).await?;
log::info!("Forwarded TaskCompleted");
Ok(())
}
async fn handle_decision_request(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
persona: &str,
memory: &str,
loop_guard: &mut LoopGuard,
) -> Result<()> {
let judge_input = JudgeInput {
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
recent_messages: envelope.payload.context().to_vec(),
};
let output = judge(judge_input).await;
if output.speak
&& !loop_guard.should_block(&envelope.correlation_id, envelope.payload.content())
{
let rpc_request = PiRpcRequest {
agent_name: agent_name.to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
recent_messages: envelope.payload.context().to_vec(),
};
if let Ok(Ok(msg_payload)) = tokio::time::timeout(
Duration::from_millis(RESPONSE_TIMEOUT_MS),
call_pi_rpc_safe(rpc_request),
)
.await
{
loop_guard.record(&envelope.correlation_id, &msg_payload.content);
let response = Envelope {
id: generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::AgentMessage,
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
reply_to: Some(envelope.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: "broadcast".to_string(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(msg_payload),
};
bus.send(&response).await?;
}
}
Ok(())
}
async fn handle_force_respond(
bus: &mut BusClient,
envelope: &Envelope,
agent_name: &str,
persona: &str,
memory: &str,
) -> Result<()> {
let rpc_request = PiRpcRequest {
agent_name: agent_name.to_string(),
persona: persona.to_string(),
memory: memory.to_string(),
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
content: envelope.payload.content().to_string(),
recent_messages: envelope.payload.context().to_vec(),
};
if let Ok(Ok(msg_payload)) = tokio::time::timeout(
Duration::from_millis(RESPONSE_TIMEOUT_MS),
call_pi_rpc_safe(rpc_request),
)
.await
{
let response = Envelope {
id: generate_event_id(),
schema_version: "1.0".to_string(),
event_type: EventType::AgentMessage,
conversation_id: envelope.conversation_id.clone(),
correlation_id: envelope.correlation_id.clone(),
reply_to: Some(envelope.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: "broadcast".to_string(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(msg_payload),
};
bus.send(&response).await?;
}
Ok(())
}