Skip to main content

walrus_runtime/
lib.rs

1//! Walrus runtime: agent registry, tool registry, and hook orchestration.
2//!
3//! The [`Runtime`] holds agents in a plain `BTreeMap` with per-agent
4//! `Mutex` for concurrent execution. Tools are stored in a shared
5//! [`ToolRegistry`] behind `Arc<RwLock>` supporting post-startup
6//! registration (e.g. MCP hot-reload).
7
8pub use memory::{InMemory, Memory, NoEmbedder};
9pub use wcore::{
10    AgentConfig, Handler, Hook, ToolRegistry,
11    model::{Message, Request, Response, Role, StreamChunk, Tool},
12};
13
14use anyhow::Result;
15use async_stream::stream;
16use compact_str::CompactString;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{collections::BTreeMap, sync::Arc};
20use tokio::sync::{Mutex, RwLock, mpsc};
21use wcore::AgentEvent;
22
23/// Re-exports of the most commonly used types.
24pub mod prelude {
25    pub use crate::{
26        AgentConfig, Handler, Hook, InMemory, Message, Request, Response, Role, Runtime,
27        StreamChunk, Tool, ToolRegistry,
28    };
29}
30
31/// The walrus runtime — agent registry, tool registry, and hook orchestration.
32///
33/// Each agent is wrapped in a per-agent `Mutex` for concurrent execution.
34/// Tools are stored in a shared `ToolRegistry` behind `Arc<RwLock>`.
35/// `Runtime::new()` is async — it calls `hook.on_register_tools()` during
36/// construction so hooks self-register their tools.
37pub struct Runtime<M: wcore::model::Model, H: Hook> {
38    pub model: M,
39    pub hook: H,
40    agents: BTreeMap<CompactString, Arc<Mutex<wcore::Agent<M>>>>,
41    tools: Arc<RwLock<ToolRegistry>>,
42}
43
44impl<M: wcore::model::Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
45    /// Create a new runtime with the given model and hook backend.
46    ///
47    /// Calls `hook.on_register_tools()` to populate the tool registry before
48    /// returning. All hook crates self-register their tools here.
49    pub async fn new(model: M, hook: H) -> Self {
50        let mut registry = ToolRegistry::new();
51        hook.on_register_tools(&mut registry).await;
52        Self {
53            model,
54            hook,
55            agents: BTreeMap::new(),
56            tools: Arc::new(RwLock::new(registry)),
57        }
58    }
59
60    // --- Tool registry ---
61
62    /// Register a tool with its handler.
63    ///
64    /// Works both before and after wrapping in `Arc` (the registry is
65    /// behind `RwLock`). Used for hot-reload (MCP add/remove/reload).
66    pub async fn register_tool(&self, tool: Tool, handler: Handler) {
67        self.tools.write().await.insert(tool, handler);
68    }
69
70    /// Remove a tool by name. Returns `true` if it existed.
71    pub async fn unregister_tool(&self, name: &str) -> bool {
72        self.tools.write().await.remove(name)
73    }
74
75    /// Atomically replace a set of tools.
76    ///
77    /// Removes `old_names` and inserts `new_tools` under a single write lock
78    /// — no window where agents see partial state.
79    pub async fn replace_tools(
80        &self,
81        old_names: &[CompactString],
82        new_tools: Vec<(Tool, Handler)>,
83    ) {
84        let mut registry = self.tools.write().await;
85        for name in old_names {
86            registry.remove(name);
87        }
88        for (tool, handler) in new_tools {
89            registry.insert(tool, handler);
90        }
91    }
92
93    /// Build a filtered [`ToolRegistry`] snapshot for the named agent.
94    ///
95    /// Reads the agent's `config.tools` list and filters the shared registry.
96    /// If the list is empty, all registered tools are included.
97    async fn dispatcher_for(&self, agent: &str) -> ToolRegistry {
98        let registry = self.tools.read().await;
99
100        let filter: Vec<CompactString> = self
101            .agents
102            .get(agent)
103            .and_then(|m| m.try_lock().ok())
104            .map(|g| g.config.tools.to_vec())
105            .unwrap_or_default();
106
107        registry.filtered_snapshot(&filter)
108    }
109
110    // --- Agent registry ---
111
112    /// Register an agent from its configuration.
113    ///
114    /// Must be called before wrapping the runtime in `Arc`. Calls
115    /// `hook.on_build_agent(config)` to enrich the config before building.
116    pub fn add_agent(&mut self, config: AgentConfig) {
117        let config = self.hook.on_build_agent(config);
118        let name = config.name.clone();
119        let agent = wcore::AgentBuilder::new(self.model.clone())
120            .config(config)
121            .build();
122        self.agents.insert(name, Arc::new(Mutex::new(agent)));
123    }
124
125    /// Get a registered agent's config by name (cloned).
126    pub async fn agent(&self, name: &str) -> Option<AgentConfig> {
127        let mutex = self.agents.get(name)?;
128        Some(mutex.lock().await.config.clone())
129    }
130
131    /// Get all registered agent configs (cloned, alphabetical order).
132    pub async fn agents(&self) -> Vec<AgentConfig> {
133        let mut configs = Vec::with_capacity(self.agents.len());
134        for mutex in self.agents.values() {
135            configs.push(mutex.lock().await.config.clone());
136        }
137        configs
138    }
139
140    /// Get the per-agent mutex by name.
141    pub fn agent_mutex(&self, name: &str) -> Option<Arc<Mutex<wcore::Agent<M>>>> {
142        self.agents.get(name).cloned()
143    }
144
145    /// Clear the conversation history for a named agent.
146    pub async fn clear_session(&self, agent: &str) {
147        if let Some(mutex) = self.agents.get(agent) {
148            mutex.lock().await.clear_history();
149        }
150    }
151
152    // --- Execution ---
153
154    /// Send a message to an agent and run to completion.
155    ///
156    /// Builds a dispatcher snapshot from the tool registry, locks the per-agent
157    /// mutex, pushes the user message, delegates to `agent.run()`, and forwards
158    /// all events to `hook.on_event()`.
159    pub async fn send_to(&self, agent: &str, content: &str) -> Result<wcore::AgentResponse> {
160        let mutex = self
161            .agents
162            .get(agent)
163            .ok_or_else(|| anyhow::anyhow!("agent '{agent}' not registered"))?;
164
165        let dispatcher = self.dispatcher_for(agent).await;
166        let mut guard = mutex.lock().await;
167        guard.push_message(Message::user(content));
168
169        let (tx, mut rx) = mpsc::unbounded_channel();
170        let response = guard.run(&dispatcher, tx).await;
171
172        while let Ok(event) = rx.try_recv() {
173            self.hook.on_event(agent, &event);
174        }
175
176        Ok(response)
177    }
178
179    /// Send a message to an agent and stream response events.
180    ///
181    /// Builds a dispatcher snapshot from the tool registry, locks the per-agent
182    /// mutex, delegates to `agent.run_stream()`, and forwards each event to
183    /// `hook.on_event()`.
184    pub fn stream_to<'a>(
185        &'a self,
186        agent: &'a str,
187        content: &'a str,
188    ) -> impl Stream<Item = AgentEvent> + 'a {
189        stream! {
190            let mutex = match self.agents.get(agent) {
191                Some(m) => m,
192                None => {
193                    let resp = wcore::AgentResponse {
194                        final_response: None,
195                        iterations: 0,
196                        stop_reason: wcore::AgentStopReason::Error(
197                            format!("agent '{agent}' not registered"),
198                        ),
199                        steps: vec![],
200                    };
201                    yield AgentEvent::Done(resp);
202                    return;
203                }
204            };
205
206            let dispatcher = self.dispatcher_for(agent).await;
207            let mut guard = mutex.lock().await;
208            guard.push_message(Message::user(content));
209
210            let mut event_stream = std::pin::pin!(guard.run_stream(&dispatcher));
211            while let Some(event) = event_stream.next().await {
212                self.hook.on_event(agent, &event);
213                yield event;
214            }
215        }
216    }
217}