Skip to main content

walrus_core/runtime/
mod.rs

1//! Runtime — agent registry, session management, and hook orchestration.
2//!
3//! [`Runtime`] holds agents as immutable definitions and sessions as
4//! per-session `Arc<Mutex<Session>>` containers. Tool schemas are registered
5//! once at startup via `hook.on_register_tools()`. Execution methods
6//! (`send_to`, `stream_to`) take a session ID, lock the session, clone the
7//! agent, and run with the session's history.
8
9use crate::{
10    Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11    agent::{
12        CompactHook,
13        tool::{ToolRegistry, ToolSender},
14    },
15    model::{Message, Model, Role},
16    runtime::hook::Hook,
17};
18use anyhow::{Result, bail};
19use async_stream::stream;
20use compact_str::CompactString;
21use futures_core::Stream;
22use futures_util::StreamExt;
23use std::{
24    collections::BTreeMap,
25    sync::{
26        Arc,
27        atomic::{AtomicU64, Ordering},
28    },
29};
30use tokio::sync::{Mutex, RwLock, mpsc};
31
32pub mod hook;
33pub mod session;
34
35pub use session::Session;
36
37/// The walrus runtime — agent registry, session store, and hook orchestration.
38///
39/// Agents are stored as plain immutable values. Sessions own conversation
40/// history behind per-session `Arc<Mutex<Session>>`. The sessions map uses
41/// `RwLock` for concurrent access without requiring `&mut self`.
42pub struct Runtime<M: Model, H: Hook> {
43    pub model: M,
44    pub hook: H,
45    agents: BTreeMap<CompactString, Agent<M>>,
46    sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
47    next_session_id: AtomicU64,
48    tools: ToolRegistry,
49    tool_tx: Option<ToolSender>,
50    compact_hook: Option<Arc<dyn CompactHook>>,
51}
52
53impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
54    /// Create a new runtime with the given model and hook backend.
55    ///
56    /// Calls `hook.on_register_tools()` to populate the schema registry.
57    /// Pass `tool_tx` to enable tool dispatch from agents; `None` means agents
58    /// have no tool dispatch (e.g. CLI without a daemon).
59    pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
60        let mut tools = ToolRegistry::new();
61        hook.on_register_tools(&mut tools).await;
62        Self {
63            model,
64            hook,
65            agents: BTreeMap::new(),
66            sessions: RwLock::new(BTreeMap::new()),
67            next_session_id: AtomicU64::new(1),
68            tools,
69            tool_tx,
70            compact_hook: None,
71        }
72    }
73
74    /// Set the compact hook used for auto-compaction enrichment.
75    pub fn set_compact_hook(&mut self, hook: Arc<dyn CompactHook>) {
76        self.compact_hook = Some(hook);
77    }
78
79    // --- Tool registry ---
80
81    /// Register a tool schema.
82    pub fn register_tool(&mut self, tool: crate::model::Tool) {
83        self.tools.insert(tool);
84    }
85
86    /// Remove a tool schema by name. Returns `true` if it existed.
87    pub fn unregister_tool(&mut self, name: &str) -> bool {
88        self.tools.remove(name)
89    }
90
91    // --- Agent registry ---
92
93    /// Register an agent from its configuration.
94    ///
95    /// Calls `hook.on_build_agent(config)` to enrich the config, then builds
96    /// the agent with a filtered schema snapshot and the runtime's `tool_tx`.
97    pub fn add_agent(&mut self, config: AgentConfig) {
98        let config = self.hook.on_build_agent(config);
99        let name = config.name.clone();
100        let tools = self.tools.filtered_snapshot(&config.tools);
101        let mut builder = AgentBuilder::new(self.model.clone())
102            .config(config)
103            .tools(tools);
104        if let Some(tx) = &self.tool_tx {
105            builder = builder.tool_tx(tx.clone());
106        }
107        if let Some(ref hook) = self.compact_hook {
108            builder = builder.compact_hook(Arc::clone(hook));
109        }
110        let agent = builder.build();
111        self.agents.insert(name, agent);
112    }
113
114    /// Get a registered agent's config by name (cloned).
115    pub fn agent(&self, name: &str) -> Option<AgentConfig> {
116        self.agents.get(name).map(|a| a.config.clone())
117    }
118
119    /// Get all registered agent configs (cloned, alphabetical order).
120    pub fn agents(&self) -> Vec<AgentConfig> {
121        self.agents.values().map(|a| a.config.clone()).collect()
122    }
123
124    /// Get a reference to an agent by name.
125    pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
126        self.agents.get(name)
127    }
128
129    // --- Session management ---
130
131    /// Create a new session for the given agent. Returns the session ID.
132    pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
133        if !self.agents.contains_key(agent) {
134            bail!("agent '{agent}' not registered");
135        }
136        let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
137        let session = Session::new(id, agent, created_by);
138        self.sessions
139            .write()
140            .await
141            .insert(id, Arc::new(Mutex::new(session)));
142        Ok(id)
143    }
144
145    /// Close (remove) a session by ID. Returns true if it existed.
146    pub async fn close_session(&self, id: u64) -> bool {
147        self.sessions.write().await.remove(&id).is_some()
148    }
149
150    /// Get a session mutex by ID.
151    pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
152        self.sessions.read().await.get(&id).cloned()
153    }
154
155    /// Get all session mutexes (for iteration/listing).
156    pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
157        self.sessions.read().await.values().cloned().collect()
158    }
159
160    // --- Execution ---
161
162    /// Send a message to a session and run to completion.
163    ///
164    /// Locks the session, looks up the agent, pushes the user message,
165    /// delegates to `agent.run()`, and forwards events to `hook.on_event()`.
166    pub async fn send_to(
167        &self,
168        session_id: u64,
169        content: &str,
170        sender: &str,
171    ) -> Result<AgentResponse> {
172        let session_mutex = self
173            .sessions
174            .read()
175            .await
176            .get(&session_id)
177            .cloned()
178            .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
179
180        let mut session = session_mutex.lock().await;
181        let agent_ref = self
182            .agents
183            .get(&session.agent)
184            .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
185
186        if sender.is_empty() {
187            session.history.push(Message::user(content));
188        } else {
189            session
190                .history
191                .push(Message::user_with_sender(content, sender));
192        }
193
194        let (tx, mut rx) = mpsc::unbounded_channel();
195        let agent_name = session.agent.clone();
196
197        // Strip previous auto-recall messages to avoid accumulation.
198        session
199            .history
200            .retain(|m| !(m.role == Role::User && m.content.starts_with("<recall>")));
201
202        let recall_msgs = self.hook.on_before_run(&agent_name, &session.history);
203        if !recall_msgs.is_empty() {
204            let insert_pos = session.history.len().saturating_sub(1);
205            for (i, msg) in recall_msgs.into_iter().enumerate() {
206                session.history.insert(insert_pos + i, msg);
207            }
208        }
209
210        let response = agent_ref.run(&mut session.history, tx).await;
211
212        while let Ok(event) = rx.try_recv() {
213            self.hook.on_event(&agent_name, &event);
214        }
215
216        let system_prompt = &agent_ref.config.system_prompt;
217        self.hook
218            .on_after_run(&agent_name, &session.history, system_prompt);
219
220        Ok(response)
221    }
222
223    /// Send a message to a session and stream response events.
224    ///
225    /// Locks the session, looks up the agent, pushes the user message, and
226    /// streams events forwarded to `hook.on_event()`.
227    pub fn stream_to(
228        &self,
229        session_id: u64,
230        content: &str,
231        sender: &str,
232    ) -> impl Stream<Item = AgentEvent> + '_ {
233        let content = content.to_owned();
234        let sender = sender.to_owned();
235        stream! {
236            let session_mutex = match self
237                .sessions
238                .read()
239                .await
240                .get(&session_id)
241                .cloned()
242            {
243                Some(m) => m,
244                None => {
245                    let resp = AgentResponse {
246                        final_response: None,
247                        iterations: 0,
248                        stop_reason: AgentStopReason::Error(
249                            format!("session {session_id} not found"),
250                        ),
251                        steps: vec![],
252                    };
253                    yield AgentEvent::Done(resp);
254                    return;
255                }
256            };
257
258            let mut session = session_mutex.lock().await;
259            let agent_ref = match self.agents.get(&session.agent) {
260                Some(a) => a,
261                None => {
262                    let resp = AgentResponse {
263                        final_response: None,
264                        iterations: 0,
265                        stop_reason: AgentStopReason::Error(
266                            format!("agent '{}' not registered", session.agent),
267                        ),
268                        steps: vec![],
269                    };
270                    yield AgentEvent::Done(resp);
271                    return;
272                }
273            };
274
275            if sender.is_empty() {
276                session.history.push(Message::user(&content));
277            } else {
278                session.history.push(Message::user_with_sender(&content, &sender));
279            }
280            let agent_name = session.agent.clone();
281
282            // Strip previous auto-recall messages to avoid accumulation.
283            session
284                .history
285                .retain(|m| !(m.role == Role::User && m.content.starts_with("<recall>")));
286
287            let recall_msgs = self.hook.on_before_run(&agent_name, &session.history);
288            if !recall_msgs.is_empty() {
289                let insert_pos = session.history.len().saturating_sub(1);
290                for (i, msg) in recall_msgs.into_iter().enumerate() {
291                    session.history.insert(insert_pos + i, msg);
292                }
293            }
294
295            {
296                let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history));
297                while let Some(event) = event_stream.next().await {
298                    self.hook.on_event(&agent_name, &event);
299                    yield event;
300                }
301            }
302
303            let system_prompt = &agent_ref.config.system_prompt;
304            self.hook
305                .on_after_run(&agent_name, &session.history, system_prompt);
306        }
307    }
308}