Skip to main content

cloakpipe_proxy/
streaming.rs

1//! SSE streaming rehydration for chat completion responses.
2
3use cloakpipe_core::{rehydrator::Rehydrator, vault::Vault};
4use futures::stream::Stream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8/// Consume an upstream SSE response and produce a rehydrated SSE stream.
9pub async fn rehydrate_stream(
10    response: reqwest::Response,
11    vault: Arc<Mutex<Vault>>,
12    request_id: String,
13) -> impl Stream<Item = Result<String, std::io::Error>> {
14    let mut buffer = String::new();
15
16    async_stream::stream! {
17        let byte_stream = response.text().await.unwrap_or_default();
18
19        // Split SSE response into lines and process events
20        for line in byte_stream.lines() {
21            if line.starts_with("data: ") {
22                let data = &line[6..];
23
24                if data == "[DONE]" {
25                    yield Ok("data: [DONE]\n\n".to_string());
26                    continue;
27                }
28
29                // Parse the SSE JSON chunk
30                if let Ok(mut chunk) = serde_json::from_str::<serde_json::Value>(data) {
31                    // Extract delta content
32                    if let Some(content) = chunk
33                        .get("choices")
34                        .and_then(|c| c.get(0))
35                        .and_then(|c| c.get("delta"))
36                        .and_then(|d| d.get("content"))
37                        .and_then(|c| c.as_str())
38                        .map(|s| s.to_string())
39                    {
40                        let vault_guard = vault.lock().await;
41                        let (rehydrated, _) = Rehydrator::rehydrate_chunk(
42                            &content,
43                            &mut buffer,
44                            &vault_guard,
45                        )
46                        .unwrap_or((content.clone(), false));
47
48                        if !rehydrated.is_empty() {
49                            // Update the delta content with rehydrated text
50                            if let Some(choices) = chunk.get_mut("choices").and_then(|c| c.as_array_mut()) {
51                                if let Some(first) = choices.first_mut() {
52                                    if let Some(delta) = first.get_mut("delta") {
53                                        delta["content"] = serde_json::Value::String(rehydrated);
54                                    }
55                                }
56                            }
57
58                            let serialized = serde_json::to_string(&chunk).unwrap_or_default();
59                            yield Ok(format!("data: {}\n\n", serialized));
60                        }
61                    } else {
62                        // Non-content chunk (role, finish_reason, etc.) — pass through
63                        yield Ok(format!("data: {}\n\n", data));
64                    }
65                } else {
66                    // Unparseable data — pass through
67                    yield Ok(format!("data: {}\n\n", data));
68                }
69            } else if !line.is_empty() {
70                yield Ok(format!("{}\n", line));
71            }
72        }
73
74        // Flush any remaining buffer
75        if !buffer.is_empty() {
76            tracing::debug!(request_id = %request_id, "Flushing remaining stream buffer");
77        }
78    }
79}