1use std::sync::Arc;
7
8use thiserror::Error;
9
10mod tooling;
11mod validation;
12
13#[cfg(test)]
14mod tests;
15
16pub use tooling::{get_available_tools, ToolDefinition};
17
18#[derive(Debug, Error)]
22pub enum ActionError {
23 #[error("Command not allowed: {0}")]
24 CommandNotAllowed(String),
25
26 #[error("Command execution failed: {0}")]
27 ExecutionFailed(String),
28
29 #[error("Timeout")]
30 Timeout,
31
32 #[error("Invalid arguments: {0}")]
33 InvalidArguments(String),
34
35 #[error("IO error: {0}")]
36 Io(#[from] std::io::Error),
37}
38
39#[derive(Debug, Clone, PartialEq)]
43pub enum Action {
44 ExecuteCommand { command: String, args: Vec<String> },
46 WebSearch { query: String },
48 ScheduleTask {
50 description: String,
51 cron: Option<String>,
52 },
53 StoreFact {
55 subject: String,
56 predicate: String,
57 object: String,
58 },
59 Recall { query: String },
61 SendMessage {
63 channel: String,
64 recipient: String,
65 content: String,
66 },
67}
68
69#[derive(Debug, Clone)]
71pub struct ActionResult {
72 pub success: bool,
73 pub output: String,
74 pub error: Option<String>,
75}
76
77#[derive(Debug, Clone)]
79pub struct MemoryFact {
80 pub namespace: String,
81 pub subject: String,
82 pub predicate: String,
83 pub object: String,
84 pub confidence: f64,
85}
86
87#[async_trait::async_trait]
89pub trait MemoryBackend: Send + Sync {
90 async fn store_fact(
91 &self,
92 namespace: &str,
93 category: &str,
94 subject: &str,
95 predicate: &str,
96 object: &str,
97 ) -> Result<String, ActionError>;
98
99 async fn recall(
100 &self,
101 query: &str,
102 top_k: usize,
103 namespace: Option<&str>,
104 ) -> Result<Vec<MemoryFact>, ActionError>;
105}
106
107#[derive(Debug, Clone)]
109pub struct SearchHit {
110 pub title: String,
111 pub url: String,
112 pub snippet: String,
113}
114
115#[async_trait::async_trait]
117pub trait WebSearchBackend: Send + Sync {
118 async fn search(&self, query: &str, top_k: usize) -> Result<Vec<SearchHit>, ActionError>;
119}
120
121#[derive(Debug, Clone)]
125pub struct FetchedPage {
126 pub url: String,
127 pub title: String,
128 pub text: String,
129}
130
131#[async_trait::async_trait]
136pub trait UrlFetchBackend: Send + Sync {
137 async fn fetch(&self, url: &str) -> Result<FetchedPage, ActionError>;
141}
142
143#[derive(Debug, Clone)]
145pub struct ScheduleOutcome {
146 pub schedule_id: String,
147 pub status: String,
148}
149
150#[async_trait::async_trait]
152pub trait SchedulingBackend: Send + Sync {
153 async fn schedule(
154 &self,
155 description: &str,
156 cron: Option<&str>,
157 namespace: &str,
158 ) -> Result<ScheduleOutcome, ActionError>;
159}
160
161#[derive(Debug, Clone)]
163pub struct MessageOutcome {
164 pub delivery_id: String,
165 pub status: String,
166}
167
168#[async_trait::async_trait]
170pub trait MessageBackend: Send + Sync {
171 async fn send(
172 &self,
173 channel: &str,
174 recipient: &str,
175 content: &str,
176 namespace: &str,
177 ) -> Result<MessageOutcome, ActionError>;
178}
179
180impl ActionResult {
181 pub fn success(output: impl Into<String>) -> Self {
183 Self {
184 success: true,
185 output: output.into(),
186 error: None,
187 }
188 }
189
190 pub fn failure(error: impl Into<String>) -> Self {
192 Self {
193 success: false,
194 output: String::new(),
195 error: Some(error.into()),
196 }
197 }
198}
199
200#[derive(Debug, Clone)]
204pub struct ActionConfig {
205 pub command_allowlist: Vec<String>,
207 pub command_timeout_secs: u64,
209 pub enable_web_search: bool,
211 pub enable_scheduling: bool,
213 pub enable_channel_send: bool,
215 pub web_search_top_k: usize,
217}
218
219impl Default for ActionConfig {
220 fn default() -> Self {
221 Self {
222 command_allowlist: vec![
223 "ls".to_string(),
224 "grep".to_string(),
225 "find".to_string(),
226 "git".to_string(),
227 "cargo".to_string(),
228 "rustc".to_string(),
229 "pwd".to_string(),
230 ],
231 command_timeout_secs: 30,
232 enable_web_search: true,
233 enable_scheduling: false,
234 enable_channel_send: false,
235 web_search_top_k: 5,
236 }
237 }
238}
239
240pub struct ActionDispatcher {
242 config: ActionConfig,
243 memory_backend: Option<Arc<dyn MemoryBackend>>,
244 web_search_backend: Option<Arc<dyn WebSearchBackend>>,
245 url_fetch_backend: Option<Arc<dyn UrlFetchBackend>>,
246 scheduling_backend: Option<Arc<dyn SchedulingBackend>>,
247 message_backend: Option<Arc<dyn MessageBackend>>,
248 namespace: String,
249}
250
251impl ActionDispatcher {
252 pub fn new(config: ActionConfig) -> Self {
254 Self {
255 config,
256 memory_backend: None,
257 web_search_backend: None,
258 url_fetch_backend: None,
259 scheduling_backend: None,
260 message_backend: None,
261 namespace: "personal".to_string(),
262 }
263 }
264
265 pub fn with_memory_backend(
267 config: ActionConfig,
268 memory_backend: Arc<dyn MemoryBackend>,
269 ) -> Self {
270 Self::new(config).with_memory(memory_backend)
271 }
272
273 pub fn with_defaults() -> Self {
275 Self::new(ActionConfig::default())
276 }
277
278 pub fn with_memory(mut self, memory_backend: Arc<dyn MemoryBackend>) -> Self {
280 self.memory_backend = Some(memory_backend);
281 self
282 }
283
284 pub fn with_web_search_backend(mut self, backend: Arc<dyn WebSearchBackend>) -> Self {
286 self.web_search_backend = Some(backend);
287 self
288 }
289
290 pub fn with_url_fetch_backend(mut self, backend: Arc<dyn UrlFetchBackend>) -> Self {
295 self.url_fetch_backend = Some(backend);
296 self
297 }
298
299 pub fn with_scheduling_backend(mut self, backend: Arc<dyn SchedulingBackend>) -> Self {
301 self.scheduling_backend = Some(backend);
302 self
303 }
304
305 pub fn with_message_backend(mut self, backend: Arc<dyn MessageBackend>) -> Self {
307 self.message_backend = Some(backend);
308 self
309 }
310
311 pub fn set_namespace(&mut self, namespace: impl Into<String>) {
313 self.namespace = namespace.into();
314 }
315
316 fn active_namespace(&self) -> &str {
317 let trimmed = self.namespace.trim();
318 if trimmed.is_empty() {
319 "personal"
320 } else {
321 trimmed
322 }
323 }
324
325 pub async fn dispatch(&self, action: &Action) -> ActionResult {
327 match action {
328 Action::ExecuteCommand { command, args } => self.execute_command(command, args).await,
329 Action::WebSearch { query } => self.web_search(query).await,
330 Action::ScheduleTask { description, cron } => {
331 self.schedule_task(description, cron.as_deref()).await
332 }
333 Action::StoreFact {
334 subject,
335 predicate,
336 object,
337 } => self.store_fact(subject, predicate, object).await,
338 Action::Recall { query } => self.recall(query).await,
339 Action::SendMessage {
340 channel,
341 recipient,
342 content,
343 } => self.send_message(channel, recipient, content).await,
344 }
345 }
346
347 async fn execute_command(&self, command: &str, args: &[String]) -> ActionResult {
349 if !self.config.command_allowlist.iter().any(|c| c == command) {
351 return ActionResult::failure(format!("Command '{command}' is not in the allowlist"));
352 }
353
354 if let Err(reason) = validation::validate_args(command, args) {
356 return ActionResult::failure(format!("Blocked: {}", reason));
357 }
358
359 let mut cmd = tokio::process::Command::new(command);
361 cmd.args(args)
362 .stdout(std::process::Stdio::piped())
363 .stderr(std::process::Stdio::piped());
364
365 match tokio::time::timeout(
367 tokio::time::Duration::from_secs(self.config.command_timeout_secs),
368 cmd.output(),
369 )
370 .await
371 {
372 Ok(Ok(output)) => {
373 let stdout = String::from_utf8_lossy(&output.stdout);
374 let stderr = String::from_utf8_lossy(&output.stderr);
375
376 if output.status.success() {
377 ActionResult::success(stdout.to_string())
378 } else {
379 ActionResult::failure(format!(
380 "Exit code: {:?}\nstderr: {}",
381 output.status.code(),
382 stderr
383 ))
384 }
385 }
386 Ok(Err(e)) => ActionResult::failure(format!("Failed to execute: {}", e)),
387 Err(_) => ActionResult::failure("Command timed out"),
388 }
389 }
390
391 async fn web_search(&self, query: &str) -> ActionResult {
396 if !self.config.enable_web_search {
397 return ActionResult::failure("Web search is disabled by config");
398 }
399 let Some(backend) = &self.web_search_backend else {
400 return ActionResult::failure("Web search backend not configured");
401 };
402 let top_k = self.config.web_search_top_k.max(1);
403 let urls = extract_urls(query);
404
405 let cleaned = strip_urls(query);
411 let search_query = if cleaned.trim().is_empty() {
412 urls.first()
413 .and_then(|u| url_hostname(u))
414 .unwrap_or_else(|| query.to_string())
415 } else {
416 cleaned
417 };
418
419 let search_future = backend.search(&search_query, top_k);
420 let fetch_future = self.fetch_urls(&urls);
421 let (search_result, fetched) = tokio::join!(search_future, fetch_future);
422
423 let mut out = String::new();
424 match search_result {
425 Ok(hits) if hits.is_empty() => {
426 out.push_str(&format!(
427 "web_search ok query=\"{}\" top_k={} hits=0\n",
428 search_query, top_k
429 ));
430 }
431 Ok(hits) => {
432 let lines = hits
433 .iter()
434 .enumerate()
435 .map(|(i, hit)| {
436 format!("{}. {} ({}) - {}", i + 1, hit.title, hit.url, hit.snippet)
437 })
438 .collect::<Vec<_>>()
439 .join("\n");
440 out.push_str(&format!(
441 "web_search ok query=\"{}\" top_k={} hits={}\n{}\n",
442 search_query,
443 top_k,
444 hits.len(),
445 lines
446 ));
447 }
448 Err(e) => {
449 out.push_str(&format!("web_search error: {e}\n"));
454 if fetched.is_empty() {
455 return ActionResult::failure(format!("Web search failed: {e}"));
456 }
457 }
458 }
459
460 if !fetched.is_empty() {
461 out.push_str("\nLinked sources (fetched directly):\n");
462 for (i, page) in fetched.iter().enumerate() {
463 out.push_str(&format!(
464 "--- [{}] {} ({})\n{}\n\n",
465 i + 1,
466 page.title,
467 page.url,
468 page.text
469 ));
470 }
471 }
472
473 ActionResult::success(out.trim_end().to_string())
474 }
475
476 async fn fetch_urls(&self, urls: &[String]) -> Vec<FetchedPage> {
480 const MAX_FETCH_URLS: usize = 4;
481 let Some(fetcher) = &self.url_fetch_backend else {
482 return Vec::new();
483 };
484 if urls.is_empty() {
485 return Vec::new();
486 }
487 let to_fetch: Vec<String> = urls.iter().take(MAX_FETCH_URLS).cloned().collect();
488 let futures = to_fetch.into_iter().map(|u| {
489 let fetcher = fetcher.clone();
490 async move {
491 match fetcher.fetch(&u).await {
492 Ok(page) => Some(page),
493 Err(e) => {
494 tracing::warn!(url = %u, error = %e, "URL fetch failed");
495 None
496 }
497 }
498 }
499 });
500 futures::future::join_all(futures)
501 .await
502 .into_iter()
503 .flatten()
504 .collect()
505 }
506
507 async fn schedule_task(&self, description: &str, cron: Option<&str>) -> ActionResult {
509 if !self.config.enable_scheduling {
510 return ActionResult::failure("Scheduling is disabled by config");
511 }
512 let Some(backend) = &self.scheduling_backend else {
513 return ActionResult::failure("Scheduling backend not configured");
514 };
515 let namespace = self.active_namespace();
516 match backend.schedule(description, cron, namespace).await {
517 Ok(outcome) => ActionResult::success(format!(
518 "schedule_task ok id={} status={} namespace={} cron={} description=\"{}\"",
519 outcome.schedule_id,
520 outcome.status,
521 namespace,
522 cron.unwrap_or("none"),
523 description
524 )),
525 Err(e) => ActionResult::failure(format!("Schedule task failed: {e}")),
526 }
527 }
528
529 async fn store_fact(&self, subject: &str, predicate: &str, object: &str) -> ActionResult {
531 let Some(memory) = &self.memory_backend else {
532 return ActionResult::failure("Memory backend not available");
533 };
534 let namespace = self.active_namespace();
535
536 match memory
537 .store_fact(namespace, "action", subject, predicate, object)
538 .await
539 {
540 Ok(id) => ActionResult::success(format!(
541 "Fact stored [{}] [{}]: {} {} {}",
542 id, namespace, subject, predicate, object
543 )),
544 Err(e) => ActionResult::failure(format!("Failed to store fact: {e}")),
545 }
546 }
547
548 async fn recall(&self, query: &str) -> ActionResult {
550 let Some(memory) = &self.memory_backend else {
551 return ActionResult::failure("Memory backend not available");
552 };
553 let namespace = self.active_namespace();
554
555 match memory.recall(query, 10, Some(namespace)).await {
556 Ok(results) if results.is_empty() => ActionResult::success("No matching facts found."),
557 Ok(results) => {
558 let lines = results
559 .iter()
560 .map(|r| {
561 format!(
562 "[{}] {} {} {} (confidence: {:.2})",
563 r.namespace, r.subject, r.predicate, r.object, r.confidence
564 )
565 })
566 .collect::<Vec<_>>()
567 .join("\n");
568 ActionResult::success(format!("Found {} fact(s):\n{}", results.len(), lines))
569 }
570 Err(e) => ActionResult::failure(format!("Recall failed: {e}")),
571 }
572 }
573
574 async fn send_message(&self, channel: &str, recipient: &str, content: &str) -> ActionResult {
576 if !self.config.enable_channel_send {
577 return ActionResult::failure("Channel sending is disabled by config");
578 }
579 let Some(backend) = &self.message_backend else {
580 return ActionResult::failure("Message backend not configured");
581 };
582 let namespace = self.active_namespace();
583 match backend.send(channel, recipient, content, namespace).await {
584 Ok(outcome) => ActionResult::success(format!(
585 "send_message ok id={} status={} channel={} recipient={} namespace={}",
586 outcome.delivery_id, outcome.status, channel, recipient, namespace
587 )),
588 Err(e) => ActionResult::failure(format!("Send message failed: {e}")),
589 }
590 }
591}
592
593pub(crate) fn extract_urls(text: &str) -> Vec<String> {
597 let mut out = Vec::new();
598 for token in text.split(|c: char| c.is_whitespace() || c == '<' || c == '>') {
599 let t = token.trim();
600 if !(t.starts_with("http://") || t.starts_with("https://")) {
601 continue;
602 }
603 let cleaned = t.trim_end_matches(|c: char| {
604 matches!(
605 c,
606 '.' | ',' | ')' | ']' | '}' | ';' | '\'' | '"' | '!' | '?'
607 )
608 });
609 if cleaned.len() > "https://".len() && !out.iter().any(|u: &String| u == cleaned) {
610 out.push(cleaned.to_string());
611 }
612 }
613 out
614}
615
616pub(crate) fn strip_urls(text: &str) -> String {
619 text.split_whitespace()
620 .filter(|t| !t.starts_with("http://") && !t.starts_with("https://"))
621 .collect::<Vec<_>>()
622 .join(" ")
623}
624
625pub(crate) fn url_hostname(url: &str) -> Option<String> {
629 let after_scheme = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
630 let host = after_scheme.split('/').next().unwrap_or(after_scheme);
631 let host = host.split('@').next_back().unwrap_or(host);
632 let host = host.split(':').next().unwrap_or(host);
633 if host.is_empty() {
634 None
635 } else {
636 Some(host.to_string())
637 }
638}