stream_rs/accumulators/openai.rs
1//! Accumulator for `OpenAI` `chat.completion.chunk` streaming deltas.
2//!
3//! `OpenAI` streams a chat completion as a sequence of chunks, each carrying a
4//! `choices` array. Every choice has an `index` and a `delta` that may contain
5//! a `role`, a `content` fragment, and/or `tool_calls` fragments. This type
6//! folds those fragments into the final per-choice content and tool calls.
7//!
8//! It is JSON-library agnostic: parse each chunk however you like and call the
9//! typed methods below. The accumulator never allocates per fragment beyond the
10//! string growth it performs.
11//!
12//! Choices and tool calls are stored sparsely in a [`BTreeMap`] keyed by their
13//! provider-supplied index, so a stream that reports a large or non-contiguous
14//! `index` (whether by quirk or by a malicious server) costs only the entries
15//! actually seen — it never forces a dense allocation up to that index.
16//!
17//! # Example
18//!
19//! ```
20//! use stream_rs::accumulators::openai::OpenAiAccumulator;
21//!
22//! let mut acc = OpenAiAccumulator::new();
23//! acc.push_role(0, "assistant");
24//! acc.push_content(0, "Hel");
25//! acc.push_content(0, "lo");
26//! assert_eq!(acc.choice(0).unwrap().content, "Hello");
27//! assert_eq!(acc.choice(0).unwrap().role.as_deref(), Some("assistant"));
28//! ```
29
30use alloc::borrow::ToOwned;
31use alloc::collections::BTreeMap;
32use alloc::string::String;
33
34/// One streamed tool call being assembled.
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct ToolCall {
37 /// The tool call id (set once, from the first fragment that carries it).
38 pub id: Option<String>,
39 /// The function name (set once).
40 pub name: Option<String>,
41 /// The function arguments, accumulated from streamed JSON fragments.
42 pub arguments: String,
43}
44
45/// The assembled state of a single `choices[index]`.
46#[derive(Debug, Clone, Default, PartialEq, Eq)]
47pub struct Choice {
48 /// The role from the first delta that carried one.
49 pub role: Option<String>,
50 /// The concatenated text content.
51 pub content: String,
52 /// Tool calls keyed by their `tool_calls[i].index`.
53 pub tool_calls: BTreeMap<usize, ToolCall>,
54 /// The `finish_reason`, set when the final chunk for this choice arrives.
55 pub finish_reason: Option<String>,
56}
57
58/// Accumulates `OpenAI` chat-completion streaming deltas into final choices.
59#[derive(Debug, Default)]
60pub struct OpenAiAccumulator {
61 choices: BTreeMap<usize, Choice>,
62}
63
64impl OpenAiAccumulator {
65 /// Create an empty accumulator.
66 #[must_use]
67 pub fn new() -> Self {
68 Self::default()
69 }
70
71 /// Apply `delta.role` for `choices[index]`.
72 ///
73 /// The choice is created on first reference, so any `index` is valid; the
74 /// role is recorded only from the first delta that carries one.
75 pub fn push_role(&mut self, index: usize, role: &str) {
76 let choice = self.choice_mut(index);
77 if choice.role.is_none() {
78 choice.role = Some(role.to_owned());
79 }
80 }
81
82 /// Append a `delta.content` fragment for `choices[index]`.
83 ///
84 /// The choice is created on first reference, so any `index` is valid.
85 pub fn push_content(&mut self, index: usize, fragment: &str) {
86 self.choice_mut(index).content.push_str(fragment);
87 }
88
89 /// Set the `finish_reason` for `choices[index]`.
90 pub fn set_finish_reason(&mut self, index: usize, reason: &str) {
91 self.choice_mut(index).finish_reason = Some(reason.to_owned());
92 }
93
94 /// Apply a `tool_calls[tool_index]` fragment for `choices[index]`.
95 ///
96 /// Any of `id`, `name`, or `arguments` may be `None`/empty for a given
97 /// fragment; `arguments` fragments are concatenated in arrival order.
98 pub fn push_tool_call(
99 &mut self,
100 index: usize,
101 tool_index: usize,
102 id: Option<&str>,
103 name: Option<&str>,
104 arguments: Option<&str>,
105 ) {
106 let choice = self.choice_mut(index);
107 let tc = choice.tool_calls.entry(tool_index).or_default();
108 if let Some(id) = id {
109 if tc.id.is_none() {
110 tc.id = Some(id.to_owned());
111 }
112 }
113 if let Some(name) = name {
114 if tc.name.is_none() {
115 tc.name = Some(name.to_owned());
116 }
117 }
118 if let Some(args) = arguments {
119 tc.arguments.push_str(args);
120 }
121 }
122
123 /// Borrow the assembled `choices[index]`, if it exists.
124 #[must_use]
125 pub fn choice(&self, index: usize) -> Option<&Choice> {
126 self.choices.get(&index)
127 }
128
129 /// All assembled choices in index order, skipping gaps that were never seen.
130 pub fn choices(&self) -> impl Iterator<Item = (usize, &Choice)> {
131 self.choices.iter().map(|(&i, c)| (i, c))
132 }
133
134 fn choice_mut(&mut self, index: usize) -> &mut Choice {
135 self.choices.entry(index).or_default()
136 }
137}