Skip to main content

ds_api/agent/
agent_core.rs

1use std::collections::HashMap;
2
3use crate::api::ApiClient;
4use crate::conversation::{Conversation, LlmSummarizer, Summarizer};
5use crate::raw::request::message::{Message, Role};
6use crate::tool_trait::Tool;
7use serde_json::Value;
8use tokio::sync::mpsc;
9
10/// A tool call fragment emitted by [`AgentStream`][crate::agent::AgentStream].
11///
12/// In streaming mode multiple `ToolCallChunk`s are emitted per tool call:
13/// the first has an empty `delta` (name is known, no args yet); subsequent
14/// chunks carry incremental argument JSON.  In non-streaming mode a single
15/// chunk is emitted with the complete argument JSON in `delta`.
16#[derive(Debug, Clone)]
17pub struct ToolCallChunk {
18    pub id: String,
19    pub name: String,
20    pub delta: String,
21}
22
23/// The result of a completed tool invocation.
24///
25/// Yielded as `AgentEvent::ToolResult` after the tool has finished executing.
26#[derive(Debug, Clone)]
27pub struct ToolCallResult {
28    pub id: String,
29    pub name: String,
30    pub args: String,
31    pub result: Value,
32}
33
34/// Events emitted by [`AgentStream`][crate::agent::AgentStream].
35///
36/// Each variant represents a distinct, self-contained event in the agent lifecycle:
37///
38/// - `Token(String)` — a text fragment from the assistant.  In streaming mode each
39///   `Token` is a single SSE delta; in non-streaming mode the full response text
40///   arrives as one `Token`.
41/// - `ToolCall(id, name, delta)` — a tool call fragment.  Behaves exactly like
42///   `Token`: in streaming mode one event is emitted per SSE chunk (first chunk has
43///   an empty `delta` and carries the tool name; subsequent chunks carry incremental
44///   argument JSON).  In non-streaming mode a single event is emitted with the
45///   complete arguments string.  Accumulate `delta` values by `id` to reconstruct
46///   the full argument JSON.  Execution begins after all chunks for a turn are
47///   delivered.
48/// - `ToolResult(ToolCallResult)` — a tool has finished executing.  One event is
49///   emitted per call, in the same order as the corresponding `ToolCall` events.
50#[derive(Debug, Clone)]
51pub enum AgentEvent {
52    Token(String),
53    /// Emitted when the model produces reasoning/thinking content (e.g. deepseek-reasoner).
54    /// In streaming mode this arrives token-by-token before the main reply.
55    ReasoningToken(String),
56    ToolCall(ToolCallChunk),
57    ToolResult(ToolCallResult),
58}
59
60/// An agent that combines a [`Conversation`] with a set of callable tools.
61///
62/// Build one with the fluent builder methods, then call [`chat`][DeepseekAgent::chat]
63/// to start a turn:
64///
65/// ```no_run
66/// use ds_api::{DeepseekAgent, tool};
67/// use serde_json::{Value, json};
68///
69/// struct MyTool;
70///
71/// #[tool]
72/// impl ds_api::Tool for MyTool {
73///     async fn greet(&self, name: String) -> Value {
74///         json!({ "greeting": format!("Hello, {name}!") })
75///     }
76/// }
77///
78/// # #[tokio::main] async fn main() {
79/// let agent = DeepseekAgent::new("sk-...")
80///     .add_tool(MyTool);
81/// # }
82/// ```
83pub struct DeepseekAgent {
84    /// The conversation manages history, the API client, and context-window compression.
85    pub(crate) conversation: Conversation,
86    pub(crate) tools: Vec<Box<dyn Tool>>,
87    pub(crate) tool_index: HashMap<String, usize>,
88    /// When `true` the agent uses SSE streaming for each API turn so `Token` events
89    /// arrive incrementally.  When `false` (default) the full response is awaited.
90    pub(crate) streaming: bool,
91    /// The model to use for every API turn.  Defaults to `"deepseek-chat"`.
92    pub(crate) model: String,
93    /// Optional channel for injecting user messages mid-loop.
94    /// Messages received here are drained after each tool-execution round and
95    /// appended to the conversation history as `Role::User` messages before the
96    /// next API turn begins.
97    pub(crate) interrupt_rx: Option<mpsc::UnboundedReceiver<String>>,
98}
99
100impl DeepseekAgent {
101    fn from_parts(client: ApiClient, model: impl Into<String>) -> Self {
102        let model = model.into();
103        let summarizer = LlmSummarizer::new(client.clone()).with_model(model.clone());
104        Self {
105            conversation: Conversation::new(client).with_summarizer(summarizer),
106            tools: vec![],
107            tool_index: HashMap::new(),
108            streaming: false,
109            model,
110            interrupt_rx: None,
111        }
112    }
113
114    /// Create a new agent targeting the DeepSeek API with `deepseek-chat`.
115    pub fn new(token: impl Into<String>) -> Self {
116        Self::from_parts(ApiClient::new(token), "deepseek-chat")
117    }
118
119    /// Create an agent targeting an OpenAI-compatible provider.
120    ///
121    /// All three parameters are set at construction time and never change:
122    ///
123    /// ```no_run
124    /// use ds_api::DeepseekAgent;
125    ///
126    /// let agent = DeepseekAgent::custom(
127    ///     "sk-or-...",
128    ///     "https://openrouter.ai/api/v1",
129    ///     "meta-llama/llama-3.3-70b-instruct:free",
130    /// );
131    /// ```
132    pub fn custom(
133        token: impl Into<String>,
134        base_url: impl Into<String>,
135        model: impl Into<String>,
136    ) -> Self {
137        let client = ApiClient::new(token).with_base_url(base_url);
138        Self::from_parts(client, model)
139    }
140
141    /// Register a tool (builder-style, supports chaining).
142    ///
143    /// The tool's protocol-level function names are indexed so incoming tool-call
144    /// requests from the model can be dispatched to the correct implementation.
145    pub fn add_tool<TT: Tool + 'static>(mut self, tool: TT) -> Self {
146        let idx = self.tools.len();
147        for raw in tool.raw_tools() {
148            self.tool_index.insert(raw.function.name.clone(), idx);
149        }
150        self.tools.push(Box::new(tool));
151        self
152    }
153
154    /// Push a user message and return an [`AgentStream`][crate::agent::AgentStream]
155    /// that drives the full agent loop (API calls + tool execution).
156    pub fn chat(mut self, user_message: &str) -> crate::agent::stream::AgentStream {
157        self.conversation.push_user_input(user_message);
158        crate::agent::stream::AgentStream::new(self)
159    }
160
161    /// Start an agent turn from the current history **without** pushing a new
162    /// user message first.
163    ///
164    /// Use this when you have already appended the user message manually (e.g.
165    /// via [`push_user_message_with_name`][Self::push_user_message_with_name])
166    /// and want to drive the agent loop from that point.
167    pub fn chat_from_history(self) -> crate::agent::stream::AgentStream {
168        crate::agent::stream::AgentStream::new(self)
169    }
170
171    /// Enable SSE streaming for each API turn (builder-style).
172    pub fn with_streaming(mut self) -> Self {
173        self.streaming = true;
174        self
175    }
176
177    /// Prepend a permanent system prompt to the conversation history (builder-style).
178    ///
179    /// System messages added this way are never removed by the built-in summarizers.
180    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
181        self.conversation
182            .add_message(Message::new(Role::System, &prompt.into()));
183        self
184    }
185
186    /// Replace the summarizer used for context-window management (builder-style).
187    pub fn with_summarizer(mut self, summarizer: impl Summarizer + 'static) -> Self {
188        self.conversation = self.conversation.with_summarizer(summarizer);
189        self
190    }
191
192    /// Seed the agent with an existing message history (builder-style).
193    ///
194    /// Used to restore a conversation from persistent storage (e.g. SQLite)
195    /// after a process restart.  The messages are set directly on the
196    /// underlying `Conversation` and will be included in the next API call.
197    ///
198    /// # Example
199    ///
200    /// ```no_run
201    /// use ds_api::DeepseekAgent;
202    /// use ds_api::raw::request::message::{Message, Role};
203    ///
204    /// # #[tokio::main] async fn main() {
205    /// let history = vec![
206    ///     Message::new(Role::User, "Hello"),
207    ///     Message::new(Role::Assistant, "Hi there!"),
208    /// ];
209    /// let agent = DeepseekAgent::new("sk-...").with_history(history);
210    /// # }
211    /// ```
212    pub fn with_history(mut self, history: Vec<crate::raw::request::message::Message>) -> Self {
213        self.conversation = self.conversation.with_history(history);
214        self
215    }
216
217    /// Append a user message with an optional display name to the conversation
218    /// history.
219    ///
220    /// The `name` field is passed through to the API as-is (OpenAI-compatible
221    /// providers use it to distinguish speakers in a shared channel).
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// use ds_api::DeepseekAgent;
227    ///
228    /// # #[tokio::main] async fn main() {
229    /// let mut agent = DeepseekAgent::new("sk-...");
230    /// agent.push_user_message_with_name("What time is it?", Some("alice"));
231    /// # }
232    /// ```
233    pub fn push_user_message_with_name(&mut self, text: &str, name: Option<&str>) {
234        use crate::raw::request::message::{Message, Role};
235        let mut msg = Message::new(Role::User, text);
236        msg.name = name.map(|n| n.to_string());
237        self.conversation.history_mut().push(msg);
238    }
239
240    /// Read-only view of the current conversation history.
241    ///
242    /// Returns all messages in order, including system prompts, user turns,
243    /// assistant replies, tool calls, and tool results.  Auto-summary messages
244    /// inserted by the built-in summarizers are also included.
245    ///
246    /// # Example
247    ///
248    /// ```no_run
249    /// use ds_api::DeepseekAgent;
250    ///
251    /// # #[tokio::main] async fn main() {
252    /// let agent = DeepseekAgent::new("sk-...");
253    /// for msg in agent.history() {
254    ///     println!("{:?}: {:?}", msg.role, msg.content);
255    /// }
256    /// # }
257    /// ```
258    pub fn history(&self) -> &[crate::raw::request::message::Message] {
259        self.conversation.history()
260    }
261
262    /// Attach an interrupt channel to the agent (builder-style).
263    ///
264    /// Returns the agent and the sender half of the channel.  Send any `String`
265    /// through the `UnboundedSender` at any time; the message will be picked up
266    /// after the current tool-execution round finishes and inserted into the
267    /// conversation history as a `Role::User` message before the next API turn.
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// use ds_api::DeepseekAgent;
273    /// use tokio::sync::mpsc;
274    ///
275    /// # #[tokio::main] async fn main() {
276    /// let (agent, tx) = DeepseekAgent::new("sk-...")
277    ///     .with_interrupt_channel();
278    ///
279    /// // In another task or callback:
280    /// tx.send("Actually, use Python instead.".into()).unwrap();
281    /// # }
282    /// ```
283    pub fn with_interrupt_channel(mut self) -> (Self, mpsc::UnboundedSender<String>) {
284        let (tx, rx) = mpsc::unbounded_channel();
285        self.interrupt_rx = Some(rx);
286        (self, tx)
287    }
288
289    /// Drain any pending messages from the interrupt channel and append them
290    /// to the conversation history as `Role::User` messages.
291    ///
292    /// Called by the state machine in [`AgentStream`] at the top of every
293    /// `Idle` transition so that injected messages are visible before each API
294    /// turn, not just after tool-execution rounds.
295    pub(crate) fn drain_interrupts(&mut self) {
296        // Strip reasoning_content from assistant messages that do NOT have
297        // tool_calls.  deepseek-reasoner requires reasoning_content to be
298        // present on every assistant message that *does* have tool_calls
299        // (it must be round-tripped within the same Turn so the model can
300        // continue reasoning after seeing the tool result).  However, for
301        // plain assistant messages (final replies with no tool_calls),
302        // reasoning_content must be omitted when starting a new Turn —
303        // sending it there causes a 400 "Missing reasoning_content" error
304        // on the *next* tool-calling assistant message in history.
305        for msg in self.conversation.history_mut().iter_mut() {
306            let has_tool_calls = msg
307                .tool_calls
308                .as_ref()
309                .map(|v| !v.is_empty())
310                .unwrap_or(false);
311            if !has_tool_calls {
312                msg.reasoning_content = None;
313            }
314        }
315
316        if let Some(rx) = self.interrupt_rx.as_mut() {
317            while let Ok(msg) = rx.try_recv() {
318                self.conversation
319                    .history_mut()
320                    .push(Message::new(Role::User, &msg));
321            }
322        }
323    }
324}