use crate::core::broker::DbBroker;
use crate::core::error;
use crate::core::external_action;
use crate::core::schemas;
use crate::core::store::Store;
use crate::plugins::health;
use clap::{Parser, Subcommand};
use rusqlite::{Result, types::ToSql};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::env;
use std::path::{Path, PathBuf};
fn reflex_db_path(root: &Path) -> PathBuf {
root.join(schemas::AUTOMATION_DB_NAME)
}
pub fn initialize_reflex_db(root: &Path) -> Result<(), error::DecapodError> {
std::fs::create_dir_all(root).map_err(error::DecapodError::IoError)?;
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.init", |conn| {
conn.execute(schemas::REFLEX_DB_SCHEMA, [])
.map_err(error::DecapodError::RusqliteError)?;
Ok(())
})?;
Ok(())
}
fn now_iso() -> String {
crate::core::time::now_epoch_z()
}
const COMPONENT_NAMES: &[&str] = &[
"application_development",
"architecture",
"artificial_intelligence",
"design_and_style",
"development_lifecycle",
"documentation",
"languages",
"platform_engineering",
"project_management",
"specialized_domains",
];
fn scope_from_dir(p: &str) -> String {
let path = Path::new(p);
for component_name in COMPONENT_NAMES {
if path.file_name().map(|s| s.to_string_lossy().to_lowercase())
== Some(component_name.to_string())
|| path
.to_string_lossy()
.to_lowercase()
.contains(&format!("/{component_name}/"))
{
return component_name.to_string();
}
}
"root".to_string()
}
fn ulid_like() -> String {
crate::core::ulid::new_ulid()
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Reflex {
pub id: String,
pub name: String,
pub description: String,
pub trigger_type: String,
pub trigger_config: String,
pub action_type: String,
pub action_config: String,
pub status: String,
pub tags: String,
pub created_at: String,
pub updated_at: String,
pub dir_path: String,
pub scope: String,
}
#[derive(Parser, Debug)]
#[clap(
name = "reflex",
about = "Manage automated responses (reflexes) within the Decapod system."
)]
pub struct ReflexCli {
#[clap(subcommand)]
pub command: ReflexCommand,
}
#[derive(Subcommand, Debug)]
pub enum ReflexCommand {
Add {
#[clap(long)]
name: String,
#[clap(long, default_value = "")]
description: String,
#[clap(long)]
trigger_type: String,
#[clap(long, default_value = "{}")]
trigger_config: String,
#[clap(long)]
action_type: String,
#[clap(long)]
action_config: String,
#[clap(long, default_value = "active")]
status: String,
#[clap(long, default_value = "")]
tags: String,
#[clap(long)]
dir: Option<String>,
},
Update {
#[clap(long)]
id: String,
#[clap(long)]
name: Option<String>,
#[clap(long)]
description: Option<String>,
#[clap(long)]
trigger_type: Option<String>,
#[clap(long)]
trigger_config: Option<String>,
#[clap(long)]
action_type: Option<String>,
#[clap(long)]
action_config: Option<String>,
#[clap(long)]
status: Option<String>,
#[clap(long)]
tags: Option<String>,
},
Get {
#[clap(long)]
id: String,
},
List {
#[clap(long)]
status: Option<String>,
#[clap(long)]
scope: Option<String>,
#[clap(long)]
tags: Option<String>,
#[clap(long)]
name_search: Option<String>,
#[clap(long)]
dir: Option<String>,
},
Delete {
#[clap(long)]
id: String,
},
Run {
#[clap(long)]
id: Option<String>,
#[clap(long)]
trigger_type: Option<String>,
#[clap(long, default_value_t = 10)]
limit: usize,
},
AddHeartbeatLoop {
#[clap(long, default_value = "human-heartbeat-autoclaim")]
name: String,
#[clap(long)]
agent: Option<String>,
#[clap(long, default_value_t = 1)]
max_claims: usize,
#[clap(long, default_value = "")]
tags: String,
#[clap(long)]
dir: Option<String>,
},
AddHumanTriggerLoop {
#[clap(long, default_value = "human-trigger-task-loop")]
name: String,
#[clap(long)]
agent: Option<String>,
#[clap(long)]
task_title: String,
#[clap(long, default_value = "medium")]
priority: String,
#[clap(long, default_value_t = 1)]
max_tasks: usize,
#[clap(long, default_value = "")]
tags: String,
#[clap(long)]
dir: Option<String>,
},
AddHealthTrigger {
#[clap(long, default_value = "health-state-remediate")]
name: String,
#[clap(long)]
agent: Option<String>,
#[clap(long, default_value = "STALE,CONTRADICTED")]
watch_states: String,
#[clap(long, default_value = "high")]
priority: String,
#[clap(long, default_value = "")]
tags: String,
#[clap(long)]
dir: Option<String>,
},
}
pub fn schema() -> serde_json::Value {
serde_json::json!({
"name": "reflex",
"version": "0.1.0",
"description": "Manage automated responses",
"commands": [
{
"name": "add",
"description": "Add a new reflex entry",
"parameters": [
{"name": "name", "required": true, "description": "Unique reflex name identifier"},
{"name": "description", "required": false, "description": "Human-readable description of the reflex purpose", "default": ""},
{"name": "trigger_type", "required": true, "description": "Type of trigger (e.g., file_change, command_exit, schedule)"},
{"name": "trigger_config", "required": true, "description": "JSON configuration for trigger conditions", "default": "{}"},
{"name": "action_type", "required": true, "description": "Type of action to perform (e.g., notify, exec, webhook)"},
{"name": "action_config", "required": true, "description": "JSON configuration for the action to execute"},
{"name": "status", "required": false, "description": "Initial reflex status", "default": "active"},
{"name": "tags", "required": false, "description": "Comma-separated tags for categorization", "default": ""}
]
},
{
"name": "list",
"description": "List reflex entries",
"parameters": [
{"name": "status", "required": false, "description": "Filter by status (active, paused, disabled)"},
{"name": "scope", "required": false, "description": "Filter by scope directory"},
{"name": "tags", "required": false, "description": "Filter by comma-separated tags"}
]
},
{
"name": "get",
"description": "Retrieve a reflex entry by ID",
"parameters": [
{"name": "id", "required": true, "description": "Reflex entry ID to retrieve"}
]
},
{
"name": "update",
"description": "Update an existing reflex entry",
"parameters": [
{"name": "id", "required": true, "description": "Reflex entry ID to update"}
]
},
{
"name": "delete",
"description": "Delete a reflex entry",
"parameters": [
{"name": "id", "required": true, "description": "Reflex entry ID to delete"}
]
},
{
"name": "run",
"description": "Run active reflex actions by id or trigger type",
"parameters": [
{"name": "id", "required": false, "description": "Optional specific reflex ID to run"},
{"name": "trigger_type", "required": false, "description": "Optional trigger type filter (e.g. human)"},
{"name": "limit", "required": false, "description": "Maximum reflex actions to run", "default": 10}
]
},
{
"name": "add-heartbeat-loop",
"description": "Install a canonical human-triggered heartbeat autoclaim reflex",
"parameters": [
{"name": "name", "required": false, "description": "Reflex name", "default": "human-heartbeat-autoclaim"},
{"name": "agent", "required": false, "description": "Agent ID (defaults to DECAPOD_AGENT_ID or unknown)"},
{"name": "max_claims", "required": false, "description": "Maximum tasks to autoclaim per heartbeat run", "default": 1}
]
},
{
"name": "add-human-trigger-loop",
"description": "Install a human trigger -> task -> worker execution -> lesson reflex",
"parameters": [
{"name": "name", "required": false, "description": "Reflex name", "default": "human-trigger-task-loop"},
{"name": "agent", "required": false, "description": "Agent ID (defaults to DECAPOD_AGENT_ID or unknown)"},
{"name": "task_title", "required": true, "description": "Task title to create on trigger"},
{"name": "priority", "required": false, "description": "Task priority", "default": "medium"},
{"name": "max_tasks", "required": false, "description": "Worker max tasks per run", "default": 1}
]
},
{
"name": "add-health-trigger",
"description": "Install a condition-based health trigger that creates remediation tasks when claims degrade",
"parameters": [
{"name": "name", "required": false, "description": "Reflex name", "default": "health-state-remediate"},
{"name": "agent", "required": false, "description": "Agent ID (defaults to DECAPOD_AGENT_ID or unknown)"},
{"name": "watch_states", "required": false, "description": "Comma-separated health states to watch", "default": "STALE,CONTRADICTED"},
{"name": "priority", "required": false, "description": "Remediation task priority", "default": "high"}
]
}
],
"storage": ["reflex.db"]
})
}
pub fn run_reflex_cli(store: &Store, cli: ReflexCli) {
let root = &store.root;
let result = match cli.command {
ReflexCommand::Add {
name,
description,
trigger_type,
trigger_config,
action_type,
action_config,
status,
tags,
dir,
} => add_reflex(
root,
name,
description,
trigger_type,
trigger_config,
action_type,
action_config,
status,
tags,
dir,
),
ReflexCommand::Update {
id,
name,
description,
trigger_type,
trigger_config,
action_type,
action_config,
status,
tags,
} => update_reflex(
root,
id,
name,
description,
trigger_type,
trigger_config,
action_type,
action_config,
status,
tags,
),
ReflexCommand::Get { id } => get_reflex(root, id),
ReflexCommand::List {
status,
scope,
tags,
name_search,
dir,
} => list_reflexes(root, status, scope, tags, name_search, dir),
ReflexCommand::Delete { id } => delete_reflex(root, id),
ReflexCommand::Run {
id,
trigger_type,
limit,
} => run_reflex_actions(root, &id, &trigger_type, &limit),
ReflexCommand::AddHeartbeatLoop {
name,
agent,
max_claims,
tags,
dir,
} => add_heartbeat_loop_reflex(root, &name, &agent, &max_claims, &tags, &dir),
ReflexCommand::AddHumanTriggerLoop {
name,
agent,
task_title,
priority,
max_tasks,
tags,
dir,
} => add_human_trigger_loop_reflex(
root,
&name,
&agent,
&task_title,
&priority,
&max_tasks,
&tags,
&dir,
),
ReflexCommand::AddHealthTrigger {
name,
agent,
watch_states,
priority,
tags,
dir,
} => add_health_trigger_reflex(root, &name, &agent, &watch_states, &priority, &tags, &dir),
};
if let Err(e) = result {
eprintln!("Error: {e}");
}
}
fn parse_json_config(raw: &str, field: &str) -> Result<JsonValue, error::DecapodError> {
serde_json::from_str(raw)
.map_err(|e| error::DecapodError::ValidationError(format!("invalid {field} JSON: {e}")))
}
fn fetch_matching_reflexes(
root: &Path,
id: Option<String>,
trigger_type: Option<String>,
limit: usize,
) -> Result<Vec<Reflex>, error::DecapodError> {
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.run.scan", |conn| {
let mut query = "SELECT id, name, description, trigger_type, trigger_config, action_type, action_config, status, tags, created_at, updated_at, dir_path, scope FROM reflexes WHERE status = 'active'".to_string();
let mut params: Vec<Box<dyn ToSql>> = Vec::new();
if let Some(i) = id {
query.push_str(" AND id = ?");
params.push(Box::new(i));
}
if let Some(t) = trigger_type {
query.push_str(" AND trigger_type = ?");
params.push(Box::new(t));
}
query.push_str(" ORDER BY updated_at DESC LIMIT ?");
params.push(Box::new(limit as i64));
let params_as_dyn: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&query)?;
let rows = stmt.query_map(¶ms_as_dyn[..], |row| {
Ok(Reflex {
id: row.get(0)?,
name: row.get(1)?,
description: row.get(2)?,
trigger_type: row.get(3)?,
trigger_config: row.get(4)?,
action_type: row.get(5)?,
action_config: row.get(6)?,
status: row.get(7)?,
tags: row.get(8)?,
created_at: row.get(9)?,
updated_at: row.get(10)?,
dir_path: row.get(11)?,
scope: row.get(12)?,
})
})?;
let mut out = Vec::new();
for row in rows {
out.push(row?);
}
Ok(out)
})
}
fn run_decapod_command_json(
root: &Path,
scope: &str,
args: &[&str],
) -> Result<serde_json::Value, error::DecapodError> {
let decapod_bin = std::env::current_exe()
.map_err(error::DecapodError::IoError)?
.to_string_lossy()
.to_string();
let cwd = root
.parent()
.and_then(|p| p.parent())
.map(Path::to_path_buf)
.unwrap_or(std::env::current_dir().map_err(error::DecapodError::IoError)?);
let output = external_action::execute(
root,
external_action::ExternalCapability::VerificationExec,
scope,
decapod_bin.as_str(),
args,
&cwd,
)?;
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let payload = serde_json::from_str::<JsonValue>(&stdout)
.unwrap_or_else(|_| serde_json::json!({ "raw_stdout": stdout }));
Ok(serde_json::json!({
"status": if output.status.success() { "ok" } else { "error" },
"exit_code": output.status.code(),
"payload": payload,
"stderr": stderr
}))
}
fn execute_reflex_action(
root: &Path,
reflex: &Reflex,
) -> Result<serde_json::Value, error::DecapodError> {
match reflex.action_type.as_str() {
"todo.heartbeat.autoclaim" => {
let cfg = parse_json_config(&reflex.action_config, "action_config")?;
let default_agent =
env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = cfg
.get("agent")
.and_then(|v| v.as_str())
.unwrap_or(default_agent.as_str());
let max_claims = cfg.get("max_claims").and_then(|v| v.as_u64()).unwrap_or(1);
let max_claims_s = max_claims.to_string();
let args = vec![
"todo",
"heartbeat",
"--format",
"json",
"--agent",
agent,
"--autoclaim",
"--max-claims",
max_claims_s.as_str(),
];
run_decapod_command_json(root, "reflex.action.todo.heartbeat.autoclaim", &args)
}
"todo.human.trigger.loop" => {
let cfg = parse_json_config(&reflex.action_config, "action_config")?;
let default_agent =
env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = cfg
.get("agent")
.and_then(|v| v.as_str())
.unwrap_or(default_agent.as_str());
let task_title = cfg
.get("task_title")
.and_then(|v| v.as_str())
.ok_or_else(|| {
error::DecapodError::ValidationError(
"action_config.task_title is required".to_string(),
)
})?;
let priority = cfg
.get("priority")
.and_then(|v| v.as_str())
.unwrap_or("medium");
let tags = cfg.get("tags").and_then(|v| v.as_str()).unwrap_or("");
let max_tasks = cfg.get("max_tasks").and_then(|v| v.as_u64()).unwrap_or(1);
let max_tasks_s = max_tasks.to_string();
let mut add_args = vec![
"todo",
"add",
"--format",
"json",
task_title,
"--priority",
priority,
"--owner",
agent,
];
if !tags.is_empty() {
add_args.push("--tags");
add_args.push(tags);
}
let add_out =
run_decapod_command_json(root, "reflex.action.todo.human.add", &add_args)?;
let added_task_id = add_out
.get("payload")
.and_then(|p| p.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let mut worker_args = vec![
"todo",
"worker-run",
"--format",
"json",
"--agent",
agent,
"--max-tasks",
max_tasks_s.as_str(),
];
if let Some(task_id) = added_task_id.as_deref() {
worker_args.push("--task-id");
worker_args.push(task_id);
}
let worker_out = run_decapod_command_json(
root,
"reflex.action.todo.human.worker_run",
&worker_args,
)?;
Ok(serde_json::json!({
"status": "ok",
"add_task": add_out,
"worker_run": worker_out
}))
}
"todo.health.remediate" => {
let cfg = parse_json_config(&reflex.action_config, "action_config")?;
let default_agent =
env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = cfg
.get("agent")
.and_then(|v| v.as_str())
.unwrap_or(default_agent.as_str());
let priority = cfg
.get("priority")
.and_then(|v| v.as_str())
.unwrap_or("high");
let watch_states: Vec<String> = cfg
.get("watch_states")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_else(|| vec!["STALE".to_string(), "CONTRADICTED".to_string()]);
let store = Store {
kind: crate::core::store::StoreKind::Repo,
root: root.to_path_buf(),
};
health::initialize_health_db(&store.root)?;
let all_health = health::get_all_health(&store)?;
let mut triggered_claims = Vec::new();
for (claim_id, state, reason) in &all_health {
let state_str = format!("{state:?}");
if watch_states.iter().any(|ws| ws == &state_str) {
triggered_claims.push(serde_json::json!({
"claim_id": claim_id,
"state": state_str,
"reason": reason
}));
}
}
if triggered_claims.is_empty() {
return Ok(serde_json::json!({
"status": "ok",
"triggered": false,
"message": "No health claims match watched states",
"watched_states": watch_states
}));
}
let mut task_results = Vec::new();
for tc in &triggered_claims {
let claim_id = tc
.get("claim_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let state_str = tc
.get("state")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let reason = tc.get("reason").and_then(|v| v.as_str()).unwrap_or("");
let task_title = format!(
"Remediate {} health claim: {}",
state_str.to_lowercase(),
claim_id
);
let tags_val = format!("health-remediation,{}", state_str.to_lowercase());
let add_args = vec![
"todo",
"add",
"--format",
"json",
&task_title,
"--priority",
priority,
"--owner",
agent,
"--tags",
&tags_val,
];
let add_out = run_decapod_command_json(
root,
"reflex.action.todo.health.remediate",
&add_args,
)?;
task_results.push(serde_json::json!({
"claim_id": claim_id,
"state": state_str,
"reason": reason,
"task": add_out
}));
}
Ok(serde_json::json!({
"status": "ok",
"triggered": true,
"watched_states": watch_states,
"remediation_tasks": task_results
}))
}
other => Err(error::DecapodError::ValidationError(format!(
"unsupported reflex action_type '{other}'"
))),
}
}
fn run_reflex_actions(
root: &Path,
id: &Option<String>,
trigger_type: &Option<String>,
limit: &usize,
) -> Result<(), error::DecapodError> {
let reflexes = fetch_matching_reflexes(root, id.clone(), trigger_type.clone(), *limit)?;
let mut results = Vec::new();
for reflex in reflexes {
let action_result = execute_reflex_action(root, &reflex);
match action_result {
Ok(payload) => results.push(serde_json::json!({
"reflex_id": reflex.id,
"name": reflex.name,
"trigger_type": reflex.trigger_type,
"action_type": reflex.action_type,
"result": payload
})),
Err(e) => results.push(serde_json::json!({
"reflex_id": reflex.id,
"name": reflex.name,
"trigger_type": reflex.trigger_type,
"action_type": reflex.action_type,
"result": {
"status": "error",
"error": e.to_string()
}
})),
}
}
println!(
"{}",
serde_json::json!({
"ts": now_iso(),
"cmd": "reflex.run",
"status": "ok",
"count": results.len(),
"results": results
})
);
Ok(())
}
fn add_heartbeat_loop_reflex(
root: &Path,
name: &str,
agent: &Option<String>,
max_claims: &usize,
tags: &str,
dir: &Option<String>,
) -> Result<(), error::DecapodError> {
let default_agent = env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = agent.clone().unwrap_or(default_agent);
let trigger_config = serde_json::json!({
"source": "human",
"intent": "heartbeat_pull"
})
.to_string();
let action_config = serde_json::json!({
"agent": agent,
"max_claims": max_claims
})
.to_string();
add_reflex(
root,
name.to_string(),
"Human-triggered reflex that runs todo heartbeat autoclaim".to_string(),
"human".to_string(),
trigger_config,
"todo.heartbeat.autoclaim".to_string(),
action_config,
"active".to_string(),
tags.to_string(),
dir.clone(),
)
}
#[allow(clippy::too_many_arguments)]
fn add_human_trigger_loop_reflex(
root: &Path,
name: &str,
agent: &Option<String>,
task_title: &str,
priority: &str,
max_tasks: &usize,
tags: &str,
dir: &Option<String>,
) -> Result<(), error::DecapodError> {
let default_agent = env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = agent.clone().unwrap_or(default_agent);
let trigger_config = serde_json::json!({
"source": "human",
"intent": "task_execute_learn"
})
.to_string();
let action_config = serde_json::json!({
"agent": agent,
"task_title": task_title,
"priority": priority,
"max_tasks": max_tasks,
"tags": tags
})
.to_string();
add_reflex(
root,
name.to_string(),
"Human trigger loop: create task, execute worker loop, persist lesson".to_string(),
"human".to_string(),
trigger_config,
"todo.human.trigger.loop".to_string(),
action_config,
"active".to_string(),
tags.to_string(),
dir.clone(),
)
}
#[allow(clippy::too_many_arguments)]
fn add_reflex(
root: &Path,
name: String,
description: String,
trigger_type: String,
trigger_config: String,
action_type: String,
action_config: String,
status: String,
tags: String,
dir: Option<String>,
) -> Result<(), error::DecapodError> {
let dir_path = dir.unwrap_or_else(|| env::current_dir().unwrap().to_string_lossy().to_string());
let dir_abs = Path::new(&dir_path)
.canonicalize()
.map_err(error::DecapodError::IoError)?
.to_string_lossy()
.to_string();
let scope = scope_from_dir(&dir_abs);
let reflex_id = format!("REF_{}", ulid_like());
let now = now_iso();
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.add", |conn| {
conn.execute(
"INSERT INTO reflexes(id, name, description, trigger_type, trigger_config, action_type, action_config, status, tags, created_at, updated_at, dir_path, scope)
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
rusqlite::params![reflex_id, name, description, trigger_type, trigger_config, action_type, action_config, status, tags, now, now, dir_abs, scope],
)?;
Ok(())
})?;
println!(
"{}",
serde_json::json!({
"ts": now_iso(),
"cmd": "add",
"id": reflex_id,
})
);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn update_reflex(
root: &Path,
id: String,
name: Option<String>,
description: Option<String>,
trigger_type: Option<String>,
trigger_config: Option<String>,
action_type: Option<String>,
action_config: Option<String>,
status: Option<String>,
tags: Option<String>,
) -> Result<(), error::DecapodError> {
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.update", |conn| {
let mut set_clauses = Vec::new();
let mut params: Vec<Box<dyn ToSql>> = Vec::new();
if let Some(n) = name {
set_clauses.push("name = ?");
params.push(Box::new(n));
}
if let Some(d) = description {
set_clauses.push("description = ?");
params.push(Box::new(d));
}
if let Some(tt) = trigger_type {
set_clauses.push("trigger_type = ?");
params.push(Box::new(tt));
}
if let Some(tc) = trigger_config {
set_clauses.push("trigger_config = ?");
params.push(Box::new(tc));
}
if let Some(at) = action_type {
set_clauses.push("action_type = ?");
params.push(Box::new(at));
}
if let Some(ac) = action_config {
set_clauses.push("action_config = ?");
params.push(Box::new(ac));
}
if let Some(s) = status {
set_clauses.push("status = ?");
params.push(Box::new(s));
}
if let Some(t) = tags {
set_clauses.push("tags = ?");
params.push(Box::new(t));
}
if set_clauses.is_empty() {
println!(
"{}",
serde_json::json!({ "ts": now_iso(), "cmd": "update", "id": id, "status": "no_changes" })
);
return Ok(());
}
set_clauses.push("updated_at = ?");
params.push(Box::new(now_iso()));
params.push(Box::new(id.clone()));
let update_sql = format!(
"UPDATE reflexes SET {} WHERE id = ?",
set_clauses.join(", ")
);
let params_as_dyn: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
conn.execute(&update_sql, ¶ms_as_dyn[..])?;
println!(
"{}",
serde_json::json!({ "ts": now_iso(), "cmd": "update", "id": id, "status": "ok" })
);
Ok(())
})
}
fn get_reflex(root: &Path, id: String) -> Result<(), error::DecapodError> {
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.get", |conn| {
let mut stmt = conn.prepare("SELECT * FROM reflexes WHERE id = ?1")?;
let mut rows = stmt.query_map([&id], |row| {
Ok(Reflex {
id: row.get(0)?,
name: row.get(1)?,
description: row.get(2)?,
trigger_type: row.get(3)?,
trigger_config: row.get(4)?,
action_type: row.get(5)?,
action_config: row.get(6)?,
status: row.get(7)?,
tags: row.get(8)?,
created_at: row.get(9)?,
updated_at: row.get(10)?,
dir_path: row.get(11)?,
scope: row.get(12)?,
})
})?;
if let Some(reflex_result) = rows.next() {
match reflex_result {
Ok(reflex) => println!("{}", serde_json::to_string_pretty(&reflex).unwrap()),
Err(e) => eprintln!("Error reading reflex: {e}"),
}
} else {
println!(
"{}",
serde_json::json!({ "ts": now_iso(), "cmd": "get", "id": id, "status": "not_found" })
);
}
Ok(())
})
}
fn list_reflexes(
root: &Path,
status: Option<String>,
scope: Option<String>,
tags: Option<String>,
name_search: Option<String>,
dir: Option<String>,
) -> Result<(), error::DecapodError> {
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.list", |conn| {
let mut query = "SELECT * FROM reflexes WHERE 1=1".to_string();
let mut params: Vec<Box<dyn ToSql>> = Vec::new();
if let Some(s) = status {
query.push_str(" AND status = ?");
params.push(Box::new(s));
}
if let Some(s) = scope {
query.push_str(" AND scope = ?");
params.push(Box::new(s));
}
if let Some(t) = tags {
query.push_str(" AND tags LIKE ?");
params.push(Box::new(format!("%{t}%")));
}
if let Some(n) = name_search {
query.push_str(" AND name LIKE ?");
params.push(Box::new(format!("%{n}%")));
}
if let Some(d) = dir {
query.push_str(" AND dir_path = ?");
params.push(Box::new(d));
}
query.push_str(" ORDER BY updated_at DESC");
let mut stmt = conn.prepare(&query)?;
let params_as_dyn: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(¶ms_as_dyn[..], |row| {
Ok(Reflex {
id: row.get(0)?,
name: row.get(1)?,
description: row.get(2)?,
trigger_type: row.get(3)?,
trigger_config: row.get(4)?,
action_type: row.get(5)?,
action_config: row.get(6)?,
status: row.get(7)?,
tags: row.get(8)?,
created_at: row.get(9)?,
updated_at: row.get(10)?,
dir_path: row.get(11)?,
scope: row.get(12)?,
})
})?;
println!("Reflexes:");
for reflex in rows {
let r = reflex?;
println!("----------------------------------------------------");
println!(
"ID: {}\nName: {}\nTrigger: {} ({})\nAction: {} ({})\nStatus: {}\nScope: {} (Path: {})\nUpdated: {}",
r.id,
r.name,
r.trigger_type,
r.trigger_config,
r.action_type,
r.action_config,
r.status,
r.scope,
r.dir_path,
r.updated_at
);
}
println!("----------------------------------------------------");
Ok(())
})
}
#[allow(clippy::too_many_arguments)]
fn add_health_trigger_reflex(
root: &Path,
name: &str,
agent: &Option<String>,
watch_states: &str,
priority: &str,
tags: &str,
dir: &Option<String>,
) -> Result<(), error::DecapodError> {
let default_agent = env::var("DECAPOD_AGENT_ID").unwrap_or_else(|_| "unknown".to_string());
let agent = agent.clone().unwrap_or(default_agent);
let states: Vec<String> = watch_states
.split(',')
.map(|s| s.trim().to_uppercase())
.filter(|s| !s.is_empty())
.collect();
let valid_states = ["STALE", "CONTRADICTED", "ASSERTED"];
for s in &states {
if !valid_states.contains(&s.as_str()) {
return Err(error::DecapodError::ValidationError(format!(
"invalid watch_state '{}'; valid values: {}",
s,
valid_states.join(", ")
)));
}
}
let trigger_config = serde_json::json!({
"source": "health_state",
"watch_states": states,
"intent": "condition_based_remediation"
})
.to_string();
let action_config = serde_json::json!({
"agent": agent,
"priority": priority,
"watch_states": states
})
.to_string();
add_reflex(
root,
name.to_string(),
format!(
"Condition-based trigger: create remediation tasks when health claims enter {} states",
states.join("/")
),
"health_state".to_string(),
trigger_config,
"todo.health.remediate".to_string(),
action_config,
"active".to_string(),
tags.to_string(),
dir.clone(),
)
}
fn delete_reflex(root: &Path, id: String) -> Result<(), error::DecapodError> {
let broker = DbBroker::new(root);
let db_path = reflex_db_path(root);
broker.with_conn(&db_path, "decapod", None, "reflex.delete", |conn| {
conn.execute("DELETE FROM reflexes WHERE id = ?1", [&id])?;
Ok(())
})?;
println!(
"{}",
serde_json::json!({ "ts": now_iso(), "cmd": "delete", "id": id, "status": "ok" })
);
Ok(())
}