mod bus_client;
mod loop_guard;
mod memory;
mod persona;
mod pi_rpc;
use anyhow::Result;
use bus_client::BusClient;
use clap::Parser;
use clawgarden_proto::{
generate_event_id, generate_trace_id, Envelope, EventType, MessagePayload, Payload,
};
use loop_guard::LoopGuard;
use memory::load_memory;
use persona::load_persona;
use pi_rpc::judge_and_respond;
use std::collections::VecDeque;
use std::sync::Mutex;
const HISTORY_CAPACITY: usize = 20;
const MAX_AGENT_TURNS: usize = 12;
static HISTORY: once_cell::sync::Lazy<Mutex<VecDeque<(String, String)>>> =
once_cell::sync::Lazy::new(|| Mutex::new(VecDeque::with_capacity(200)));
fn record_history(conversation_id: &str, formatted_msg: &str) {
let mut hist = HISTORY.lock().unwrap();
while hist.len() >= HISTORY_CAPACITY * 3 {
hist.pop_front();
}
hist.push_back((conversation_id.to_string(), formatted_msg.to_string()));
}
fn get_history(conversation_id: &str) -> Vec<String> {
let hist = HISTORY.lock().unwrap();
hist.iter()
.filter(|(cid, _)| cid == conversation_id)
.rev()
.take(HISTORY_CAPACITY)
.map(|(_, msg)| msg.clone())
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}
fn count_consecutive_agent_turns(conversation_id: &str) -> usize {
let hist = HISTORY.lock().unwrap();
let mut count = 0usize;
for (_, msg) in hist.iter().rev().filter(|(cid, _)| cid == conversation_id) {
if msg.starts_with("[사용자]:") || msg.starts_with("[user]:") {
break;
}
if msg.starts_with('[') {
count += 1;
} else {
break;
}
}
count
}
use std::time::Duration;
use tokio::time::interval;
const HEARTBEAT_INTERVAL_SECS: u64 = 5;
const RESPONSE_TIMEOUT_MS: u64 = 30_000;
#[derive(Parser, Debug)]
#[command(name = "clawgarden-agent")]
struct Opts {
#[arg(long)]
agent_name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opts = Opts::parse();
let name = opts.agent_name;
log::info!("Starting agent: {}", name);
let persona = load_persona(&name).await?;
let memory = load_memory(&name).await?;
if persona.is_empty() {
log::warn!("No persona for {}", name);
}
let mut guard = LoopGuard::new(name.clone());
let mut bus = BusClient::new();
loop {
match bus.connect().await {
Ok(_) => break,
Err(e) => {
log::error!("Bus connect failed: {}, retry 5s", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
log::info!("Connected to bus");
bus.send(&Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: format!("subscribe:{}", name),
correlation_id: format!("sub_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: generate_trace_id(),
source: format!("agent:{}", name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "subscribe".into(),
message: format!("main from {}", name),
},
}).await?;
let hb_name = name.clone();
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
loop {
ticker.tick().await;
let mut hb = BusClient::new();
if let Ok(()) = hb.connect().await {
let _ = hb.send(&Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type: EventType::SystemNotice,
conversation_id: format!("heartbeat:{}", hb_name),
correlation_id: format!("hb_{}", uuid::Uuid::new_v4()),
reply_to: None,
trace_id: generate_trace_id(),
source: format!("agent:{}", hb_name),
target: "bus".into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::SystemNotice {
notice_type: "heartbeat".into(),
message: format!("hb from {}", hb_name),
},
}).await;
}
}
});
loop {
if !bus.is_connected() {
log::warn!("Bus disconnected, reconnecting...");
match bus.connect().await {
Ok(_) => log::info!("Reconnected"),
Err(e) => {
log::error!("Reconnect failed: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
let env = match tokio::time::timeout(Duration::from_secs(1), bus.recv()).await {
Ok(Ok(env)) => env,
Ok(Err(e)) => {
log::error!("Recv failed: {}", e);
bus.disconnect();
continue;
}
Err(_) => continue,
};
if let Err(e) = handle(&mut bus, &env, &name, &persona, &memory, &mut guard).await {
log::error!("Handle error: {}", e);
}
}
}
async fn handle(
bus: &mut BusClient,
env: &Envelope,
name: &str,
persona: &str,
memory: &str,
guard: &mut LoopGuard,
) -> Result<()> {
if env.source == format!("agent:{}", name) {
return Ok(());
}
match env.event_type {
EventType::UserMessage => {
let speaker = format_speaker(&env.source);
let msg = format!("[{}]: {}", speaker, env.payload.content());
log::info!("UserMsg: {}", msg.chars().take(60).collect::<String>());
record_history(&env.conversation_id, &msg);
let history = get_history(&env.conversation_id);
let mentioned = is_mentioned(name, env.payload.content());
respond_to(bus, env, name, persona, memory, guard, &msg, &history, mentioned).await?;
}
EventType::AgentMessage | EventType::AgentWhisper => {
let speaker = format_speaker(&env.source);
let msg = format!("[{}]: {}", speaker, env.payload.content());
log::info!("AgentMsg: {}", msg.chars().take(60).collect::<String>());
record_history(&env.conversation_id, &msg);
if env.source == format!("agent:{}", name) {
return Ok(());
}
let depth = count_consecutive_agent_turns(&env.conversation_id);
if depth >= MAX_AGENT_TURNS {
log::info!("Agent turn depth {} >= {}, staying silent", depth, MAX_AGENT_TURNS);
return Ok(());
}
let history = get_history(&env.conversation_id);
let mentioned = is_mentioned(name, env.payload.content());
if mentioned {
log::info!("Name mentioned in agent message, high priority");
}
respond_to(bus, env, name, persona, memory, guard, &msg, &history, mentioned).await?;
}
EventType::ForceRespond => {
if env.target == name || env.target == "broadcast" {
let msg = format!("[{}]: {}", format_speaker(&env.source), env.payload.content());
log::info!("ForceRespond: {}", msg.chars().take(60).collect::<String>());
let history = get_history(&env.conversation_id);
respond_to(bus, env, name, persona, memory, guard, &msg, &history, true).await?;
}
}
EventType::DecisionOnly => {
let msg = format!("[{}]: {}", format_speaker(&env.source), env.payload.content());
let history = get_history(&env.conversation_id);
respond_to(bus, env, name, persona, memory, guard, &msg, &history, false).await?;
}
EventType::TaskCompleted => {
let fwd = make_envelope(env, name, EventType::AgentMessage, "broadcast",
MessagePayload { content: format!("Task done: {}", env.payload.content()), context: vec![] });
bus.send(&fwd).await?;
}
EventType::SystemNotice => {
log::debug!("SysNotice: {}", env.payload.content().chars().take(60).collect::<String>());
}
EventType::ScheduleTriggered => {}
}
Ok(())
}
async fn respond_to(
bus: &mut BusClient,
env: &Envelope,
name: &str,
persona: &str,
memory: &str,
guard: &mut LoopGuard,
msg: &str,
context: &[String],
force: bool,
) -> Result<()> {
if guard.should_block(&env.correlation_id, env.payload.content()) {
return Ok(());
}
let result = tokio::time::timeout(
Duration::from_millis(RESPONSE_TIMEOUT_MS),
judge_and_respond(name, persona, memory, msg, context, force),
).await;
match result {
Ok(Ok(Some(payload))) => {
guard.record(&env.correlation_id, &payload.content);
let self_msg = format!("[{}]: {}", name, payload.content);
record_history(&env.conversation_id, &self_msg);
let resp = make_envelope(env, name, EventType::AgentMessage, "broadcast", payload);
bus.send(&resp).await?;
log::info!("Sent response (broadcast)");
}
Ok(Ok(None)) => {
log::debug!("Staying silent");
}
Ok(Err(e)) => {
log::error!("LLM error: {}", e);
}
Err(_) => {
log::error!("LLM timeout");
}
}
Ok(())
}
fn format_speaker(source: &str) -> &str {
if let Some(name) = source.strip_prefix("agent:") {
name
} else if source.starts_with("telegram:") || source.starts_with("user") {
"사용자"
} else {
source
}
}
fn is_mentioned(agent_name: &str, content: &str) -> bool {
let lower = content.to_lowercase();
if lower.contains(agent_name) {
return true;
}
if let Ok(username) = std::env::var("TELEGRAM_BOT_USERNAME") {
if lower.contains(&username.to_lowercase()) {
return true;
}
}
false
}
fn make_envelope(
reply_to: &Envelope,
agent_name: &str,
event_type: EventType,
target: &str,
payload: MessagePayload,
) -> Envelope {
Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type,
conversation_id: reply_to.conversation_id.clone(),
correlation_id: reply_to.correlation_id.clone(),
reply_to: Some(reply_to.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: target.into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(payload),
}
}