cloakpipe_proxy/
streaming.rs1use cloakpipe_core::{rehydrator::Rehydrator, vault::Vault};
4use futures::stream::Stream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8pub async fn rehydrate_stream(
10 response: reqwest::Response,
11 vault: Arc<Mutex<Vault>>,
12 request_id: String,
13) -> impl Stream<Item = Result<String, std::io::Error>> {
14 let mut buffer = String::new();
15
16 async_stream::stream! {
17 let byte_stream = response.text().await.unwrap_or_default();
18
19 for line in byte_stream.lines() {
21 if let Some(data) = line.strip_prefix("data: ") {
22
23 if data == "[DONE]" {
24 yield Ok("data: [DONE]\n\n".to_string());
25 continue;
26 }
27
28 if let Ok(mut chunk) = serde_json::from_str::<serde_json::Value>(data) {
30 if let Some(content) = chunk
32 .get("choices")
33 .and_then(|c| c.get(0))
34 .and_then(|c| c.get("delta"))
35 .and_then(|d| d.get("content"))
36 .and_then(|c| c.as_str())
37 .map(|s| s.to_string())
38 {
39 let vault_guard = vault.lock().await;
40 let (rehydrated, _) = Rehydrator::rehydrate_chunk(
41 &content,
42 &mut buffer,
43 &vault_guard,
44 )
45 .unwrap_or((content.clone(), false));
46
47 if !rehydrated.is_empty() {
48 if let Some(choices) = chunk.get_mut("choices").and_then(|c| c.as_array_mut()) {
50 if let Some(first) = choices.first_mut() {
51 if let Some(delta) = first.get_mut("delta") {
52 delta["content"] = serde_json::Value::String(rehydrated);
53 }
54 }
55 }
56
57 let serialized = serde_json::to_string(&chunk).unwrap_or_default();
58 yield Ok(format!("data: {}\n\n", serialized));
59 }
60 } else {
61 yield Ok(format!("data: {}\n\n", data));
63 }
64 } else {
65 yield Ok(format!("data: {}\n\n", data));
67 }
68 } else if !line.is_empty() {
69 yield Ok(format!("{}\n", line));
70 }
71 }
72
73 if !buffer.is_empty() {
75 tracing::debug!(request_id = %request_id, "Flushing remaining stream buffer");
76 }
77 }
78}