ds_api/agent/agent_core.rs
1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// Information about a tool call requested by the model.
11///
12/// Yielded as `AgentEvent::ToolCall` when the model requests a tool invocation.
13/// At this point the tool has not yet been executed.
14#[derive(Debug, Clone)]
15pub struct ToolCallInfo {
16 pub id: String,
17 pub name: String,
18 pub args: Value,
19}
20
21/// The result of a completed tool invocation.
22///
23/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
24#[derive(Debug, Clone)]
25pub struct ToolCallResult {
26 pub id: String,
27 pub name: String,
28 pub args: Value,
29 pub result: Value,
30}
31
32/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
33///
34/// Each variant represents a distinct, self-contained event in the agent lifecycle:
35///
36/// - `Token(String)` — a text fragment from the assistant. In streaming mode each
37/// `Token` is a single SSE delta; in non-streaming mode the full response text
38/// arrives as one `Token`.
39/// - `ToolCallStart { id, name }` — emitted the moment a tool call's name is known
40/// during streaming, before any arguments have arrived. Allows UIs to show the
41/// tool name immediately. Only emitted in streaming mode.
42/// - `ToolCallArgsDelta { id, delta }` — an incremental fragment of the tool call's
43/// JSON arguments, emitted once per SSE chunk during streaming. Accumulate these
44/// to reconstruct the full arguments string. Only emitted in streaming mode.
45/// - `ToolCall(ToolCallInfo)` — the model has requested a tool invocation, emitted
46/// once the full arguments are assembled (end of stream). In non-streaming mode
47/// this is the only tool-call event. Execution begins after this event.
48/// - `ToolResult(ToolCallResult)` — a tool has finished executing. One event is
49/// emitted per call, in the same order as the corresponding `ToolCall` events.
50#[derive(Debug, Clone)]
51pub enum AgentEvent {
52 Token(String),
53 /// Emitted in streaming mode the instant a tool call's id+name are known.
54 ToolCallStart {
55 id: String,
56 name: String,
57 },
58 /// Emitted in streaming mode for each incremental fragment of the arguments JSON.
59 ToolCallArgsDelta {
60 id: String,
61 delta: String,
62 },
63 ToolCall(ToolCallInfo),
64 ToolResult(ToolCallResult),
65}
66
67/// An agent that combines a [`Conversation`] with a set of callable tools.
68///
69/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
70/// to start a turn:
71///
72/// ```no_run
73/// use ds_api::{DeepseekAgent, tool};
74/// use serde_json::{Value, json};
75///
76/// struct MyTool;
77///
78/// #[tool]
79/// impl ds_api::Tool for MyTool {
80/// async fn greet(&self, name: String) -> Value {
81/// json!({ "greeting": format!("Hello, {name}!") })
82/// }
83/// }
84///
85/// # #[tokio::main] async fn main() {
86/// let agent = DeepseekAgent::new("sk-...")
87/// .add_tool(MyTool);
88/// # }
89/// ```
90pub struct DeepseekAgent {
91 /// The conversation manages history, the API client, and context-window compression.
92 pub(crate) conversation: Conversation,
93 pub(crate) tools: Vec<Box<dyn Tool>>,
94 pub(crate) tool_index: HashMap<String, usize>,
95 /// When `true` the agent uses SSE streaming for each API turn so `Token` events
96 /// arrive incrementally. When `false` (default) the full response is awaited.
97 pub(crate) streaming: bool,
98 /// The model to use for every API turn. Defaults to `"deepseek-chat"`.
99 pub(crate) model: String,
100 /// Optional channel for injecting user messages mid-loop.
101 /// Messages received here are drained after each tool-execution round and
102 /// appended to the conversation history as `Role::User` messages before the
103 /// next API turn begins.
104 pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
105}
106
107impl DeepseekAgent {
108 fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
109 let model = model.into();
110 let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
111 Self {
112 conversation: Conversation::new(client).with_summarizer(summarizer),
113 tools: vec![],
114 tool_index: HashMap::new(),
115 streaming: false,
116 model,
117 interrupt_rx: None,
118 }
119 }
120
121 /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
122 pub fn new(token: impl Into<String>) -> Self {
123 Self::from_parts(ApiClient::new(token), "deepseek-chat")
124 }
125
126 /// Create an agent targeting an OpenAI-compatible provider.
127 ///
128 /// All three parameters are set at construction time and never change:
129 ///
130 /// ```no_run
131 /// use ds_api::DeepseekAgent;
132 ///
133 /// let agent = DeepseekAgent::custom(
134 /// "sk-or-...",
135 /// "https://openrouter.ai/api/v1",
136 /// "meta-llama/llama-3.3-70b-instruct:free",
137 /// );
138 /// ```
139 pub fn custom(
140 token: impl Into<String>,
141 base_url: impl Into<String>,
142 model: impl Into<String>,
143 ) -> Self {
144 let client = ApiClient::new(token).with_base_url(base_url);
145 Self::from_parts(client, model)
146 }
147
148 /// Register a tool (builder-style, supports chaining).
149 ///
150 /// The tool's protocol-level function names are indexed so incoming tool-call
151 /// requests from the model can be dispatched to the correct implementation.
152 pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
153 let idx = self.tools.len();
154 for raw in tool.raw_tools() {
155 self.tool_index.insert(raw.function.name.clone(), idx);
156 }
157 self.tools.push(Box::new(tool));
158 self
159 }
160
161 /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
162 /// that drives the full agent loop (API calls + tool execution).
163 pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
164 self.conversation.push_user_input(user_message);
165 crate::agent::stream::AgentStream::new(self)
166 }
167
168 /// Start an agent turn from the current history **without** pushing a new
169 /// user message first.
170 ///
171 /// Use this when you have already appended the user message manually (e.g.
172 /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
173 /// and want to drive the agent loop from that point.
174 pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
175 crate::agent::stream::AgentStream::new(self)
176 }
177
178 /// Enable SSE streaming for each API turn (builder-style).
179 pub fn with_streaming(mut self) -> Self {
180 self.streaming = true;
181 self
182 }
183
184 /// Prepend a permanent system prompt to the conversation history (builder-style).
185 ///
186 /// System messages added this way are never removed by the built-in summarizers.
187 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
188 self.conversation
189 .add_message(Message::new(Role::System, &prompt.into()));
190 self
191 }
192
193 /// Replace the summarizer used for context-window management (builder-style).
194 pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
195 self.conversation = self.conversation.with_summarizer(summarizer);
196 self
197 }
198
199 /// Seed the agent with an existing message history (builder-style).
200 ///
201 /// Used to restore a conversation from persistent storage (e.g. SQLite)
202 /// after a process restart. The messages are set directly on the
203 /// underlying `Conversation` and will be included in the next API call.
204 ///
205 /// # Example
206 ///
207 /// ```no_run
208 /// use ds_api::DeepseekAgent;
209 /// use ds_api::raw::request::message::{Message, Role};
210 ///
211 /// # #[tokio::main] async fn main() {
212 /// let history = vec![
213 /// Message::new(Role::User, "Hello"),
214 /// Message::new(Role::Assistant, "Hi there!"),
215 /// ];
216 /// let agent = DeepseekAgent::new("sk-...").with_history(history);
217 /// # }
218 /// ```
219 pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
220 self.conversation = self.conversation.with_history(history);
221 self
222 }
223
224 /// Append a user message with an optional display name to the conversation
225 /// history.
226 ///
227 /// The `name` field is passed through to the API as-is (OpenAI-compatible
228 /// providers use it to distinguish speakers in a shared channel).
229 ///
230 /// # Example
231 ///
232 /// ```no_run
233 /// use ds_api::DeepseekAgent;
234 ///
235 /// # #[tokio::main] async fn main() {
236 /// let mut agent = DeepseekAgent::new("sk-...");
237 /// agent.push_user_message_with_name("What time is it?", Some("alice"));
238 /// # }
239 /// ```
240 pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
241 use crate::raw::request::message::{Message, Role};
242 let mut msg = Message::new(Role::User, text);
243 msg.name = name.map(|n| n.to_string());
244 self.conversation.history_mut().push(msg);
245 }
246
247 /// Read-only view of the current conversation history.
248 ///
249 /// Returns all messages in order, including system prompts, user turns,
250 /// assistant replies, tool calls, and tool results. Auto-summary messages
251 /// inserted by the built-in summarizers are also included.
252 ///
253 /// # Example
254 ///
255 /// ```no_run
256 /// use ds_api::DeepseekAgent;
257 ///
258 /// # #[tokio::main] async fn main() {
259 /// let agent = DeepseekAgent::new("sk-...");
260 /// for msg in agent.history() {
261 /// println!("{:?}: {:?}", msg.role, msg.content);
262 /// }
263 /// # }
264 /// ```
265 pub fn history(&self) -> &[crate::raw::request::message::Message] {
266 self.conversation.history()
267 }
268
269 /// Attach an interrupt channel to the agent (builder-style).
270 ///
271 /// Returns the agent and the sender half of the channel. Send any `String`
272 /// through the `UnboundedSender` at any time; the message will be picked up
273 /// after the current tool-execution round finishes and inserted into the
274 /// conversation history as a `Role::User` message before the next API turn.
275 ///
276 /// # Example
277 ///
278 /// ```no_run
279 /// use ds_api::DeepseekAgent;
280 /// use tokio::sync::mpsc;
281 ///
282 /// # #[tokio::main] async fn main() {
283 /// let (agent, tx) = DeepseekAgent::new("sk-...")
284 /// .with_interrupt_channel();
285 ///
286 /// // In another task or callback:
287 /// tx.send("Actually, use Python instead.".into()).unwrap();
288 /// # }
289 /// ```
290 pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
291 let (tx, rx) = mpsc::unbounded_channel();
292 self.interrupt_rx = Some(rx);
293 (self, tx)
294 }
295
296 /// Drain any pending messages from the interrupt channel and append them
297 /// to the conversation history as `Role::User` messages.
298 ///
299 /// Called by the state machine in [`AgentStream`] at the top of every
300 /// `Idle` transition so that injected messages are visible before each API
301 /// turn, not just after tool-execution rounds.
302 pub(crate) fn drain_interrupts(&mut self) {
303 if let Some(rx) = self.interrupt_rx.as_mut() {
304 while let Ok(msg) = rx.try_recv() {
305 self.conversation
306 .history_mut()
307 .push(Message::new(Role::User, &msg));
308 }
309 }
310 }
311}