Skip to main content

crabtalk_core/runtime/
mod.rs

1//! Runtime — agent registry, session management, and hook orchestration.
2//!
3//! [`Runtime`] holds agents as immutable definitions and sessions as
4//! per-session `Arc<Mutex<Session>>` containers. Tool schemas are registered
5//! once at startup via `hook.on_register_tools()`. Execution methods
6//! (`send_to`, `stream_to`) take a session ID, lock the session, clone the
7//! agent, and run with the session's history.
8
9use crate::{
10    Agent, AgentBuilder, AgentConfig, AgentEvent, AgentResponse, AgentStopReason,
11    agent::tool::{ToolRegistry, ToolSender},
12    model::{Message, Model},
13    runtime::hook::Hook,
14};
15use anyhow::{Result, bail};
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::StreamExt;
19use std::{
20    collections::{BTreeMap, HashSet},
21    sync::{
22        Arc,
23        atomic::{AtomicU64, Ordering},
24    },
25};
26use tokio::sync::{Mutex, RwLock, mpsc};
27
28pub mod hook;
29pub mod session;
30
31pub use session::Session;
32
33/// The crabtalk runtime — agent registry, session store, and hook orchestration.
34///
35/// Agents are stored as plain immutable values. Sessions own conversation
36/// history behind per-session `Arc<Mutex<Session>>`. The sessions map uses
37/// `RwLock` for concurrent access without requiring `&mut self`.
38pub struct Runtime<M: Model, H: Hook> {
39    pub model: M,
40    pub hook: H,
41    agents: BTreeMap<String, Agent<M>>,
42    sessions: RwLock<BTreeMap<u64, Arc<Mutex<Session>>>>,
43    next_session_id: AtomicU64,
44    tools: ToolRegistry,
45    tool_tx: Option<ToolSender>,
46    active_sessions: RwLock<HashSet<u64>>,
47}
48
49impl<M: Model + Send + Sync + Clone + 'static, H: Hook + 'static> Runtime<M, H> {
50    /// Create a new runtime with the given model and hook backend.
51    ///
52    /// Calls `hook.on_register_tools()` to populate the schema registry.
53    /// Pass `tool_tx` to enable tool dispatch from agents; `None` means agents
54    /// have no tool dispatch (e.g. CLI without a daemon).
55    pub async fn new(model: M, hook: H, tool_tx: Option<ToolSender>) -> Self {
56        let mut tools = ToolRegistry::new();
57        hook.on_register_tools(&mut tools).await;
58        Self {
59            model,
60            hook,
61            agents: BTreeMap::new(),
62            sessions: RwLock::new(BTreeMap::new()),
63            next_session_id: AtomicU64::new(1),
64            tools,
65            tool_tx,
66            active_sessions: RwLock::new(HashSet::new()),
67        }
68    }
69
70    // --- Tool registry ---
71
72    /// Register a tool schema.
73    pub fn register_tool(&mut self, tool: crate::model::Tool) {
74        self.tools.insert(tool);
75    }
76
77    /// Remove a tool schema by name. Returns `true` if it existed.
78    pub fn unregister_tool(&mut self, name: &str) -> bool {
79        self.tools.remove(name)
80    }
81
82    // --- Agent registry ---
83
84    /// Register an agent from its configuration.
85    ///
86    /// Calls `hook.on_build_agent(config)` to enrich the config, then builds
87    /// the agent with a filtered schema snapshot and the runtime's `tool_tx`.
88    pub fn add_agent(&mut self, config: AgentConfig) {
89        let config = self.hook.on_build_agent(config);
90        let name = config.name.clone();
91        let tools = self.tools.filtered_snapshot(&config.tools);
92        let mut builder = AgentBuilder::new(self.model.clone())
93            .config(config)
94            .tools(tools);
95        if let Some(tx) = &self.tool_tx {
96            builder = builder.tool_tx(tx.clone());
97        }
98        let agent = builder.build();
99        self.agents.insert(name, agent);
100    }
101
102    /// Get a registered agent's config by name (cloned).
103    pub fn agent(&self, name: &str) -> Option<AgentConfig> {
104        self.agents.get(name).map(|a| a.config.clone())
105    }
106
107    /// Get all registered agent configs (cloned, alphabetical order).
108    pub fn agents(&self) -> Vec<AgentConfig> {
109        self.agents.values().map(|a| a.config.clone()).collect()
110    }
111
112    /// Get a reference to an agent by name.
113    pub fn get_agent(&self, name: &str) -> Option<&Agent<M>> {
114        self.agents.get(name)
115    }
116
117    // --- Session management ---
118
119    /// Create a new session for the given agent. Returns the session ID.
120    pub async fn create_session(&self, agent: &str, created_by: &str) -> Result<u64> {
121        if !self.agents.contains_key(agent) {
122            bail!("agent '{agent}' not registered");
123        }
124        let id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
125        let mut session = Session::new(id, agent, created_by);
126        session.init_file(&crate::paths::SESSIONS_DIR);
127        self.sessions
128            .write()
129            .await
130            .insert(id, Arc::new(Mutex::new(session)));
131        Ok(id)
132    }
133
134    /// Close (remove) a session by ID. Returns true if it existed.
135    pub async fn close_session(&self, id: u64) -> bool {
136        self.sessions.write().await.remove(&id).is_some()
137    }
138
139    /// Get a session mutex by ID.
140    pub async fn session(&self, id: u64) -> Option<Arc<Mutex<Session>>> {
141        self.sessions.read().await.get(&id).cloned()
142    }
143
144    /// Get all session mutexes (for iteration/listing).
145    pub async fn sessions(&self) -> Vec<Arc<Mutex<Session>>> {
146        self.sessions.read().await.values().cloned().collect()
147    }
148
149    /// Check if a session is currently active (running send_to or stream_to).
150    pub async fn is_active(&self, id: u64) -> bool {
151        self.active_sessions.read().await.contains(&id)
152    }
153
154    // --- Execution ---
155
156    /// Push the user message, strip old auto-injected messages, and inject
157    /// fresh ones via `on_before_run`. Returns the agent name.
158    fn prepare_history(&self, session: &mut Session, content: &str, sender: &str) -> String {
159        let content = self.hook.preprocess(&session.agent, content);
160        if sender.is_empty() {
161            session.history.push(Message::user(&content));
162        } else {
163            session
164                .history
165                .push(Message::user_with_sender(&content, sender));
166        }
167
168        // Strip previous auto-injected messages to avoid accumulation.
169        session.history.retain(|m| !m.auto_injected);
170
171        let agent_name = session.agent.clone();
172        let recall_msgs = self
173            .hook
174            .on_before_run(&agent_name, session.id, &session.history);
175        if !recall_msgs.is_empty() {
176            let insert_pos = session.history.len().saturating_sub(1);
177            for (i, msg) in recall_msgs.into_iter().enumerate() {
178                session.history.insert(insert_pos + i, msg);
179            }
180        }
181        agent_name
182    }
183
184    /// Send a message to a session and run to completion.
185    pub async fn send_to(
186        &self,
187        session_id: u64,
188        content: &str,
189        sender: &str,
190    ) -> Result<AgentResponse> {
191        let session_mutex = self
192            .sessions
193            .read()
194            .await
195            .get(&session_id)
196            .cloned()
197            .ok_or_else(|| anyhow::anyhow!("session {session_id} not found"))?;
198
199        let mut session = session_mutex.lock().await;
200        let agent_name = self.prepare_history(&mut session, content, sender);
201        let agent_ref = self
202            .agents
203            .get(&session.agent)
204            .ok_or_else(|| anyhow::anyhow!("agent '{}' not registered", session.agent))?;
205
206        let (tx, mut rx) = mpsc::unbounded_channel();
207        self.active_sessions.write().await.insert(session_id);
208        let response = agent_ref.run(&mut session.history, tx, None).await;
209        self.active_sessions.write().await.remove(&session_id);
210
211        while let Ok(event) = rx.try_recv() {
212            self.hook.on_event(&agent_name, session_id, &event);
213        }
214
215        session.persist();
216        Ok(response)
217    }
218
219    /// Send a message to a session and stream response events.
220    pub fn stream_to(
221        &self,
222        session_id: u64,
223        content: &str,
224        sender: &str,
225    ) -> impl Stream<Item = AgentEvent> + '_ {
226        let content = content.to_owned();
227        let sender = sender.to_owned();
228        stream! {
229            let session_mutex = match self
230                .sessions
231                .read()
232                .await
233                .get(&session_id)
234                .cloned()
235            {
236                Some(m) => m,
237                None => {
238                    let resp = AgentResponse {
239                        final_response: None,
240                        iterations: 0,
241                        stop_reason: AgentStopReason::Error(
242                            format!("session {session_id} not found"),
243                        ),
244                        steps: vec![],
245                    };
246                    yield AgentEvent::Done(resp);
247                    return;
248                }
249            };
250
251            let mut session = session_mutex.lock().await;
252            let agent_name = self.prepare_history(&mut session, &content, &sender);
253            let agent_ref = match self.agents.get(&session.agent) {
254                Some(a) => a,
255                None => {
256                    let resp = AgentResponse {
257                        final_response: None,
258                        iterations: 0,
259                        stop_reason: AgentStopReason::Error(
260                            format!("agent '{}' not registered", session.agent),
261                        ),
262                        steps: vec![],
263                    };
264                    yield AgentEvent::Done(resp);
265                    return;
266                }
267            };
268
269            self.active_sessions.write().await.insert(session_id);
270            {
271                let mut event_stream = std::pin::pin!(agent_ref.run_stream(&mut session.history, Some(session_id)));
272                while let Some(event) = event_stream.next().await {
273                    self.hook.on_event(&agent_name, session_id, &event);
274                    yield event;
275                }
276            }
277            self.active_sessions.write().await.remove(&session_id);
278            session.persist();
279        }
280    }
281}