Skip to main content

crabtalk_core/runtime/
mod.rs

1//! Runtime — agent registry, session management, and hook orchestration.
2//!
3//! [`Runtime`] holds agents as immutable definitions and sessions as
4//! per-session `Arc<Mutex<Session>>` containers. Tool schemas are registered
5//! once at startup via `hook.on_register_tools()`. Execution methods
6//! (`send_to`, `stream_to`) take a session ID, lock the session, clone the
7//! agent, and run with the session's history.
8
9use crate::{
10    Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11    agent::tool::{ToolRegistry, ToolSender},
12    model::{Message, Model},
13    runtime::hook::Hook,
14};
15use anyhow::{Result, bail};
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{
20    collections::{BTreeMap, HashSet},
21    sync::{
22        Arc,
23        atomic::{AtomicU64, Ordering},
24    },
25};
26use tokio::sync::{Mutex, RwLock, mpsc};
27
28pub mod hook;
29pub mod session;
30
31pub use session::Session;
32
33/// The crabtalk runtime — agent registry, session store, and hook orchestration.
34///
35/// Agents are stored as plain immutable values. Sessions own conversation
36/// history behind per-session `Arc<Mutex<Session>>`. The sessions map uses
37/// `RwLock` for concurrent access without requiring `&mut self`.
38pub struct Runtime<M: Model, H: Hook> {
39    pub model: M,
40    pub hook: H,
41    agents: BTreeMap<String, Agent<M>>,
42    sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
43    next_session_id: AtomicU64,
44    tools: ToolRegistry,
45    tool_tx: Option<ToolSender>,
46    active_sessions: RwLock<HashSet<u64>>,
47}
48
49impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
50    /// Create a new runtime with the given model and hook backend.
51    ///
52    /// Calls `hook.on_register_tools()` to populate the schema registry.
53    /// Pass `tool_tx` to enable tool dispatch from agents; `None` means agents
54    /// have no tool dispatch (e.g. CLI without a daemon).
55    pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
56        let mut tools = ToolRegistry::new();
57        hook.on_register_tools(&mut tools).await;
58        Self {
59            model,
60            hook,
61            agents: BTreeMap::new(),
62            sessions: RwLock::new(BTreeMap::new()),
63            next_session_id: AtomicU64::new(1),
64            tools,
65            tool_tx,
66            active_sessions: RwLock::new(HashSet::new()),
67        }
68    }
69
70    // --- Tool registry ---
71
72    /// Register a tool schema.
73    pub fn register_tool(&mut self, tool: crate::model::Tool) {
74        self.tools.insert(tool);
75    }
76
77    /// Remove a tool schema by name. Returns `true` if it existed.
78    pub fn unregister_tool(&mut self, name: &str) -> bool {
79        self.tools.remove(name)
80    }
81
82    // --- Agent registry ---
83
84    /// Register an agent from its configuration.
85    ///
86    /// Calls `hook.on_build_agent(config)` to enrich the config, then builds
87    /// the agent with a filtered schema snapshot and the runtime's `tool_tx`.
88    pub fn add_agent(&mut self, config: AgentConfig) {
89        let config = self.hook.on_build_agent(config);
90        let name = config.name.clone();
91        let tools = self.tools.filtered_snapshot(&config.tools);
92        let mut builder = AgentBuilder::new(self.model.clone())
93            .config(config)
94            .tools(tools);
95        if let Some(tx) = &self.tool_tx {
96            builder = builder.tool_tx(tx.clone());
97        }
98        let agent = builder.build();
99        self.agents.insert(name, agent);
100    }
101
102    /// Get a registered agent's config by name (cloned).
103    pub fn agent(&self, name: &str) -> Option<AgentConfig> {
104        self.agents.get(name).map(|a| a.config.clone())
105    }
106
107    /// Get all registered agent configs (cloned, alphabetical order).
108    pub fn agents(&self) -> Vec<AgentConfig> {
109        self.agents.values().map(|a| a.config.clone()).collect()
110    }
111
112    /// Get a reference to an agent by name.
113    pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
114        self.agents.get(name)
115    }
116
117    // --- Session management ---
118
119    /// Get or create a session for the given (agent, created_by) identity.
120    ///
121    /// 1. Check in-memory sessions for a match → return existing ID.
122    /// 2. Check disk for a persisted session file → load context, return ID.
123    /// 3. Neither → create a new session with a fresh file.
124    pub async fn get_or_create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
125        if !self.agents.contains_key(agent) {
126            bail!("agent '{agent}' not registered");
127        }
128
129        // 1. In-memory lookup.
130        {
131            let sessions = self.sessions.read().await;
132            for (id, session_mutex) in sessions.iter() {
133                let s = session_mutex.lock().await;
134                if s.agent == agent && s.created_by == created_by {
135                    return Ok(*id);
136                }
137            }
138        }
139
140        // 2. Disk lookup — find latest session file for this identity.
141        if let Some(path) =
142            session::find_latest_session(&crate::paths::SESSIONS_DIR, agent, created_by)
143            && let Ok((meta, messages)) = Session::load_context(&path)
144        {
145            let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
146            let mut session = Session::new(id, agent, created_by);
147            session.history = messages;
148            session.title = meta.title;
149            session.uptime_secs = meta.uptime_secs;
150            session.file_path = Some(path);
151            self.sessions
152                .write()
153                .await
154                .insert(id, Arc::new(Mutex::new(session)));
155            return Ok(id);
156        }
157
158        // 3. Create new.
159        self.create_session(agent, created_by).await
160    }
161
162    /// Create a new session for the given agent. Returns the session ID.
163    pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
164        if !self.agents.contains_key(agent) {
165            bail!("agent '{agent}' not registered");
166        }
167        let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
168        let mut session = Session::new(id, agent, created_by);
169        session.init_file(&crate::paths::SESSIONS_DIR);
170        self.sessions
171            .write()
172            .await
173            .insert(id, Arc::new(Mutex::new(session)));
174        Ok(id)
175    }
176
177    /// Load a specific session from a file path. Returns the session ID.
178    pub async fn load_specific_session(&self, file_path: &std::path::Path) -> Result<u64> {
179        let (meta, messages) = Session::load_context(file_path)?;
180        if !self.agents.contains_key(&meta.agent) {
181            bail!("agent '{}' not registered", meta.agent);
182        }
183        let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
184        let mut session = Session::new(id, &meta.agent, &meta.created_by);
185        session.history = messages;
186        session.title = meta.title;
187        session.uptime_secs = meta.uptime_secs;
188        session.file_path = Some(file_path.to_path_buf());
189        self.sessions
190            .write()
191            .await
192            .insert(id, Arc::new(Mutex::new(session)));
193        Ok(id)
194    }
195
196    /// Close (remove) a session by ID. Returns true if it existed.
197    pub async fn close_session(&self, id: u64) -> bool {
198        self.sessions.write().await.remove(&id).is_some()
199    }
200
201    /// Get a session mutex by ID.
202    pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
203        self.sessions.read().await.get(&id).cloned()
204    }
205
206    /// Get all session mutexes (for iteration/listing).
207    pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
208        self.sessions.read().await.values().cloned().collect()
209    }
210
211    /// Check if a session is currently active (running send_to or stream_to).
212    pub async fn is_active(&self, id: u64) -> bool {
213        self.active_sessions.read().await.contains(&id)
214    }
215
216    /// Number of currently active sessions.
217    pub async fn active_session_count(&self) -> usize {
218        self.active_sessions.read().await.len()
219    }
220
221    /// Compact a session's history into a concise summary.
222    ///
223    /// Clones history to release the lock before the LLM call.
224    /// Returns `None` if session/agent not found, history empty, or LLM fails.
225    pub async fn compact_session(&self, session_id: u64) -> Option<String> {
226        let (agent_name, history) = {
227            let session_mutex = self.sessions.read().await.get(&session_id)?.clone();
228            let session = session_mutex.lock().await;
229            if session.history.is_empty() {
230                return None;
231            }
232            (session.agent.clone(), session.history.clone())
233        };
234        self.agents.get(&agent_name)?.compact(&history).await
235    }
236
237    /// Move all sessions from this runtime into `dest`.
238    ///
239    /// Used during daemon reload to preserve gateway sessions. The `dest`
240    /// runtime must not yet be shared (call before wrapping in `Arc`).
241    pub async fn transfer_sessions<M2: Model, H2: Hook>(&self, dest: &mut Runtime<M2, H2>) {
242        let sessions = self.sessions.read().await;
243        let dest_sessions = dest.sessions.get_mut();
244        for (id, session) in sessions.iter() {
245            dest_sessions.insert(*id, session.clone());
246        }
247        let next = self.next_session_id.load(Ordering::Relaxed);
248        dest.next_session_id.store(next, Ordering::Relaxed);
249    }
250
251    /// Spawn a background task to generate a conversation title from the
252    /// first user+assistant exchange. Non-blocking — the main flow continues.
253    fn spawn_title_generation(&self, _session_id: u64, session_mutex: Arc<Mutex<Session>>) {
254        let model = self.model.clone();
255        tokio::spawn(async move {
256            let (user_msg, assistant_msg) = {
257                let session = session_mutex.lock().await;
258                let user = session
259                    .history
260                    .iter()
261                    .find(|m| m.role == crate::model::Role::User && !m.auto_injected)
262                    .map(|m| m.content.clone());
263                let assistant = session
264                    .history
265                    .iter()
266                    .find(|m| m.role == crate::model::Role::Assistant)
267                    .map(|m| m.content.clone());
268                (user, assistant)
269            };
270
271            let Some(user) = user_msg else { return };
272            let Some(assistant) = assistant_msg else {
273                return;
274            };
275
276            // Truncate to keep the title-generation request small.
277            let user_snippet: String = user.chars().take(200).collect();
278            let assistant_snippet: String = assistant.chars().take(200).collect();
279
280            let prompt = format!(
281                "Summarize this conversation in 3-6 words as a short title. \
282                 Return ONLY the title, nothing else.\n\n\
283                 User: {user_snippet}\nAssistant: {assistant_snippet}"
284            );
285
286            let request = crate::model::Request::new(model.active_model())
287                .with_messages(vec![Message::user(&prompt)]);
288
289            match model.send(&request).await {
290                Ok(response) => {
291                    if let Some(title) = response.content() {
292                        let title = title.trim().trim_matches('"').to_string();
293                        if !title.is_empty() {
294                            let mut session = session_mutex.lock().await;
295                            if session.title.is_empty() {
296                                session.set_title(&title);
297                            }
298                        }
299                    }
300                }
301                Err(e) => {
302                    tracing::debug!("title generation failed: {e}");
303                }
304            }
305        });
306    }
307
308    // --- Execution ---
309
310    /// Push the user message, strip old auto-injected messages, and inject
311    /// fresh ones via `on_before_run`. Returns the agent name.
312    fn prepare_history(&self, session: &mut Session, content: &str, sender: &str) -> String {
313        let content = self.hook.preprocess(&session.agent, content);
314        if sender.is_empty() {
315            session.history.push(Message::user(&content));
316        } else {
317            session
318                .history
319                .push(Message::user_with_sender(&content, sender));
320        }
321
322        // Strip previous auto-injected messages to avoid accumulation.
323        session.history.retain(|m| !m.auto_injected);
324
325        let agent_name = session.agent.clone();
326        let recall_msgs = self
327            .hook
328            .on_before_run(&agent_name, session.id, &session.history);
329        if !recall_msgs.is_empty() {
330            let insert_pos = session.history.len().saturating_sub(1);
331            for (i, msg) in recall_msgs.into_iter().enumerate() {
332                session.history.insert(insert_pos + i, msg);
333            }
334        }
335        agent_name
336    }
337
338    /// Send a message to a session and run to completion.
339    pub async fn send_to(
340        &self,
341        session_id: u64,
342        content: &str,
343        sender: &str,
344    ) -> Result<AgentResponse> {
345        let session_mutex = self
346            .sessions
347            .read()
348            .await
349            .get(&session_id)
350            .cloned()
351            .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
352
353        let mut session = session_mutex.lock().await;
354        let pre_run_len = session.history.len();
355        let agent_name = self.prepare_history(&mut session, content, sender);
356        let agent_ref = self
357            .agents
358            .get(&session.agent)
359            .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
360
361        let (tx, mut rx) = mpsc::unbounded_channel();
362        let run_start = std::time::Instant::now();
363        self.active_sessions.write().await.insert(session_id);
364        let response = agent_ref.run(&mut session.history, tx, None).await;
365        self.active_sessions.write().await.remove(&session_id);
366        session.uptime_secs += run_start.elapsed().as_secs();
367
368        // Drain events, stash compact summary if one occurred.
369        let mut compact_summary: Option<String> = None;
370        while let Ok(event) = rx.try_recv() {
371            if let AgentEvent::Compact { ref summary } = event {
372                compact_summary = Some(summary.clone());
373            }
374            self.hook.on_event(&agent_name, session_id, &event);
375        }
376
377        // Append-only persistence.
378        if let Some(summary) = compact_summary {
379            // Compaction happened: append compact marker + post-compact messages.
380            session.append_compact(&summary);
381            // history[0] is the summary-as-user-message; skip it (compact line serves that role).
382            if session.history.len() > 1 {
383                session.append_messages(&session.history[1..]);
384            }
385        } else {
386            // No compaction: append new messages since pre_run.
387            session.append_messages(&session.history[pre_run_len..]);
388        }
389
390        // Persist updated uptime to meta line.
391        session.rewrite_meta();
392
393        // Generate title in background if this is the first exchange.
394        if session.title.is_empty() && session.history.len() >= 2 {
395            self.spawn_title_generation(session_id, session_mutex.clone());
396        }
397        Ok(response)
398    }
399
400    /// Send a message to a session and stream response events.
401    pub fn stream_to(
402        &self,
403        session_id: u64,
404        content: &str,
405        sender: &str,
406    ) -> impl Stream<Item = AgentEvent> + '_ {
407        let content = content.to_owned();
408        let sender = sender.to_owned();
409        stream! {
410            let session_mutex = match self
411                .sessions
412                .read()
413                .await
414                .get(&session_id)
415                .cloned()
416            {
417                Some(m) => m,
418                None => {
419                    let resp = AgentResponse {
420                        final_response: None,
421                        iterations: 0,
422                        stop_reason: AgentStopReason::Error(
423                            format!("session {session_id} not found"),
424                        ),
425                        steps: vec![],
426                    };
427                    yield AgentEvent::Done(resp);
428                    return;
429                }
430            };
431
432            let mut session = session_mutex.lock().await;
433            let pre_run_len = session.history.len();
434            let agent_name = self.prepare_history(&mut session, &content, &sender);
435            let agent_ref = match self.agents.get(&session.agent) {
436                Some(a) => a,
437                None => {
438                    let resp = AgentResponse {
439                        final_response: None,
440                        iterations: 0,
441                        stop_reason: AgentStopReason::Error(
442                            format!("agent '{}' not registered", session.agent),
443                        ),
444                        steps: vec![],
445                    };
446                    yield AgentEvent::Done(resp);
447                    return;
448                }
449            };
450
451            let run_start = std::time::Instant::now();
452            self.active_sessions.write().await.insert(session_id);
453            let mut compact_summary: Option<String> = None;
454            let mut done_event: Option<AgentEvent> = None;
455            {
456                let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history, Some(session_id)));
457                while let Some(event) = event_stream.next().await {
458                    if let AgentEvent::Compact { ref summary } = event {
459                        compact_summary = Some(summary.clone());
460                    }
461                    self.hook.on_event(&agent_name, session_id, &event);
462                    // Hold back Done — yield it after persistence.
463                    if matches!(event, AgentEvent::Done(_)) {
464                        done_event = Some(event);
465                    } else {
466                        yield event;
467                    }
468                }
469            }
470            // Borrow on session.history is released. Persist now.
471            self.active_sessions.write().await.remove(&session_id);
472            session.uptime_secs += run_start.elapsed().as_secs();
473            if let Some(summary) = compact_summary {
474                session.append_compact(&summary);
475                if session.history.len() > 1 {
476                    session.append_messages(&session.history[1..]);
477                }
478            } else {
479                session.append_messages(&session.history[pre_run_len..]);
480            }
481            // Persist updated uptime to meta line.
482            session.rewrite_meta();
483
484            // Generate title in background if this is the first exchange.
485            if session.title.is_empty() && session.history.len() >= 2 {
486                self.spawn_title_generation(session_id, session_mutex.clone());
487            }
488            // Now yield Done.
489            if let Some(event) = done_event {
490                yield event;
491            }
492        }
493    }
494}