ccs_proxy/proxy/
sse_tap.rs1use crate::capture::{CaptureError, ErrorKind};
7use crate::provider::{ProviderKind, claude::ClaudeReassembler, codex::CodexReassembler};
8use bytes::Bytes;
9use futures::StreamExt;
10use futures::stream::Stream;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type TapReceiver = mpsc::Receiver<Bytes>;
15
16pub type ClientChunk = Result<Bytes, std::io::Error>;
18
19pub fn tee<S>(upstream: S) -> (impl Stream<Item = ClientChunk>, TapReceiver)
26where
27 S: Stream<Item = Result<Bytes, reqwest::Error>> + Send + 'static,
28{
29 let (tap_tx, tap_rx) = mpsc::channel::<Bytes>(64);
30 let (out_tx, out_rx) = mpsc::channel::<Result<Bytes, std::io::Error>>(64);
31 tokio::spawn(async move {
32 let mut up = std::pin::pin!(upstream);
33 while let Some(item) = up.next().await {
34 match item {
35 Ok(chunk) => {
36 let _ = tap_tx.try_send(chunk.clone());
38 if out_tx.send(Ok(chunk)).await.is_err() {
39 break;
40 }
41 }
42 Err(err) => {
43 let io_err = std::io::Error::other(err);
44 let _ = out_tx.send(Err(io_err)).await;
45 break;
46 }
47 }
48 }
49 drop(tap_tx);
50 });
51 (ReceiverStream::new(out_rx), tap_rx)
52}
53
54pub async fn reassemble(
58 provider: ProviderKind,
59 mut rx: TapReceiver,
60) -> (Option<serde_json::Value>, u64, Option<CaptureError>) {
61 match provider {
62 ProviderKind::Claude => {
63 let mut reasm = ClaudeReassembler::new();
64 while let Some(chunk) = rx.recv().await {
65 reasm.feed(&chunk);
66 }
67 let count = reasm.frames_count();
68 match reasm.finish() {
69 Some(msg) => (Some(msg.to_json()), count, None),
70 None => (
71 None,
72 count,
73 Some(CaptureError {
74 kind: ErrorKind::ReassembleFailed,
75 message: "no SSE frames parsed".into(),
76 }),
77 ),
78 }
79 }
80 ProviderKind::Codex => {
81 let mut reasm = CodexReassembler::new();
82 while let Some(chunk) = rx.recv().await {
83 reasm.feed(&chunk);
84 }
85 let count = reasm.frames_count();
86 match reasm.finish() {
87 Some(msg) => (Some(msg.to_json()), count, None),
88 None => (
89 None,
90 count,
91 Some(CaptureError {
92 kind: ErrorKind::ReassembleFailed,
93 message: "no SSE frames parsed".into(),
94 }),
95 ),
96 }
97 }
98 }
99}