Skip to main content

ccs_proxy/proxy/
sse_tap.rs

1//! Split the upstream byte stream so that every chunk is sent to the HTTP
2//! client without buffering AND a clone of every chunk is delivered to a
3//! background reassembler that produces the final JSON message used by the
4//! capture record.
5
6use crate::capture::{CaptureError, ErrorKind};
7use crate::provider::{ProviderKind, claude::ClaudeReassembler, codex::CodexReassembler};
8use bytes::Bytes;
9use futures::StreamExt;
10use futures::stream::Stream;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13
14pub type TapReceiver = mpsc::Receiver<Bytes>;
15
16/// One item of the client-side stream returned by `tee`.
17pub type ClientChunk = Result<Bytes, std::io::Error>;
18
19/// Splits an upstream byte stream into:
20/// - the returned `Stream` that forwards every chunk to the HTTP client (no
21///   buffering — the stream is forwarded as it arrives, preserving TTFT)
22/// - a `TapReceiver` that yields a (cheap, `Arc`-backed) clone of every chunk
23///   for background reassembly. If the tap channel fills up the chunk is
24///   dropped — the client copy is never delayed by the tap.
25pub fn tee<S>(upstream: S) -> (impl Stream<Item = ClientChunk>, TapReceiver)
26where
27    S: Stream<Item = Result<Bytes, reqwest::Error>> + Send + 'static,
28{
29    let (tap_tx, tap_rx) = mpsc::channel::<Bytes>(64);
30    let (out_tx, out_rx) = mpsc::channel::<Result<Bytes, std::io::Error>>(64);
31    tokio::spawn(async move {
32        let mut up = std::pin::pin!(upstream);
33        while let Some(item) = up.next().await {
34            match item {
35                Ok(chunk) => {
36                    // tap is best-effort; never block the client side
37                    let _ = tap_tx.try_send(chunk.clone());
38                    if out_tx.send(Ok(chunk)).await.is_err() {
39                        break;
40                    }
41                }
42                Err(err) => {
43                    let io_err = std::io::Error::other(err);
44                    let _ = out_tx.send(Err(io_err)).await;
45                    break;
46                }
47            }
48        }
49        drop(tap_tx);
50    });
51    (ReceiverStream::new(out_rx), tap_rx)
52}
53
54/// Drain the tap stream into the provider-specific reassembler, then return
55/// the final reassembled JSON message (when available), the frame count, and
56/// an optional partial-capture error.
57pub async fn reassemble(
58    provider: ProviderKind,
59    mut rx: TapReceiver,
60) -> (Option<serde_json::Value>, u64, Option<CaptureError>) {
61    match provider {
62        ProviderKind::Claude => {
63            let mut reasm = ClaudeReassembler::new();
64            while let Some(chunk) = rx.recv().await {
65                reasm.feed(&chunk);
66            }
67            let count = reasm.frames_count();
68            match reasm.finish() {
69                Some(msg) => (Some(msg.to_json()), count, None),
70                None => (
71                    None,
72                    count,
73                    Some(CaptureError {
74                        kind: ErrorKind::ReassembleFailed,
75                        message: "no SSE frames parsed".into(),
76                    }),
77                ),
78            }
79        }
80        ProviderKind::Codex => {
81            let mut reasm = CodexReassembler::new();
82            while let Some(chunk) = rx.recv().await {
83                reasm.feed(&chunk);
84            }
85            let count = reasm.frames_count();
86            match reasm.finish() {
87                Some(msg) => (Some(msg.to_json()), count, None),
88                None => (
89                    None,
90                    count,
91                    Some(CaptureError {
92                        kind: ErrorKind::ReassembleFailed,
93                        message: "no SSE frames parsed".into(),
94                    }),
95                ),
96            }
97        }
98    }
99}