use std::sync::Arc;
use thiserror::Error;
mod validation;
#[cfg(test)]
mod tests;
#[derive(Debug, Error)]
pub enum ActionError {
#[error("Command not allowed: {0}")]
CommandNotAllowed(String),
#[error("Command execution failed: {0}")]
ExecutionFailed(String),
#[error("Timeout")]
Timeout,
#[error("Invalid arguments: {0}")]
InvalidArguments(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Debug, Clone, PartialEq)]
pub enum Action {
ExecuteCommand { command: String, args: Vec<String> },
WebSearch { query: String },
ScheduleTask {
description: String,
cron: Option<String>,
},
StoreFact {
subject: String,
predicate: String,
object: String,
},
Recall { query: String },
SendMessage {
channel: String,
recipient: String,
content: String,
},
NetDiagnostic { probe: NetProbe, target: String },
SecurityAudit,
AnalyzeLogs {
system: bool,
since: String,
lines: usize,
},
BaselineCapture { label: Option<String> },
BaselineDiff { from: Option<u32>, to: Option<u32> },
BaselineList,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NetProbe {
Check,
Trace,
Cert,
}
#[derive(Debug, Clone)]
pub struct ActionResult {
pub success: bool,
pub output: String,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MemoryFact {
pub namespace: String,
pub subject: String,
pub predicate: String,
pub object: String,
pub confidence: f64,
}
#[async_trait::async_trait]
pub trait MemoryBackend: Send + Sync {
async fn store_fact(
&self,
namespace: &str,
category: &str,
subject: &str,
predicate: &str,
object: &str,
) -> Result<String, ActionError>;
async fn recall(
&self,
query: &str,
top_k: usize,
namespace: Option<&str>,
) -> Result<Vec<MemoryFact>, ActionError>;
}
#[derive(Debug, Clone)]
pub struct SearchHit {
pub title: String,
pub url: String,
pub snippet: String,
}
#[async_trait::async_trait]
pub trait WebSearchBackend: Send + Sync {
async fn search(&self, query: &str, top_k: usize) -> Result<Vec<SearchHit>, ActionError>;
}
#[derive(Debug, Clone)]
pub struct FetchedPage {
pub url: String,
pub title: String,
pub text: String,
}
#[async_trait::async_trait]
pub trait UrlFetchBackend: Send + Sync {
async fn fetch(&self, url: &str) -> Result<FetchedPage, ActionError>;
}
#[derive(Debug, Clone)]
pub struct ScheduleOutcome {
pub schedule_id: String,
pub status: String,
}
#[async_trait::async_trait]
pub trait SchedulingBackend: Send + Sync {
async fn schedule(
&self,
description: &str,
cron: Option<&str>,
namespace: &str,
) -> Result<ScheduleOutcome, ActionError>;
}
#[derive(Debug, Clone)]
pub struct MessageOutcome {
pub delivery_id: String,
pub status: String,
}
#[async_trait::async_trait]
pub trait MessageBackend: Send + Sync {
async fn send(
&self,
channel: &str,
recipient: &str,
content: &str,
namespace: &str,
) -> Result<MessageOutcome, ActionError>;
}
#[async_trait::async_trait]
pub trait NetDiagnosticsBackend: Send + Sync {
async fn check(&self, target: &str) -> Result<String, ActionError>;
async fn trace(&self, target: &str) -> Result<String, ActionError>;
async fn cert(&self, target: &str) -> Result<String, ActionError>;
}
#[async_trait::async_trait]
pub trait SecurityAuditBackend: Send + Sync {
async fn audit(&self) -> Result<String, ActionError>;
}
#[async_trait::async_trait]
pub trait LogAnalysisBackend: Send + Sync {
async fn analyze(&self, system: bool, since: &str, lines: usize)
-> Result<String, ActionError>;
}
#[async_trait::async_trait]
pub trait BaselineBackend: Send + Sync {
async fn capture(&self, label: Option<&str>) -> Result<String, ActionError>;
async fn diff(&self, from: Option<u32>, to: Option<u32>) -> Result<String, ActionError>;
async fn list(&self) -> Result<String, ActionError>;
}
impl ActionResult {
pub fn success(output: impl Into<String>) -> Self {
Self {
success: true,
output: output.into(),
error: None,
}
}
pub fn failure(error: impl Into<String>) -> Self {
Self {
success: false,
output: String::new(),
error: Some(error.into()),
}
}
}
#[derive(Debug, Clone)]
pub struct ActionConfig {
pub command_allowlist: Vec<String>,
pub command_timeout_secs: u64,
pub enable_web_search: bool,
pub enable_scheduling: bool,
pub enable_channel_send: bool,
pub web_search_top_k: usize,
}
impl Default for ActionConfig {
fn default() -> Self {
Self {
command_allowlist: vec![
"ls".to_string(),
"grep".to_string(),
"find".to_string(),
"git".to_string(),
"cargo".to_string(),
"rustc".to_string(),
"pwd".to_string(),
],
command_timeout_secs: 30,
enable_web_search: true,
enable_scheduling: false,
enable_channel_send: false,
web_search_top_k: 5,
}
}
}
pub struct ActionDispatcher {
config: ActionConfig,
memory_backend: Option<Arc<dyn MemoryBackend>>,
web_search_backend: Option<Arc<dyn WebSearchBackend>>,
url_fetch_backend: Option<Arc<dyn UrlFetchBackend>>,
scheduling_backend: Option<Arc<dyn SchedulingBackend>>,
message_backend: Option<Arc<dyn MessageBackend>>,
net_diagnostics_backend: Option<Arc<dyn NetDiagnosticsBackend>>,
security_audit_backend: Option<Arc<dyn SecurityAuditBackend>>,
log_analysis_backend: Option<Arc<dyn LogAnalysisBackend>>,
baseline_backend: Option<Arc<dyn BaselineBackend>>,
sandbox_executor: Option<Arc<dyn sandbox::SandboxExecutor>>,
namespace: String,
}
impl ActionDispatcher {
pub fn new(config: ActionConfig) -> Self {
Self {
config,
memory_backend: None,
web_search_backend: None,
url_fetch_backend: None,
scheduling_backend: None,
message_backend: None,
net_diagnostics_backend: None,
security_audit_backend: None,
log_analysis_backend: None,
baseline_backend: None,
sandbox_executor: None,
namespace: "personal".to_string(),
}
}
pub fn with_memory_backend(
config: ActionConfig,
memory_backend: Arc<dyn MemoryBackend>,
) -> Self {
Self::new(config).with_memory(memory_backend)
}
pub fn with_defaults() -> Self {
Self::new(ActionConfig::default())
}
pub fn with_memory(mut self, memory_backend: Arc<dyn MemoryBackend>) -> Self {
self.memory_backend = Some(memory_backend);
self
}
pub fn with_web_search_backend(mut self, backend: Arc<dyn WebSearchBackend>) -> Self {
self.web_search_backend = Some(backend);
self
}
pub fn with_url_fetch_backend(mut self, backend: Arc<dyn UrlFetchBackend>) -> Self {
self.url_fetch_backend = Some(backend);
self
}
pub fn with_scheduling_backend(mut self, backend: Arc<dyn SchedulingBackend>) -> Self {
self.scheduling_backend = Some(backend);
self
}
pub fn with_message_backend(mut self, backend: Arc<dyn MessageBackend>) -> Self {
self.message_backend = Some(backend);
self
}
pub fn with_net_diagnostics_backend(mut self, backend: Arc<dyn NetDiagnosticsBackend>) -> Self {
self.net_diagnostics_backend = Some(backend);
self
}
pub fn with_security_audit_backend(mut self, backend: Arc<dyn SecurityAuditBackend>) -> Self {
self.security_audit_backend = Some(backend);
self
}
pub fn with_log_analysis_backend(mut self, backend: Arc<dyn LogAnalysisBackend>) -> Self {
self.log_analysis_backend = Some(backend);
self
}
pub fn with_baseline_backend(mut self, backend: Arc<dyn BaselineBackend>) -> Self {
self.baseline_backend = Some(backend);
self
}
pub fn with_sandbox_executor(mut self, executor: Arc<dyn sandbox::SandboxExecutor>) -> Self {
self.sandbox_executor = Some(executor);
self
}
pub fn set_namespace(&mut self, namespace: impl Into<String>) {
self.namespace = namespace.into();
}
fn active_namespace(&self) -> &str {
let trimmed = self.namespace.trim();
if trimmed.is_empty() {
"personal"
} else {
trimmed
}
}
pub async fn dispatch(&self, action: &Action) -> ActionResult {
match action {
Action::ExecuteCommand { command, args } => self.execute_command(command, args).await,
Action::WebSearch { query } => self.web_search(query).await,
Action::ScheduleTask { description, cron } => {
self.schedule_task(description, cron.as_deref()).await
}
Action::StoreFact {
subject,
predicate,
object,
} => self.store_fact(subject, predicate, object).await,
Action::Recall { query } => self.recall(query).await,
Action::SendMessage {
channel,
recipient,
content,
} => self.send_message(channel, recipient, content).await,
Action::NetDiagnostic { probe, target } => self.net_diagnostic(*probe, target).await,
Action::SecurityAudit => self.security_audit().await,
Action::AnalyzeLogs {
system,
since,
lines,
} => self.analyze_logs(*system, since, *lines).await,
Action::BaselineCapture { label } => self.baseline_capture(label.as_deref()).await,
Action::BaselineDiff { from, to } => self.baseline_diff(*from, *to).await,
Action::BaselineList => self.baseline_list().await,
}
}
async fn analyze_logs(&self, system: bool, since: &str, lines: usize) -> ActionResult {
let Some(backend) = self.log_analysis_backend.as_ref() else {
return ActionResult::failure("log-analysis backend not configured in this deployment");
};
match backend.analyze(system, since, lines).await {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn baseline_capture(&self, label: Option<&str>) -> ActionResult {
let Some(backend) = self.baseline_backend.as_ref() else {
return ActionResult::failure("baseline backend not configured in this deployment");
};
match backend.capture(label).await {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn baseline_diff(&self, from: Option<u32>, to: Option<u32>) -> ActionResult {
let Some(backend) = self.baseline_backend.as_ref() else {
return ActionResult::failure("baseline backend not configured in this deployment");
};
match backend.diff(from, to).await {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn baseline_list(&self) -> ActionResult {
let Some(backend) = self.baseline_backend.as_ref() else {
return ActionResult::failure("baseline backend not configured in this deployment");
};
match backend.list().await {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn security_audit(&self) -> ActionResult {
let Some(backend) = self.security_audit_backend.as_ref() else {
return ActionResult::failure(
"security audit backend not configured in this deployment",
);
};
match backend.audit().await {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn net_diagnostic(&self, probe: NetProbe, target: &str) -> ActionResult {
let Some(backend) = self.net_diagnostics_backend.as_ref() else {
return ActionResult::failure(
"network diagnostics backend not configured in this deployment",
);
};
let target = target.trim();
if target.is_empty() {
return ActionResult::failure("net diagnostic needs a target host");
}
let result = match probe {
NetProbe::Check => backend.check(target).await,
NetProbe::Trace => backend.trace(target).await,
NetProbe::Cert => backend.cert(target).await,
};
match result {
Ok(report) => ActionResult::success(report),
Err(e) => ActionResult::failure(e.to_string()),
}
}
async fn execute_command(&self, command: &str, args: &[String]) -> ActionResult {
if !self.config.command_allowlist.iter().any(|c| c == command) {
return ActionResult::failure(format!("Command '{command}' is not in the allowlist"));
}
if let Err(reason) = validation::validate_args(command, args) {
return ActionResult::failure(format!("Blocked: {}", reason));
}
let Some(executor) = self.sandbox_executor.as_ref() else {
tracing::warn!(
command,
"execute_command refused — no sandbox executor wired"
);
return ActionResult::failure(
"Sandbox not configured — refusing to execute commands without isolation",
);
};
let timeout = std::time::Duration::from_secs(self.config.command_timeout_secs);
let sandbox_command = sandbox::SandboxCommand::new(command, args.to_vec())
.with_workdir(std::env::current_dir().unwrap_or_default())
.with_timeout(timeout);
match executor.run(sandbox_command).await {
Ok(outcome) => {
if outcome.exit_code == 0 {
ActionResult::success(outcome.stdout)
} else {
ActionResult::failure(format!(
"Exit code: {}\nstderr: {}",
outcome.exit_code, outcome.stderr
))
}
}
Err(sandbox::SandboxError::Timeout(d)) => {
ActionResult::failure(format!("Command timed out after {:?}", d))
}
Err(sandbox::SandboxError::Forbidden(reason)) => {
ActionResult::failure(format!("Blocked by sandbox: {reason}"))
}
Err(sandbox::SandboxError::PathNotAllowed(p)) => {
ActionResult::failure(format!("Blocked by sandbox (path not allowed): {p}"))
}
Err(e) => ActionResult::failure(format!("Sandbox execution failed: {e}")),
}
}
async fn web_search(&self, query: &str) -> ActionResult {
if !self.config.enable_web_search {
return ActionResult::failure("Web search is disabled by config");
}
let Some(backend) = &self.web_search_backend else {
return ActionResult::failure("Web search backend not configured");
};
let top_k = self.config.web_search_top_k.max(1);
let urls = extract_urls(query);
let cleaned = strip_urls(query);
let search_query = if cleaned.trim().is_empty() {
urls.first()
.and_then(|u| url_hostname(u))
.unwrap_or_else(|| query.to_string())
} else {
cleaned
};
let search_future = backend.search(&search_query, top_k);
let fetch_future = self.fetch_urls(&urls);
let (search_result, fetched) = tokio::join!(search_future, fetch_future);
let mut out = String::new();
match search_result {
Ok(hits) if hits.is_empty() => {
out.push_str(&format!(
"web_search ok query=\"{}\" top_k={} hits=0\n",
search_query, top_k
));
}
Ok(hits) => {
let lines = hits
.iter()
.enumerate()
.map(|(i, hit)| {
format!("{}. {} ({}) - {}", i + 1, hit.title, hit.url, hit.snippet)
})
.collect::<Vec<_>>()
.join("\n");
out.push_str(&format!(
"web_search ok query=\"{}\" top_k={} hits={}\n{}\n",
search_query,
top_k,
hits.len(),
lines
));
}
Err(e) => {
out.push_str(&format!("web_search error: {e}\n"));
if fetched.is_empty() {
return ActionResult::failure(format!("Web search failed: {e}"));
}
}
}
if !fetched.is_empty() {
out.push_str("\nLinked sources (fetched directly):\n");
for (i, page) in fetched.iter().enumerate() {
out.push_str(&format!(
"--- [{}] {} ({})\n{}\n\n",
i + 1,
page.title,
page.url,
page.text
));
}
}
ActionResult::success(out.trim_end().to_string())
}
async fn fetch_urls(&self, urls: &[String]) -> Vec<FetchedPage> {
const MAX_FETCH_URLS: usize = 4;
let Some(fetcher) = &self.url_fetch_backend else {
return Vec::new();
};
if urls.is_empty() {
return Vec::new();
}
let to_fetch: Vec<String> = urls.iter().take(MAX_FETCH_URLS).cloned().collect();
let futures = to_fetch.into_iter().map(|u| {
let fetcher = fetcher.clone();
async move {
match fetcher.fetch(&u).await {
Ok(page) => Some(page),
Err(e) => {
tracing::warn!(url = %u, error = %e, "URL fetch failed");
None
}
}
}
});
futures::future::join_all(futures)
.await
.into_iter()
.flatten()
.collect()
}
async fn schedule_task(&self, description: &str, cron: Option<&str>) -> ActionResult {
if !self.config.enable_scheduling {
return ActionResult::failure("Scheduling is disabled by config");
}
let Some(backend) = &self.scheduling_backend else {
return ActionResult::failure("Scheduling backend not configured");
};
let namespace = self.active_namespace();
match backend.schedule(description, cron, namespace).await {
Ok(outcome) => ActionResult::success(format!(
"schedule_task ok id={} status={} namespace={} cron={} description=\"{}\"",
outcome.schedule_id,
outcome.status,
namespace,
cron.unwrap_or("none"),
description
)),
Err(e) => ActionResult::failure(format!("Schedule task failed: {e}")),
}
}
async fn store_fact(&self, subject: &str, predicate: &str, object: &str) -> ActionResult {
let Some(memory) = &self.memory_backend else {
return ActionResult::failure("Memory backend not available");
};
let namespace = self.active_namespace();
match memory
.store_fact(namespace, "action", subject, predicate, object)
.await
{
Ok(id) => ActionResult::success(format!(
"Fact stored [{}] [{}]: {} {} {}",
id, namespace, subject, predicate, object
)),
Err(e) => ActionResult::failure(format!("Failed to store fact: {e}")),
}
}
async fn recall(&self, query: &str) -> ActionResult {
let Some(memory) = &self.memory_backend else {
return ActionResult::failure("Memory backend not available");
};
let namespace = self.active_namespace();
match memory.recall(query, 10, Some(namespace)).await {
Ok(results) if results.is_empty() => ActionResult::success("No matching facts found."),
Ok(results) => {
let lines = results
.iter()
.map(|r| {
format!(
"[{}] {} {} {} (confidence: {:.2})",
r.namespace, r.subject, r.predicate, r.object, r.confidence
)
})
.collect::<Vec<_>>()
.join("\n");
ActionResult::success(format!("Found {} fact(s):\n{}", results.len(), lines))
}
Err(e) => ActionResult::failure(format!("Recall failed: {e}")),
}
}
async fn send_message(&self, channel: &str, recipient: &str, content: &str) -> ActionResult {
if !self.config.enable_channel_send {
return ActionResult::failure("Channel sending is disabled by config");
}
let Some(backend) = &self.message_backend else {
return ActionResult::failure("Message backend not configured");
};
let namespace = self.active_namespace();
match backend.send(channel, recipient, content, namespace).await {
Ok(outcome) => ActionResult::success(format!(
"send_message ok id={} status={} channel={} recipient={} namespace={}",
outcome.delivery_id, outcome.status, channel, recipient, namespace
)),
Err(e) => ActionResult::failure(format!("Send message failed: {e}")),
}
}
}
pub(crate) fn extract_urls(text: &str) -> Vec<String> {
let mut out = Vec::new();
for token in text.split(|c: char| c.is_whitespace() || c == '<' || c == '>') {
let t = token.trim();
if !(t.starts_with("http://") || t.starts_with("https://")) {
continue;
}
let cleaned = t.trim_end_matches(|c: char| {
matches!(
c,
'.' | ',' | ')' | ']' | '}' | ';' | '\'' | '"' | '!' | '?'
)
});
if cleaned.len() > "https://".len() && !out.iter().any(|u: &String| u == cleaned) {
out.push(cleaned.to_string());
}
}
out
}
pub(crate) fn strip_urls(text: &str) -> String {
text.split_whitespace()
.filter(|t| !t.starts_with("http://") && !t.starts_with("https://"))
.collect::<Vec<_>>()
.join(" ")
}
pub(crate) fn url_hostname(url: &str) -> Option<String> {
let after_scheme = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
let host = after_scheme.split('/').next().unwrap_or(after_scheme);
let host = host.split('@').next_back().unwrap_or(host);
let host = host.split(':').next().unwrap_or(host);
if host.is_empty() {
None
} else {
Some(host.to_string())
}
}