Skip to main content

phantom_protocol/api/
tcp_transport.rs

1//! Length-prefixed `SessionTransport` over `tokio::net::TcpStream`.
2//!
3//! `SessionTransport` is message-oriented (returns one frame per `recv_bytes`),
4//! while TCP is a stream. This adapter inserts a 4-byte big-endian length prefix
5//! before each frame so the trait contract is preserved.
6//!
7//! Phase 2.1: the receive path keeps a single persistent `BytesMut`
8//! accumulator across `recv_bytes` calls. Each frame is `split_to`-ed off
9//! into an owned `Bytes` which the caller takes — zero-copy from the
10//! accumulator to the returned frame, no per-packet `Vec::new` alloc.
11
12use crate::api::session::{FramePhase, SessionTransport};
13use crate::errors::CoreError;
14use bytes::{Bytes, BytesMut};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::TcpStream;
18use tokio::sync::Mutex;
19
20/// Receive frame cap during the unauthenticated handshake (WIRE-001). A
21/// `ClientHello` — even carrying a 16 KiB 0-RTT blob — is well under this, so an
22/// oversized DECLARED frame is rejected right after the 4-byte length prefix,
23/// before any body is buffered. This bounds the memory a single unauthenticated
24/// peer can make the server allocate.
25const HANDSHAKE_FRAME_CAP: usize = 64 * 1024; // 64 KiB
26
27/// Receive/send frame cap once the session is established — matches the
28/// application-layer delivery cap. Lowered from the historical 16 MiB.
29const STEADY_STATE_FRAME_CAP: usize = 4 * 1024 * 1024; // 4 MiB
30
31/// Initial (and shrink-target) capacity for the persistent recv accumulator.
32/// Sized to a generous MTU so the typical workload never reallocates.
33const RECV_BUF_INITIAL_CAPACITY: usize = 64 * 1024;
34
35/// Incremental-read chunk: the accumulator grows by at most this per read, so a
36/// peer that DECLARES a large frame but then stalls cannot make us pre-commit
37/// the full declared length (WIRE-001 amplification fix).
38const RECV_CHUNK: usize = 64 * 1024;
39
40/// After a frame larger than `RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT`, the
41/// accumulator is reset to baseline (LEGS-003) so one big frame does not pin a
42/// large buffer for the connection's life. Steady-state ~MTU frames stay well
43/// under the threshold and never pay a realloc.
44const SHRINK_SLACK_MULT: usize = 4;
45
46pub struct TcpSessionTransport {
47    write_half: Mutex<tokio::net::tcp::OwnedWriteHalf>,
48    /// Read half + the per-direction accumulator. Held together under
49    /// one mutex so the buffer lifetime tracks the reader's exactly
50    /// (Phase 2.1).
51    read_half: Mutex<(tokio::net::tcp::OwnedReadHalf, BytesMut)>,
52    /// Current receive frame-size cap (WIRE-001). Starts at the tight handshake
53    /// cap and rises to the steady-state cap via [`set_frame_phase`].
54    frame_cap: AtomicUsize,
55}
56
57impl TcpSessionTransport {
58    pub fn new(stream: TcpStream) -> Self {
59        let _ = stream.set_nodelay(true);
60        let (r, w) = stream.into_split();
61        Self {
62            write_half: Mutex::new(w),
63            read_half: Mutex::new((r, BytesMut::with_capacity(RECV_BUF_INITIAL_CAPACITY))),
64            frame_cap: AtomicUsize::new(HANDSHAKE_FRAME_CAP),
65        }
66    }
67
68    /// Current receive accumulator capacity — test-only accessor for the
69    /// LEGS-003 shrink behavior.
70    #[cfg(test)]
71    pub(crate) async fn accum_capacity(&self) -> usize {
72        self.read_half.lock().await.1.capacity()
73    }
74}
75
76impl SessionTransport for TcpSessionTransport {
77    async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
78        if data.len() > STEADY_STATE_FRAME_CAP {
79            return Err(CoreError::NetworkError(format!(
80                "frame too large: {} > {}",
81                data.len(),
82                STEADY_STATE_FRAME_CAP
83            )));
84        }
85        let mut w = self.write_half.lock().await;
86        let len = (data.len() as u32).to_be_bytes();
87        w.write_all(&len)
88            .await
89            .map_err(|e| CoreError::NetworkError(e.to_string()))?;
90        w.write_all(data)
91            .await
92            .map_err(|e| CoreError::NetworkError(e.to_string()))?;
93        w.flush()
94            .await
95            .map_err(|e| CoreError::NetworkError(e.to_string()))?;
96        Ok(())
97    }
98
99    async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
100        let cap = self.frame_cap.load(Ordering::Relaxed);
101        let mut guard = self.read_half.lock().await;
102        let (r, buf) = &mut *guard;
103
104        // Read the 4-byte big-endian length prefix.
105        let mut len_buf = [0u8; 4];
106        r.read_exact(&mut len_buf)
107            .await
108            .map_err(|e| CoreError::NetworkError(e.to_string()))?;
109        let len = u32::from_be_bytes(len_buf) as usize;
110        // WIRE-001: reject an oversized DECLARED length up front (phase-gated
111        // cap), BEFORE buffering any body bytes.
112        if len > cap {
113            return Err(CoreError::NetworkError(format!(
114                "oversized frame from peer: {} > {}",
115                len, cap
116            )));
117        }
118
119        // Incremental read (WIRE-001): grow the accumulator by at most
120        // `RECV_CHUNK` per read, so a peer that declares a large frame but then
121        // stalls makes us commit at most one chunk — not the full declared
122        // length (no 4-byte → big-alloc amplification). `read_exact` reads
123        // exactly the requested bytes, so no subsequent frame leaks into `buf`.
124        buf.clear();
125        let mut filled = 0usize;
126        while filled < len {
127            let chunk = (len - filled).min(RECV_CHUNK);
128            buf.resize(filled + chunk, 0);
129            r.read_exact(&mut buf[filled..filled + chunk])
130                .await
131                .map_err(|e| CoreError::NetworkError(e.to_string()))?;
132            filled += chunk;
133        }
134
135        // `split_to(len)` hands the caller an owned `BytesMut` view over the
136        // frame; `freeze` makes it an immutable refcounted `Bytes`.
137        let frame = buf.split_to(len).freeze();
138
139        // LEGS-003: a single large frame must not pin a large accumulator for
140        // the connection's life. After one, reset to baseline (the old large
141        // allocation is reclaimed once the frozen frame is also dropped).
142        if len > RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT {
143            *buf = BytesMut::with_capacity(RECV_BUF_INITIAL_CAPACITY);
144        }
145        Ok(frame)
146    }
147
148    fn set_frame_phase(&self, phase: FramePhase) {
149        let cap = match phase {
150            FramePhase::Handshake => HANDSHAKE_FRAME_CAP,
151            FramePhase::Established => STEADY_STATE_FRAME_CAP,
152        };
153        self.frame_cap.store(cap, Ordering::Relaxed);
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use tokio::net::{TcpListener, TcpStream};
161
162    async fn tcp_pair() -> (TcpSessionTransport, TcpSessionTransport) {
163        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
164        let addr = listener.local_addr().expect("addr");
165        let (client, accepted) = tokio::join!(TcpStream::connect(addr), listener.accept());
166        let client = client.expect("connect");
167        let (server, _) = accepted.expect("accept");
168        (
169            TcpSessionTransport::new(client),
170            TcpSessionTransport::new(server),
171        )
172    }
173
174    /// **WIRE-001.** During the unauthenticated handshake phase the recv cap is
175    /// tight (64 KiB): an oversized DECLARED frame is rejected right after the
176    /// 4-byte prefix, before any body is buffered — no 4-byte → big-alloc
177    /// amplification.
178    #[tokio::test]
179    async fn handshake_phase_rejects_oversized_frame() {
180        let (client, server) = tcp_pair().await; // server defaults to handshake phase
181        let big = vec![0u8; 100 * 1024]; // 100 KiB > 64 KiB handshake cap
182        client
183            .send_bytes(&big)
184            .await
185            .expect("send is within the 4 MiB send cap");
186        let err = server
187            .recv_bytes()
188            .await
189            .expect_err("oversized handshake-phase frame must be rejected");
190        assert!(matches!(err, CoreError::NetworkError(_)));
191    }
192
193    /// After establishment the cap rises to 4 MiB, a large frame round-trips, and
194    /// (LEGS-003) the accumulator is reset to baseline afterward.
195    #[tokio::test]
196    async fn established_phase_accepts_large_frame_and_resets_accumulator() {
197        let (client, server) = tcp_pair().await;
198        server.set_frame_phase(FramePhase::Established);
199        let payload = vec![7u8; 1024 * 1024]; // 1 MiB, within the 4 MiB cap
200
201        // Drive send and recv CONCURRENTLY. A 1 MiB `write_all` does not complete
202        // until the peer starts draining (the kernel socket buffer is far
203        // smaller than 1 MiB once it is contended — e.g. when the parallel test
204        // suite saturates loopback), so doing send-then-recv sequentially on one
205        // task deadlocks on TCP flow control. `join!` lets the reader drain while
206        // the writer fills.
207        let (send_res, recv_res) = tokio::join!(client.send_bytes(&payload), server.recv_bytes());
208        send_res.expect("send 1 MiB");
209        let got = recv_res.expect("recv 1 MiB");
210        assert_eq!(got.len(), payload.len());
211        assert_eq!(&got[..8], &payload[..8]);
212        let cap = server.accum_capacity().await;
213        assert!(
214            cap <= RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT,
215            "accumulator must reset to baseline after a large frame (LEGS-003); capacity = {cap}"
216        );
217        // A small follow-up frame still works.
218        client.send_bytes(b"small").await.expect("send small");
219        let got = server.recv_bytes().await.expect("recv small");
220        assert_eq!(&got[..], b"small");
221    }
222
223    /// The send path enforces the 4 MiB steady-state cap (down from 16 MiB).
224    #[tokio::test]
225    async fn send_rejects_over_steady_state_cap() {
226        let (client, _server) = tcp_pair().await;
227        let too_big = vec![0u8; STEADY_STATE_FRAME_CAP + 1];
228        assert!(client.send_bytes(&too_big).await.is_err());
229    }
230}