ds-api 0.10.7

A Rust client library for the DeepSeek API with support for chat completions, streaming, and tools
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
//! Agent executor — pure business logic for driving one API turn.
//!
//! This module owns all the "do actual work" functions that were previously
//! scattered at the bottom of `stream.rs`:
//!
//! | Function | Responsibility |
//! |---|---|
//! | [`build_request`] | Assemble an [`ApiRequest`] from current history + tools. |
//! | [`run_summarize`] | Invoke `maybe_summarize` and hand the agent back. |
//! | [`fetch_response`] | Non-streaming API call; returns content + raw tool calls. |
//! | [`connect_stream`] | Open an SSE stream and hand back the `BoxStream`. |
//! | [`execute_tools`] | Dispatch all pending tool calls and collect results. |
//!
//! The streaming state machine in [`stream`][super::stream] is the only consumer of
//! this module; nothing in here knows about [`Poll`] or [`Context`].  That separation
//! makes it straightforward to add retry logic, timeouts, or parallel tool execution
//! in the future without touching the state machine.

use futures::stream::BoxStream;
use serde_json::Value;

use crate::agent::agent_core::{DeepseekAgent, ToolCallResult};
use crate::api::ApiRequest;
use crate::error::ApiError;
use crate::raw::ChatCompletionChunk;
use crate::raw::request::message::{FunctionCall, Message, Role, ToolCall, ToolType};

// ── Internal result types ─────────────────────────────────────────────────────

/// Outcome of a completed non-streaming API fetch.
pub(crate) struct FetchResult {
    /// The assistant's text content, if any.
    pub(crate) content: Option<String>,
    /// Reasoning/thinking content produced by the model, if any.
    pub(crate) reasoning_content: Option<String>,
    /// Raw tool-call objects requested by the model.
    pub(crate) raw_tool_calls: Vec<ToolCall>,
}

/// Outcome of a completed tool-execution pass.
pub(crate) struct ToolsResult {
    /// One [`ToolCallResult`] per dispatched tool call, in call order.
    pub(crate) results: Vec<ToolCallResult>,
}

// ── Streaming accumulator ─────────────────────────────────────────────────────

/// Accumulates a single tool-call's incremental SSE deltas until the stream ends.
pub(crate) struct PartialToolCall {
    pub(crate) id: String,
    pub(crate) name: String,
    pub(crate) arguments: String,
}

/// All mutable state needed while consuming an SSE stream.
///
/// Boxed by the caller so it fits neatly in one state-machine variant without
/// blowing up the size of every other variant.
pub(crate) struct StreamingData {
    pub(crate) stream: BoxStream<'static, Result<ChatCompletionChunk, ApiError>>,
    pub(crate) agent: DeepseekAgent,
    /// Accumulated text content across all deltas for the current turn.
    pub(crate) content_buf: String,
    /// Accumulated reasoning/thinking content across all deltas for the current turn.
    pub(crate) reasoning_buf: String,
    /// Per-index partial tool-call buffers; sparse — may contain `None` gaps.
    pub(crate) tool_call_bufs: Vec<Option<PartialToolCall>>,
}

/// An incremental streaming event produced by [`apply_chunk_delta`].
pub(crate) enum ChunkEvent {
    /// A text fragment from the assistant.
    Token(String),
    /// A reasoning/thinking fragment (deepseek-reasoner).
    ReasoningToken(String),
    /// A tool call chunk: `(id, name, delta)`.  First emission has empty `delta`
    /// and is the signal that a new tool call has started; subsequent emissions
    /// carry incremental argument JSON fragments.
    ToolCallChunk {
        id: String,
        name: String,
        delta: String,
        index: u32,
    },
}

// ── Type aliases for futures returned by this module ─────────────────────────

/// Future produced by [`fetch_response`].
pub(crate) type FetchFuture = std::pin::Pin<
    Box<dyn std::future::Future<Output = (Result<FetchResult, ApiError>, DeepseekAgent)> + Send>,
>;

/// Future produced by [`connect_stream`].
pub(crate) type ConnectFuture = std::pin::Pin<
    Box<
        dyn std::future::Future<
                Output = (
                    Result<BoxStream<'static, Result<ChatCompletionChunk, ApiError>>, ApiError>,
                    DeepseekAgent,
                ),
            > + Send,
    >,
