Skip to main content

walrus_core/agent/
mod.rs

1//! Immutable agent definition and execution methods.
2//!
3//! [`Agent`] owns its configuration, model, tool schemas, and an optional
4//! [`ToolSender`] for dispatching tool calls to the runtime. Conversation
5//! history is passed in externally — the agent itself is stateless.
6//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
7//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
8//! `run()` collects its events and returns the final response.
9
10use crate::model::{
11    Choice, CompletionMeta, Delta, Message, MessageBuilder, Model, Request, Response, Role, Tool,
12    Usage,
13};
14use anyhow::Result;
15use async_stream::stream;
16pub use builder::AgentBuilder;
17pub use config::AgentConfig;
18use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
19use futures_core::Stream;
20use futures_util::StreamExt;
21use tokio::sync::{mpsc, oneshot};
22pub use tool::{AsTool, ToolDescription, ToolRequest, ToolSender};
23
24mod builder;
25mod compact;
26pub mod config;
27pub mod event;
28pub mod tool;
29
30/// Extract sender from the last user message in history.
31fn last_sender(history: &[Message]) -> String {
32    history
33        .iter()
34        .rev()
35        .find(|m| m.role == Role::User)
36        .map(|m| m.sender.clone())
37        .unwrap_or_default()
38}
39
40/// An immutable agent definition.
41///
42/// Generic over `M: Model` — stores the model provider alongside config,
43/// tool schemas, and an optional sender for tool dispatch. Conversation
44/// history is owned externally and passed into execution methods.
45/// Callers drive execution via `step()` (single LLM round), `run()` (loop to
46/// completion), or `run_stream()` (yields events as a stream).
47pub struct Agent<M: Model> {
48    /// Agent configuration (name, prompt, model, limits, tool_choice).
49    pub config: AgentConfig,
50    /// The model provider for LLM calls.
51    model: M,
52    /// Tool schemas advertised to the LLM. Set once at build time.
53    tools: Vec<Tool>,
54    /// Sender for dispatching tool calls to the runtime. None = no tools.
55    tool_tx: Option<ToolSender>,
56}
57
58impl<M: Model> Agent<M> {
59    /// Build a request from config state (system prompt + history + tool schemas).
60    fn build_request(&self, history: &[Message]) -> Request {
61        let model_name = self
62            .config
63            .model
64            .clone()
65            .unwrap_or_else(|| self.model.active_model());
66
67        let mut messages = Vec::with_capacity(1 + history.len());
68        if !self.config.system_prompt.is_empty() {
69            messages.push(Message::system(&self.config.system_prompt));
70        }
71        messages.extend(history.iter().cloned());
72
73        let mut request = Request::new(model_name)
74            .with_messages(messages)
75            .with_tool_choice(self.config.tool_choice.clone())
76            .with_think(self.config.thinking);
77        if !self.tools.is_empty() {
78            request = request.with_tools(self.tools.clone());
79        }
80        request
81    }
82
83    /// Perform a single LLM round: send request, dispatch tools, return step.
84    ///
85    /// Composes a [`Request`] from config state (system prompt + history +
86    /// tool schemas), calls the stored model, dispatches any tool calls via
87    /// the [`ToolSender`] channel, and appends results to history.
88    pub async fn step(&self, history: &mut Vec<Message>) -> Result<AgentStep> {
89        let request = self.build_request(history);
90        let response = self.model.send(&request).await?;
91        let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
92
93        if let Some(msg) = response.message() {
94            history.push(msg);
95        }
96
97        let mut tool_results = Vec::new();
98        if !tool_calls.is_empty() {
99            let sender = last_sender(history);
100            for tc in &tool_calls {
101                let result = self
102                    .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
103                    .await;
104                let msg = Message::tool(&result, tc.id.clone());
105                history.push(msg.clone());
106                tool_results.push(msg);
107            }
108        }
109
110        Ok(AgentStep {
111            response,
112            tool_calls,
113            tool_results,
114        })
115    }
116
117    /// Dispatch a single tool call via the tool sender channel.
118    ///
119    /// Returns the result string. If no sender is configured, returns an error
120    /// message without panicking.
121    async fn dispatch_tool(&self, name: &str, args: &str, sender: &str) -> String {
122        let Some(tx) = &self.tool_tx else {
123            return format!("tool '{name}' called but no tool sender configured");
124        };
125        let (reply_tx, reply_rx) = oneshot::channel();
126        let req = ToolRequest {
127            name: name.to_owned(),
128            args: args.to_owned(),
129            agent: self.config.name.to_string(),
130            reply: reply_tx,
131            task_id: None,
132            sender: sender.into(),
133        };
134        if tx.send(req).is_err() {
135            return format!("tool channel closed while calling '{name}'");
136        }
137        reply_rx
138            .await
139            .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
140    }
141
142    /// Determine the stop reason for a step with no tool calls.
143    fn stop_reason(step: &AgentStep) -> AgentStopReason {
144        if step.response.content().is_some() {
145            AgentStopReason::TextResponse
146        } else {
147            AgentStopReason::NoAction
148        }
149    }
150
151    /// Run the agent loop to completion, returning the final response.
152    ///
153    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
154    /// `events`, and extracts the `Done` response.
155    pub async fn run(
156        &self,
157        history: &mut Vec<Message>,
158        events: mpsc::UnboundedSender<AgentEvent>,
159    ) -> AgentResponse {
160        let mut stream = std::pin::pin!(self.run_stream(history));
161        let mut response = None;
162        while let Some(event) = stream.next().await {
163            if let AgentEvent::Done(ref resp) = event {
164                response = Some(resp.clone());
165            }
166            let _ = events.send(event);
167        }
168
169        response.unwrap_or_else(|| AgentResponse {
170            final_response: None,
171            iterations: 0,
172            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
173            steps: vec![],
174        })
175    }
176
177    /// Run the agent loop as a stream of [`AgentEvent`]s.
178    ///
179    /// Uses the model's streaming API so text deltas are yielded token-by-token.
180    /// Tool call responses are dispatched after the stream completes (arguments
181    /// arrive incrementally and must be fully accumulated first).
182    pub fn run_stream<'a>(
183        &'a self,
184        history: &'a mut Vec<Message>,
185    ) -> impl Stream<Item = AgentEvent> + 'a {
186        stream! {
187            let mut steps = Vec::new();
188            let max = self.config.max_iterations;
189
190            for _ in 0..max {
191                let request = self.build_request(history);
192
193                // Stream from the model, yielding text deltas as they arrive.
194                let mut builder = MessageBuilder::new(Role::Assistant);
195                let mut finish_reason = None;
196                let mut last_meta = CompletionMeta::default();
197                let mut last_usage = None;
198                let mut stream_error = None;
199
200                {
201                    let mut chunk_stream = std::pin::pin!(self.model.stream(request));
202                    while let Some(result) = chunk_stream.next().await {
203                        match result {
204                            Ok(chunk) => {
205                                if let Some(text) = chunk.content() {
206                                    yield AgentEvent::TextDelta(text.to_owned());
207                                }
208                                if let Some(reason) = chunk.reasoning_content() {
209                                    yield AgentEvent::ThinkingDelta(reason.to_owned());
210                                }
211                                if let Some(r) = chunk.reason() {
212                                    finish_reason = Some(r.clone());
213                                }
214                                last_meta = chunk.meta.clone();
215                                if chunk.usage.is_some() {
216                                    last_usage = chunk.usage.clone();
217                                }
218                                builder.accept(&chunk);
219                            }
220                            Err(e) => {
221                                stream_error = Some(e.to_string());
222                                break;
223                            }
224                        }
225                    }
226                }
227                if let Some(e) = stream_error {
228                    yield AgentEvent::Done(AgentResponse {
229                        final_response: None,
230                        iterations: steps.len(),
231                        stop_reason: AgentStopReason::Error(e),
232                        steps,
233                    });
234                    return;
235                }
236
237                // Build the accumulated message and response.
238                let msg = builder.build();
239                let tool_calls = msg.tool_calls.to_vec();
240                let content = if msg.content.is_empty() {
241                    None
242                } else {
243                    Some(msg.content.clone())
244                };
245
246                let response = Response {
247                    meta: last_meta,
248                    choices: vec![Choice {
249                        index: 0,
250                        delta: Delta {
251                            role: Some(Role::Assistant),
252                            content: content.clone(),
253                            reasoning_content: if msg.reasoning_content.is_empty() {
254                                None
255                            } else {
256                                Some(msg.reasoning_content.clone())
257                            },
258                            tool_calls: if tool_calls.is_empty() {
259                                None
260                            } else {
261                                Some(tool_calls.clone())
262                            },
263                        },
264                        finish_reason,
265                        logprobs: None,
266                    }],
267                    usage: last_usage.unwrap_or(Usage {
268                        prompt_tokens: 0,
269                        completion_tokens: 0,
270                        total_tokens: 0,
271                        prompt_cache_hit_tokens: None,
272                        prompt_cache_miss_tokens: None,
273                        completion_tokens_details: None,
274                    }),
275                };
276
277                history.push(msg);
278                let has_tool_calls = !tool_calls.is_empty();
279
280                // Dispatch tool calls if any.
281                let mut tool_results = Vec::new();
282                if has_tool_calls {
283                    let sender = last_sender(history);
284                    yield AgentEvent::ToolCallsStart(tool_calls.clone());
285                    for tc in &tool_calls {
286                        let result = self
287                            .dispatch_tool(&tc.function.name, &tc.function.arguments, &sender)
288                            .await;
289                        let msg = Message::tool(&result, tc.id.clone());
290                        history.push(msg.clone());
291                        tool_results.push(msg);
292                        yield AgentEvent::ToolResult {
293                            call_id: tc.id.clone(),
294                            output: result,
295                        };
296                    }
297                    yield AgentEvent::ToolCallsComplete;
298                }
299
300                // Auto-compaction: check token estimate after each step.
301                if let Some(threshold) = self.config.compact_threshold
302                    && Self::estimate_tokens(history) > threshold
303                {
304                    if let Some(summary) = self.compact(history).await {
305                        yield AgentEvent::Compact { summary: summary.clone() };
306                        *history = vec![Message::user(&summary)];
307                        yield AgentEvent::TextDelta(
308                            "\n[context compacted]\n".to_owned(),
309                        );
310                    }
311                    continue;
312                }
313
314                let step = AgentStep {
315                    response,
316                    tool_calls,
317                    tool_results,
318                };
319
320                if !has_tool_calls {
321                    let stop_reason = Self::stop_reason(&step);
322                    steps.push(step);
323                    yield AgentEvent::Done(AgentResponse {
324                        final_response: content,
325                        iterations: steps.len(),
326                        stop_reason,
327                        steps,
328                    });
329                    return;
330                }
331
332                steps.push(step);
333            }
334
335            let final_response = steps.last().and_then(|s| s.response.content().cloned());
336            yield AgentEvent::Done(AgentResponse {
337                final_response,
338                iterations: steps.len(),
339                stop_reason: AgentStopReason::MaxIterations,
340                steps,
341            });
342        }
343    }
344}