ds_api/agent/agent_core.rs
1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// A tool call fragment emitted by [`AgentStream`][crate::agent::AgentStream].
11///
12/// In streaming mode multiple `ToolCallChunk`s are emitted per tool call:
13/// the first has an empty `delta` (name is known, no args yet); subsequent
14/// chunks carry incremental argument JSON. In non-streaming mode a single
15/// chunk is emitted with the complete argument JSON in `delta`.
16#[derive(Debug, Clone)]
17pub struct ToolCallChunk {
18 pub id: String,
19 pub name: String,
20 pub delta: String,
21}
22
23/// The result of a completed tool invocation.
24///
25/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
26#[derive(Debug, Clone)]
27pub struct ToolCallResult {
28 pub id: String,
29 pub name: String,
30 pub args: String,
31 pub result: Value,
32}
33
34/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
35///
36/// Each variant represents a distinct, self-contained event in the agent lifecycle:
37///
38/// - `Token(String)` — a text fragment from the assistant. In streaming mode each
39/// `Token` is a single SSE delta; in non-streaming mode the full response text
40/// arrives as one `Token`.
41/// - `ToolCall(id, name, delta)` — a tool call fragment. Behaves exactly like
42/// `Token`: in streaming mode one event is emitted per SSE chunk (first chunk has
43/// an empty `delta` and carries the tool name; subsequent chunks carry incremental
44/// argument JSON). In non-streaming mode a single event is emitted with the
45/// complete arguments string. Accumulate `delta` values by `id` to reconstruct
46/// the full argument JSON. Execution begins after all chunks for a turn are
47/// delivered.
48/// - `ToolResult(ToolCallResult)` — a tool has finished executing. One event is
49/// emitted per call, in the same order as the corresponding `ToolCall` events.
50#[derive(Debug, Clone)]
51pub enum AgentEvent {
52 Token(String),
53 /// Emitted when the model produces reasoning/thinking content (e.g. deepseek-reasoner).
54 /// In streaming mode this arrives token-by-token before the main reply.
55 ReasoningToken(String),
56 ToolCall(ToolCallChunk),
57 ToolResult(ToolCallResult),
58}
59
60/// An agent that combines a [`Conversation`] with a set of callable tools.
61///
62/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
63/// to start a turn:
64///
65/// ```no_run
66/// use ds_api::{DeepseekAgent, tool};
67/// use serde_json::{Value, json};
68///
69/// struct MyTool;
70///
71/// #[tool]
72/// impl ds_api::Tool for MyTool {
73/// async fn greet(&self, name: String) -> Value {
74/// json!({ "greeting": format!("Hello, {name}!") })
75/// }
76/// }
77///
78/// # #[tokio::main] async fn main() {
79/// let agent = DeepseekAgent::new("sk-...")
80/// .add_tool(MyTool);
81/// # }
82/// ```
83pub struct DeepseekAgent {
84 /// The conversation manages history, the API client, and context-window compression.
85 pub(crate) conversation: Conversation,
86 pub(crate) tools: Vec<Box<dyn Tool>>,
87 pub(crate) tool_index: HashMap<String, usize>,
88 /// When `true` the agent uses SSE streaming for each API turn so `Token` events
89 /// arrive incrementally. When `false` (default) the full response is awaited.
90 pub(crate) streaming: bool,
91 /// The model to use for every API turn. Defaults to `"deepseek-chat"`.
92 pub(crate) model: String,
93 /// Optional channel for injecting user messages mid-loop.
94 /// Messages received here are drained after each tool-execution round and
95 /// appended to the conversation history as `Role::User` messages before the
96 /// next API turn begins.
97 pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
98 /// Optional map of extra top-level JSON fields to merge into the API request body.
99 /// This is used by the builder helpers below to attach custom provider-specific
100 /// fields that the typed request doesn't yet expose.
101 pub(crate) extra_body: Option<serde_json::Map<String, serde_json::Value>>,
102}
103
104impl DeepseekAgent {
105 fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
106 let model = model.into();
107 let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
108 Self {
109 conversation: Conversation::new(client).with_summarizer(summarizer),
110 tools: vec![],
111 tool_index: HashMap::new(),
112 streaming: false,
113 model,
114 interrupt_rx: None,
115 extra_body: None,
116 }
117 }
118
119 /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
120 pub fn new(token: impl Into<String>) -> Self {
121 Self::from_parts(ApiClient::new(token), "deepseek-chat")
122 }
123
124 /// Create an agent targeting an OpenAI-compatible provider.
125 ///
126 /// All three parameters are set at construction time and never change:
127 ///
128 /// ```no_run
129 /// use ds_api::DeepseekAgent;
130 ///
131 /// let agent = DeepseekAgent::custom(
132 /// "sk-or-...",
133 /// "https://openrouter.ai/api/v1",
134 /// "meta-llama/llama-3.3-70b-instruct:free",
135 /// );
136 /// ```
137 pub fn custom(
138 token: impl Into<String>,
139 base_url: impl Into<String>,
140 model: impl Into<String>,
141 ) -> Self {
142 let client = ApiClient::new(token).with_base_url(base_url);
143 Self::from_parts(client, model)
144 }
145
146 /// Register a tool (builder-style, supports chaining).
147 ///
148 /// The tool's protocol-level function names are indexed so incoming tool-call
149 /// requests from the model can be dispatched to the correct implementation.
150 pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
151 let idx = self.tools.len();
152 for raw in tool.raw_tools() {
153 self.tool_index.insert(raw.function.name.clone(), idx);
154 }
155 self.tools.push(Box::new(tool));
156 self
157 }
158
159 /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
160 /// that drives the full agent loop (API calls + tool execution).
161 pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
162 self.conversation.push_user_input(user_message);
163 crate::agent::stream::AgentStream::new(self)
164 }
165
166 /// Start an agent turn from the current history **without** pushing a new
167 /// user message first.
168 ///
169 /// Use this when you have already appended the user message manually (e.g.
170 /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
171 /// and want to drive the agent loop from that point.
172 pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
173 crate::agent::stream::AgentStream::new(self)
174 }
175
176 /// Enable SSE streaming for each API turn (builder-style).
177 pub fn with_streaming(mut self) -> Self {
178 self.streaming = true;
179 self
180 }
181
182 /// Merge arbitrary top-level JSON key/value pairs into the request body for
183 /// the next API turn. The pairs are stored on the agent and later merged
184 /// into the `ApiRequest` raw body when a request is built.
185 ///
186 /// Example:
187 /// let mut map = serde_json::Map::new();
188 /// map.insert(\"foo\".to_string(), serde_json::json!(\"bar\"));
189 /// let agent = DeepseekAgent::new(\"sk-...\").extra_body(map);
190 pub fn extra_body(mut self, map: serde_json::Map<String, serde_json::Value>) -> Self {
191 if let Some(ref mut existing) = self.extra_body {
192 existing.extend(map);
193 } else {
194 self.extra_body = Some(map);
195 }
196 self
197 }
198
199 /// Add a single extra top-level field to be merged into the request body.
200 /// Convenience helper to avoid constructing a full map.
201 pub fn extra_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
202 if let Some(ref mut m) = self.extra_body {
203 m.insert(key.into(), value);
204 } else {
205 let mut m = serde_json::Map::new();
206 m.insert(key.into(), value);
207 self.extra_body = Some(m);
208 }
209 self
210 }
211
212 /// Prepend a permanent system prompt to the conversation history (builder-style).
213 ///
214 /// System messages added this way are never removed by the built-in summarizers.
215 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
216 self.conversation
217 .add_message(Message::new(Role::System, &prompt.into()));
218 self
219 }
220
221 /// Replace the summarizer used for context-window management (builder-style).
222 pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
223 self.conversation = self.conversation.with_summarizer(summarizer);
224 self
225 }
226
227 /// Seed the agent with an existing message history (builder-style).
228 ///
229 /// Used to restore a conversation from persistent storage (e.g. SQLite)
230 /// after a process restart. The messages are set directly on the
231 /// underlying `Conversation` and will be included in the next API call.
232 ///
233 /// # Example
234 ///
235 /// ```no_run
236 /// use ds_api::DeepseekAgent;
237 /// use ds_api::raw::request::message::{Message, Role};
238 ///
239 /// # #[tokio::main] async fn main() {
240 /// let history = vec![
241 /// Message::new(Role::User, "Hello"),
242 /// Message::new(Role::Assistant, "Hi there!"),
243 /// ];
244 /// let agent = DeepseekAgent::new("sk-...").with_history(history);
245 /// # }
246 /// ```
247 pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
248 self.conversation = self.conversation.with_history(history);
249 self
250 }
251
252 /// Append a user message with an optional display name to the conversation
253 /// history.
254 ///
255 /// The `name` field is passed through to the API as-is (OpenAI-compatible
256 /// providers use it to distinguish speakers in a shared channel).
257 ///
258 /// # Example
259 ///
260 /// ```no_run
261 /// use ds_api::DeepseekAgent;
262 ///
263 /// # #[tokio::main] async fn main() {
264 /// let mut agent = DeepseekAgent::new("sk-...");
265 /// agent.push_user_message_with_name("What time is it?", Some("alice"));
266 /// # }
267 /// ```
268 pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
269 use crate::raw::request::message::{Message, Role};
270 let mut msg = Message::new(Role::User, text);
271 msg.name = name.map(|n| n.to_string());
272 self.conversation.history_mut().push(msg);
273 }
274
275 /// Read-only view of the current conversation history.
276 ///
277 /// Returns all messages in order, including system prompts, user turns,
278 /// assistant replies, tool calls, and tool results. Auto-summary messages
279 /// inserted by the built-in summarizers are also included.
280 ///
281 /// # Example
282 ///
283 /// ```no_run
284 /// use ds_api::DeepseekAgent;
285 ///
286 /// # #[tokio::main] async fn main() {
287 /// let agent = DeepseekAgent::new("sk-...");
288 /// for msg in agent.history() {
289 /// println!("{:?}: {:?}", msg.role, msg.content);
290 /// }
291 /// # }
292 /// ```
293 pub fn history(&self) -> &[crate::raw::request::message::Message] {
294 self.conversation.history()
295 }
296
297 /// Attach an interrupt channel to the agent (builder-style).
298 ///
299 /// Returns the agent and the sender half of the channel. Send any `String`
300 /// through the `UnboundedSender` at any time; the message will be picked up
301 /// after the current tool-execution round finishes and inserted into the
302 /// conversation history as a `Role::User` message before the next API turn.
303 ///
304 /// # Example
305 ///
306 /// ```no_run
307 /// use ds_api::DeepseekAgent;
308 /// use tokio::sync::mpsc;
309 ///
310 /// # #[tokio::main] async fn main() {
311 /// let (agent, tx) = DeepseekAgent::new("sk-...")
312 /// .with_interrupt_channel();
313 ///
314 /// // In another task or callback:
315 /// tx.send("Actually, use Python instead.".into()).unwrap();
316 /// # }
317 /// ```
318 pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
319 let (tx, rx) = mpsc::unbounded_channel();
320 self.interrupt_rx = Some(rx);
321 (self, tx)
322 }
323
324 /// Drain any pending messages from the interrupt channel and append them
325 /// to the conversation history as `Role::User` messages.
326 ///
327 /// Called by the state machine in [`AgentStream`] at the top of every
328 /// `Idle` transition so that injected messages are visible before each API
329 /// turn, not just after tool-execution rounds.
330 pub(crate) fn drain_interrupts(&mut self) {
331 if let Some(rx) = self.interrupt_rx.as_mut() {
332 while let Ok(msg) = rx.try_recv() {
333 self.conversation
334 .history_mut()
335 .push(Message::new(Role::User, &msg));
336 }
337 }
338 }
339}