1use crate::{
10 Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11 agent::tool::{ToolRegistry, ToolSender},
12 model::{Message, Model},
13 runtime::hook::Hook,
14};
15use anyhow::{Result, bail};
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{
20 collections::BTreeMap,
21 sync::{
22 Arc,
23 atomic::{AtomicU64, Ordering},
24 },
25};
26use tokio::sync::{Mutex, RwLock, mpsc};
27
28pub mod hook;
29pub mod session;
30
31pub use session::Session;
32
33pub struct Runtime<M: Model, H: Hook> {
39 pub model: M,
40 pub hook: H,
41 agents: BTreeMap<String, Agent<M>>,
42 sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
43 next_session_id: AtomicU64,
44 tools: ToolRegistry,
45 tool_tx: Option<ToolSender>,
46}
47
48impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
49 pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
55 let mut tools = ToolRegistry::new();
56 hook.on_register_tools(&mut tools).await;
57 Self {
58 model,
59 hook,
60 agents: BTreeMap::new(),
61 sessions: RwLock::new(BTreeMap::new()),
62 next_session_id: AtomicU64::new(1),
63 tools,
64 tool_tx,
65 }
66 }
67
68 pub fn register_tool(&mut self, tool: crate::model::Tool) {
72 self.tools.insert(tool);
73 }
74
75 pub fn unregister_tool(&mut self, name: &str) -> bool {
77 self.tools.remove(name)
78 }
79
80 pub fn add_agent(&mut self, config: AgentConfig) {
87 let config = self.hook.on_build_agent(config);
88 let name = config.name.clone();
89 let tools = self.tools.filtered_snapshot(&config.tools);
90 let mut builder = AgentBuilder::new(self.model.clone())
91 .config(config)
92 .tools(tools);
93 if let Some(tx) = &self.tool_tx {
94 builder = builder.tool_tx(tx.clone());
95 }
96 let agent = builder.build();
97 self.agents.insert(name, agent);
98 }
99
100 pub fn agent(&self, name: &str) -> Option<AgentConfig> {
102 self.agents.get(name).map(|a| a.config.clone())
103 }
104
105 pub fn agents(&self) -> Vec<AgentConfig> {
107 self.agents.values().map(|a| a.config.clone()).collect()
108 }
109
110 pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
112 self.agents.get(name)
113 }
114
115 pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
119 if !self.agents.contains_key(agent) {
120 bail!("agent '{agent}' not registered");
121 }
122 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
123 let session = Session::new(id, agent, created_by);
124 self.sessions
125 .write()
126 .await
127 .insert(id, Arc::new(Mutex::new(session)));
128 Ok(id)
129 }
130
131 pub async fn close_session(&self, id: u64) -> bool {
133 self.sessions.write().await.remove(&id).is_some()
134 }
135
136 pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
138 self.sessions.read().await.get(&id).cloned()
139 }
140
141 pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
143 self.sessions.read().await.values().cloned().collect()
144 }
145
146 fn prepare_history(&self, session: &mut Session, content: &str, sender: &str) -> String {
151 let content = self.hook.preprocess(&session.agent, content);
152 if sender.is_empty() {
153 session.history.push(Message::user(&content));
154 } else {
155 session
156 .history
157 .push(Message::user_with_sender(&content, sender));
158 }
159
160 session.history.retain(|m| !m.auto_injected);
162
163 let agent_name = session.agent.clone();
164 let recall_msgs = self.hook.on_before_run(&agent_name, &session.history);
165 if !recall_msgs.is_empty() {
166 let insert_pos = session.history.len().saturating_sub(1);
167 for (i, msg) in recall_msgs.into_iter().enumerate() {
168 session.history.insert(insert_pos + i, msg);
169 }
170 }
171 agent_name
172 }
173
174 pub async fn send_to(
176 &self,
177 session_id: u64,
178 content: &str,
179 sender: &str,
180 ) -> Result<AgentResponse> {
181 let session_mutex = self
182 .sessions
183 .read()
184 .await
185 .get(&session_id)
186 .cloned()
187 .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
188
189 let mut session = session_mutex.lock().await;
190 let agent_name = self.prepare_history(&mut session, content, sender);
191 let agent_ref = self
192 .agents
193 .get(&session.agent)
194 .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
195
196 let (tx, mut rx) = mpsc::unbounded_channel();
197 let response = agent_ref.run(&mut session.history, tx).await;
198
199 while let Ok(event) = rx.try_recv() {
200 self.hook.on_event(&agent_name, &event);
201 }
202
203 Ok(response)
204 }
205
206 pub fn stream_to(
208 &self,
209 session_id: u64,
210 content: &str,
211 sender: &str,
212 ) -> impl Stream<Item = AgentEvent> + '_ {
213 let content = content.to_owned();
214 let sender = sender.to_owned();
215 stream! {
216 let session_mutex = match self
217 .sessions
218 .read()
219 .await
220 .get(&session_id)
221 .cloned()
222 {
223 Some(m) => m,
224 None => {
225 let resp = AgentResponse {
226 final_response: None,
227 iterations: 0,
228 stop_reason: AgentStopReason::Error(
229 format!("session {session_id} not found"),
230 ),
231 steps: vec![],
232 };
233 yield AgentEvent::Done(resp);
234 return;
235 }
236 };
237
238 let mut session = session_mutex.lock().await;
239 let agent_name = self.prepare_history(&mut session, &content, &sender);
240 let agent_ref = match self.agents.get(&session.agent) {
241 Some(a) => a,
242 None => {
243 let resp = AgentResponse {
244 final_response: None,
245 iterations: 0,
246 stop_reason: AgentStopReason::Error(
247 format!("agent '{}' not registered", session.agent),
248 ),
249 steps: vec![],
250 };
251 yield AgentEvent::Done(resp);
252 return;
253 }
254 };
255
256 let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history));
257 while let Some(event) = event_stream.next().await {
258 self.hook.on_event(&agent_name, &event);
259 yield event;
260 }
261 }
262 }
263}