use anyhow::Result;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::Duration;
use clawgarden_proto::{
generate_event_id, generate_trace_id, Envelope, EventType, MessagePayload, Payload,
};
use crate::bus_client::BusClient;
pub struct AgentSettings {
pub history_capacity: usize,
pub history_buffer_capacity: usize,
pub max_agent_turns: usize,
pub response_timeout_ms: u64,
pub agent_loop_timeout_ms: u64,
pub loop_hard_max_steps: usize,
pub loop_same_action_repeat_limit: usize,
pub loop_consecutive_error_limit: usize,
pub loop_stall_window: usize,
pub loop_stall_min_progress_sum: i32,
pub loop_cycle_repeat_limit: usize,
pub loop_checkpoint_enabled: bool,
}
static SETTINGS: once_cell::sync::Lazy<AgentSettings> = once_cell::sync::Lazy::new(|| {
let c = clawgarden_proto::AppConfig::load();
AgentSettings {
history_capacity: c.agent.history_capacity,
history_buffer_capacity: c.agent.history_buffer_capacity,
max_agent_turns: c.agent.max_agent_turns,
response_timeout_ms: c.agent.response_timeout_ms,
agent_loop_timeout_ms: c.agent.agent_loop_timeout_ms,
loop_hard_max_steps: c.agent.loop_hard_max_steps,
loop_same_action_repeat_limit: c.agent.loop_same_action_repeat_limit,
loop_consecutive_error_limit: c.agent.loop_consecutive_error_limit,
loop_stall_window: c.agent.loop_stall_window,
loop_stall_min_progress_sum: c.agent.loop_stall_min_progress_sum,
loop_cycle_repeat_limit: c.agent.loop_cycle_repeat_limit,
loop_checkpoint_enabled: c.agent.loop_checkpoint_enabled,
}
});
pub fn settings() -> &'static AgentSettings {
&SETTINGS
}
pub fn loop_policy() -> crate::loop_policy::LoopPolicy {
let s = settings();
let policy = crate::loop_policy::LoopPolicy {
hard_max_steps: s.loop_hard_max_steps,
same_action_repeat_limit: s.loop_same_action_repeat_limit,
consecutive_error_limit: s.loop_consecutive_error_limit,
stall_window: s.loop_stall_window,
stall_min_progress_sum: s.loop_stall_min_progress_sum,
cycle_repeat_limit: s.loop_cycle_repeat_limit,
checkpoint_enabled: s.loop_checkpoint_enabled,
};
match policy.validate_runtime() {
Ok(()) => policy,
Err(e) => {
log::error!("Invalid loop policy config: {}. Falling back to defaults.", e);
crate::loop_policy::LoopPolicy::defaults()
}
}
}
static HISTORY: once_cell::sync::Lazy<Mutex<VecDeque<(String, String)>>> =
once_cell::sync::Lazy::new(|| {
Mutex::new(VecDeque::with_capacity(SETTINGS.history_buffer_capacity))
});
pub fn record_history(conversation_id: &str, formatted_msg: &str) {
let mut hist = HISTORY.lock().unwrap_or_else(|e| e.into_inner());
while hist.len() >= settings().history_capacity * 3 {
hist.pop_front();
}
hist.push_back((conversation_id.to_string(), formatted_msg.to_string()));
}
pub fn get_history(conversation_id: &str) -> Vec<String> {
let hist = HISTORY.lock().unwrap_or_else(|e| e.into_inner());
hist.iter()
.filter(|(cid, _)| cid == conversation_id)
.rev()
.take(settings().history_capacity)
.map(|(_, msg)| msg.clone())
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}
pub fn count_consecutive_agent_turns(conversation_id: &str) -> usize {
let hist = HISTORY.lock().unwrap_or_else(|e| e.into_inner());
let mut count = 0usize;
for (_, msg) in hist.iter().rev().filter(|(cid, _)| cid == conversation_id) {
if !msg.starts_with('[') {
break;
}
if let Some(end) = msg.find(']') {
let name = &msg[1..end];
if !is_known_agent_name(name) {
break;
}
}
count += 1;
}
count
}
pub fn is_known_agent_name(name: &str) -> bool {
if let Ok(members) = std::env::var("TEAM_MEMBERS") {
for entry in members.split(',') {
let entry = entry.trim();
if let Some(colon_pos) = entry.find(':') {
if &entry[..colon_pos] == name {
return true;
}
} else if entry == name {
return true;
}
}
}
if let Ok(username) = std::env::var("TELEGRAM_BOT_USERNAME") {
if name == username {
return true;
}
}
false
}
pub fn make_envelope(
reply_to: &Envelope,
agent_name: &str,
event_type: EventType,
target: &str,
payload: MessagePayload,
) -> Envelope {
Envelope {
id: generate_event_id(),
schema_version: "1.0".into(),
event_type,
conversation_id: reply_to.conversation_id.clone(),
correlation_id: reply_to.correlation_id.clone(),
reply_to: Some(reply_to.id.clone()),
trace_id: generate_trace_id(),
source: format!("agent:{}", agent_name),
target: target.into(),
created_at: chrono::Utc::now().timestamp(),
deadline_ms: 0,
payload: Payload::Message(payload),
}
}
#[derive(Debug, Clone)]
pub struct ExecCommandResult {
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
pub timed_out: bool,
pub duration_ms: u64,
pub spawn_error: Option<String>,
}
const BLOCKED_PATTERNS: &[&str] = &[
"docker",
"kubectl",
"crictl",
"sudo ",
"su ",
"chmod u+s",
"chown root",
"curl -o /",
"wget -O /",
"nc -e",
"ncat",
"kill -9 1",
"/proc/",
"/sys/",
"rm -rf /",
"mkfs.",
"dd if=",
"/etc/shadow",
"/etc/passwd",
".ssh/",
"apt-get install",
"yum install",
"pip install",
"npm install -g",
"cargo install",
];
fn cached_blocked_patterns() -> &'static Vec<String> {
use std::sync::OnceLock;
static CACHED: OnceLock<Vec<String>> = OnceLock::new();
CACHED.get_or_init(|| {
BLOCKED_PATTERNS.iter().map(|p| p.to_lowercase()).collect()
})
}
fn validate_command(command: &str, workdir: &str) -> Result<(), String> {
let cmd_lower = command.to_lowercase();
let allowed_workdirs = {
let config = clawgarden_proto::AppConfig::load();
vec![
config.paths.workspace.clone(),
"/tmp".to_string(),
"/app".to_string(),
]
};
if !allowed_workdirs.iter().any(|d| workdir.starts_with(d) || workdir == d) {
return Err(format!(
"Workdir '{}' is outside allowed directories ({})",
workdir,
allowed_workdirs.join(", ")
));
}
for (i, pattern) in BLOCKED_PATTERNS.iter().enumerate() {
if cmd_lower.contains(&cached_blocked_patterns()[i]) {
return Err(format!(
"Command blocked: contains prohibited pattern '{}'",
pattern.trim()
));
}
}
if command.contains("../") && (command.contains("/etc/") || command.contains("/root/") || command.contains("/home/")) {
return Err("Command blocked: path traversal to restricted directory".to_string());
}
Ok(())
}
pub async fn execute_command(
command: &str,
workdir: Option<&str>,
timeout_secs: u64,
) -> ExecCommandResult {
let workdir = workdir.unwrap_or("/workspace");
if let Err(reason) = validate_command(command, workdir) {
log::warn!("Command blocked: {} — command: {}", reason, command);
return ExecCommandResult {
stdout: String::new(),
stderr: format!("Command rejected: {}", reason),
exit_code: Some(126), timed_out: false,
duration_ms: 0,
spawn_error: Some(reason),
};
}
let started = std::time::Instant::now();
let result = tokio::time::timeout(
Duration::from_secs(timeout_secs),
tokio::process::Command::new("sh")
.arg("-c")
.arg(command)
.current_dir(workdir)
.output(),
)
.await;
match result {
Ok(Ok(output)) => ExecCommandResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code(),
timed_out: false,
duration_ms: started.elapsed().as_millis() as u64,
spawn_error: None,
},
Ok(Err(e)) => ExecCommandResult {
stdout: String::new(),
stderr: String::new(),
exit_code: None,
timed_out: false,
duration_ms: started.elapsed().as_millis() as u64,
spawn_error: Some(format!("Execution error: {}", e)),
},
Err(_) => ExecCommandResult {
stdout: String::new(),
stderr: String::new(),
exit_code: None,
timed_out: true,
duration_ms: started.elapsed().as_millis() as u64,
spawn_error: None,
},
}
}
pub async fn create_skill(
bus: &mut BusClient,
agent_name: &str,
_env: &Envelope,
skill_name: &str,
description: &str,
skill_md: &str,
) -> Result<()> {
use clawgarden_proto::generate_correlation_id;
let correlation_id = generate_correlation_id();
let create_env = Envelope::new_skill_create(
correlation_id.clone(),
clawgarden_proto::generate_trace_id(),
format!("agent:{}", agent_name),
skill_name.to_string(),
description.to_string(),
skill_md.to_string(),
);
bus.send(&create_env).await?;
log::info!("Sent SkillCreate for '{}'", skill_name);
Ok(())
}
pub fn format_speaker(source: &str) -> String {
if let Some(name) = source.strip_prefix("agent:") {
name.to_string()
} else if let Some(name) = source.strip_prefix("telegram:") {
if name.starts_with("user_") {
"User".to_string()
} else {
name.to_string()
}
} else {
source.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_command_allowed() {
assert!(validate_command("ls -la", "/workspace").is_ok());
assert!(validate_command("cat README.md", "/workspace").is_ok());
assert!(validate_command("rg 'fn main'", "/workspace/src").is_ok());
assert!(validate_command("echo hello", "/tmp").is_ok());
}
#[test]
fn test_validate_command_blocked_docker() {
let result = validate_command("docker run alpine", "/workspace");
assert!(result.is_err());
assert!(result.unwrap_err().contains("docker"));
}
#[test]
fn test_validate_command_blocked_sudo() {
let result = validate_command("sudo rm -rf /", "/workspace");
assert!(result.is_err());
assert!(result.unwrap_err().contains("sudo"));
}
#[test]
fn test_validate_command_blocked_credential_access() {
let result = validate_command("cat /etc/shadow", "/workspace");
assert!(result.is_err());
assert!(result.unwrap_err().contains("/etc/shadow"));
}
#[test]
fn test_validate_command_blocked_package_install() {
let result = validate_command("apt-get install nmap", "/workspace");
assert!(result.is_err());
}
#[test]
fn test_validate_command_blocked_workdir() {
let result = validate_command("ls", "/etc");
assert!(result.is_err());
assert!(result.unwrap_err().contains("outside allowed"));
}
#[test]
fn test_validate_command_allowed_tmp() {
assert!(validate_command("mktemp", "/tmp").is_ok());
}
#[test]
fn test_validate_command_blocked_destructive() {
let result = validate_command("rm -rf /", "/workspace");
assert!(result.is_err());
}
#[test]
fn test_execute_command_blocked_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(execute_command("sudo whoami", None, 5));
assert_eq!(result.exit_code, Some(126));
assert!(result.spawn_error.is_some());
assert!(result.stderr.contains("rejected"));
}
}