>;

/// Future produced by [`execute_tools`].
pub(crate) type ExecFuture =
    std::pin::Pin<Box<dyn std::future::Future<Output = (ToolsResult, DeepseekAgent)> + Send>>;

/// Future produced by [`run_summarize`].
pub(crate) type SummarizeFuture =
    std::pin::Pin<Box<dyn std::future::Future<Output = DeepseekAgent> + Send>>;

// ── Public helpers ────────────────────────────────────────────────────────────

// ── Business-logic functions ──────────────────────────────────────────────────

/// Assemble an [`ApiRequest`] from the agent's current conversation history and
/// registered tools.
///
/// If the agent has at least one tool registered, `tool_choice` is set to `auto`
/// so the model can freely decide whether to call a tool.
pub(crate) fn build_request(agent: &DeepseekAgent) -> ApiRequest {
    // deepseek-reasoner rules for reasoning_content in history:
    //
    //  1. An assistant message with tool_calls MUST include reasoning_content
    //     (the model needs it to continue reasoning after seeing tool results).
    //  2. An assistant message without tool_calls MUST NOT include
    //     reasoning_content (sending it causes a 400 on the next tool-calling
    //     turn: "Missing reasoning_content").
    //  3. Among messages that do have tool_calls, only the last one's
    //     reasoning_content should be kept; earlier ones can be stripped to
    //     reduce prompt size.
    //
    // We apply this filter here — on the copy sent to the API — rather than
    // mutating the stored history, so the original reasoning content is
    // preserved in the conversation for persistence / display.
    let history = agent.conversation.history();

    // DeepSeek Reasoner rules:
    //  - assistant message WITH tool_calls  → reasoning_content must be present
    //  - assistant message WITHOUT tool_calls → reasoning_content must be absent
    // We preserve the real reasoning when available, and only strip/fill as needed.
    let messages: Vec<Message> = history
        .iter()
        .map(|m| {
            if !matches!(m.role, Role::Assistant) {
                return m.clone();
            }
            let has_tool_calls = m.tool_calls.as_ref().map(|v| !v.is_empty()).unwrap_or(false);
            if has_tool_calls {
                // Must have reasoning_content; keep real value or fall back to empty string.
                Message {
                    reasoning_content: Some(
                        m.reasoning_content.clone().unwrap_or_default()
                    ),
                    ..m.clone()
                }
            } else {
                // Must NOT have reasoning_content.
                Message {
                    reasoning_content: None,
                    ..m.clone()
                }
            }
        })
        .collect();

    let mut req = ApiRequest::builder()
        .with_model(agent.model.clone())
        .messages(messages);
    for tool in &agent.tools {
        for raw in tool.raw_tools() {
            req = req.add_tool(raw);
        }
    }
    if !agent.tools.is_empty() {
        req = req.tool_choice_auto();
    }

    // Merge any extra_body fields stored on the agent into the ApiRequest.
    // Clone because `agent` is borrowed immutably here.
    if let Some(ref map) = agent.extra_body {
        req = req.extra_body(map.clone());
    }

    req
}

/// Run `maybe_summarize` on the agent's conversation and return the agent.
///
/// Ownership of the agent is taken so the future can be stored in the state
/// machine without lifetime complications.
pub(crate) async fn run_summarize(mut agent: DeepseekAgent) -> DeepseekAgent {
    agent.conversation.maybe_summarize().await;
    agent
}

/// Perform a single non-streaming API turn.
///
/// On success, the assistant's reply message is appended to the conversation
/// history before returning.  On failure, the agent is returned alongside the
/// error so the state machine can store it safely.
///
/// Returns `(Result<FetchResult, ApiError>, DeepseekAgent)` so ownership is
/// always transferred back to the caller regardless of outcome.
pub(crate) async fn fetch_response(
    mut agent: DeepseekAgent,
) -> (Result<FetchResult, ApiError>, DeepseekAgent) {
    let req = build_request(&agent);

    let resp = match agent.conversation.client.send(req).await {
        Ok(r) => r,
        Err(e) => return (Err(e), agent),
    };

    let choice = match resp.choices.into_iter().next() {
        Some(c) => c,
        None => {
            return (
                Err(ApiError::Other("empty response: no choices".into())),
                agent,
            );
        }
    };

    let assistant_msg = choice.message;
    let content = assistant_msg.content.clone();
    let reasoning_content = assistant_msg.reasoning_content.clone();
    let raw_tool_calls = assistant_msg.tool_calls.clone().unwrap_or_default();
    // Keep reasoning_content in history so it can be sent back within the same
    // Turn (required by deepseek-reasoner when tool calls are involved).
    // It will be stripped at the start of the next Turn in drain_interrupts.
    agent.conversation.history_mut().push(assistant_msg);

    (
        Ok(FetchResult {
            content,
            reasoning_content,
            raw_tool_calls,
        }),
        agent,
    )
}

