Skip to main content

brainos_cortex/
actions.rs

1//! Action dispatch — tool execution.
2//!
3//! Dispatches tool calls from LLM: command execution (sandboxed),
4//! web search, scheduling, memory operations, and message sending.
5
6use std::sync::Arc;
7
8use thiserror::Error;
9
10mod tooling;
11mod validation;
12
13#[cfg(test)]
14mod tests;
15
16pub use tooling::{get_available_tools, ToolDefinition};
17
18// ─── Errors ─────────────────────────────────────────────────────────────────
19
20/// Errors from action execution.
21#[derive(Debug, Error)]
22pub enum ActionError {
23    #[error("Command not allowed: {0}")]
24    CommandNotAllowed(String),
25
26    #[error("Command execution failed: {0}")]
27    ExecutionFailed(String),
28
29    #[error("Timeout")]
30    Timeout,
31
32    #[error("Invalid arguments: {0}")]
33    InvalidArguments(String),
34
35    #[error("IO error: {0}")]
36    Io(#[from] std::io::Error),
37}
38
39// ─── Action Types ───────────────────────────────────────────────────────────
40
41/// Available actions/tools.
42#[derive(Debug, Clone, PartialEq)]
43pub enum Action {
44    /// Execute a shell command (sandboxed).
45    ExecuteCommand { command: String, args: Vec<String> },
46    /// Search the web.
47    WebSearch { query: String },
48    /// Schedule a task.
49    ScheduleTask {
50        description: String,
51        cron: Option<String>,
52    },
53    /// Store a fact in semantic memory.
54    StoreFact {
55        subject: String,
56        predicate: String,
57        object: String,
58    },
59    /// Recall from memory.
60    Recall { query: String },
61    /// Send a message to an external endpoint (via protocol adapters).
62    SendMessage {
63        channel: String,
64        recipient: String,
65        content: String,
66    },
67}
68
69/// Result of an action execution.
70#[derive(Debug, Clone)]
71pub struct ActionResult {
72    pub success: bool,
73    pub output: String,
74    pub error: Option<String>,
75}
76
77/// Normalized memory fact used by action backends.
78#[derive(Debug, Clone)]
79pub struct MemoryFact {
80    pub namespace: String,
81    pub subject: String,
82    pub predicate: String,
83    pub object: String,
84    pub confidence: f64,
85}
86
87/// Optional backend that provides real memory read/write operations.
88#[async_trait::async_trait]
89pub trait MemoryBackend: Send + Sync {
90    async fn store_fact(
91        &self,
92        namespace: &str,
93        category: &str,
94        subject: &str,
95        predicate: &str,
96        object: &str,
97    ) -> Result<String, ActionError>;
98
99    async fn recall(
100        &self,
101        query: &str,
102        top_k: usize,
103        namespace: Option<&str>,
104    ) -> Result<Vec<MemoryFact>, ActionError>;
105}
106
107/// Structured web-search hit returned by WebSearchBackend.
108#[derive(Debug, Clone)]
109pub struct SearchHit {
110    pub title: String,
111    pub url: String,
112    pub snippet: String,
113}
114
115/// Optional backend for web search actions.
116#[async_trait::async_trait]
117pub trait WebSearchBackend: Send + Sync {
118    async fn search(&self, query: &str, top_k: usize) -> Result<Vec<SearchHit>, ActionError>;
119}
120
121/// Result of fetching a single URL: cleaned, bounded text content the
122/// LLM can be given as grounding. `text` is plain text — HTML tags and
123/// scripts have been stripped by the backend.
124#[derive(Debug, Clone)]
125pub struct FetchedPage {
126    pub url: String,
127    pub title: String,
128    pub text: String,
129}
130
131/// Optional backend for fetching the body of a URL the user (or an
132/// upstream search hit) handed us. Kept separate from `WebSearchBackend`
133/// so a deployment can have search without fetch (or vice versa) and so
134/// the two contracts can evolve independently.
135#[async_trait::async_trait]
136pub trait UrlFetchBackend: Send + Sync {
137    /// Fetch a single URL. The backend is responsible for timeouts, body
138    /// size caps, and HTML-to-text reduction so the returned page is
139    /// safe to pass straight into an LLM context window.
140    async fn fetch(&self, url: &str) -> Result<FetchedPage, ActionError>;
141}
142
143/// Structured scheduling outcome returned by SchedulingBackend.
144#[derive(Debug, Clone)]
145pub struct ScheduleOutcome {
146    pub schedule_id: String,
147    pub status: String,
148}
149
150/// Optional backend for scheduling actions.
151#[async_trait::async_trait]
152pub trait SchedulingBackend: Send + Sync {
153    async fn schedule(
154        &self,
155        description: &str,
156        cron: Option<&str>,
157        namespace: &str,
158    ) -> Result<ScheduleOutcome, ActionError>;
159}
160
161/// Structured message-delivery outcome returned by MessageBackend.
162#[derive(Debug, Clone)]
163pub struct MessageOutcome {
164    pub delivery_id: String,
165    pub status: String,
166}
167
168/// Optional backend for outbound message actions.
169#[async_trait::async_trait]
170pub trait MessageBackend: Send + Sync {
171    async fn send(
172        &self,
173        channel: &str,
174        recipient: &str,
175        content: &str,
176        namespace: &str,
177    ) -> Result<MessageOutcome, ActionError>;
178}
179
180impl ActionResult {
181    /// Create a successful result.
182    pub fn success(output: impl Into<String>) -> Self {
183        Self {
184            success: true,
185            output: output.into(),
186            error: None,
187        }
188    }
189
190    /// Create a failed result.
191    pub fn failure(error: impl Into<String>) -> Self {
192        Self {
193            success: false,
194            output: String::new(),
195            error: Some(error.into()),
196        }
197    }
198}
199
200// ─── Action Dispatcher ──────────────────────────────────────────────────────
201
202/// Configuration for action execution.
203#[derive(Debug, Clone)]
204pub struct ActionConfig {
205    /// Allowed commands for execution.
206    pub command_allowlist: Vec<String>,
207    /// Timeout for command execution (seconds).
208    pub command_timeout_secs: u64,
209    /// Enable web search.
210    pub enable_web_search: bool,
211    /// Enable scheduling.
212    pub enable_scheduling: bool,
213    /// Enable channel sends.
214    pub enable_channel_send: bool,
215    /// Default number of hits to request from the web search backend.
216    pub web_search_top_k: usize,
217}
218
219impl Default for ActionConfig {
220    fn default() -> Self {
221        Self {
222            command_allowlist: vec![
223                "ls".to_string(),
224                "grep".to_string(),
225                "find".to_string(),
226                "git".to_string(),
227                "cargo".to_string(),
228                "rustc".to_string(),
229                "pwd".to_string(),
230            ],
231            command_timeout_secs: 30,
232            enable_web_search: true,
233            enable_scheduling: false,
234            enable_channel_send: false,
235            web_search_top_k: 5,
236        }
237    }
238}
239
240/// Dispatches actions/tools.
241pub struct ActionDispatcher {
242    config: ActionConfig,
243    memory_backend: Option<Arc<dyn MemoryBackend>>,
244    web_search_backend: Option<Arc<dyn WebSearchBackend>>,
245    url_fetch_backend: Option<Arc<dyn UrlFetchBackend>>,
246    scheduling_backend: Option<Arc<dyn SchedulingBackend>>,
247    message_backend: Option<Arc<dyn MessageBackend>>,
248    namespace: String,
249}
250
251impl ActionDispatcher {
252    /// Create a new dispatcher.
253    pub fn new(config: ActionConfig) -> Self {
254        Self {
255            config,
256            memory_backend: None,
257            web_search_backend: None,
258            url_fetch_backend: None,
259            scheduling_backend: None,
260            message_backend: None,
261            namespace: "personal".to_string(),
262        }
263    }
264
265    /// Create a new dispatcher with a memory backend attached.
266    pub fn with_memory_backend(
267        config: ActionConfig,
268        memory_backend: Arc<dyn MemoryBackend>,
269    ) -> Self {
270        Self::new(config).with_memory(memory_backend)
271    }
272
273    /// Create with default config.
274    pub fn with_defaults() -> Self {
275        Self::new(ActionConfig::default())
276    }
277
278    /// Attach a memory backend.
279    pub fn with_memory(mut self, memory_backend: Arc<dyn MemoryBackend>) -> Self {
280        self.memory_backend = Some(memory_backend);
281        self
282    }
283
284    /// Attach a web-search backend.
285    pub fn with_web_search_backend(mut self, backend: Arc<dyn WebSearchBackend>) -> Self {
286        self.web_search_backend = Some(backend);
287        self
288    }
289
290    /// Attach a URL-fetch backend so user-provided links can be enriched
291    /// inline with web-search results. Optional — without it, URLs in the
292    /// query are still surfaced as part of the search query string but
293    /// not fetched.
294    pub fn with_url_fetch_backend(mut self, backend: Arc<dyn UrlFetchBackend>) -> Self {
295        self.url_fetch_backend = Some(backend);
296        self
297    }
298
299    /// Attach a scheduling backend.
300    pub fn with_scheduling_backend(mut self, backend: Arc<dyn SchedulingBackend>) -> Self {
301        self.scheduling_backend = Some(backend);
302        self
303    }
304
305    /// Attach a message backend.
306    pub fn with_message_backend(mut self, backend: Arc<dyn MessageBackend>) -> Self {
307        self.message_backend = Some(backend);
308        self
309    }
310
311    /// Set the default namespace used by action backends.
312    pub fn set_namespace(&mut self, namespace: impl Into<String>) {
313        self.namespace = namespace.into();
314    }
315
316    fn active_namespace(&self) -> &str {
317        let trimmed = self.namespace.trim();
318        if trimmed.is_empty() {
319            "personal"
320        } else {
321            trimmed
322        }
323    }
324
325    /// Execute an action.
326    pub async fn dispatch(&self, action: &Action) -> ActionResult {
327        match action {
328            Action::ExecuteCommand { command, args } => self.execute_command(command, args).await,
329            Action::WebSearch { query } => self.web_search(query).await,
330            Action::ScheduleTask { description, cron } => {
331                self.schedule_task(description, cron.as_deref()).await
332            }
333            Action::StoreFact {
334                subject,
335                predicate,
336                object,
337            } => self.store_fact(subject, predicate, object).await,
338            Action::Recall { query } => self.recall(query).await,
339            Action::SendMessage {
340                channel,
341                recipient,
342                content,
343            } => self.send_message(channel, recipient, content).await,
344        }
345    }
346
347    /// Execute a sandboxed command.
348    async fn execute_command(&self, command: &str, args: &[String]) -> ActionResult {
349        // Check allowlist
350        if !self.config.command_allowlist.iter().any(|c| c == command) {
351            return ActionResult::failure(format!("Command '{command}' is not in the allowlist"));
352        }
353
354        // Validate arguments against deny-lists
355        if let Err(reason) = validation::validate_args(command, args) {
356            return ActionResult::failure(format!("Blocked: {}", reason));
357        }
358
359        // Build command
360        let mut cmd = tokio::process::Command::new(command);
361        cmd.args(args)
362            .stdout(std::process::Stdio::piped())
363            .stderr(std::process::Stdio::piped());
364
365        // Execute with timeout
366        match tokio::time::timeout(
367            tokio::time::Duration::from_secs(self.config.command_timeout_secs),
368            cmd.output(),
369        )
370        .await
371        {
372            Ok(Ok(output)) => {
373                let stdout = String::from_utf8_lossy(&output.stdout);
374                let stderr = String::from_utf8_lossy(&output.stderr);
375
376                if output.status.success() {
377                    ActionResult::success(stdout.to_string())
378                } else {
379                    ActionResult::failure(format!(
380                        "Exit code: {:?}\nstderr: {}",
381                        output.status.code(),
382                        stderr
383                    ))
384                }
385            }
386            Ok(Err(e)) => ActionResult::failure(format!("Failed to execute: {}", e)),
387            Err(_) => ActionResult::failure("Command timed out"),
388        }
389    }
390
391    /// Search the web. If the query contains URLs, fetch their bodies
392    /// in parallel and append them as a `Linked sources:` block so the
393    /// downstream LLM can ground its answer in what the user actually
394    /// pasted, not just what the search engine surfaced.
395    async fn web_search(&self, query: &str) -> ActionResult {
396        if !self.config.enable_web_search {
397            return ActionResult::failure("Web search is disabled by config");
398        }
399        let Some(backend) = &self.web_search_backend else {
400            return ActionResult::failure("Web search backend not configured");
401        };
402        let top_k = self.config.web_search_top_k.max(1);
403        let urls = extract_urls(query);
404
405        // Strip the URLs out of the search query so we send the engine
406        // the actual semantic question, not a wall of links it will
407        // tokenize into noise. If nothing else remains, fall back to
408        // searching for the first URL's hostname (which usually still
409        // returns the canonical landing page).
410        let cleaned = strip_urls(query);
411        let search_query = if cleaned.trim().is_empty() {
412            urls.first()
413                .and_then(|u| url_hostname(u))
414                .unwrap_or_else(|| query.to_string())
415        } else {
416            cleaned
417        };
418
419        let search_future = backend.search(&search_query, top_k);
420        let fetch_future = self.fetch_urls(&urls);
421        let (search_result, fetched) = tokio::join!(search_future, fetch_future);
422
423        let mut out = String::new();
424        match search_result {
425            Ok(hits) if hits.is_empty() => {
426                out.push_str(&format!(
427                    "web_search ok query=\"{}\" top_k={} hits=0\n",
428                    search_query, top_k
429                ));
430            }
431            Ok(hits) => {
432                let lines = hits
433                    .iter()
434                    .enumerate()
435                    .map(|(i, hit)| {
436                        format!("{}. {} ({}) - {}", i + 1, hit.title, hit.url, hit.snippet)
437                    })
438                    .collect::<Vec<_>>()
439                    .join("\n");
440                out.push_str(&format!(
441                    "web_search ok query=\"{}\" top_k={} hits={}\n{}\n",
442                    search_query,
443                    top_k,
444                    hits.len(),
445                    lines
446                ));
447            }
448            Err(e) => {
449                // Search failure is not fatal if we managed to fetch the
450                // user's pasted URLs — the LLM can still answer from
451                // those. Surface the search error inline so the caller
452                // can see what happened.
453                out.push_str(&format!("web_search error: {e}\n"));
454                if fetched.is_empty() {
455                    return ActionResult::failure(format!("Web search failed: {e}"));
456                }
457            }
458        }
459
460        if !fetched.is_empty() {
461            out.push_str("\nLinked sources (fetched directly):\n");
462            for (i, page) in fetched.iter().enumerate() {
463                out.push_str(&format!(
464                    "--- [{}] {} ({})\n{}\n\n",
465                    i + 1,
466                    page.title,
467                    page.url,
468                    page.text
469                ));
470            }
471        }
472
473        ActionResult::success(out.trim_end().to_string())
474    }
475
476    /// Fetch up to `MAX_FETCH_URLS` URLs in parallel using the configured
477    /// fetch backend. Returns successfully fetched pages only — failures
478    /// are logged and dropped so a single bad URL doesn't block the rest.
479    async fn fetch_urls(&self, urls: &[String]) -> Vec<FetchedPage> {
480        const MAX_FETCH_URLS: usize = 4;
481        let Some(fetcher) = &self.url_fetch_backend else {
482            return Vec::new();
483        };
484        if urls.is_empty() {
485            return Vec::new();
486        }
487        let to_fetch: Vec<String> = urls.iter().take(MAX_FETCH_URLS).cloned().collect();
488        let futures = to_fetch.into_iter().map(|u| {
489            let fetcher = fetcher.clone();
490            async move {
491                match fetcher.fetch(&u).await {
492                    Ok(page) => Some(page),
493                    Err(e) => {
494                        tracing::warn!(url = %u, error = %e, "URL fetch failed");
495                        None
496                    }
497                }
498            }
499        });
500        futures::future::join_all(futures)
501            .await
502            .into_iter()
503            .flatten()
504            .collect()
505    }
506
507    /// Schedule a task.
508    async fn schedule_task(&self, description: &str, cron: Option<&str>) -> ActionResult {
509        if !self.config.enable_scheduling {
510            return ActionResult::failure("Scheduling is disabled by config");
511        }
512        let Some(backend) = &self.scheduling_backend else {
513            return ActionResult::failure("Scheduling backend not configured");
514        };
515        let namespace = self.active_namespace();
516        match backend.schedule(description, cron, namespace).await {
517            Ok(outcome) => ActionResult::success(format!(
518                "schedule_task ok id={} status={} namespace={} cron={} description=\"{}\"",
519                outcome.schedule_id,
520                outcome.status,
521                namespace,
522                cron.unwrap_or("none"),
523                description
524            )),
525            Err(e) => ActionResult::failure(format!("Schedule task failed: {e}")),
526        }
527    }
528
529    /// Store a fact in semantic memory.
530    async fn store_fact(&self, subject: &str, predicate: &str, object: &str) -> ActionResult {
531        let Some(memory) = &self.memory_backend else {
532            return ActionResult::failure("Memory backend not available");
533        };
534        let namespace = self.active_namespace();
535
536        match memory
537            .store_fact(namespace, "action", subject, predicate, object)
538            .await
539        {
540            Ok(id) => ActionResult::success(format!(
541                "Fact stored [{}] [{}]: {} {} {}",
542                id, namespace, subject, predicate, object
543            )),
544            Err(e) => ActionResult::failure(format!("Failed to store fact: {e}")),
545        }
546    }
547
548    /// Recall from memory.
549    async fn recall(&self, query: &str) -> ActionResult {
550        let Some(memory) = &self.memory_backend else {
551            return ActionResult::failure("Memory backend not available");
552        };
553        let namespace = self.active_namespace();
554
555        match memory.recall(query, 10, Some(namespace)).await {
556            Ok(results) if results.is_empty() => ActionResult::success("No matching facts found."),
557            Ok(results) => {
558                let lines = results
559                    .iter()
560                    .map(|r| {
561                        format!(
562                            "[{}] {} {} {} (confidence: {:.2})",
563                            r.namespace, r.subject, r.predicate, r.object, r.confidence
564                        )
565                    })
566                    .collect::<Vec<_>>()
567                    .join("\n");
568                ActionResult::success(format!("Found {} fact(s):\n{}", results.len(), lines))
569            }
570            Err(e) => ActionResult::failure(format!("Recall failed: {e}")),
571        }
572    }
573
574    /// Send a message via channel.
575    async fn send_message(&self, channel: &str, recipient: &str, content: &str) -> ActionResult {
576        if !self.config.enable_channel_send {
577            return ActionResult::failure("Channel sending is disabled by config");
578        }
579        let Some(backend) = &self.message_backend else {
580            return ActionResult::failure("Message backend not configured");
581        };
582        let namespace = self.active_namespace();
583        match backend.send(channel, recipient, content, namespace).await {
584            Ok(outcome) => ActionResult::success(format!(
585                "send_message ok id={} status={} channel={} recipient={} namespace={}",
586                outcome.delivery_id, outcome.status, channel, recipient, namespace
587            )),
588            Err(e) => ActionResult::failure(format!("Send message failed: {e}")),
589        }
590    }
591}
592
593/// Extract `http(s)://` URLs from a free-form text. Strips trailing
594/// punctuation that's almost certainly not part of the URL (`.`, `,`,
595/// `)`, `]`, `}`, `;`, `'`, `"`).
596pub(crate) fn extract_urls(text: &str) -> Vec<String> {
597    let mut out = Vec::new();
598    for token in text.split(|c: char| c.is_whitespace() || c == '<' || c == '>') {
599        let t = token.trim();
600        if !(t.starts_with("http://") || t.starts_with("https://")) {
601            continue;
602        }
603        let cleaned = t.trim_end_matches(|c: char| {
604            matches!(
605                c,
606                '.' | ',' | ')' | ']' | '}' | ';' | '\'' | '"' | '!' | '?'
607            )
608        });
609        if cleaned.len() > "https://".len() && !out.iter().any(|u: &String| u == cleaned) {
610            out.push(cleaned.to_string());
611        }
612    }
613    out
614}
615
616/// Remove `http(s)://...` tokens from `text` so a query passed to a
617/// search engine isn't dominated by the link wall.
618pub(crate) fn strip_urls(text: &str) -> String {
619    text.split_whitespace()
620        .filter(|t| !t.starts_with("http://") && !t.starts_with("https://"))
621        .collect::<Vec<_>>()
622        .join(" ")
623}
624
625/// Best-effort hostname extraction (no `url` crate dependency). Used as
626/// a fallback search query when the user pasted only links and no
627/// surrounding question.
628pub(crate) fn url_hostname(url: &str) -> Option<String> {
629    let after_scheme = url.split_once("://").map(|(_, r)| r).unwrap_or(url);
630    let host = after_scheme.split('/').next().unwrap_or(after_scheme);
631    let host = host.split('@').next_back().unwrap_or(host);
632    let host = host.split(':').next().unwrap_or(host);
633    if host.is_empty() {
634        None
635    } else {
636        Some(host.to_string())
637    }
638}