embacle_server/streaming.rs
1// ABOUTME: Bridges embacle ChatStream to OpenAI-compatible Server-Sent Events format
2// ABOUTME: Converts StreamChunk items to "data: {json}\n\n" SSE with [DONE] terminator
3//
4// SPDX-License-Identifier: Apache-2.0
5// Copyright (c) 2026 dravr.ai
6
7use std::convert::Infallible;
8
9use axum::response::sse::{Event, Sse};
10use axum::response::{IntoResponse, Response};
11use embacle::types::ChatStream;
12use futures::StreamExt;
13
14use crate::completions::{generate_id, unix_timestamp};
15use crate::openai_types::{ChatCompletionChunk, ChunkChoice, Delta};
16
17/// Convert a `ChatStream` into an SSE response in `OpenAI` streaming format
18///
19/// Emits:
20/// 1. An initial chunk with role="assistant" and empty content
21/// 2. Content delta chunks as they arrive from the provider
22/// 3. A final chunk with `finish_reason`
23/// 4. `data: [DONE]` terminator
24pub fn sse_response(stream: ChatStream, model: &str) -> Response {
25 let completion_id = generate_id();
26 let created = unix_timestamp();
27 let model = model.to_owned();
28
29 let sse_stream = {
30 let mut sent_role = false;
31
32 stream.map(move |chunk_result| {
33 match chunk_result {
34 Ok(chunk) => {
35 let (role, content, finish_reason) = if !sent_role {
36 sent_role = true;
37 if chunk.delta.is_empty() && !chunk.is_final {
38 // First chunk: role announcement only
39 (Some("assistant"), None, None)
40 } else {
41 // First chunk has content: send role + content
42 (Some("assistant"), Some(chunk.delta), chunk.finish_reason)
43 }
44 } else if chunk.is_final {
45 (
46 None,
47 if chunk.delta.is_empty() {
48 None
49 } else {
50 Some(chunk.delta)
51 },
52 Some(chunk.finish_reason.unwrap_or_else(|| "stop".to_owned())),
53 )
54 } else {
55 (None, Some(chunk.delta), None)
56 };
57
58 // LinesStream strips trailing \n from each line. Restore it
59 // so concatenated SSE deltas preserve original line breaks.
60 let content = content.map(|c| {
61 if !c.is_empty() && !c.ends_with('\n') {
62 let mut normalized = c;
63 normalized.push('\n');
64 normalized
65 } else {
66 c
67 }
68 });
69
70 let data = ChatCompletionChunk {
71 id: completion_id.clone(),
72 object: "chat.completion.chunk",
73 created,
74 model: model.clone(),
75 choices: vec![ChunkChoice {
76 index: 0,
77 delta: Delta { role, content },
78 finish_reason,
79 }],
80 };
81
82 let json = serde_json::to_string(&data).unwrap_or_default();
83 Ok::<_, Infallible>(Event::default().data(json))
84 }
85 Err(e) => {
86 let error_json = serde_json::json!({
87 "error": {
88 "message": e.message,
89 "type": "stream_error"
90 }
91 });
92 Ok(Event::default().data(error_json.to_string()))
93 }
94 }
95 })
96 };
97
98 // Append the [DONE] sentinel after the stream completes
99 let done_stream =
100 futures::stream::once(async { Ok::<_, Infallible>(Event::default().data("[DONE]")) });
101
102 let combined = sse_stream.chain(done_stream);
103
104 Sse::new(combined)
105 .keep_alive(axum::response::sse::KeepAlive::default())
106 .into_response()
107}