/// Open an SSE stream for the current turn.
///
/// The [`ApiRequest`] is built from the agent's current state.  The agent is
/// returned alongside the stream so the state machine can transition into
/// [`StreamingChunks`][super::stream::AgentStreamState::StreamingChunks].
///
/// Returns `(Result<BoxStream<…>, ApiError>, DeepseekAgent)` for the same
/// ownership-transfer reason as [`fetch_response`].
pub(crate) async fn connect_stream(
    agent: DeepseekAgent,
) -> (
    Result<BoxStream<'static, Result<ChatCompletionChunk, ApiError>>, ApiError>,
    DeepseekAgent,
) {
    let req = build_request(&agent);
    match agent.conversation.client.clone().into_stream(req).await {
        Ok(stream) => (Ok(stream), agent),
        Err(e) => (Err(e), agent),
    }
}

/// Execute all pending tool calls sequentially and collect results.
///
/// For each [`ToolCall`]:
/// 1. The corresponding tool implementation is looked up by name.
/// 2. The tool is called with the parsed argument [`Value`].
/// 3. A `Role::Tool` message is appended to the conversation history so the
///    model can see the result on the next turn.
/// 4. A [`ToolCallResult`] is pushed to the results list.
///
/// Unknown tool names produce an error-shaped JSON result rather than panicking,
/// so a misconfigured agent degrades gracefully.
///
/// Returns `(ToolsResult, DeepseekAgent)` — the agent is returned so the state
/// machine can reclaim ownership after the future resolves.
pub(crate) async fn execute_tools(
    mut agent: DeepseekAgent,
    raw_tool_calls: Vec<ToolCall>,
) -> (ToolsResult, DeepseekAgent) {
    let mut results = Vec::with_capacity(raw_tool_calls.len());
    // Buffer any interrupts that arrive during tool execution so they are
    // appended to history only after the tool-execution pass completes.
    // This prevents interrupt messages from being injected mid-request and
    // causing mismatches between tool_call messages and their corresponding
    // tool results.
    let mut buffered_interrupts: Vec<String> = Vec::new();

    for tc in raw_tool_calls {
        let args: Value = serde_json::from_str(&tc.function.arguments).unwrap_or(Value::Null);

        // Prepare a result value and an abort flag.
        let mut result = serde_json::json!({ "error": "unknown tool" });
        let mut aborted = false;

        if let Some(&idx) = agent.tool_index.get(&tc.function.name) {
            tokio::select! {
                res = agent.tools[idx].call(&tc.function.name, args.clone()) => {
                    result = res;
                }
                maybe_msg = agent.interrupt_rx.recv() => {
                    if let Some(msg) = maybe_msg {
                        buffered_interrupts.push(msg);
                        while let Ok(more) = agent.interrupt_rx.try_recv() {
                            buffered_interrupts.push(more);
                        }
                    }
                    result = serde_json::json!({ "error": "aborted by interrupt" });
                    aborted = true;
                }
            }
        } else {
            result = serde_json::json!({ "error": format!("unknown tool: {}", tc.function.name) });
        }

        agent.conversation.history_mut().push(Message {
            role: Role::Tool,
            content: Some(result.to_string()),
            tool_call_id: Some(tc.id.clone()),
            ..Default::default()
        });

        results.push(ToolCallResult {
            id: tc.id,
            name: tc.function.name,
            args: tc.function.arguments,
            result,
        });

        if aborted {
            break;
        }
    }

    // Drain any user messages that arrived while tools were executing and
    // buffer them so they are appended after the tool-execution pass finishes.
    while let Ok(msg) = agent.interrupt_rx.try_recv() {
        buffered_interrupts.push(msg);
    }

    // Append buffered interrupts to the conversation history in order.
    for msg in buffered_interrupts {
        agent.conversation.history_mut().push(Message::user(&msg));
    }

    (ToolsResult { results }, agent)
}

