1use std::sync::Arc;
7
8use thiserror::Error;
9
10mod validation;
11
12#[cfg(test)]
13mod tests;
14
15#[derive(Debug, Error)]
19pub enum ActionError {
20 #[error("Command not allowed: {0}")]
21 CommandNotAllowed(String),
22
23 #[error("Command execution failed: {0}")]
24 ExecutionFailed(String),
25
26 #[error("Timeout")]
27 Timeout,
28
29 #[error("Invalid arguments: {0}")]
30 InvalidArguments(String),
31
32 #[error("IO error: {0}")]
33 Io(#[from] std::io::Error),
34}
35
36#[derive(Debug, Clone, PartialEq)]
40pub enum Action {
41 ExecuteCommand { command: String, args: Vec<String> },
43 WebSearch { query: String },
45 ScheduleTask {
47 description: String,
48 cron: Option<String>,
49 },
50 StoreFact {
52 subject: String,
53 predicate: String,
54 object: String,
55 },
56 Recall { query: String },
58 SendMessage {
60 channel: String,
61 recipient: String,
62 content: String,
63 },
64 NetDiagnostic { probe: NetProbe, target: String },
66 SecurityAudit,
68 AnalyzeLogs {
71 system: bool,
72 since: String,
73 lines: usize,
74 },
75 BaselineCapture { label: Option<String> },
77 BaselineDiff { from: Option<u32>, to: Option<u32> },
79 BaselineList,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum NetProbe {
87 Check,
89 Trace,
91 Cert,
93}
94
95#[derive(Debug, Clone)]
97pub struct ActionResult {
98 pub success: bool,
99 pub output: String,
100 pub error: Option<String>,
101}
102
103#[derive(Debug, Clone)]
105pub struct MemoryFact {
106 pub namespace: String,
107 pub subject: String,
108 pub predicate: String,
109 pub object: String,
110 pub confidence: f64,
111}
112
113#[async_trait::async_trait]
115pub trait MemoryBackend: Send + Sync {
116 async fn store_fact(
117 &self,
118 namespace: &str,
119 category: &str,
120 subject: &str,
121 predicate: &str,
122 object: &str,
123 ) -> Result<String, ActionError>;
124
125 async fn recall(
126 &self,
127 query: &str,
128 top_k: usize,
129 namespace: Option<&str>,
130 ) -> Result<Vec<MemoryFact>, ActionError>;
131}
132
133#[derive(Debug, Clone)]
135pub struct SearchHit {
136 pub title: String,
137 pub url: String,
138 pub snippet: String,
139}
140
141#[async_trait::async_trait]
143pub trait WebSearchBackend: Send + Sync {
144 async fn search(&self, query: &str, top_k: usize) -> Result<Vec<SearchHit>, ActionError>;
145}
146
147#[derive(Debug, Clone)]
151pub struct FetchedPage {
152 pub url: String,
153 pub title: String,
154 pub text: String,
155}
156
157#[async_trait::async_trait]
162pub trait UrlFetchBackend: Send + Sync {
163 async fn fetch(&self, url: &str) -> Result<FetchedPage, ActionError>;
167}
168
169#[derive(Debug, Clone)]
171pub struct ScheduleOutcome {
172 pub schedule_id: String,
173 pub status: String,
174}
175
176#[async_trait::async_trait]
178pub trait SchedulingBackend: Send + Sync {
179 async fn schedule(
180 &self,
181 description: &str,
182 cron: Option<&str>,
183 namespace: &str,
184 ) -> Result<ScheduleOutcome, ActionError>;
185}
186
187#[derive(Debug, Clone)]
189pub struct MessageOutcome {
190 pub delivery_id: String,
191 pub status: String,
192}
193
194#[async_trait::async_trait]
196pub trait MessageBackend: Send + Sync {
197 async fn send(
198 &self,
199 channel: &str,
200 recipient: &str,
201 content: &str,
202 namespace: &str,
203 ) -> Result<MessageOutcome, ActionError>;
204}
205
206#[async_trait::async_trait]
212pub trait NetDiagnosticsBackend: Send + Sync {
213 async fn check(&self, target: &str) -> Result<String, ActionError>;
215 async fn trace(&self, target: &str) -> Result<String, ActionError>;
217 async fn cert(&self, target: &str) -> Result<String, ActionError>;
219}
220
221#[async_trait::async_trait]
225pub trait SecurityAuditBackend: Send + Sync {
226 async fn audit(&self) -> Result<String, ActionError>;
227}
228
229#[async_trait::async_trait]
234pub trait LogAnalysisBackend: Send + Sync {
235 async fn analyze(&self, system: bool, since: &str, lines: usize)
236 -> Result<String, ActionError>;
237}
238
239#[async_trait::async_trait]
243pub trait BaselineBackend: Send + Sync {
244 async fn capture(&self, label: Option<&str>) -> Result<String, ActionError>;
245 async fn diff(&self, from: Option<u32>, to: Option<u32>) -> Result<String, ActionError>;
246 async fn list(&self) -> Result<String, ActionError>;
247}
248
249impl ActionResult {
250 pub fn success(output: impl Into<String>) -> Self {
252 Self {
253 success: true,
254 output: output.into(),
255 error: None,
256 }
257 }
258
259 pub fn failure(error: impl Into<String>) -> Self {
261 Self {
262 success: false,
263 output: String::new(),
264 error: Some(error.into()),
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
273pub struct ActionConfig {
274 pub command_allowlist: Vec<String>,
276 pub command_timeout_secs: u64,
278 pub enable_web_search: bool,
280 pub enable_scheduling: bool,
282 pub enable_channel_send: bool,
284 pub web_search_top_k: usize,
286}
287
288impl Default for ActionConfig {
289 fn default() -> Self {
290 Self {
291 command_allowlist: vec![
292 "ls".to_string(),
293 "grep".to_string(),
294 "find".to_string(),
295 "git".to_string(),
296 "cargo".to_string(),
297 "rustc".to_string(),
298 "pwd".to_string(),
299 ],
300 command_timeout_secs: 30,
301 enable_web_search: true,
302 enable_scheduling: false,
303 enable_channel_send: false,
304 web_search_top_k: 5,
305 }
306 }
307}
308
309pub struct ActionDispatcher {
311 config: ActionConfig,
312 memory_backend: Option<Arc<dyn MemoryBackend>>,
313 web_search_backend: Option<Arc<dyn WebSearchBackend>>,
314 url_fetch_backend: Option<Arc<dyn UrlFetchBackend>>,
315 scheduling_backend: Option<Arc<dyn SchedulingBackend>>,
316 message_backend: Option<Arc<dyn MessageBackend>>,
317 net_diagnostics_backend: Option<Arc<dyn NetDiagnosticsBackend>>,
318 security_audit_backend: Option<Arc<dyn SecurityAuditBackend>>,
319 log_analysis_backend: Option<Arc<dyn LogAnalysisBackend>>,
320 baseline_backend: Option<Arc<dyn BaselineBackend>>,
321 sandbox_executor: Option<Arc<dyn sandbox::SandboxExecutor>>,
325 namespace: String,
326}
327
328impl ActionDispatcher {
329 pub fn new(config: ActionConfig) -> Self {
331 Self {
332 config,
333 memory_backend: None,
334 web_search_backend: None,
335 url_fetch_backend: None,
336 scheduling_backend: None,
337 message_backend: None,
338 net_diagnostics_backend: None,
339 security_audit_backend: None,
340 log_analysis_backend: None,
341 baseline_backend: None,
342 sandbox_executor: None,
343 namespace: "personal".to_string(),
344 }
345 }
346
347 pub fn with_memory_backend(
349 config: ActionConfig,
350 memory_backend: Arc<dyn MemoryBackend>,
351 ) -> Self {
352 Self::new(config).with_memory(memory_backend)
353 }
354
355 pub fn with_defaults() -> Self {
357 Self::new(ActionConfig::default())
358 }
359
360 pub fn with_memory(mut self, memory_backend: Arc<dyn MemoryBackend>) -> Self {
362 self.memory_backend = Some(memory_backend);
363 self
364 }
365
366 pub fn with_web_search_backend(mut self, backend: Arc<dyn WebSearchBackend>) -> Self {
368 self.web_search_backend = Some(backend);
369 self
370 }
371
372 pub fn with_url_fetch_backend(mut self, backend: Arc<dyn UrlFetchBackend>) -> Self {
377 self.url_fetch_backend = Some(backend);
378 self
379 }
380
381 pub fn with_scheduling_backend(mut self, backend: Arc<dyn SchedulingBackend>) -> Self {
383 self.scheduling_backend = Some(backend);
384 self
385 }
386
387 pub fn with_message_backend(mut self, backend: Arc<dyn MessageBackend>) -> Self {
389 self.message_backend = Some(backend);
390 self
391 }
392
393 pub fn with_net_diagnostics_backend(mut self, backend: Arc<dyn NetDiagnosticsBackend>) -> Self {
395 self.net_diagnostics_backend = Some(backend);
396 self
397 }
398
399 pub fn with_security_audit_backend(mut self, backend: Arc<dyn SecurityAuditBackend>) -> Self {
401 self.security_audit_backend = Some(backend);
402 self
403 }
404
405 pub fn with_log_analysis_backend(mut self, backend: Arc<dyn LogAnalysisBackend>) -> Self {
407 self.log_analysis_backend = Some(backend);
408 self
409 }
410
411 pub fn with_baseline_backend(mut self, backend: Arc<dyn BaselineBackend>) -> Self {
413 self.baseline_backend = Some(backend);
414 self
415 }
416
417 pub fn with_sandbox_executor(mut self, executor: Arc<dyn sandbox::SandboxExecutor>) -> Self {
421 self.sandbox_executor = Some(executor);
422 self
423 }
424
425 pub fn set_namespace(&mut self, namespace: impl Into<String>) {
427 self.namespace = namespace.into();
428 }
429
430 fn active_namespace(&self) -> &str {
431 let trimmed = self.namespace.trim();
432 if trimmed.is_empty() {
433 "personal"
434 } else {
435 trimmed
436 }
437 }
438
439 pub async fn dispatch(&self, action: &Action) -> ActionResult {
441 match action {
442 Action::ExecuteCommand { command, args } => self.execute_command(command, args).await,
443 Action::WebSearch { query } => self.web_search(query).await,
444 Action::ScheduleTask { description, cron } => {
445 self.schedule_task(description, cron.as_deref()).await
446 }
447 Action::StoreFact {
448 subject,
449 predicate,
450 object,
451 } => self.store_fact(subject, predicate, object).await,
452 Action::Recall { query } => self.recall(query).await,
453 Action::SendMessage {
454 channel,
455 recipient,
456 content,
457 } => self.send_message(channel, recipient, content).await,
458 Action::NetDiagnostic { probe, target } => self.net_diagnostic(*probe, target).await,
459 Action::SecurityAudit => self.security_audit().await,
460 Action::AnalyzeLogs {
461 system,
462 since,
463 lines,
464 } => self.analyze_logs(*system, since, *lines).await,
465 Action::BaselineCapture { label } => self.baseline_capture(label.as_deref()).await,
466 Action::BaselineDiff { from, to } => self.baseline_diff(*from, *to).await,
467 Action::BaselineList => self.baseline_list().await,
468 }
469 }
470
471 async fn analyze_logs(&self, system: bool, since: &str, lines: usize) -> ActionResult {
474 let Some(backend) = self.log_analysis_backend.as_ref() else {
475 return ActionResult::failure("log-analysis backend not configured in this deployment");
476 };
477 match backend.analyze(system, since, lines).await {
478 Ok(report) => ActionResult::success(report),
479 Err(e) => ActionResult::failure(e.to_string()),
480 }
481 }
482
483 async fn baseline_capture(&self, label: Option<&str>) -> ActionResult {
485 let Some(backend) = self.baseline_backend.as_ref() else {
486 return ActionResult::failure("baseline backend not configured in this deployment");
487 };
488 match backend.capture(label).await {
489 Ok(report) => ActionResult::success(report),
490 Err(e) => ActionResult::failure(e.to_string()),
491 }
492 }
493
494 async fn baseline_diff(&self, from: Option<u32>, to: Option<u32>) -> ActionResult {
496 let Some(backend) = self.baseline_backend.as_ref() else {
497 return ActionResult::failure("baseline backend not configured in this deployment");
498 };
499 match backend.diff(from, to).await {
500 Ok(report) => ActionResult::success(report),
501 Err(e) => ActionResult::failure(e.to_string()),
502 }
503 }
504
505 async fn baseline_list(&self) -> ActionResult {
507 let Some(backend) = self.baseline_backend.as_ref() else {
508 return ActionResult::failure("baseline backend not configured in this deployment");
509 };
510 match backend.list().await {
511 Ok(report) => ActionResult::success(report),
512 Err(e) => ActionResult::failure(e.to_string()),
513 }
514 }
515
516 async fn security_audit(&self) -> ActionResult {
520 let Some(backend) = self.security_audit_backend.as_ref() else {
521 return ActionResult::failure(
522 "security audit backend not configured in this deployment",
523 );
524 };
525 match backend.audit().await {
526 Ok(report) => ActionResult::success(report),
527 Err(e) => ActionResult::failure(e.to_string()),
528 }
529 }
530
531 async fn net_diagnostic(&self, probe: NetProbe, target: &str) -> ActionResult {
535 let Some(backend) = self.net_diagnostics_backend.as_ref() else {
536 return ActionResult::failure(
537 "network diagnostics backend not configured in this deployment",
538 );
539 };
540 let target = target.trim();
541 if target.is_empty() {
542 return ActionResult::failure("net diagnostic needs a target host");
543 }
544 let result = match probe {
545 NetProbe::Check => backend.check(target).await,
546 NetProbe::Trace => backend.trace(target).await,
547 NetProbe::Cert => backend.cert(target).await,
548 };
549 match result {
550 Ok(report) => ActionResult::success(report),
551 Err(e) => ActionResult::failure(e.to_string()),
552 }
553 }
554
555 async fn execute_command(&self, command: &str, args: &[String]) -> ActionResult {
566 if !self.config.command_allowlist.iter().any(|c| c == command) {
567 return ActionResult::failure(format!("Command '{command}' is not in the allowlist"));
568 }
569
570 if let Err(reason) = validation::validate_args(command, args) {
571 return ActionResult::failure(format!("Blocked: {}", reason));
572 }
573
574 let Some(executor) = self.sandbox_executor.as_ref() else {
575 tracing::warn!(
576 command,
577 "execute_command refused — no sandbox executor wired"
578 );
579 return ActionResult::failure(
580 "Sandbox not configured — refusing to execute commands without isolation",
581 );
582 };
583
584 let timeout = std::time::Duration::from_secs(self.config.command_timeout_secs);
585 let sandbox_command = sandbox::SandboxCommand::new(command, args.to_vec())
586 .with_workdir(std::env::current_dir().unwrap_or_default())
587 .with_timeout(timeout);
588
589 match executor.run(sandbox_command).await {
590 Ok(outcome) => {
591 if outcome.exit_code == 0 {
592 ActionResult::success(outcome.stdout)
593 } else {
594 ActionResult::failure(format!(
595 "Exit code: {}\nstderr: {}",
596 outcome.exit_code, outcome.stderr
597 ))
598 }
599 }
600 Err(sandbox::SandboxError::Timeout(d)) => {
601 ActionResult::failure(format!("Command timed out after {:?}", d))
602 }
603 Err(sandbox::SandboxError::Forbidden(reason)) => {
604 ActionResult::failure(format!("Blocked by sandbox: {reason}"))
605 }
606 Err(sandbox::SandboxError::PathNotAllowed(p)) => {
607 ActionResult::failure(format!("Blocked by sandbox (path not allowed): {p}"))
608 }
609 Err(e) => ActionResult::failure(format!("Sandbox execution failed: {e}")),
610 }
611 }
612
613 async fn web_search(&self, query: &str) -> ActionResult {
618 if !self.config.enable_web_search {
619 return ActionResult::failure("Web search is disabled by config");
620 }
621 let Some(backend) = &self.web_search_backend else {
622 return ActionResult::failure("Web search backend not configured");
623 };
624 let top_k = self.config.web_search_top_k.max(1);
625 let urls = extract_urls(query);
626
627 let cleaned = strip_urls(query);
633 let search_query = if cleaned.trim().is_empty() {
634 urls.first()
635 .and_then(|u| url_hostname(u))
636 .unwrap_or_else(|| query.to_string())
637 } else {
638 cleaned
639 };
640
641 let search_future = backend.search(&search_query, top_k);
642 let fetch_future = self.fetch_urls(&urls);
643 let (search_result, fetched) = tokio::join!(search_future, fetch_future);
644
645 let mut out = String::new();
646 match search_result {
647 Ok(hits) if hits.is_empty() => {
648 out.push_str(&format!(
649 "web_search ok query=\"{}\" top_k={} hits=0\n",
650 search_query, top_k
651 ));
652 }
653 Ok(hits) => {
654 let lines = hits
655 .iter()
656 .enumerate()
657 .map(|(i, hit)| {
658 format!("{}. {} ({}) - {}", i + 1, hit.title, hit.url, hit.snippet)
659 })
660 .collect::<Vec<_>>()
661 .join("\n");
662 out.push_str(&format!(
663 "web_search ok query=\"{}\" top_k={} hits={}\n{}\n",
664 search_query,
665 top_k,
666 hits.len(),
667 lines
668 ));
669 }
670 Err(e) => {
671 out.push_str(&format!("web_search error: {e}\n"));
676 if fetched.is_empty() {
677 return ActionResult::failure(format!("Web search failed: {e}"));
678 }
679 }
680 }
681
682 if !fetched.is_empty() {
683 out.push_str("\nLinked sources (fetched directly):\n");
684 for (i, page) in fetched.iter().enumerate() {
685 out.push_str(&format!(
686 "--- [{}] {} ({})\n{}\n\n",
687 i + 1,
688 page.title,
689 page.url,
690 page.text
691 ));
692 }
693 }
694
695 ActionResult::success(out.trim_end().to_string())
696 }
697
698 async fn fetch_urls(&self, urls: &[String]) -> Vec<FetchedPage> {
702 const MAX_FETCH_URLS: usize = 4;
703 let Some(fetcher) = &self.url_fetch_backend else {
704 return Vec::new();
705 };
706 if urls.is_empty() {
707 return Vec::new();
708 }
709 let to_fetch: Vec<String> = urls.iter().take(MAX_FETCH_URLS).cloned().collect();
710 let futures = to_fetch.into_iter().map(|u| {
711 let fetcher = fetcher.clone();
712 async move {
713 match fetcher.fetch(&u).await {
714 Ok(page) => Some(page),
715 Err(e) => {
716 tracing::warn!(url = %u, error = %e, "URL fetch failed");
717 None
718 }
719 }
720 }
721 });
722 futures::future::join_all(futures)
723 .await
724 .into_iter()
725 .flatten()
726 .collect()
727 }
728
729 async fn schedule_task(&self, description: &str, cron: Option<&str>) -> ActionResult {
731 if !self.config.enable_scheduling {
732 return ActionResult::failure("Scheduling is disabled by config");
733 }
734 let Some(backend) = &self.scheduling_backend else {
735 return ActionResult::failure("Scheduling backend not configured");
736 };
737 let namespace = self.active_namespace();
738 match backend.schedule(description, cron, namespace).await {
739 Ok(outcome) => ActionResult::success(format!(
740 "schedule_task ok id={} status={} namespace={} cron={} description=\"{}\"",
741 outcome.schedule_id,
742 outcome.status,
743 namespace,
744 cron.unwrap_or("none"),
745 description
746 )),
747 Err(e) => ActionResult::failure(format!("Schedule task failed: {e}")),
748 }
749 }
750
751 async fn store_fact(&self, subject: &str, predicate: &str, object: &str) -> ActionResult {
753 let Some(memory) = &self.memory_backend else {
754 return ActionResult::failure("Memory backend not available");
755 };
756 let namespace = self.active_namespace();
757
758 match memory
759 .store_fact(namespace, "action", subject, predicate, object)
760 .await
761 {
762 Ok(id) => ActionResult::success(format!(
763 "Fact stored [{}] [{}]: {} {} {}",
764 id, namespace, subject, predicate, object
765 )),
766 Err(e) => ActionResult::failure(format!("Failed to store fact: {e}")),
767 }
768 }
769
770 async fn recall(&self, query: &str) -> ActionResult {
772 let Some(memory) = &self.memory_backend else {
773 return ActionResult::failure("Memory backend not available");
774 };
775 let namespace = self.active_namespace();
776
777 match memory.recall(query, 10, Some(namespace)).await {
778 Ok(results) if results.is_empty() => ActionResult::success("No matching facts found."),
779 Ok(results) => {
780 let lines = results
781 .iter()
782 .map(|r| {
783 format!(
784 "[{}] {} {} {} (confidence: {:.2})",
785 r.namespace, r.subject, r.predicate, r.object, r.confidence
786 )
787 })
788 .collect::<Vec<_>>()
789 .join("\n");
790 ActionResult::success(format!("Found {} fact(s):\n{}", results.len(), lines))
791 }
792 Err(e) => ActionResult::failure(format!("Recall failed: {e}")),
793 }
794 }
795
796 async fn send_message(&self, channel: &str, recipient: &str, content: &str) -> ActionResult {
798 if !self.config.enable_channel_send {
799 return ActionResult::failure("Channel sending is disabled by config");
800 }
801 let Some(backend) = &self.message_backend else {
802 return ActionResult::failure("Message backend not configured");
803 };
804 let namespace = self.active_namespace();
805 match backend.send(channel, recipient, content, namespace).await {
806 Ok(outcome) => ActionResult::success(format!(
807 "send_message ok id={} status={} channel={} recipient={} namespace={}",
808 outcome.delivery_id, outcome.status, channel, recipient, namespace
809 )),
810 Err(e) => ActionResult::failure(format!("Send message failed: {e}")),
811 }
812 }
813}
814
815pub(crate) fn extract_urls(text: &str) -> Vec<String> {
819 let mut out = Vec::new();
820 for token in text.split(|c: char| c.is_whitespace() || c == '<' || c == '>') {
821 let t = token.trim();
822 if !(t.starts_with("http://") || t.starts_with("https://")) {
823 continue;
824 }
825 let cleaned = t.trim_end_matches(|c: char| {
826 matches!(
827 c,
828 '.' | ',' | ')' | ']' | '}' | ';' | '\'' | '"' | '!' | '?'
829 )
830 });
831 if cleaned.len() > "https://".len() && !out.iter().any(|u: &String| u == cleaned) {
832 out.push(cleaned.to_string());
833 }
834 }
835 out
836}
837
838pub(crate) fn strip_urls(text: &str) -> String {
841 text.split_whitespace()
842 .filter(|t| !t.starts_with("http://") && !t.starts_with("https://"))
843 .collect::<Vec<_>>()
844 .join(" ")
845}
846
847pub(crate) fn url_hostname(url: &str) -> Option<String> {
851 let after_scheme = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
852 let host = after_scheme.split('/').next().unwrap_or(after_scheme);
853 let host = host.split('@').next_back().unwrap_or(host);
854 let host = host.split(':').next().unwrap_or(host);
855 if host.is_empty() {
856 None
857 } else {
858 Some(host.to_string())
859 }
860}