Skip to main content

cloakpipe_proxy/
streaming.rs

1//! SSE streaming rehydration for chat completion responses.
2
3use cloakpipe_core::{rehydrator::Rehydrator, vault::Vault};
4use futures::stream::Stream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8/// Consume an upstream SSE response and produce a rehydrated SSE stream.
9pub async fn rehydrate_stream(
10    response: reqwest::Response,
11    vault: Arc<Mutex<Vault>>,
12    request_id: String,
13) -> impl Stream<Item = Result<String, std::io::Error>> {
14    let mut buffer = String::new();
15
16    async_stream::stream! {
17        let byte_stream = response.text().await.unwrap_or_default();
18
19        // Split SSE response into lines and process events
20        for line in byte_stream.lines() {
21            if let Some(data) = line.strip_prefix("data: ") {
22
23                if data == "[DONE]" {
24                    yield Ok("data: [DONE]\n\n".to_string());
25                    continue;
26                }
27
28                // Parse the SSE JSON chunk
29                if let Ok(mut chunk) = serde_json::from_str::<serde_json::Value>(data) {
30                    // Extract delta content
31                    if let Some(content) = chunk
32                        .get("choices")
33                        .and_then(|c| c.get(0))
34                        .and_then(|c| c.get("delta"))
35                        .and_then(|d| d.get("content"))
36                        .and_then(|c| c.as_str())
37                        .map(|s| s.to_string())
38                    {
39                        let vault_guard = vault.lock().await;
40                        let (rehydrated, _) = Rehydrator::rehydrate_chunk(
41                            &content,
42                            &mut buffer,
43                            &vault_guard,
44                        )
45                        .unwrap_or((content.clone(), false));
46
47                        if !rehydrated.is_empty() {
48                            // Update the delta content with rehydrated text
49                            if let Some(choices) = chunk.get_mut("choices").and_then(|c| c.as_array_mut()) {
50                                if let Some(first) = choices.first_mut() {
51                                    if let Some(delta) = first.get_mut("delta") {
52                                        delta["content"] = serde_json::Value::String(rehydrated);
53                                    }
54                                }
55                            }
56
57                            let serialized = serde_json::to_string(&chunk).unwrap_or_default();
58                            yield Ok(format!("data: {}\n\n", serialized));
59                        }
60                    } else {
61                        // Non-content chunk (role, finish_reason, etc.) — pass through
62                        yield Ok(format!("data: {}\n\n", data));
63                    }
64                } else {
65                    // Unparseable data — pass through
66                    yield Ok(format!("data: {}\n\n", data));
67                }
68            } else if !line.is_empty() {
69                yield Ok(format!("{}\n", line));
70            }
71        }
72
73        // Flush any remaining buffer
74        if !buffer.is_empty() {
75            tracing::debug!(request_id = %request_id, "Flushing remaining stream buffer");
76        }
77    }
78}