embacle_server/
streaming.rs1use std::convert::Infallible;
8
9use axum::response::sse::{Event, Sse};
10use axum::response::{IntoResponse, Response};
11use embacle::types::ChatStream;
12use futures::StreamExt;
13
14use crate::completions::{generate_id, unix_timestamp};
15use crate::openai_types::{ChatCompletionChunk, ChunkChoice, Delta};
16
17pub fn sse_response(stream: ChatStream, model: &str) -> Response {
25 let completion_id = generate_id();
26 let created = unix_timestamp();
27 let model = model.to_owned();
28
29 let sse_stream = {
30 let mut sent_role = false;
31
32 stream.map(move |chunk_result| {
33 match chunk_result {
34 Ok(chunk) => {
35 let (role, content, finish_reason) = if !sent_role {
36 sent_role = true;
37 if chunk.delta.is_empty() && !chunk.is_final {
38 (Some("assistant"), None, None)
40 } else {
41 (Some("assistant"), Some(chunk.delta), chunk.finish_reason)
43 }
44 } else if chunk.is_final {
45 (
46 None,
47 if chunk.delta.is_empty() {
48 None
49 } else {
50 Some(chunk.delta)
51 },
52 Some(chunk.finish_reason.unwrap_or_else(|| "stop".to_owned())),
53 )
54 } else {
55 (None, Some(chunk.delta), None)
56 };
57
58 let data = ChatCompletionChunk {
59 id: completion_id.clone(),
60 object: "chat.completion.chunk",
61 created,
62 model: model.clone(),
63 choices: vec![ChunkChoice {
64 index: 0,
65 delta: Delta { role, content },
66 finish_reason,
67 }],
68 };
69
70 let json = serde_json::to_string(&data).unwrap_or_default();
71 Ok::<_, Infallible>(Event::default().data(json))
72 }
73 Err(e) => {
74 let error_json = serde_json::json!({
75 "error": {
76 "message": e.message,
77 "type": "stream_error"
78 }
79 });
80 Ok(Event::default().data(error_json.to_string()))
81 }
82 }
83 })
84 };
85
86 let done_stream =
88 futures::stream::once(async { Ok::<_, Infallible>(Event::default().data("[DONE]")) });
89
90 let combined = sse_stream.chain(done_stream);
91
92 Sse::new(combined)
93 .keep_alive(axum::response::sse::KeepAlive::default())
94 .into_response()
95}