1use crate::{
10 Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11 agent::tool::{ToolRegistry, ToolSender},
12 model::{Message, Model},
13 runtime::hook::Hook,
14};
15use anyhow::{Result, bail};
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{
20 collections::{BTreeMap, HashSet},
21 sync::{
22 Arc,
23 atomic::{AtomicU64, Ordering},
24 },
25};
26use tokio::sync::{Mutex, RwLock, mpsc};
27
28pub mod hook;
29pub mod session;
30
31pub use session::Session;
32
33pub struct Runtime<M: Model, H: Hook> {
39 pub model: M,
40 pub hook: H,
41 agents: BTreeMap<String, Agent<M>>,
42 sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
43 next_session_id: AtomicU64,
44 tools: ToolRegistry,
45 tool_tx: Option<ToolSender>,
46 active_sessions: RwLock<HashSet<u64>>,
47}
48
49impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
50 pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
56 let mut tools = ToolRegistry::new();
57 hook.on_register_tools(&mut tools).await;
58 Self {
59 model,
60 hook,
61 agents: BTreeMap::new(),
62 sessions: RwLock::new(BTreeMap::new()),
63 next_session_id: AtomicU64::new(1),
64 tools,
65 tool_tx,
66 active_sessions: RwLock::new(HashSet::new()),
67 }
68 }
69
70 pub fn register_tool(&mut self, tool: crate::model::Tool) {
74 self.tools.insert(tool);
75 }
76
77 pub fn unregister_tool(&mut self, name: &str) -> bool {
79 self.tools.remove(name)
80 }
81
82 pub fn add_agent(&mut self, config: AgentConfig) {
89 let config = self.hook.on_build_agent(config);
90 let name = config.name.clone();
91 let tools = self.tools.filtered_snapshot(&config.tools);
92 let mut builder = AgentBuilder::new(self.model.clone())
93 .config(config)
94 .tools(tools);
95 if let Some(tx) = &self.tool_tx {
96 builder = builder.tool_tx(tx.clone());
97 }
98 let agent = builder.build();
99 self.agents.insert(name, agent);
100 }
101
102 pub fn agent(&self, name: &str) -> Option<AgentConfig> {
104 self.agents.get(name).map(|a| a.config.clone())
105 }
106
107 pub fn agents(&self) -> Vec<AgentConfig> {
109 self.agents.values().map(|a| a.config.clone()).collect()
110 }
111
112 pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
114 self.agents.get(name)
115 }
116
117 pub async fn get_or_create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
125 if !self.agents.contains_key(agent) {
126 bail!("agent '{agent}' not registered");
127 }
128
129 {
131 let sessions = self.sessions.read().await;
132 for (id, session_mutex) in sessions.iter() {
133 let s = session_mutex.lock().await;
134 if s.agent == agent && s.created_by == created_by {
135 return Ok(*id);
136 }
137 }
138 }
139
140 if let Some(path) =
142 session::find_latest_session(&crate::paths::SESSIONS_DIR, agent, created_by)
143 && let Ok((meta, messages)) = Session::load_context(&path)
144 {
145 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
146 let mut session = Session::new(id, agent, created_by);
147 session.history = messages;
148 session.title = meta.title;
149 session.uptime_secs = meta.uptime_secs;
150 session.file_path = Some(path);
151 self.sessions
152 .write()
153 .await
154 .insert(id, Arc::new(Mutex::new(session)));
155 return Ok(id);
156 }
157
158 self.create_session(agent, created_by).await
160 }
161
162 pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
164 if !self.agents.contains_key(agent) {
165 bail!("agent '{agent}' not registered");
166 }
167 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
168 let mut session = Session::new(id, agent, created_by);
169 session.init_file(&crate::paths::SESSIONS_DIR);
170 self.sessions
171 .write()
172 .await
173 .insert(id, Arc::new(Mutex::new(session)));
174 Ok(id)
175 }
176
177 pub async fn load_specific_session(&self, file_path: &std::path::Path) -> Result<u64> {
179 let (meta, messages) = Session::load_context(file_path)?;
180 if !self.agents.contains_key(&meta.agent) {
181 bail!("agent '{}' not registered", meta.agent);
182 }
183 let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
184 let mut session = Session::new(id, &meta.agent, &meta.created_by);
185 session.history = messages;
186 session.title = meta.title;
187 session.uptime_secs = meta.uptime_secs;
188 session.file_path = Some(file_path.to_path_buf());
189 self.sessions
190 .write()
191 .await
192 .insert(id, Arc::new(Mutex::new(session)));
193 Ok(id)
194 }
195
196 pub async fn close_session(&self, id: u64) -> bool {
198 self.sessions.write().await.remove(&id).is_some()
199 }
200
201 pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
203 self.sessions.read().await.get(&id).cloned()
204 }
205
206 pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
208 self.sessions.read().await.values().cloned().collect()
209 }
210
211 pub async fn is_active(&self, id: u64) -> bool {
213 self.active_sessions.read().await.contains(&id)
214 }
215
216 pub async fn active_session_count(&self) -> usize {
218 self.active_sessions.read().await.len()
219 }
220
221 pub async fn compact_session(&self, session_id: u64) -> Option<String> {
226 let (agent_name, history) = {
227 let session_mutex = self.sessions.read().await.get(&session_id)?.clone();
228 let session = session_mutex.lock().await;
229 if session.history.is_empty() {
230 return None;
231 }
232 (session.agent.clone(), session.history.clone())
233 };
234 self.agents.get(&agent_name)?.compact(&history).await
235 }
236
237 pub async fn transfer_sessions<M2: Model, H2: Hook>(&self, dest: &mut Runtime<M2, H2>) {
242 let sessions = self.sessions.read().await;
243 let dest_sessions = dest.sessions.get_mut();
244 for (id, session) in sessions.iter() {
245 dest_sessions.insert(*id, session.clone());
246 }
247 let next = self.next_session_id.load(Ordering::Relaxed);
248 dest.next_session_id.store(next, Ordering::Relaxed);
249 }
250
251 fn spawn_title_generation(&self, _session_id: u64, session_mutex: Arc<Mutex<Session>>) {
254 let model = self.model.clone();
255 tokio::spawn(async move {
256 let (user_msg, assistant_msg) = {
257 let session = session_mutex.lock().await;
258 let user = session
259 .history
260 .iter()
261 .find(|m| m.role == crate::model::Role::User && !m.auto_injected)
262 .map(|m| m.content.clone());
263 let assistant = session
264 .history
265 .iter()
266 .find(|m| m.role == crate::model::Role::Assistant)
267 .map(|m| m.content.clone());
268 (user, assistant)
269 };
270
271 let Some(user) = user_msg else { return };
272 let Some(assistant) = assistant_msg else {
273 return;
274 };
275
276 let user_snippet: String = user.chars().take(200).collect();
278 let assistant_snippet: String = assistant.chars().take(200).collect();
279
280 let prompt = format!(
281 "Summarize this conversation in 3-6 words as a short title. \
282 Return ONLY the title, nothing else.\n\n\
283 User: {user_snippet}\nAssistant: {assistant_snippet}"
284 );
285
286 let request = crate::model::Request::new(model.active_model())
287 .with_messages(vec![Message::user(&prompt)]);
288
289 match model.send(&request).await {
290 Ok(response) => {
291 if let Some(title) = response.content() {
292 let title = title.trim().trim_matches('"').to_string();
293 if !title.is_empty() {
294 let mut session = session_mutex.lock().await;
295 if session.title.is_empty() {
296 session.set_title(&title);
297 }
298 }
299 }
300 }
301 Err(e) => {
302 tracing::debug!("title generation failed: {e}");
303 }
304 }
305 });
306 }
307
308 fn prepare_history(&self, session: &mut Session, content: &str, sender: &str) -> String {
313 let content = self.hook.preprocess(&session.agent, content);
314 if sender.is_empty() {
315 session.history.push(Message::user(&content));
316 } else {
317 session
318 .history
319 .push(Message::user_with_sender(&content, sender));
320 }
321
322 session.history.retain(|m| !m.auto_injected);
324
325 let agent_name = session.agent.clone();
326 let recall_msgs = self
327 .hook
328 .on_before_run(&agent_name, session.id, &session.history);
329 if !recall_msgs.is_empty() {
330 let insert_pos = session.history.len().saturating_sub(1);
331 for (i, msg) in recall_msgs.into_iter().enumerate() {
332 session.history.insert(insert_pos + i, msg);
333 }
334 }
335 agent_name
336 }
337
338 pub async fn send_to(
340 &self,
341 session_id: u64,
342 content: &str,
343 sender: &str,
344 ) -> Result<AgentResponse> {
345 let session_mutex = self
346 .sessions
347 .read()
348 .await
349 .get(&session_id)
350 .cloned()
351 .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
352
353 let mut session = session_mutex.lock().await;
354 let pre_run_len = session.history.len();
355 let agent_name = self.prepare_history(&mut session, content, sender);
356 let agent_ref = self
357 .agents
358 .get(&session.agent)
359 .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
360
361 let (tx, mut rx) = mpsc::unbounded_channel();
362 let run_start = std::time::Instant::now();
363 self.active_sessions.write().await.insert(session_id);
364 let response = agent_ref.run(&mut session.history, tx, None).await;
365 self.active_sessions.write().await.remove(&session_id);
366 session.uptime_secs += run_start.elapsed().as_secs();
367
368 let mut compact_summary: Option<String> = None;
370 while let Ok(event) = rx.try_recv() {
371 if let AgentEvent::Compact { ref summary } = event {
372 compact_summary = Some(summary.clone());
373 }
374 self.hook.on_event(&agent_name, session_id, &event);
375 }
376
377 if let Some(summary) = compact_summary {
379 session.append_compact(&summary);
381 if session.history.len() > 1 {
383 session.append_messages(&session.history[1..]);
384 }
385 } else {
386 session.append_messages(&session.history[pre_run_len..]);
388 }
389
390 session.rewrite_meta();
392
393 if session.title.is_empty() && session.history.len() >= 2 {
395 self.spawn_title_generation(session_id, session_mutex.clone());
396 }
397 Ok(response)
398 }
399
400 pub fn stream_to(
402 &self,
403 session_id: u64,
404 content: &str,
405 sender: &str,
406 ) -> impl Stream<Item = AgentEvent> + '_ {
407 let content = content.to_owned();
408 let sender = sender.to_owned();
409 stream! {
410 let session_mutex = match self
411 .sessions
412 .read()
413 .await
414 .get(&session_id)
415 .cloned()
416 {
417 Some(m) => m,
418 None => {
419 let resp = AgentResponse {
420 final_response: None,
421 iterations: 0,
422 stop_reason: AgentStopReason::Error(
423 format!("session {session_id} not found"),
424 ),
425 steps: vec![],
426 };
427 yield AgentEvent::Done(resp);
428 return;
429 }
430 };
431
432 let mut session = session_mutex.lock().await;
433 let pre_run_len = session.history.len();
434 let agent_name = self.prepare_history(&mut session, &content, &sender);
435 let agent_ref = match self.agents.get(&session.agent) {
436 Some(a) => a,
437 None => {
438 let resp = AgentResponse {
439 final_response: None,
440 iterations: 0,
441 stop_reason: AgentStopReason::Error(
442 format!("agent '{}' not registered", session.agent),
443 ),
444 steps: vec![],
445 };
446 yield AgentEvent::Done(resp);
447 return;
448 }
449 };
450
451 let run_start = std::time::Instant::now();
452 self.active_sessions.write().await.insert(session_id);
453 let mut compact_summary: Option<String> = None;
454 let mut done_event: Option<AgentEvent> = None;
455 {
456 let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history, Some(session_id)));
457 while let Some(event) = event_stream.next().await {
458 if let AgentEvent::Compact { ref summary } = event {
459 compact_summary = Some(summary.clone());
460 }
461 self.hook.on_event(&agent_name, session_id, &event);
462 if matches!(event, AgentEvent::Done(_)) {
464 done_event = Some(event);
465 } else {
466 yield event;
467 }
468 }
469 }
470 self.active_sessions.write().await.remove(&session_id);
472 session.uptime_secs += run_start.elapsed().as_secs();
473 if let Some(summary) = compact_summary {
474 session.append_compact(&summary);
475 if session.history.len() > 1 {
476 session.append_messages(&session.history[1..]);
477 }
478 } else {
479 session.append_messages(&session.history[pre_run_len..]);
480 }
481 session.rewrite_meta();
483
484 if session.title.is_empty() && session.history.len() >= 2 {
486 self.spawn_title_generation(session_id, session_mutex.clone());
487 }
488 if let Some(event) = done_event {
490 yield event;
491 }
492 }
493 }
494}