walrus_core/runtime/
mod.rs1use crate::{
10 Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11 agent::tool::{ToolRegistry, ToolSender},
12 model::{Message, Model},
13 runtime::hook::Hook,
14};
15use anyhow::Result;
16use async_stream::stream;
17use compact_str::CompactString;
18use futures_core::Stream;
19use futures_util::StreamExt;
20use std::{collections::BTreeMap, sync::Arc};
21use tokio::sync::{Mutex, mpsc};
22
23pub mod hook;
24
25pub struct Runtime<M: Model, H: Hook> {
33 pub model: M,
34 pub hook: H,
35 agents: BTreeMap<CompactString, Arc<Mutex<Agent<M>>>>,
36 tools: ToolRegistry,
37 tool_tx: Option<ToolSender>,
38}
39
40impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
41 pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
47 let mut tools = ToolRegistry::new();
48 hook.on_register_tools(&mut tools).await;
49 Self {
50 model,
51 hook,
52 agents: BTreeMap::new(),
53 tools,
54 tool_tx,
55 }
56 }
57
58 pub fn register_tool(&mut self, tool: crate::model::Tool) {
62 self.tools.insert(tool);
63 }
64
65 pub fn unregister_tool(&mut self, name: &str) -> bool {
67 self.tools.remove(name)
68 }
69
70 pub fn add_agent(&mut self, config: AgentConfig) {
77 let config = self.hook.on_build_agent(config);
78 let name = config.name.clone();
79 let tools = self.tools.tools();
80 let mut builder = AgentBuilder::new(self.model.clone())
81 .config(config)
82 .tools(tools);
83 if let Some(tx) = &self.tool_tx {
84 builder = builder.tool_tx(tx.clone());
85 }
86 let agent = builder.build();
87 self.agents.insert(name, Arc::new(Mutex::new(agent)));
88 }
89
90 pub async fn agent(&self, name: &str) -> Option<AgentConfig> {
92 let mutex = self.agents.get(name)?;
93 Some(mutex.lock().await.config.clone())
94 }
95
96 pub async fn agents(&self) -> Vec<AgentConfig> {
98 let mut configs = Vec::with_capacity(self.agents.len());
99 for mutex in self.agents.values() {
100 configs.push(mutex.lock().await.config.clone());
101 }
102 configs
103 }
104
105 pub fn agent_mutex(&self, name: &str) -> Option<Arc<Mutex<Agent<M>>>> {
107 self.agents.get(name).cloned()
108 }
109
110 pub async fn send_to(&self, agent: &str, content: &str) -> Result<AgentResponse> {
117 let mutex = self
118 .agents
119 .get(agent)
120 .ok_or_else(|| anyhow::anyhow!("agent '{agent}' not registered"))?;
121
122 let mut guard = mutex.lock().await;
123 guard.push_message(Message::user(content));
124
125 let (tx, mut rx) = mpsc::unbounded_channel();
126 let response = guard.run(tx).await;
127
128 while let Ok(event) = rx.try_recv() {
129 self.hook.on_event(agent, &event);
130 }
131
132 Ok(response)
133 }
134
135 pub fn stream_to<'a>(
140 &'a self,
141 agent: &'a str,
142 content: &'a str,
143 ) -> impl Stream<Item = AgentEvent> + 'a {
144 stream! {
145 let mutex = match self.agents.get(agent) {
146 Some(m) => m,
147 None => {
148 let resp = AgentResponse {
149 final_response: None,
150 iterations: 0,
151 stop_reason: AgentStopReason::Error(
152 format!("agent '{agent}' not registered"),
153 ),
154 steps: vec![],
155 };
156 yield AgentEvent::Done(resp);
157 return;
158 }
159 };
160
161 let mut guard = mutex.lock().await;
162 guard.push_message(Message::user(content));
163
164 let mut event_stream = std::pin::pin!(guard.run_stream());
165 while let Some(event) = event_stream.next().await {
166 self.hook.on_event(agent, &event);
167 yield event;
168 }
169 }
170 }
171}