Skip to main content

embacle_server/
streaming.rs

1// ABOUTME: Bridges embacle ChatStream to OpenAI-compatible Server-Sent Events format
2// ABOUTME: Converts StreamChunk items to "data: {json}\n\n" SSE with [DONE] terminator
3//
4// SPDX-License-Identifier: Apache-2.0
5// Copyright (c) 2026 dravr.ai
6
7use std::convert::Infallible;
8
9use axum::response::sse::{Event, Sse};
10use axum::response::{IntoResponse, Response};
11use embacle::types::ChatStream;
12use futures::StreamExt;
13
14use crate::completions::{generate_id, unix_timestamp};
15use crate::openai_types::{ChatCompletionChunk, ChunkChoice, Delta};
16
17/// Convert a `ChatStream` into an SSE response in `OpenAI` streaming format
18///
19/// Emits:
20/// 1. An initial chunk with role="assistant" and empty content
21/// 2. Content delta chunks as they arrive from the provider
22/// 3. A final chunk with `finish_reason`
23/// 4. `data: [DONE]` terminator
24pub fn sse_response(stream: ChatStream, model: &str) -> Response {
25    let completion_id = generate_id();
26    let created = unix_timestamp();
27    let model = model.to_owned();
28
29    let sse_stream = {
30        let mut sent_role = false;
31
32        stream.map(move |chunk_result| {
33            match chunk_result {
34                Ok(chunk) => {
35                    let (role, content, finish_reason) = if !sent_role {
36                        sent_role = true;
37                        if chunk.delta.is_empty() && !chunk.is_final {
38                            // First chunk: role announcement only
39                            (Some("assistant"), None, None)
40                        } else {
41                            // First chunk has content: send role + content
42                            (Some("assistant"), Some(chunk.delta), chunk.finish_reason)
43                        }
44                    } else if chunk.is_final {
45                        (
46                            None,
47                            if chunk.delta.is_empty() {
48                                None
49                            } else {
50                                Some(chunk.delta)
51                            },
52                            Some(chunk.finish_reason.unwrap_or_else(|| "stop".to_owned())),
53                        )
54                    } else {
55                        (None, Some(chunk.delta), None)
56                    };
57
58                    // LinesStream strips trailing \n from each line. Restore it
59                    // so concatenated SSE deltas preserve original line breaks.
60                    let content = content.map(|c| {
61                        if !c.is_empty() && !c.ends_with('\n') {
62                            let mut normalized = c;
63                            normalized.push('\n');
64                            normalized
65                        } else {
66                            c
67                        }
68                    });
69
70                    let data = ChatCompletionChunk {
71                        id: completion_id.clone(),
72                        object: "chat.completion.chunk",
73                        created,
74                        model: model.clone(),
75                        choices: vec![ChunkChoice {
76                            index: 0,
77                            delta: Delta { role, content },
78                            finish_reason,
79                        }],
80                    };
81
82                    let json = serde_json::to_string(&data).unwrap_or_default();
83                    Ok::<_, Infallible>(Event::default().data(json))
84                }
85                Err(e) => {
86                    let error_json = serde_json::json!({
87                        "error": {
88                            "message": e.message,
89                            "type": "stream_error"
90                        }
91                    });
92                    Ok(Event::default().data(error_json.to_string()))
93                }
94            }
95        })
96    };
97
98    // Append the [DONE] sentinel after the stream completes
99    let done_stream =
100        futures::stream::once(async { Ok::<_, Infallible>(Event::default().data("[DONE]")) });
101
102    let combined = sse_stream.chain(done_stream);
103
104    Sse::new(combined)
105        .keep_alive(axum::response::sse::KeepAlive::default())
106        .into_response()
107}