clawgarden-agent 0.4.0

Agent runtime with persona/memory loader, judge, and pi RPC for ClawGarden
Documentation
//! ClawGarden Agent — LLM-based natural group chat participant.

mod bus_client;
mod loop_guard;
mod memory;
mod persona;
mod pi_rpc;

use anyhow::Result;
use bus_client::BusClient;
use clap::Parser;
use clawgarden_proto::{
    generate_event_id, generate_trace_id, Envelope, EventType, MessagePayload, Payload,
};
use loop_guard::LoopGuard;
use memory::load_memory;
use persona::load_persona;
use pi_rpc::judge_and_respond;
use std::collections::VecDeque;
use std::sync::Mutex;

/// Max conversation history lines to keep per conversation
const HISTORY_CAPACITY: usize = 20;

/// Max consecutive agent-to-agent turns before forcing silence
const MAX_AGENT_TURNS: usize = 12;

/// Conversation history: conversation_id -> recent messages with speaker attribution
static HISTORY: once_cell::sync::Lazy<Mutex<VecDeque<(String, String)>>> = 
    once_cell::sync::Lazy::new(|| Mutex::new(VecDeque::with_capacity(200)));

/// Record a message in conversation history
fn record_history(conversation_id: &str, formatted_msg: &str) {
    let mut hist = HISTORY.lock().unwrap();
    // Keep last N total entries
    while hist.len() >= HISTORY_CAPACITY * 3 {
        hist.pop_front();
    }
    hist.push_back((conversation_id.to_string(), formatted_msg.to_string()));
}

/// Get recent history for a conversation
fn get_history(conversation_id: &str) -> Vec<String> {
    let hist = HISTORY.lock().unwrap();
    hist.iter()
        .filter(|(cid, _)| cid == conversation_id)
        .rev()
        .take(HISTORY_CAPACITY)
        .map(|(_, msg)| msg.clone())
        .collect::<Vec<_>>()
        .into_iter()
        .rev()
        .collect()
}

/// Count how many of the last N messages are from agents (not user)
fn count_consecutive_agent_turns(conversation_id: &str) -> usize {
    let hist = HISTORY.lock().unwrap();
    let mut count = 0usize;
    for (_, msg) in hist.iter().rev().filter(|(cid, _)| cid == conversation_id) {
        // Messages from agents start with "[agent_name]:", user starts with "[사용자]:"
        if msg.starts_with("[사용자]:") || msg.starts_with("[user]:") {
            break;
        }
        if msg.starts_with('[') {
            count += 1;
        } else {
            break;
        }
    }
    count
}
use std::time::Duration;
use tokio::time::interval;

const HEARTBEAT_INTERVAL_SECS: u64 = 5;
const RESPONSE_TIMEOUT_MS: u64 = 30_000;

#[derive(Parser, Debug)]
#[command(name = "clawgarden-agent")]
struct Opts {
    #[arg(long)]
    agent_name: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opts = Opts::parse();
    let name = opts.agent_name;
    log::info!("Starting agent: {}", name);

    let persona = load_persona(&name).await?;
    let memory = load_memory(&name).await?;
    if persona.is_empty() {
        log::warn!("No persona for {}", name);
    }

    let mut guard = LoopGuard::new(name.clone());

    // Connect to bus
    let mut bus = BusClient::new();
    loop {
        match bus.connect().await {
            Ok(_) => break,
            Err(e) => {
                log::error!("Bus connect failed: {}, retry 5s", e);
                tokio::time::sleep(Duration::from_secs(5)).await;
            }
        }
    }
    log::info!("Connected to bus");

    // Subscribe for event push
    bus.send(&Envelope {
        id: generate_event_id(),
        schema_version: "1.0".into(),
        event_type: EventType::SystemNotice,
        conversation_id: format!("subscribe:{}", name),
        correlation_id: format!("sub_{}", uuid::Uuid::new_v4()),
        reply_to: None,
        trace_id: generate_trace_id(),
        source: format!("agent:{}", name),
        target: "bus".into(),
        created_at: chrono::Utc::now().timestamp(),
        deadline_ms: 0,
        payload: Payload::SystemNotice {
            notice_type: "subscribe".into(),
            message: format!("main from {}", name),
        },
    }).await?;

