Skip to main content

crabtalk_core/agent/
mod.rs

1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolSender`] for dispatching tool calls to the runtime. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{
11    Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12    Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30/// Extract sender from the last user message in history.
31fn last_sender(history: &[Message]) -> String {
32    history
33        .iter()
34        .rev()
35        .find(|m| m.role == Role::User)
36        .map(|m| m.sender.clone())
37        .unwrap_or_default()
38}
39
40/// An immutable agent definition.
41///
42/// Generic over `M: Model` — stores the model provider alongside config,
43/// tool schemas, and an optional sender for tool dispatch. Conversation
44/// history is owned externally and passed into execution methods.
45/// Callers drive execution via `step()` (single LLM round), `run()` (loop to
46/// completion), or `run_stream()` (yields events as a stream).
47pub struct Agent<M: Model> {
48    /// Agent configuration (name, prompt, model, limits, tool_choice).
49    pub config: AgentConfig,
50    /// The model provider for LLM calls.
51    model: M,
52    /// Tool schemas advertised to the LLM. Set once at build time.
53    tools: Vec<Tool>,
54    /// Sender for dispatching tool calls to the runtime. None = no tools.
55    tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59    /// Build a request from config state (system prompt + history + tool schemas).
60    fn build_request(&self, history: &[Message]) -> Request {
61        let model_name = self
62            .config
63            .model
64            .clone()
65            .unwrap_or_else(|| self.model.active_model());
66
67        let mut messages = Vec::with_capacity(1 + history.len());
68        if !self.config.system_prompt.is_empty() {
69            messages.push(Message::system(&self.config.system_prompt));
70        }
71        messages.extend(history.iter().cloned());
72
73        let mut request = Request::new(model_name)
74            .with_messages(messages)
75            .with_tool_choice(self.config.tool_choice.clone())
76            .with_think(self.config.thinking);
77        if !self.tools.is_empty() {
78            request = request.with_tools(self.tools.clone());
79        }
80        request
81    }
82
83    /// Perform a single LLM round: send request, dispatch tools, return step.
84    ///
85    /// Composes a [`Request`] from config state (system prompt + history +
86    /// tool schemas), calls the stored model, dispatches any tool calls via
87    /// the [`ToolSender`] channel, and appends results to history.
88    pub async fn step(
89        &self,
90        history: &mut Vec<Message>,
91        session_id: Option<u64>,
92    ) -> Result<AgentStep> {
93        let request = self.build_request(history);
94        let response = self.model.send(&request).await?;
95        let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
96
97        if let Some(msg) = response.message() {
98            history.push(msg);
99        }
100
101        let mut tool_results = Vec::new();
102        if !tool_calls.is_empty() {
103            let sender = last_sender(history);
104            for tc in &tool_calls {
105                let result = self
106                    .dispatch_tool(
107                        &tc.function.name,
108                        &tc.function.arguments,
109                        &sender,
110                        session_id,
111                    )
112                    .await;
113                let msg = Message::tool(&result, tc.id.clone());
114                history.push(msg.clone());
115                tool_results.push(msg);
116            }
117        }
118
119        Ok(AgentStep {
120            response,
121            tool_calls,
122            tool_results,
123        })
124    }
125
126    /// Dispatch a single tool call via the tool sender channel.
127    ///
128    /// Returns the result string. If no sender is configured, returns an error
129    /// message without panicking.
130    async fn dispatch_tool(
131        &self,
132        name: &str,
133        args: &str,
134        sender: &str,
135        session_id: Option<u64>,
136    ) -> String {
137        let Some(tx) = &self.tool_tx else {
138            return format!("tool '{name}' called but no tool sender configured");
139        };
140        let (reply_tx, reply_rx) = oneshot::channel();
141        let req = ToolRequest {
142            name: name.to_owned(),
143            args: args.to_owned(),
144            agent: self.config.name.to_string(),
145            reply: reply_tx,
146            task_id: None,
147            sender: sender.into(),
148            session_id,
149        };
150        if tx.send(req).is_err() {
151            return format!("tool channel closed while calling '{name}'");
152        }
153        reply_rx
154            .await
155            .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
156    }
157
158    /// Determine the stop reason for a step with no tool calls.
159    fn stop_reason(step: &AgentStep) -> AgentStopReason {
160        if step.response.content().is_some() {
161            AgentStopReason::TextResponse
162        } else {
163            AgentStopReason::NoAction
164        }
165    }
166
167    /// Run the agent loop to completion, returning the final response.
168    ///
169    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
170    /// `events`, and extracts the `Done` response.
171    pub async fn run(
172        &self,
173        history: &mut Vec<Message>,
174        events: mpsc::UnboundedSender<AgentEvent>,
175        session_id: Option<u64>,
176    ) -> AgentResponse {
177        let mut stream = std::pin::pin!(self.run_stream(history, session_id));
178        let mut response = None;
179        while let Some(event) = stream.next().await {
180            if let AgentEvent::Done(ref resp) = event {
181                response = Some(resp.clone());
182            }
183            let _ = events.send(event);
184        }
185
186        response.unwrap_or_else(|| AgentResponse {
187            final_response: None,
188            iterations: 0,
189            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
190            steps: vec![],
191        })
192    }
193
194    /// Run the agent loop as a stream of [`AgentEvent`]s.
195    ///
196    /// Uses the model's streaming API so text deltas are yielded token-by-token.
197    /// Tool call responses are dispatched after the stream completes (arguments
198    /// arrive incrementally and must be fully accumulated first).
199    pub fn run_stream<'a>(
200        &'a self,
201        history: &'a mut Vec<Message>,
202        session_id: Option<u64>,
203    ) -> impl Stream<Item = AgentEvent> + 'a {
204        stream! {
205            let mut steps = Vec::new();
206            let max = self.config.max_iterations;
207
208            for _ in 0..max {
209                let request = self.build_request(history);
210
211                // Stream from the model, yielding text deltas as they arrive.
212                let mut builder = MessageBuilder::new(Role::Assistant);
213                let mut finish_reason = None;
214                let mut last_meta = CompletionMeta::default();
215                let mut last_usage = None;
216                let mut stream_error = None;
217                let mut tool_begin_emitted = false;
218
219                {
220                    let mut chunk_stream = std::pin::pin!(self.model.stream(request));
221                    while let Some(result) = chunk_stream.next().await {
222                        match result {
223                            Ok(chunk) => {
224                                if let Some(text) = chunk.content() {
225                                    yield AgentEvent::TextDelta(text.to_owned());
226                                }
227                                if let Some(reason) = chunk.reasoning_content() {
228                                    yield AgentEvent::ThinkingDelta(reason.to_owned());
229                                }
230                                if let Some(r) = chunk.reason() {
231                                    finish_reason = Some(r.clone());
232                                }
233                                last_meta = chunk.meta.clone();
234                                if chunk.usage.is_some() {
235                                    last_usage = chunk.usage.clone();
236                                }
237                                builder.accept(&chunk);
238                                // Emit ToolCallsBegin as soon as tool names appear
239                                // in the builder, so the CLI can show markers while
240                                // args are still streaming. Uses current builder
241                                // state, which may already have partial/full args.
242                                if !tool_begin_emitted {
243                                    let calls = builder.peek_tool_calls();
244                                    if !calls.is_empty() {
245                                        tool_begin_emitted = true;
246                                        yield AgentEvent::ToolCallsBegin(calls);
247                                    }
248                                }
249                            }
250                            Err(e) => {
251                                stream_error = Some(e.to_string());
252                                break;
253                            }
254                        }
255                    }
256                }
257                if let Some(e) = stream_error {
258                    yield AgentEvent::Done(AgentResponse {
259                        final_response: None,
260                        iterations: steps.len(),
261                        stop_reason: AgentStopReason::Error(e),
262                        steps,
263                    });
264                    return;
265                }
266
267                // Build the accumulated message and response.
268                let msg = builder.build();
269                let tool_calls = msg.tool_calls.to_vec();
270                let content = if msg.content.is_empty() {
271                    None
272                } else {
273                    Some(msg.content.clone())
274                };
275
276                let response = Response {
277                    meta: last_meta,
278                    choices: vec![Choice {
279                        index: 0,
280                        delta: Delta {
281                            role: Some(Role::Assistant),
282                            content: content.clone(),
283                            reasoning_content: if msg.reasoning_content.is_empty() {
284                                None
285                            } else {
286                                Some(msg.reasoning_content.clone())
287                            },
288                            tool_calls: if tool_calls.is_empty() {
289                                None
290                            } else {
291                                Some(tool_calls.clone())
292                            },
293                        },
294                        finish_reason,
295                        logprobs: None,
296                    }],
297                    usage: last_usage.unwrap_or(Usage {
298                        prompt_tokens: 0,
299                        completion_tokens: 0,
300                        total_tokens: 0,
301                        prompt_cache_hit_tokens: None,
302                        prompt_cache_miss_tokens: None,
303                        completion_tokens_details: None,
304                    }),
305                };
306
307                history.push(msg);
308                let has_tool_calls = !tool_calls.is_empty();
309
310                // Dispatch tool calls if any.
311                //
312                // Batch the tool calls
313                let mut tool_results = Vec::new();
314                if has_tool_calls {
315                    let sender = last_sender(history);
316                    yield AgentEvent::ToolCallsStart(tool_calls.clone());
317                    for tc in &tool_calls {
318                        let tool_start = std::time::Instant::now();
319                        let result = self
320                            .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender, session_id)
321                            .await;
322                        let duration_ms = tool_start.elapsed().as_millis() as u64;
323                        let msg = Message::tool(&result, tc.id.clone());
324                        history.push(msg.clone());
325                        tool_results.push(msg);
326                        yield AgentEvent::ToolResult {
327                            call_id: tc.id.clone(),
328                            output: result,
329                            duration_ms,
330                        };
331                    }
332                    yield AgentEvent::ToolCallsComplete;
333                }
334
335                // Auto-compaction: check token estimate after each step.
336                if let Some(threshold) = self.config.compact_threshold
337                    && Self::estimate_tokens(history) > threshold
338                {
339                    if let Some(summary) = self.compact(history).await {
340                        yield AgentEvent::Compact { summary: summary.clone() };
341                        *history = vec![Message::user(&summary)];
342                        yield AgentEvent::TextDelta(
343                            "\n[context compacted]\n".to_owned(),
344                        );
345                    }
346                    continue;
347                }
348
349                let step = AgentStep {
350                    response,
351                    tool_calls,
352                    tool_results,
353                };
354
355                if !has_tool_calls {
356                    let stop_reason = Self::stop_reason(&step);
357                    steps.push(step);
358                    yield AgentEvent::Done(AgentResponse {
359                        final_response: content,
360                        iterations: steps.len(),
361                        stop_reason,
362                        steps,
363                    });
364                    return;
365                }
366
367                steps.push(step);
368            }
369
370            let final_response = steps.last().and_then(|s| s.response.content().cloned());
371            yield AgentEvent::Done(AgentResponse {
372                final_response,
373                iterations: steps.len(),
374                stop_reason: AgentStopReason::MaxIterations,
375                steps,
376            });
377        }
378    }
379}