cloakpipe_proxy/
streaming.rs1use cloakpipe_core::{rehydrator::Rehydrator, vault::Vault};
4use futures::stream::Stream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8pub async fn rehydrate_stream(
10 response: reqwest::Response,
11 vault: Arc<Mutex<Vault>>,
12 request_id: String,
13) -> impl Stream<Item = Result<String, std::io::Error>> {
14 let mut buffer = String::new();
15
16 async_stream::stream! {
17 let byte_stream = response.text().await.unwrap_or_default();
18
19 for line in byte_stream.lines() {
21 if line.starts_with("data: ") {
22 let data = &line[6..];
23
24 if data == "[DONE]" {
25 yield Ok("data: [DONE]\n\n".to_string());
26 continue;
27 }
28
29 if let Ok(mut chunk) = serde_json::from_str::<serde_json::Value>(data) {
31 if let Some(content) = chunk
33 .get("choices")
34 .and_then(|c| c.get(0))
35 .and_then(|c| c.get("delta"))
36 .and_then(|d| d.get("content"))
37 .and_then(|c| c.as_str())
38 .map(|s| s.to_string())
39 {
40 let vault_guard = vault.lock().await;
41 let (rehydrated, _) = Rehydrator::rehydrate_chunk(
42 &content,
43 &mut buffer,
44 &vault_guard,
45 )
46 .unwrap_or((content.clone(), false));
47
48 if !rehydrated.is_empty() {
49 if let Some(choices) = chunk.get_mut("choices").and_then(|c| c.as_array_mut()) {
51 if let Some(first) = choices.first_mut() {
52 if let Some(delta) = first.get_mut("delta") {
53 delta["content"] = serde_json::Value::String(rehydrated);
54 }
55 }
56 }
57
58 let serialized = serde_json::to_string(&chunk).unwrap_or_default();
59 yield Ok(format!("data: {}\n\n", serialized));
60 }
61 } else {
62 yield Ok(format!("data: {}\n\n", data));
64 }
65 } else {
66 yield Ok(format!("data: {}\n\n", data));
68 }
69 } else if !line.is_empty() {
70 yield Ok(format!("{}\n", line));
71 }
72 }
73
74 if !buffer.is_empty() {
76 tracing::debug!(request_id = %request_id, "Flushing remaining stream buffer");
77 }
78 }
79}