    // Heartbeat
    let hb_name = name.clone();
    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
        loop {
            ticker.tick().await;
            let mut hb = BusClient::new();
            if let Ok(()) = hb.connect().await {
                let _ = hb.send(&Envelope {
                    id: generate_event_id(),
                    schema_version: "1.0".into(),
                    event_type: EventType::SystemNotice,
                    conversation_id: format!("heartbeat:{}", hb_name),
                    correlation_id: format!("hb_{}", uuid::Uuid::new_v4()),
                    reply_to: None,
                    trace_id: generate_trace_id(),
                    source: format!("agent:{}", hb_name),
                    target: "bus".into(),
                    created_at: chrono::Utc::now().timestamp(),
                    deadline_ms: 0,
                    payload: Payload::SystemNotice {
                        notice_type: "heartbeat".into(),
                        message: format!("hb from {}", hb_name),
                    },
                }).await;
            }
        }
    });

    // Event loop
    loop {
        if !bus.is_connected() {
            log::warn!("Bus disconnected, reconnecting...");
            match bus.connect().await {
                Ok(_) => log::info!("Reconnected"),
                Err(e) => {
                    log::error!("Reconnect failed: {}", e);
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                }
            }
        }

        let env = match tokio::time::timeout(Duration::from_secs(1), bus.recv()).await {
            Ok(Ok(env)) => env,
            Ok(Err(e)) => {
                log::error!("Recv failed: {}", e);
                bus.disconnect();
                continue;
            }
            Err(_) => continue,
        };

        if let Err(e) = handle(&mut bus, &env, &name, &persona, &memory, &mut guard).await {
            log::error!("Handle error: {}", e);
        }
    }
}

async fn handle(
    bus: &mut BusClient,
    env: &Envelope,
    name: &str,
    persona: &str,
    memory: &str,
    guard: &mut LoopGuard,
) -> Result<()> {
    // Skip own messages
    if env.source == format!("agent:{}", name) {
        return Ok(());
    }

    match env.event_type {
        EventType::UserMessage => {
            let speaker = format_speaker(&env.source);
            let msg = format!("[{}]: {}", speaker, env.payload.content());
            log::info!("UserMsg: {}", msg.chars().take(60).collect::<String>());
            record_history(&env.conversation_id, &msg);
            let history = get_history(&env.conversation_id);
            let mentioned = is_mentioned(name, env.payload.content());
            respond_to(bus, env, name, persona, memory, guard, &msg, &history, mentioned).await?;
        }
        EventType::AgentMessage | EventType::AgentWhisper => {
            let speaker = format_speaker(&env.source);
            let msg = format!("[{}]: {}", speaker, env.payload.content());
            log::info!("AgentMsg: {}", msg.chars().take(60).collect::<String>());
            record_history(&env.conversation_id, &msg);

            // Don't respond to own messages
            if env.source == format!("agent:{}", name) {
                return Ok(());
            }

            // Safety: prevent infinite agent-to-agent loops
            let depth = count_consecutive_agent_turns(&env.conversation_id);
            if depth >= MAX_AGENT_TURNS {
                log::info!("Agent turn depth {} >= {}, staying silent", depth, MAX_AGENT_TURNS);
                return Ok(());
            }

            let history = get_history(&env.conversation_id);

            // If our name is explicitly mentioned, treat as high-priority (force)
            let mentioned = is_mentioned(name, env.payload.content());
            if mentioned {
                log::info!("Name mentioned in agent message, high priority");
            }
            respond_to(bus, env, name, persona, memory, guard, &msg, &history, mentioned).await?;
        }
        EventType::ForceRespond => {
            if env.target == name || env.target == "broadcast" {
                let msg = format!("[{}]: {}", format_speaker(&env.source), env.payload.content());
                log::info!("ForceRespond: {}", msg.chars().take(60).collect::<String>());
                let history = get_history(&env.conversation_id);
                respond_to(bus, env, name, persona, memory, guard, &msg, &history, true).await?;
            }
        }
        EventType::DecisionOnly => {
            let msg = format!("[{}]: {}", format_speaker(&env.source), env.payload.content());
            let history = get_history(&env.conversation_id);
            respond_to(bus, env, name, persona, memory, guard, &msg, &history, false).await?;
        }
        EventType::TaskCompleted => {
            let fwd = make_envelope(env, name, EventType::AgentMessage, "broadcast",
                MessagePayload { content: format!("Task done: {}", env.payload.content()), context: vec![] });
            bus.send(&fwd).await?;
        }
        EventType::SystemNotice => {
            log::debug!("SysNotice: {}", env.payload.content().chars().take(60).collect::<String>());
        }
        EventType::ScheduleTriggered => {}
    }
    Ok(())
}

