Skip to main content

stream_rs/accumulators/
openai.rs

1//! Accumulator for `OpenAI` `chat.completion.chunk` streaming deltas.
2//!
3//! `OpenAI` streams a chat completion as a sequence of chunks, each carrying a
4//! `choices` array. Every choice has an `index` and a `delta` that may contain
5//! a `role`, a `content` fragment, and/or `tool_calls` fragments. This type
6//! folds those fragments into the final per-choice content and tool calls.
7//!
8//! It is JSON-library agnostic: parse each chunk however you like and call the
9//! typed methods below. The accumulator never allocates per fragment beyond the
10//! string growth it performs.
11//!
12//! Choices and tool calls are stored sparsely in a [`BTreeMap`] keyed by their
13//! provider-supplied index, so a stream that reports a large or non-contiguous
14//! `index` (whether by quirk or by a malicious server) costs only the entries
15//! actually seen — it never forces a dense allocation up to that index.
16//!
17//! # Example
18//!
19//! ```
20//! use stream_rs::accumulators::openai::OpenAiAccumulator;
21//!
22//! let mut acc = OpenAiAccumulator::new();
23//! acc.push_role(0, "assistant");
24//! acc.push_content(0, "Hel");
25//! acc.push_content(0, "lo");
26//! assert_eq!(acc.choice(0).unwrap().content, "Hello");
27//! assert_eq!(acc.choice(0).unwrap().role.as_deref(), Some("assistant"));
28//! ```
29
30use alloc::borrow::ToOwned;
31use alloc::collections::BTreeMap;
32use alloc::string::String;
33
34/// One streamed tool call being assembled.
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct ToolCall {
37    /// The tool call id (set once, from the first fragment that carries it).
38    pub id: Option<String>,
39    /// The function name (set once).
40    pub name: Option<String>,
41    /// The function arguments, accumulated from streamed JSON fragments.
42    pub arguments: String,
43}
44
45/// The assembled state of a single `choices[index]`.
46#[derive(Debug, Clone, Default, PartialEq, Eq)]
47pub struct Choice {
48    /// The role from the first delta that carried one.
49    pub role: Option<String>,
50    /// The concatenated text content.
51    pub content: String,
52    /// Tool calls keyed by their `tool_calls[i].index`.
53    pub tool_calls: BTreeMap<usize, ToolCall>,
54    /// The `finish_reason`, set when the final chunk for this choice arrives.
55    pub finish_reason: Option<String>,
56}
57
58/// Accumulates `OpenAI` chat-completion streaming deltas into final choices.
59#[derive(Debug, Default)]
60pub struct OpenAiAccumulator {
61    choices: BTreeMap<usize, Choice>,
62}
63
64impl OpenAiAccumulator {
65    /// Create an empty accumulator.
66    #[must_use]
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Apply `delta.role` for `choices[index]`.
72    ///
73    /// The choice is created on first reference, so any `index` is valid; the
74    /// role is recorded only from the first delta that carries one.
75    pub fn push_role(&mut self, index: usize, role: &str) {
76        let choice = self.choice_mut(index);
77        if choice.role.is_none() {
78            choice.role = Some(role.to_owned());
79        }
80    }
81
82    /// Append a `delta.content` fragment for `choices[index]`.
83    ///
84    /// The choice is created on first reference, so any `index` is valid.
85    pub fn push_content(&mut self, index: usize, fragment: &str) {
86        self.choice_mut(index).content.push_str(fragment);
87    }
88
89    /// Set the `finish_reason` for `choices[index]`.
90    pub fn set_finish_reason(&mut self, index: usize, reason: &str) {
91        self.choice_mut(index).finish_reason = Some(reason.to_owned());
92    }
93
94    /// Apply a `tool_calls[tool_index]` fragment for `choices[index]`.
95    ///
96    /// Any of `id`, `name`, or `arguments` may be `None`/empty for a given
97    /// fragment; `arguments` fragments are concatenated in arrival order.
98    pub fn push_tool_call(
99        &mut self,
100        index: usize,
101        tool_index: usize,
102        id: Option<&str>,
103        name: Option<&str>,
104        arguments: Option<&str>,
105    ) {
106        let choice = self.choice_mut(index);
107        let tc = choice.tool_calls.entry(tool_index).or_default();
108        if let Some(id) = id {
109            if tc.id.is_none() {
110                tc.id = Some(id.to_owned());
111            }
112        }
113        if let Some(name) = name {
114            if tc.name.is_none() {
115                tc.name = Some(name.to_owned());
116            }
117        }
118        if let Some(args) = arguments {
119            tc.arguments.push_str(args);
120        }
121    }
122
123    /// Borrow the assembled `choices[index]`, if it exists.
124    #[must_use]
125    pub fn choice(&self, index: usize) -> Option<&Choice> {
126        self.choices.get(&index)
127    }
128
129    /// All assembled choices in index order, skipping gaps that were never seen.
130    pub fn choices(&self) -> impl Iterator<Item = (usize, &Choice)> {
131        self.choices.iter().map(|(&i, c)| (i, c))
132    }
133
134    fn choice_mut(&mut self, index: usize) -> &mut Choice {
135        self.choices.entry(index).or_default()
136    }
137}