1use crate::{
10 Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11 agent::tool::{ToolRegistry, ToolSender},
12 model::{Message, Model},
13 runtime::hook::Hook,
14};
15use anyhow::{Result, bail};
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{
20 collections::{BTreeMap, HashSet},
21 sync::{
22 Arc,
23 atomic::{AtomicU64, Ordering},
24 },
25};
26use tokio::sync::{Mutex, RwLock, mpsc};
27
28pub mod hook;
29pub mod session;
30
31pub use session::Session;
32
33pub struct Runtime<M: Model, H: Hook> {
39 pub model: M,
40 pub hook: H,
41 agents: BTreeMap<String, Agent<M>>,
42 sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
43 next_session_id: AtomicU64,
44 tools: ToolRegistry,
45 tool_tx: Option<ToolSender>,
46 active_sessions: RwLock<HashSet<u64>>,
47}
48
49impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
50 pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
56 let mut tools = ToolRegistry::new();
57 hook.on_register_tools(&mut tools).await;
58 Self {
59 model,
60 hook,
61 agents: BTreeMap::new(),
62 sessions: RwLock::new(BTreeMap::new()),
63 next_session_id: AtomicU64::new(1),
64 tools,
65 tool_tx,
66 active_sessions: RwLock::new(HashSet::new()),
67 }
68 }
69
70 pub fn register_tool(&mut self, tool: crate::model::Tool) {
74 self.tools.insert(tool);
75 }
76
77 pub fn unregister_tool(&mut self, name: &str) -> bool {
79 self.tools.remove(name)
80 }
81
82 pub fn add_agent(&mut self, config: AgentConfig) {
89 let config = self.hook.on_build_agent(config);
90 let name = config.name.clone();
91 let tools = self.tools.filtered_snapshot(&config.tools);
92 let mut builder = AgentBuilder::new(self.model.clone())
93 .config(config)
94 .tools(tools);
95 if let Some(tx) = &self.tool_tx {
96 builder = builder.tool_tx(tx.clone());
97 }
98 let agent = builder.build();
99 self.agents.insert(name, agent);
100 }
101
102 pub fn agent(&self, name: &str) -> Option<AgentConfig> {
104 self.agents.get(name).map(|a| a.config.clone())
105 }
106
107 pub fn agents(&self) -> Vec<AgentConfig> {
109 self.agents.values().map(|a| a.config.clone()).collect()
110 }
111
112 pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
114 self.agents.get(name)
115 }
116
117 pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
121 if !self.agents.contains_key(agent) {
122 bail!("agent '{agent}' not registered");
123 }
124 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
125 let mut session = Session::new(id, agent, created_by);
126 session.init_file(&crate::paths::SESSIONS_DIR);
127 self.sessions
128 .write()
129 .await
130 .insert(id, Arc::new(Mutex::new(session)));
131 Ok(id)
132 }
133
134 pub async fn close_session(&self, id: u64) -> bool {
136 self.sessions.write().await.remove(&id).is_some()
137 }
138
139 pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
141 self.sessions.read().await.get(&id).cloned()
142 }
143
144 pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
146 self.sessions.read().await.values().cloned().collect()
147 }
148
149 pub async fn is_active(&self, id: u64) -> bool {
151 self.active_sessions.read().await.contains(&id)
152 }
153
154 fn prepare_history(&self, session: &mut Session, content: &str, sender: &str) -> String {
159 let content = self.hook.preprocess(&session.agent, content);
160 if sender.is_empty() {
161 session.history.push(Message::user(&content));
162 } else {
163 session
164 .history
165 .push(Message::user_with_sender(&content, sender));
166 }
167
168 session.history.retain(|m| !m.auto_injected);
170
171 let agent_name = session.agent.clone();
172 let recall_msgs = self
173 .hook
174 .on_before_run(&agent_name, session.id, &session.history);
175 if !recall_msgs.is_empty() {
176 let insert_pos = session.history.len().saturating_sub(1);
177 for (i, msg) in recall_msgs.into_iter().enumerate() {
178 session.history.insert(insert_pos + i, msg);
179 }
180 }
181 agent_name
182 }
183
184 pub async fn send_to(
186 &self,
187 session_id: u64,
188 content: &str,
189 sender: &str,
190 ) -> Result<AgentResponse> {
191 let session_mutex = self
192 .sessions
193 .read()
194 .await
195 .get(&session_id)
196 .cloned()
197 .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
198
199 let mut session = session_mutex.lock().await;
200 let agent_name = self.prepare_history(&mut session, content, sender);
201 let agent_ref = self
202 .agents
203 .get(&session.agent)
204 .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
205
206 let (tx, mut rx) = mpsc::unbounded_channel();
207 self.active_sessions.write().await.insert(session_id);
208 let response = agent_ref.run(&mut session.history, tx, None).await;
209 self.active_sessions.write().await.remove(&session_id);
210
211 while let Ok(event) = rx.try_recv() {
212 self.hook.on_event(&agent_name, session_id, &event);
213 }
214
215 session.persist();
216 Ok(response)
217 }
218
219 pub fn stream_to(
221 &self,
222 session_id: u64,
223 content: &str,
224 sender: &str,
225 ) -> impl Stream<Item = AgentEvent> + '_ {
226 let content = content.to_owned();
227 let sender = sender.to_owned();
228 stream! {
229 let session_mutex = match self
230 .sessions
231 .read()
232 .await
233 .get(&session_id)
234 .cloned()
235 {
236 Some(m) => m,
237 None => {
238 let resp = AgentResponse {
239 final_response: None,
240 iterations: 0,
241 stop_reason: AgentStopReason::Error(
242 format!("session {session_id} not found"),
243 ),
244 steps: vec![],
245 };
246 yield AgentEvent::Done(resp);
247 return;
248 }
249 };
250
251 let mut session = session_mutex.lock().await;
252 let agent_name = self.prepare_history(&mut session, &content, &sender);
253 let agent_ref = match self.agents.get(&session.agent) {
254 Some(a) => a,
255 None => {
256 let resp = AgentResponse {
257 final_response: None,
258 iterations: 0,
259 stop_reason: AgentStopReason::Error(
260 format!("agent '{}' not registered", session.agent),
261 ),
262 steps: vec![],
263 };
264 yield AgentEvent::Done(resp);
265 return;
266 }
267 };
268
269 self.active_sessions.write().await.insert(session_id);
270 {
271 let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history, Some(session_id)));
272 while let Some(event) = event_stream.next().await {
273 self.hook.on_event(&agent_name, session_id, &event);
274 yield event;
275 }
276 }
277 self.active_sessions.write().await.remove(&session_id);
278 session.persist();
279 }
280 }
281}