Skip to main content

walrus_core/runtime/
mod.rs

1//! Runtime — agent registry, schema store, and hook orchestration.
2//!
3//! [`Runtime`] holds agents in a `BTreeMap` with per-agent `Mutex` for
4//! concurrent execution. Tool schemas are registered once at startup via
5//! `hook.on_register_tools()` and stored in a plain [`ToolRegistry`].
6//! Each agent receives a filtered schema snapshot and a [`ToolSender`] at
7//! build time — the runtime holds no handlers or closures.
8
9use crate::{
10    Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11    agent::tool::{ToolRegistry, ToolSender},
12    model::{Message, Model},
13    runtime::hook::Hook,
14};
15use anyhow::Result;
16use async_stream::stream;
17use compact_str::CompactString;
18use futures_core::Stream;
19use futures_util::StreamExt;
20use std::{collections::BTreeMap, sync::Arc};
21use tokio::sync::{Mutex, mpsc};
22
23pub mod hook;
24
25/// The walrus runtime — agent registry, schema store, and hook orchestration.
26///
27/// Tool schemas are registered once at construction via `hook.on_register_tools()`.
28/// Each agent is built with a filtered schema snapshot and a `ToolSender` so it
29/// can dispatch tool calls back to the runtime without going through the runtime.
30/// `Runtime::new()` is async — it calls `hook.on_register_tools()` during
31/// construction to populate the schema registry.
32pub struct Runtime<M: Model, H: Hook> {
33    pub model: M,
34    pub hook: H,
35    agents: BTreeMap<CompactString, Arc<Mutex<Agent<M>>>>,
36    tools: ToolRegistry,
37    tool_tx: Option<ToolSender>,
38}
39
40impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
41    /// Create a new runtime with the given model and hook backend.
42    ///
43    /// Calls `hook.on_register_tools()` to populate the schema registry.
44    /// Pass `tool_tx` to enable tool dispatch from agents; `None` means agents
45    /// have no tool dispatch (e.g. CLI without a daemon).
46    pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
47        let mut tools = ToolRegistry::new();
48        hook.on_register_tools(&mut tools).await;
49        Self {
50            model,
51            hook,
52            agents: BTreeMap::new(),
53            tools,
54            tool_tx,
55        }
56    }
57
58    // --- Tool registry ---
59
60    /// Register a tool schema.
61    pub fn register_tool(&mut self, tool: crate::model::Tool) {
62        self.tools.insert(tool);
63    }
64
65    /// Remove a tool schema by name. Returns `true` if it existed.
66    pub fn unregister_tool(&mut self, name: &str) -> bool {
67        self.tools.remove(name)
68    }
69
70    // --- Agent registry ---
71
72    /// Register an agent from its configuration.
73    ///
74    /// Calls `hook.on_build_agent(config)` to enrich the config, then builds
75    /// the agent with a filtered schema snapshot and the runtime's `tool_tx`.
76    pub fn add_agent(&mut self, config: AgentConfig) {
77        let config = self.hook.on_build_agent(config);
78        let name = config.name.clone();
79        let tools = self.tools.tools();
80        let mut builder = AgentBuilder::new(self.model.clone())
81            .config(config)
82            .tools(tools);
83        if let Some(tx) = &self.tool_tx {
84            builder = builder.tool_tx(tx.clone());
85        }
86        let agent = builder.build();
87        self.agents.insert(name, Arc::new(Mutex::new(agent)));
88    }
89
90    /// Get a registered agent's config by name (cloned).
91    pub async fn agent(&self, name: &str) -> Option<AgentConfig> {
92        let mutex = self.agents.get(name)?;
93        Some(mutex.lock().await.config.clone())
94    }
95
96    /// Get all registered agent configs (cloned, alphabetical order).
97    pub async fn agents(&self) -> Vec<AgentConfig> {
98        let mut configs = Vec::with_capacity(self.agents.len());
99        for mutex in self.agents.values() {
100            configs.push(mutex.lock().await.config.clone());
101        }
102        configs
103    }
104
105    /// Get the per-agent mutex by name.
106    pub fn agent_mutex(&self, name: &str) -> Option<Arc<Mutex<Agent<M>>>> {
107        self.agents.get(name).cloned()
108    }
109
110    // --- Execution ---
111
112    /// Send a message to an agent and run to completion.
113    ///
114    /// Locks the per-agent mutex, pushes the user message, delegates to
115    /// `agent.run()`, and forwards all events to `hook.on_event()`.
116    pub async fn send_to(&self, agent: &str, content: &str) -> Result<AgentResponse> {
117        let mutex = self
118            .agents
119            .get(agent)
120            .ok_or_else(|| anyhow::anyhow!("agent '{agent}' not registered"))?;
121
122        let mut guard = mutex.lock().await;
123        guard.push_message(Message::user(content));
124
125        let (tx, mut rx) = mpsc::unbounded_channel();
126        let response = guard.run(tx).await;
127
128        while let Ok(event) = rx.try_recv() {
129            self.hook.on_event(agent, &event);
130        }
131
132        Ok(response)
133    }
134
135    /// Send a message to an agent and stream response events.
136    ///
137    /// Locks the per-agent mutex, pushes the user message, and streams events
138    /// forwarded to `hook.on_event()`.
139    pub fn stream_to<'a>(
140        &'a self,
141        agent: &'a str,
142        content: &'a str,
143    ) -> impl Stream<Item = AgentEvent> + 'a {
144        stream! {
145            let mutex = match self.agents.get(agent) {
146                Some(m) => m,
147                None => {
148                    let resp = AgentResponse {
149                        final_response: None,
150                        iterations: 0,
151                        stop_reason: AgentStopReason::Error(
152                            format!("agent '{agent}' not registered"),
153                        ),
154                        steps: vec![],
155                    };
156                    yield AgentEvent::Done(resp);
157                    return;
158                }
159            };
160
161            let mut guard = mutex.lock().await;
162            guard.push_message(Message::user(content));
163
164            let mut event_stream = std::pin::pin!(guard.run_stream());
165            while let Some(event) = event_stream.next().await {
166                self.hook.on_event(agent, &event);
167                yield event;
168            }
169        }
170    }
171}