Skip to main content

aiclient_api/convert/
stream.rs

1//! SSE stream chunk conversion helpers.
2//! Functions to convert SSE chunks between OpenAI and Anthropic formats.
3
4/// Convert a raw SSE chunk to OpenAI format.
5/// If the chunk already looks like OpenAI format (has "choices"), return as-is.
6/// Handles `data: [DONE]` terminator.
7pub fn chunk_to_openai(chunk: &[u8], _model: &str) -> Vec<u8> {
8    let text = match std::str::from_utf8(chunk) {
9        Ok(s) => s,
10        Err(_) => return chunk.to_vec(),
11    };
12
13    let mut result = Vec::new();
14    for line in text.lines() {
15        if let Some(data) = line.strip_prefix("data: ") {
16            if data.trim() == "[DONE]" {
17                result.extend_from_slice(b"data: [DONE]\n\n");
18                continue;
19            }
20            // Try to parse as JSON
21            if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
22                // If already OpenAI format (has "choices"), pass through
23                if val.get("choices").is_some() {
24                    result.extend_from_slice(line.as_bytes());
25                    result.extend_from_slice(b"\n\n");
26                } else {
27                    // Try to convert from Anthropic streaming format
28                    let converted = convert_anthropic_chunk_to_openai(&val, _model);
29                    if let Some(c) = converted {
30                        let serialized = serde_json::to_string(&c).unwrap_or_default();
31                        result.extend_from_slice(b"data: ");
32                        result.extend_from_slice(serialized.as_bytes());
33                        result.extend_from_slice(b"\n\n");
34                    } else {
35                        // Pass through unknown format
36                        result.extend_from_slice(line.as_bytes());
37                        result.extend_from_slice(b"\n\n");
38                    }
39                }
40            } else {
41                // Pass through non-JSON data lines
42                result.extend_from_slice(line.as_bytes());
43                result.extend_from_slice(b"\n\n");
44            }
45        } else if !line.is_empty() {
46            result.extend_from_slice(line.as_bytes());
47            result.extend_from_slice(b"\n");
48        }
49    }
50    result
51}
52
53/// Convert a raw SSE chunk to Anthropic format.
54/// If the chunk already looks like Anthropic format (has event type fields), return as-is.
55/// Handles `data: [DONE]` terminator.
56pub fn chunk_to_anthropic(chunk: &[u8], _model: &str) -> Vec<u8> {
57    let text = match std::str::from_utf8(chunk) {
58        Ok(s) => s,
59        Err(_) => return chunk.to_vec(),
60    };
61
62    let mut result = Vec::new();
63    for line in text.lines() {
64        if let Some(data) = line.strip_prefix("data: ") {
65            if data.trim() == "[DONE]" {
66                // OpenAI uses [DONE], Anthropic uses message_stop event
67                // We'll emit the Anthropic message_stop event
68                let stop_event = serde_json::json!({"type": "message_stop"});
69                let serialized = serde_json::to_string(&stop_event).unwrap_or_default();
70                result.extend_from_slice(b"event: message_stop\n");
71                result.extend_from_slice(b"data: ");
72                result.extend_from_slice(serialized.as_bytes());
73                result.extend_from_slice(b"\n\n");
74                continue;
75            }
76            // Try to parse as JSON
77            if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
78                // If already Anthropic format (has "type" field that's an event type), pass through
79                if let Some(event_type) = val.get("type").and_then(|t| t.as_str()) {
80                    if matches!(
81                        event_type,
82                        "message_start"
83                            | "content_block_start"
84                            | "content_block_delta"
85                            | "content_block_stop"
86                            | "message_delta"
87                            | "message_stop"
88                    ) {
89                        result.extend_from_slice(b"data: ");
90                        result.extend_from_slice(data.as_bytes());
91                        result.extend_from_slice(b"\n\n");
92                        continue;
93                    }
94                }
95                // Try to convert from OpenAI streaming format
96                let converted = convert_openai_chunk_to_anthropic(&val, _model);
97                if let Some(c) = converted {
98                    for event in c {
99                        let serialized = serde_json::to_string(&event).unwrap_or_default();
100                        result.extend_from_slice(b"data: ");
101                        result.extend_from_slice(serialized.as_bytes());
102                        result.extend_from_slice(b"\n\n");
103                    }
104                } else {
105                    result.extend_from_slice(line.as_bytes());
106                    result.extend_from_slice(b"\n\n");
107                }
108            } else {
109                result.extend_from_slice(line.as_bytes());
110                result.extend_from_slice(b"\n\n");
111            }
112        } else if !line.is_empty() {
113            result.extend_from_slice(line.as_bytes());
114            result.extend_from_slice(b"\n");
115        }
116    }
117    result
118}
119
120fn convert_anthropic_chunk_to_openai(
121    val: &serde_json::Value,
122    model: &str,
123) -> Option<serde_json::Value> {
124    let event_type = val.get("type").and_then(|t| t.as_str())?;
125
126    match event_type {
127        "content_block_delta" => {
128            let delta = val.get("delta")?;
129            let delta_type = delta.get("type").and_then(|t| t.as_str())?;
130            if delta_type == "text_delta" {
131                let text = delta.get("text").and_then(|t| t.as_str()).unwrap_or("");
132                let chunk = serde_json::json!({
133                    "id": "chatcmpl-stream",
134                    "object": "chat.completion.chunk",
135                    "created": 0,
136                    "model": model,
137                    "choices": [{
138                        "index": 0,
139                        "delta": {
140                            "content": text,
141                        },
142                        "finish_reason": null,
143                    }]
144                });
145                Some(chunk)
146            } else {
147                None
148            }
149        }
150        "message_delta" => {
151            let delta = val.get("delta")?;
152            let stop_reason = delta.get("stop_reason").and_then(|r| r.as_str());
153            let finish_reason = stop_reason.map(|s| match s {
154                "end_turn" => "stop",
155                "max_tokens" => "length",
156                "tool_use" => "tool_calls",
157                other => other,
158            });
159            let chunk = serde_json::json!({
160                "id": "chatcmpl-stream",
161                "object": "chat.completion.chunk",
162                "created": 0,
163                "model": model,
164                "choices": [{
165                    "index": 0,
166                    "delta": {},
167                    "finish_reason": finish_reason,
168                }]
169            });
170            Some(chunk)
171        }
172        _ => None,
173    }
174}
175
176fn convert_openai_chunk_to_anthropic(
177    val: &serde_json::Value,
178    _model: &str,
179) -> Option<Vec<serde_json::Value>> {
180    let choices = val.get("choices").and_then(|c| c.as_array())?;
181    let first_choice = choices.first()?;
182    let delta = first_choice.get("delta")?;
183
184    let mut events = Vec::new();
185
186    if let Some(content) = delta.get("content").and_then(|c| c.as_str()) {
187        if !content.is_empty() {
188            let event = serde_json::json!({
189                "type": "content_block_delta",
190                "index": 0,
191                "delta": {
192                    "type": "text_delta",
193                    "text": content,
194                }
195            });
196            events.push(event);
197        }
198    }
199
200    if let Some(finish_reason) = first_choice.get("finish_reason").and_then(|f| f.as_str()) {
201        let stop_reason = match finish_reason {
202            "stop" => "end_turn",
203            "length" => "max_tokens",
204            "tool_calls" => "tool_use",
205            other => other,
206        };
207        let event = serde_json::json!({
208            "type": "message_delta",
209            "delta": {
210                "type": "message_delta",
211                "stop_reason": stop_reason,
212            }
213        });
214        events.push(event);
215    }
216
217    if events.is_empty() {
218        None
219    } else {
220        Some(events)
221    }
222}