async fn respond_to(
    bus: &mut BusClient,
    env: &Envelope,
    name: &str,
    persona: &str,
    memory: &str,
    guard: &mut LoopGuard,
    msg: &str,
    context: &[String],
    force: bool,
) -> Result<()> {
    if guard.should_block(&env.correlation_id, env.payload.content()) {
        return Ok(());
    }

    let result = tokio::time::timeout(
        Duration::from_millis(RESPONSE_TIMEOUT_MS),
        judge_and_respond(name, persona, memory, msg, context, force),
    ).await;

    match result {
        Ok(Ok(Some(payload))) => {
            guard.record(&env.correlation_id, &payload.content);

            // Record own response in conversation history
            let self_msg = format!("[{}]: {}", name, payload.content);
            record_history(&env.conversation_id, &self_msg);

            let resp = make_envelope(env, name, EventType::AgentMessage, "broadcast", payload);
            bus.send(&resp).await?;
            log::info!("Sent response (broadcast)");
        }
        Ok(Ok(None)) => {
            log::debug!("Staying silent");
        }
        Ok(Err(e)) => {
            log::error!("LLM error: {}", e);
        }
        Err(_) => {
            log::error!("LLM timeout");
        }
    }
    Ok(())
}

/// Format envelope source into a human-readable speaker name.
fn format_speaker(source: &str) -> &str {
    // "agent:eleven" → "eleven", "telegram:user_123" → "사용자"
    if let Some(name) = source.strip_prefix("agent:") {
        name
    } else if source.starts_with("telegram:") || source.starts_with("user") {
        "사용자"
    } else {
        source
    }
}

/// Check if this agent's name appears in the message content.
/// Just a simple substring check — the LLM handles all the nuanced interpretation.
fn is_mentioned(agent_name: &str, content: &str) -> bool {
    let lower = content.to_lowercase();
    if lower.contains(agent_name) {
        return true;
    }
    // Check Telegram @username (e.g. "@claw_eleven_bot")
    if let Ok(username) = std::env::var("TELEGRAM_BOT_USERNAME") {
        if lower.contains(&username.to_lowercase()) {
            return true;
        }
    }
    false
}

fn make_envelope(
    reply_to: &Envelope,
    agent_name: &str,
    event_type: EventType,
    target: &str,
    payload: MessagePayload,
) -> Envelope {
    Envelope {
        id: generate_event_id(),
        schema_version: "1.0".into(),
        event_type,
        conversation_id: reply_to.conversation_id.clone(),
        correlation_id: reply_to.correlation_id.clone(),
        reply_to: Some(reply_to.id.clone()),
        trace_id: generate_trace_id(),
        source: format!("agent:{}", agent_name),
        target: target.into(),
        created_at: chrono::Utc::now().timestamp(),
        deadline_ms: 0,
        payload: Payload::Message(payload),
    }
}