Skip to main content

crabtalk_core/agent/
mod.rs

1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolSender`] for dispatching tool calls to the runtime. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{
11    Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12    Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30/// Extract sender from the last user message in history.
31fn last_sender(history: &[Message]) -> String {
32    history
33        .iter()
34        .rev()
35        .find(|m| m.role == Role::User)
36        .map(|m| m.sender.clone())
37        .unwrap_or_default()
38}
39
40/// An immutable agent definition.
41///
42/// Generic over `M: Model` — stores the model provider alongside config,
43/// tool schemas, and an optional sender for tool dispatch. Conversation
44/// history is owned externally and passed into execution methods.
45/// Callers drive execution via `step()` (single LLM round), `run()` (loop to
46/// completion), or `run_stream()` (yields events as a stream).
47pub struct Agent<M: Model> {
48    /// Agent configuration (name, prompt, model, limits, tool_choice).
49    pub config: AgentConfig,
50    /// The model provider for LLM calls.
51    model: M,
52    /// Tool schemas advertised to the LLM. Set once at build time.
53    tools: Vec<Tool>,
54    /// Sender for dispatching tool calls to the runtime. None = no tools.
55    tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59    /// Build a request from config state (system prompt + history + tool schemas).
60    fn build_request(&self, history: &[Message]) -> Request {
61        let model_name = self
62            .config
63            .model
64            .clone()
65            .unwrap_or_else(|| self.model.active_model());
66
67        let mut messages = Vec::with_capacity(1 + history.len());
68        if !self.config.system_prompt.is_empty() {
69            messages.push(Message::system(&self.config.system_prompt));
70        }
71        messages.extend(history.iter().cloned());
72
73        let mut request = Request::new(model_name)
74            .with_messages(messages)
75            .with_tool_choice(self.config.tool_choice.clone())
76            .with_think(self.config.thinking);
77        if !self.tools.is_empty() {
78            request = request.with_tools(self.tools.clone());
79        }
80        request
81    }
82
83    /// Perform a single LLM round: send request, dispatch tools, return step.
84    ///
85    /// Composes a [`Request`] from config state (system prompt + history +
86    /// tool schemas), calls the stored model, dispatches any tool calls via
87    /// the [`ToolSender`] channel, and appends results to history.
88    pub async fn step(
89        &self,
90        history: &mut Vec<Message>,
91        session_id: Option<u64>,
92    ) -> Result<AgentStep> {
93        let request = self.build_request(history);
94        let response = self.model.send(&request).await?;
95        let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
96
97        if let Some(msg) = response.message() {
98            history.push(msg);
99        }
100
101        let mut tool_results = Vec::new();
102        if !tool_calls.is_empty() {
103            let sender = last_sender(history);
104            for tc in &tool_calls {
105                let result = self
106                    .dispatch_tool(
107                        &tc.function.name,
108                        &tc.function.arguments,
109                        &sender,
110                        session_id,
111                    )
112                    .await;
113                let msg = Message::tool(&result, tc.id.clone());
114                history.push(msg.clone());
115                tool_results.push(msg);
116            }
117        }
118
119        Ok(AgentStep {
120            response,
121            tool_calls,
122            tool_results,
123        })
124    }
125
126    /// Dispatch a single tool call via the tool sender channel.
127    ///
128    /// Returns the result string. If no sender is configured, returns an error
129    /// message without panicking.
130    async fn dispatch_tool(
131        &self,
132        name: &str,
133        args: &str,
134        sender: &str,
135        session_id: Option<u64>,
136    ) -> String {
137        let Some(tx) = &self.tool_tx else {
138            return format!("tool '{name}' called but no tool sender configured");
139        };
140        let (reply_tx, reply_rx) = oneshot::channel();
141        let req = ToolRequest {
142            name: name.to_owned(),
143            args: args.to_owned(),
144            agent: self.config.name.to_string(),
145            reply: reply_tx,
146            task_id: None,
147            sender: sender.into(),
148            session_id,
149        };
150        if tx.send(req).is_err() {
151            return format!("tool channel closed while calling '{name}'");
152        }
153        reply_rx
154            .await
155            .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
156    }
157
158    /// Determine the stop reason for a step with no tool calls.
159    fn stop_reason(step: &AgentStep) -> AgentStopReason {
160        if step.response.content().is_some() {
161            AgentStopReason::TextResponse
162        } else {
163            AgentStopReason::NoAction
164        }
165    }
166
167    /// Run the agent loop to completion, returning the final response.
168    ///
169    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
170    /// `events`, and extracts the `Done` response.
171    pub async fn run(
172        &self,
173        history: &mut Vec<Message>,
174        events: mpsc::UnboundedSender<AgentEvent>,
175        session_id: Option<u64>,
176    ) -> AgentResponse {
177        let mut stream = std::pin::pin!(self.run_stream(history, session_id));
178        let mut response = None;
179        while let Some(event) = stream.next().await {
180            if let AgentEvent::Done(ref resp) = event {
181                response = Some(resp.clone());
182            }
183            let _ = events.send(event);
184        }
185
186        response.unwrap_or_else(|| AgentResponse {
187            final_response: None,
188            iterations: 0,
189            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
190            steps: vec![],
191        })
192    }
193
194    /// Run the agent loop as a stream of [`AgentEvent`]s.
195    ///
196    /// Uses the model's streaming API so text deltas are yielded token-by-token.
197    /// Tool call responses are dispatched after the stream completes (arguments
198    /// arrive incrementally and must be fully accumulated first).
199    pub fn run_stream<'a>(
200        &'a self,
201        history: &'a mut Vec<Message>,
202        session_id: Option<u64>,
203    ) -> impl Stream<Item = AgentEvent> + 'a {
204        stream! {
205            let mut steps = Vec::new();
206            let max = self.config.max_iterations;
207
208            for _ in 0..max {
209                let request = self.build_request(history);
210
211                // Stream from the model, yielding text deltas as they arrive.
212                let mut builder = MessageBuilder::new(Role::Assistant);
213                let mut finish_reason = None;
214                let mut last_meta = CompletionMeta::default();
215                let mut last_usage = None;
216                let mut stream_error = None;
217
218                {
219                    let mut chunk_stream = std::pin::pin!(self.model.stream(request));
220                    while let Some(result) = chunk_stream.next().await {
221                        match result {
222                            Ok(chunk) => {
223                                if let Some(text) = chunk.content() {
224                                    yield AgentEvent::TextDelta(text.to_owned());
225                                }
226                                if let Some(reason) = chunk.reasoning_content() {
227                                    yield AgentEvent::ThinkingDelta(reason.to_owned());
228                                }
229                                if let Some(r) = chunk.reason() {
230                                    finish_reason = Some(r.clone());
231                                }
232                                last_meta = chunk.meta.clone();
233                                if chunk.usage.is_some() {
234                                    last_usage = chunk.usage.clone();
235                                }
236                                builder.accept(&chunk);
237                            }
238                            Err(e) => {
239                                stream_error = Some(e.to_string());
240                                break;
241                            }
242                        }
243                    }
244                }
245                if let Some(e) = stream_error {
246                    yield AgentEvent::Done(AgentResponse {
247                        final_response: None,
248                        iterations: steps.len(),
249                        stop_reason: AgentStopReason::Error(e),
250                        steps,
251                    });
252                    return;
253                }
254
255                // Build the accumulated message and response.
256                let msg = builder.build();
257                let tool_calls = msg.tool_calls.to_vec();
258                let content = if msg.content.is_empty() {
259                    None
260                } else {
261                    Some(msg.content.clone())
262                };
263
264                let response = Response {
265                    meta: last_meta,
266                    choices: vec![Choice {
267                        index: 0,
268                        delta: Delta {
269                            role: Some(Role::Assistant),
270                            content: content.clone(),
271                            reasoning_content: if msg.reasoning_content.is_empty() {
272                                None
273                            } else {
274                                Some(msg.reasoning_content.clone())
275                            },
276                            tool_calls: if tool_calls.is_empty() {
277                                None
278                            } else {
279                                Some(tool_calls.clone())
280                            },
281                        },
282                        finish_reason,
283                        logprobs: None,
284                    }],
285                    usage: last_usage.unwrap_or(Usage {
286                        prompt_tokens: 0,
287                        completion_tokens: 0,
288                        total_tokens: 0,
289                        prompt_cache_hit_tokens: None,
290                        prompt_cache_miss_tokens: None,
291                        completion_tokens_details: None,
292                    }),
293                };
294
295                history.push(msg);
296                let has_tool_calls = !tool_calls.is_empty();
297
298                // Dispatch tool calls if any.
299                let mut tool_results = Vec::new();
300                if has_tool_calls {
301                    let sender = last_sender(history);
302                    yield AgentEvent::ToolCallsStart(tool_calls.clone());
303                    for tc in &tool_calls {
304                        let result = self
305                            .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender, session_id)
306                            .await;
307                        let msg = Message::tool(&result, tc.id.clone());
308                        history.push(msg.clone());
309                        tool_results.push(msg);
310                        yield AgentEvent::ToolResult {
311                            call_id: tc.id.clone(),
312                            output: result,
313                        };
314                    }
315                    yield AgentEvent::ToolCallsComplete;
316                }
317
318                // Auto-compaction: check token estimate after each step.
319                if let Some(threshold) = self.config.compact_threshold
320                    && Self::estimate_tokens(history) > threshold
321                {
322                    if let Some(summary) = self.compact(history).await {
323                        yield AgentEvent::Compact { summary: summary.clone() };
324                        *history = vec![Message::user(&summary)];
325                        yield AgentEvent::TextDelta(
326                            "\n[context compacted]\n".to_owned(),
327                        );
328                    }
329                    continue;
330                }
331
332                let step = AgentStep {
333                    response,
334                    tool_calls,
335                    tool_results,
336                };
337
338                if !has_tool_calls {
339                    let stop_reason = Self::stop_reason(&step);
340                    steps.push(step);
341                    yield AgentEvent::Done(AgentResponse {
342                        final_response: content,
343                        iterations: steps.len(),
344                        stop_reason,
345                        steps,
346                    });
347                    return;
348                }
349
350                steps.push(step);
351            }
352
353            let final_response = steps.last().and_then(|s| s.response.content().cloned());
354            yield AgentEvent::Done(AgentResponse {
355                final_response,
356                iterations: steps.len(),
357                stop_reason: AgentStopReason::MaxIterations,
358                steps,
359            });
360        }
361    }
362}