Skip to main content

embacle_server/
streaming.rs

1// ABOUTME: Bridges embacle ChatStream to OpenAI-compatible Server-Sent Events format
2// ABOUTME: Converts StreamChunk items to "data: {json}\n\n" SSE with [DONE] terminator
3//
4// SPDX-License-Identifier: Apache-2.0
5// Copyright (c) 2026 dravr.ai
6
7use std::convert::Infallible;
8
9use axum::response::sse::{Event, Sse};
10use axum::response::{IntoResponse, Response};
11use embacle::types::ChatStream;
12use futures::StreamExt;
13
14use crate::completions::{generate_id, unix_timestamp};
15use crate::openai_types::{ChatCompletionChunk, ChunkChoice, Delta};
16
17/// Convert a `ChatStream` into an SSE response in `OpenAI` streaming format
18///
19/// Emits:
20/// 1. An initial chunk with role="assistant" and empty content
21/// 2. Content delta chunks as they arrive from the provider
22/// 3. A final chunk with `finish_reason`
23/// 4. `data: [DONE]` terminator
24pub fn sse_response(stream: ChatStream, model: &str) -> Response {
25    let completion_id = generate_id();
26    let created = unix_timestamp();
27    let model = model.to_owned();
28
29    let sse_stream = {
30        let mut sent_role = false;
31
32        stream.map(move |chunk_result| {
33            match chunk_result {
34                Ok(chunk) => {
35                    let (role, content, finish_reason) = if !sent_role {
36                        sent_role = true;
37                        if chunk.delta.is_empty() && !chunk.is_final {
38                            // First chunk: role announcement only
39                            (Some("assistant"), None, None)
40                        } else {
41                            // First chunk has content: send role + content
42                            (Some("assistant"), Some(chunk.delta), chunk.finish_reason)
43                        }
44                    } else if chunk.is_final {
45                        (
46                            None,
47                            if chunk.delta.is_empty() {
48                                None
49                            } else {
50                                Some(chunk.delta)
51                            },
52                            Some(chunk.finish_reason.unwrap_or_else(|| "stop".to_owned())),
53                        )
54                    } else {
55                        (None, Some(chunk.delta), None)
56                    };
57
58                    let data = ChatCompletionChunk {
59                        id: completion_id.clone(),
60                        object: "chat.completion.chunk",
61                        created,
62                        model: model.clone(),
63                        choices: vec![ChunkChoice {
64                            index: 0,
65                            delta: Delta { role, content },
66                            finish_reason,
67                        }],
68                    };
69
70                    let json = serde_json::to_string(&data).unwrap_or_default();
71                    Ok::<_, Infallible>(Event::default().data(json))
72                }
73                Err(e) => {
74                    let error_json = serde_json::json!({
75                        "error": {
76                            "message": e.message,
77                            "type": "stream_error"
78                        }
79                    });
80                    Ok(Event::default().data(error_json.to_string()))
81                }
82            }
83        })
84    };
85
86    // Append the [DONE] sentinel after the stream completes
87    let done_stream =
88        futures::stream::once(async { Ok::<_, Infallible>(Event::default().data("[DONE]")) });
89
90    let combined = sse_stream.chain(done_stream);
91
92    Sse::new(combined)
93        .keep_alive(axum::response::sse::KeepAlive::default())
94        .into_response()
95}