/// Finalize a completed SSE stream by assembling full [`ToolCall`] objects from
/// the per-index [`PartialToolCall`] buffers and recording the assistant turn in
/// history.
///
/// Returns the assembled raw tool calls (empty vec if the turn had no tool use).
pub(crate) fn finalize_stream(data: &mut StreamingData) -> Vec<ToolCall> {
    let raw_tool_calls: Vec<ToolCall> = data
        .tool_call_bufs
        .drain(..)
        .flatten()
        .map(|p| ToolCall {
            id: p.id,
            r#type: ToolType::Function,
            function: FunctionCall {
                name: p.name,
                arguments: p.arguments,
            },
        })
        .collect();

    let assistant_msg = Message {
        role: Role::Assistant,
        content: if data.content_buf.is_empty() {
            None
        } else {
            Some(std::mem::take(&mut data.content_buf))
        },
        // Keep reasoning_content in history so it can be sent back within the
        // same Turn (required by deepseek-reasoner when tool calls are involved).
        // It will be stripped at the start of the next Turn in drain_interrupts.
        reasoning_content: if data.reasoning_buf.is_empty() {
            None
        } else {
            Some(std::mem::take(&mut data.reasoning_buf))
        },
        tool_calls: if raw_tool_calls.is_empty() {
            None
        } else {
            Some(raw_tool_calls.clone())
        },
        ..Default::default()
    };
    data.agent.conversation.history_mut().push(assistant_msg);

    raw_tool_calls
}

/// Apply a single SSE chunk delta to the [`StreamingData`] accumulator.
///
/// Returns a list of zero or more [`ChunkEvent`]s to be yielded to the caller.
/// In practice at most one event is returned per chunk, but the vec keeps the
/// API uniform across all cases.
pub(crate) fn apply_chunk_delta(
    data: &mut StreamingData,
    chunk: crate::raw::ChatCompletionChunk,
) -> Vec<ChunkEvent> {
    let choice = match chunk.choices.into_iter().next() {
        Some(c) => c,
        None => return vec![],
    };
    let delta = choice.delta;

    if let Some(dtcs) = delta.tool_calls {
        let mut events = Vec::new();
        for dtc in dtcs {
            let idx = dtc.index as usize;
            if data.tool_call_bufs.len() <= idx {
                data.tool_call_bufs.resize_with(idx + 1, || None);
            }
            let entry = &mut data.tool_call_bufs[idx];
            if entry.is_none() {
                // First chunk for this tool call — name and id arrive here.
                let id = dtc.id.clone().unwrap_or_default();
                let name = dtc
                    .function
                    .as_ref()
                    .and_then(|f| f.name.clone())
                    .unwrap_or_default();
                events.push(ChunkEvent::ToolCallChunk {
                    id: id.clone(),
                    name: name.clone(),
                    delta: String::new(),
                    index: idx as u32,
                });
                *entry = Some(PartialToolCall {
                    id,
                    name,
                    arguments: String::new(),
                });
            }
            if let Some(partial) = entry.as_mut() {
                if let Some(id) = dtc.id
                    && partial.id.is_empty()
                {
                    partial.id = id;
                }
                if let Some(func) = dtc.function
                    && let Some(args) = func.arguments
                    && !args.is_empty()
                {
                    partial.arguments.push_str(&args);
                    events.push(ChunkEvent::ToolCallChunk {
                        id: partial.id.clone(),
                        name: partial.name.clone(),
                        delta: args,
                        index: idx as u32,
                    });
                }
            }
        }
        return events;
    }

    if let Some(reasoning) = delta.reasoning_content
        && !reasoning.is_empty()
    {
        data.reasoning_buf.push_str(&reasoning);
        return vec![ChunkEvent::ReasoningToken(reasoning)];
    }

    if let Some(content) = delta.content
        && !content.is_empty()
    {
        data.content_buf.push_str(&content);
        return vec![ChunkEvent::Token(content)];
    }

    vec![]
}