1use crate::{
10 Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11 agent::{
12 CompactHook,
13 tool::{ToolRegistry, ToolSender},
14 },
15 model::{Message, Model, Role},
16 runtime::hook::Hook,
17};
18use anyhow::{Result, bail};
19use async_stream::stream;
20use compact_str::CompactString;
21use futures_core::Stream;
22use futures_util::StreamExt;
23use std::{
24 collections::BTreeMap,
25 sync::{
26 Arc,
27 atomic::{AtomicU64, Ordering},
28 },
29};
30use tokio::sync::{Mutex, RwLock, mpsc};
31
32pub mod hook;
33pub mod session;
34
35pub use session::Session;
36
37pub struct Runtime<M: Model, H: Hook> {
43 pub model: M,
44 pub hook: H,
45 agents: BTreeMap<CompactString, Agent<M>>,
46 sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
47 next_session_id: AtomicU64,
48 tools: ToolRegistry,
49 tool_tx: Option<ToolSender>,
50 compact_hook: Option<Arc<dyn CompactHook>>,
51}
52
53impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
54 pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
60 let mut tools = ToolRegistry::new();
61 hook.on_register_tools(&mut tools).await;
62 Self {
63 model,
64 hook,
65 agents: BTreeMap::new(),
66 sessions: RwLock::new(BTreeMap::new()),
67 next_session_id: AtomicU64::new(1),
68 tools,
69 tool_tx,
70 compact_hook: None,
71 }
72 }
73
74 pub fn set_compact_hook(&mut self, hook: Arc<dyn CompactHook>) {
76 self.compact_hook = Some(hook);
77 }
78
79 pub fn register_tool(&mut self, tool: crate::model::Tool) {
83 self.tools.insert(tool);
84 }
85
86 pub fn unregister_tool(&mut self, name: &str) -> bool {
88 self.tools.remove(name)
89 }
90
91 pub fn add_agent(&mut self, config: AgentConfig) {
98 let config = self.hook.on_build_agent(config);
99 let name = config.name.clone();
100 let tools = self.tools.filtered_snapshot(&config.tools);
101 let mut builder = AgentBuilder::new(self.model.clone())
102 .config(config)
103 .tools(tools);
104 if let Some(tx) = &self.tool_tx {
105 builder = builder.tool_tx(tx.clone());
106 }
107 if let Some(ref hook) = self.compact_hook {
108 builder = builder.compact_hook(Arc::clone(hook));
109 }
110 let agent = builder.build();
111 self.agents.insert(name, agent);
112 }
113
114 pub fn agent(&self, name: &str) -> Option<AgentConfig> {
116 self.agents.get(name).map(|a| a.config.clone())
117 }
118
119 pub fn agents(&self) -> Vec<AgentConfig> {
121 self.agents.values().map(|a| a.config.clone()).collect()
122 }
123
124 pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
126 self.agents.get(name)
127 }
128
129 pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
133 if !self.agents.contains_key(agent) {
134 bail!("agent '{agent}' not registered");
135 }
136 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
137 let session = Session::new(id, agent, created_by);
138 self.sessions
139 .write()
140 .await
141 .insert(id, Arc::new(Mutex::new(session)));
142 Ok(id)
143 }
144
145 pub async fn close_session(&self, id: u64) -> bool {
147 self.sessions.write().await.remove(&id).is_some()
148 }
149
150 pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
152 self.sessions.read().await.get(&id).cloned()
153 }
154
155 pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
157 self.sessions.read().await.values().cloned().collect()
158 }
159
160 pub async fn send_to(
167 &self,
168 session_id: u64,
169 content: &str,
170 sender: &str,
171 ) -> Result<AgentResponse> {
172 let session_mutex = self
173 .sessions
174 .read()
175 .await
176 .get(&session_id)
177 .cloned()
178 .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
179
180 let mut session = session_mutex.lock().await;
181 let agent_ref = self
182 .agents
183 .get(&session.agent)
184 .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
185
186 if sender.is_empty() {
187 session.history.push(Message::user(content));
188 } else {
189 session
190 .history
191 .push(Message::user_with_sender(content, sender));
192 }
193
194 let (tx, mut rx) = mpsc::unbounded_channel();
195 let agent_name = session.agent.clone();
196
197 session
199 .history
200 .retain(|m| !(m.role == Role::User && m.content.starts_with("<recall>")));
201
202 let recall_msgs = self.hook.on_before_run(&agent_name, &session.history);
203 if !recall_msgs.is_empty() {
204 let insert_pos = session.history.len().saturating_sub(1);
205 for (i, msg) in recall_msgs.into_iter().enumerate() {
206 session.history.insert(insert_pos + i, msg);
207 }
208 }
209
210 let response = agent_ref.run(&mut session.history, tx).await;
211
212 while let Ok(event) = rx.try_recv() {
213 self.hook.on_event(&agent_name, &event);
214 }
215
216 let system_prompt = &agent_ref.config.system_prompt;
217 self.hook
218 .on_after_run(&agent_name, &session.history, system_prompt);
219
220 Ok(response)
221 }
222
223 pub fn stream_to(
228 &self,
229 session_id: u64,
230 content: &str,
231 sender: &str,
232 ) -> impl Stream<Item = AgentEvent> + '_ {
233 let content = content.to_owned();
234 let sender = sender.to_owned();
235 stream! {
236 let session_mutex = match self
237 .sessions
238 .read()
239 .await
240 .get(&session_id)
241 .cloned()
242 {
243 Some(m) => m,
244 None => {
245 let resp = AgentResponse {
246 final_response: None,
247 iterations: 0,
248 stop_reason: AgentStopReason::Error(
249 format!("session {session_id} not found"),
250 ),
251 steps: vec![],
252 };
253 yield AgentEvent::Done(resp);
254 return;
255 }
256 };
257
258 let mut session = session_mutex.lock().await;
259 let agent_ref = match self.agents.get(&session.agent) {
260 Some(a) => a,
261 None => {
262 let resp = AgentResponse {
263 final_response: None,
264 iterations: 0,
265 stop_reason: AgentStopReason::Error(
266 format!("agent '{}' not registered", session.agent),
267 ),
268 steps: vec![],
269 };
270 yield AgentEvent::Done(resp);
271 return;
272 }
273 };
274
275 if sender.is_empty() {
276 session.history.push(Message::user(&content));
277 } else {
278 session.history.push(Message::user_with_sender(&content, &sender));
279 }
280 let agent_name = session.agent.clone();
281
282 session
284 .history
285 .retain(|m| !(m.role == Role::User && m.content.starts_with("<recall>")));
286
287 let recall_msgs = self.hook.on_before_run(&agent_name, &session.history);
288 if !recall_msgs.is_empty() {
289 let insert_pos = session.history.len().saturating_sub(1);
290 for (i, msg) in recall_msgs.into_iter().enumerate() {
291 session.history.insert(insert_pos + i, msg);
292 }
293 }
294
295 {
296 let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history));
297 while let Some(event) = event_stream.next().await {
298 self.hook.on_event(&agent_name, &event);
299 yield event;
300 }
301 }
302
303 let system_prompt = &agent_ref.config.system_prompt;
304 self.hook
305 .on_after_run(&agent_name, &session.history, system_prompt);
306 }
307 }
308}