Skip to main content

walrus_core/agent/
mod.rs

1//! Stateful agent execution unit.
2//!
3//! [`Agent`] owns its configuration, model, message history, tool schemas,
4//! and an optional [`ToolSender`] for dispatching tool calls to the runtime.
5//! It drives LLM execution through [`Agent::step`], [`Agent::run`], and
6//! [`Agent::run_stream`]. `run_stream()` is the canonical step loop —
7//! `run()` collects its events and returns the final response.
8
9use crate::model::{Message, Model, Request, Tool};
10use anyhow::Result;
11use async_stream::stream;
12use event::{AgentEvent, AgentResponse, AgentStep, AgentStopReason};
13use futures_core::Stream;
14use tokio::sync::{mpsc, oneshot};
15use tool::{ToolRequest, ToolSender};
16
17pub use builder::AgentBuilder;
18pub use config::AgentConfig;
19pub use parser::parse_agent_md;
20
21mod builder;
22pub mod config;
23pub mod event;
24mod parser;
25pub mod tool;
26
27/// A stateful agent execution unit.
28///
29/// Generic over `M: Model` — stores the model provider alongside config,
30/// conversation history, tool schemas, and an optional sender for tool dispatch.
31/// Callers drive execution via `step()` (single LLM round), `run()` (loop to
32/// completion), or `run_stream()` (yields events as a stream).
33pub struct Agent<M: Model> {
34    /// Agent configuration (name, prompt, model, limits, tool_choice).
35    pub config: AgentConfig,
36    /// The model provider for LLM calls.
37    model: M,
38    /// Conversation history (user/assistant/tool messages).
39    pub(crate) history: Vec<Message>,
40    /// Tool schemas advertised to the LLM. Set once at build time.
41    tools: Vec<Tool>,
42    /// Sender for dispatching tool calls to the runtime. None = no tools.
43    tool_tx: Option<ToolSender>,
44}
45
46impl<M: Model> Agent<M> {
47    /// Push a message into the conversation history.
48    pub fn push_message(&mut self, message: Message) {
49        self.history.push(message);
50    }
51
52    /// Return a reference to the conversation history.
53    pub fn messages(&self) -> &[Message] {
54        &self.history
55    }
56
57    /// Clear the conversation history, keeping configuration intact.
58    pub fn clear_history(&mut self) {
59        self.history.clear();
60    }
61
62    /// Perform a single LLM round: send request, dispatch tools, return step.
63    ///
64    /// Composes a [`Request`] from config state (system prompt + history +
65    /// tool schemas), calls the stored model, dispatches any tool calls via
66    /// the [`ToolSender`] channel, and appends results to history.
67    pub async fn step(&mut self) -> Result<AgentStep> {
68        let model_name = self
69            .config
70            .model
71            .clone()
72            .unwrap_or_else(|| self.model.active_model());
73
74        let mut messages = Vec::with_capacity(1 + self.history.len());
75        if !self.config.system_prompt.is_empty() {
76            messages.push(Message::system(&self.config.system_prompt));
77        }
78        messages.extend(self.history.iter().cloned());
79
80        let mut request = Request::new(model_name)
81            .with_messages(messages)
82            .with_tool_choice(self.config.tool_choice.clone());
83        if !self.tools.is_empty() {
84            request = request.with_tools(self.tools.clone());
85        }
86
87        let response = self.model.send(&request).await?;
88        let tool_calls = response.tool_calls().unwrap_or_default().to_vec();
89
90        if let Some(msg) = response.message() {
91            self.history.push(msg);
92        }
93
94        let mut tool_results = Vec::new();
95        if !tool_calls.is_empty() {
96            for tc in &tool_calls {
97                let result = self
98                    .dispatch_tool(&tc.function.name, &tc.function.arguments)
99                    .await;
100                let msg = Message::tool(&result, tc.id.clone());
101                self.history.push(msg.clone());
102                tool_results.push(msg);
103            }
104        }
105
106        Ok(AgentStep {
107            response,
108            tool_calls,
109            tool_results,
110        })
111    }
112
113    /// Dispatch a single tool call via the tool sender channel.
114    ///
115    /// Returns the result string. If no sender is configured, returns an error
116    /// message without panicking.
117    async fn dispatch_tool(&self, name: &str, args: &str) -> String {
118        let Some(tx) = &self.tool_tx else {
119            return format!("tool '{name}' called but no tool sender configured");
120        };
121        let (reply_tx, reply_rx) = oneshot::channel();
122        let req = ToolRequest {
123            name: name.to_owned(),
124            args: args.to_owned(),
125            reply: reply_tx,
126        };
127        if tx.send(req).is_err() {
128            return format!("tool channel closed while calling '{name}'");
129        }
130        reply_rx
131            .await
132            .unwrap_or_else(|_| format!("tool '{name}' dropped reply"))
133    }
134
135    /// Determine the stop reason for a step with no tool calls.
136    fn stop_reason(step: &AgentStep) -> AgentStopReason {
137        if step.response.content().is_some() {
138            AgentStopReason::TextResponse
139        } else {
140            AgentStopReason::NoAction
141        }
142    }
143
144    /// Run the agent loop to completion, returning the final response.
145    ///
146    /// Wraps [`Agent::run_stream`] — collects all events, sends each through
147    /// `events`, and extracts the `Done` response.
148    pub async fn run(&mut self, events: mpsc::UnboundedSender<AgentEvent>) -> AgentResponse {
149        use futures_util::StreamExt;
150
151        let mut stream = std::pin::pin!(self.run_stream());
152        let mut response = None;
153        while let Some(event) = stream.next().await {
154            if let AgentEvent::Done(ref resp) = event {
155                response = Some(resp.clone());
156            }
157            let _ = events.send(event);
158        }
159
160        response.unwrap_or_else(|| AgentResponse {
161            final_response: None,
162            iterations: 0,
163            stop_reason: AgentStopReason::Error("stream ended without Done".into()),
164            steps: vec![],
165        })
166    }
167
168    /// Run the agent loop as a stream of [`AgentEvent`]s.
169    ///
170    /// The canonical step loop. Calls [`Agent::step`] up to `max_iterations`
171    /// times, yielding events as they are produced. Always finishes with a
172    /// `Done` event containing the [`AgentResponse`].
173    pub fn run_stream(&mut self) -> impl Stream<Item = AgentEvent> + '_ {
174        stream! {
175            let mut steps = Vec::new();
176            let max = self.config.max_iterations;
177
178            for _ in 0..max {
179                match self.step().await {
180                    Ok(step) => {
181                        let has_tool_calls = !step.tool_calls.is_empty();
182                        let text = step.response.content().cloned();
183
184                        if let Some(ref t) = text {
185                            yield AgentEvent::TextDelta(t.clone());
186                        }
187
188                        if has_tool_calls {
189                            yield AgentEvent::ToolCallsStart(step.tool_calls.clone());
190                            for (tc, result) in step.tool_calls.iter().zip(&step.tool_results) {
191                                yield AgentEvent::ToolResult {
192                                    call_id: tc.id.clone(),
193                                    output: result.content.clone(),
194                                };
195                            }
196                            yield AgentEvent::ToolCallsComplete;
197                        }
198
199                        if !has_tool_calls {
200                            let stop_reason = Self::stop_reason(&step);
201                            steps.push(step);
202                            yield AgentEvent::Done(AgentResponse {
203                                final_response: text,
204                                iterations: steps.len(),
205                                stop_reason,
206                                steps,
207                            });
208                            return;
209                        }
210
211                        steps.push(step);
212                    }
213                    Err(e) => {
214                        yield AgentEvent::Done(AgentResponse {
215                            final_response: None,
216                            iterations: steps.len(),
217                            stop_reason: AgentStopReason::Error(e.to_string()),
218                            steps,
219                        });
220                        return;
221                    }
222                }
223            }
224
225            let final_response = steps.last().and_then(|s| s.response.content().cloned());
226            yield AgentEvent::Done(AgentResponse {
227                final_response,
228                iterations: steps.len(),
229                stop_reason: AgentStopReason::MaxIterations,
230                steps,
231            });
232        }
233    }
234}