phantom_protocol/api/tcp_transport.rs
1//! Length-prefixed `SessionTransport` over `tokio::net::TcpStream`.
2//!
3//! `SessionTransport` is message-oriented (returns one frame per `recv_bytes`),
4//! while TCP is a stream. This adapter inserts a 4-byte big-endian length prefix
5//! before each frame so the trait contract is preserved.
6//!
7//! Phase 2.1: the receive path keeps a single persistent `BytesMut`
8//! accumulator across `recv_bytes` calls. Each frame is `split_to`-ed off
9//! into an owned `Bytes` which the caller takes — zero-copy from the
10//! accumulator to the returned frame, no per-packet `Vec::new` alloc.
11
12use crate::api::session::{FramePhase, SessionTransport};
13use crate::errors::CoreError;
14use bytes::{Bytes, BytesMut};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::TcpStream;
18use tokio::sync::Mutex;
19
20/// Receive frame cap during the unauthenticated handshake (WIRE-001). A
21/// `ClientHello` — even carrying a 16 KiB 0-RTT blob — is well under this, so an
22/// oversized DECLARED frame is rejected right after the 4-byte length prefix,
23/// before any body is buffered. This bounds the memory a single unauthenticated
24/// peer can make the server allocate.
25const HANDSHAKE_FRAME_CAP: usize = 64 * 1024; // 64 KiB
26
27/// Receive/send frame cap once the session is established — matches the
28/// application-layer delivery cap. Lowered from the historical 16 MiB.
29const STEADY_STATE_FRAME_CAP: usize = 4 * 1024 * 1024; // 4 MiB
30
31/// Initial (and shrink-target) capacity for the persistent recv accumulator.
32/// Sized to a generous MTU so the typical workload never reallocates.
33const RECV_BUF_INITIAL_CAPACITY: usize = 64 * 1024;
34
35/// Incremental-read chunk: the accumulator grows by at most this per read, so a
36/// peer that DECLARES a large frame but then stalls cannot make us pre-commit
37/// the full declared length (WIRE-001 amplification fix).
38const RECV_CHUNK: usize = 64 * 1024;
39
40/// After a frame larger than `RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT`, the
41/// accumulator is reset to baseline (LEGS-003) so one big frame does not pin a
42/// large buffer for the connection's life. Steady-state ~MTU frames stay well
43/// under the threshold and never pay a realloc.
44const SHRINK_SLACK_MULT: usize = 4;
45
46pub struct TcpSessionTransport {
47 write_half: Mutex<tokio::net::tcp::OwnedWriteHalf>,
48 /// Read half + the per-direction accumulator. Held together under
49 /// one mutex so the buffer lifetime tracks the reader's exactly
50 /// (Phase 2.1).
51 read_half: Mutex<(tokio::net::tcp::OwnedReadHalf, BytesMut)>,
52 /// Current receive frame-size cap (WIRE-001). Starts at the tight handshake
53 /// cap and rises to the steady-state cap via [`set_frame_phase`].
54 frame_cap: AtomicUsize,
55}
56
57impl TcpSessionTransport {
58 pub fn new(stream: TcpStream) -> Self {
59 let _ = stream.set_nodelay(true);
60 let (r, w) = stream.into_split();
61 Self {
62 write_half: Mutex::new(w),
63 read_half: Mutex::new((r, BytesMut::with_capacity(RECV_BUF_INITIAL_CAPACITY))),
64 frame_cap: AtomicUsize::new(HANDSHAKE_FRAME_CAP),
65 }
66 }
67
68 /// Current receive accumulator capacity — test-only accessor for the
69 /// LEGS-003 shrink behavior.
70 #[cfg(test)]
71 pub(crate) async fn accum_capacity(&self) -> usize {
72 self.read_half.lock().await.1.capacity()
73 }
74}
75
76impl SessionTransport for TcpSessionTransport {
77 async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
78 if data.len() > STEADY_STATE_FRAME_CAP {
79 return Err(CoreError::NetworkError(format!(
80 "frame too large: {} > {}",
81 data.len(),
82 STEADY_STATE_FRAME_CAP
83 )));
84 }
85 let mut w = self.write_half.lock().await;
86 let len = (data.len() as u32).to_be_bytes();
87 w.write_all(&len)
88 .await
89 .map_err(|e| CoreError::NetworkError(e.to_string()))?;
90 w.write_all(data)
91 .await
92 .map_err(|e| CoreError::NetworkError(e.to_string()))?;
93 w.flush()
94 .await
95 .map_err(|e| CoreError::NetworkError(e.to_string()))?;
96 Ok(())
97 }
98
99 async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
100 let cap = self.frame_cap.load(Ordering::Relaxed);
101 let mut guard = self.read_half.lock().await;
102 let (r, buf) = &mut *guard;
103
104 // Read the 4-byte big-endian length prefix.
105 let mut len_buf = [0u8; 4];
106 r.read_exact(&mut len_buf)
107 .await
108 .map_err(|e| CoreError::NetworkError(e.to_string()))?;
109 let len = u32::from_be_bytes(len_buf) as usize;
110 // WIRE-001: reject an oversized DECLARED length up front (phase-gated
111 // cap), BEFORE buffering any body bytes.
112 if len > cap {
113 return Err(CoreError::NetworkError(format!(
114 "oversized frame from peer: {} > {}",
115 len, cap
116 )));
117 }
118
119 // Incremental read (WIRE-001): grow the accumulator by at most
120 // `RECV_CHUNK` per read, so a peer that declares a large frame but then
121 // stalls makes us commit at most one chunk — not the full declared
122 // length (no 4-byte → big-alloc amplification). `read_exact` reads
123 // exactly the requested bytes, so no subsequent frame leaks into `buf`.
124 buf.clear();
125 let mut filled = 0usize;
126 while filled < len {
127 let chunk = (len - filled).min(RECV_CHUNK);
128 buf.resize(filled + chunk, 0);
129 r.read_exact(&mut buf[filled..filled + chunk])
130 .await
131 .map_err(|e| CoreError::NetworkError(e.to_string()))?;
132 filled += chunk;
133 }
134
135 // `split_to(len)` hands the caller an owned `BytesMut` view over the
136 // frame; `freeze` makes it an immutable refcounted `Bytes`.
137 let frame = buf.split_to(len).freeze();
138
139 // LEGS-003: a single large frame must not pin a large accumulator for
140 // the connection's life. After one, reset to baseline (the old large
141 // allocation is reclaimed once the frozen frame is also dropped).
142 if len > RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT {
143 *buf = BytesMut::with_capacity(RECV_BUF_INITIAL_CAPACITY);
144 }
145 Ok(frame)
146 }
147
148 fn set_frame_phase(&self, phase: FramePhase) {
149 let cap = match phase {
150 FramePhase::Handshake => HANDSHAKE_FRAME_CAP,
151 FramePhase::Established => STEADY_STATE_FRAME_CAP,
152 };
153 self.frame_cap.store(cap, Ordering::Relaxed);
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use tokio::net::{TcpListener, TcpStream};
161
162 async fn tcp_pair() -> (TcpSessionTransport, TcpSessionTransport) {
163 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
164 let addr = listener.local_addr().expect("addr");
165 let (client, accepted) = tokio::join!(TcpStream::connect(addr), listener.accept());
166 let client = client.expect("connect");
167 let (server, _) = accepted.expect("accept");
168 (
169 TcpSessionTransport::new(client),
170 TcpSessionTransport::new(server),
171 )
172 }
173
174 /// **WIRE-001.** During the unauthenticated handshake phase the recv cap is
175 /// tight (64 KiB): an oversized DECLARED frame is rejected right after the
176 /// 4-byte prefix, before any body is buffered — no 4-byte → big-alloc
177 /// amplification.
178 #[tokio::test]
179 async fn handshake_phase_rejects_oversized_frame() {
180 let (client, server) = tcp_pair().await; // server defaults to handshake phase
181 let big = vec![0u8; 100 * 1024]; // 100 KiB > 64 KiB handshake cap
182 client
183 .send_bytes(&big)
184 .await
185 .expect("send is within the 4 MiB send cap");
186 let err = server
187 .recv_bytes()
188 .await
189 .expect_err("oversized handshake-phase frame must be rejected");
190 assert!(matches!(err, CoreError::NetworkError(_)));
191 }
192
193 /// After establishment the cap rises to 4 MiB, a large frame round-trips, and
194 /// (LEGS-003) the accumulator is reset to baseline afterward.
195 #[tokio::test]
196 async fn established_phase_accepts_large_frame_and_resets_accumulator() {
197 let (client, server) = tcp_pair().await;
198 server.set_frame_phase(FramePhase::Established);
199 let payload = vec![7u8; 1024 * 1024]; // 1 MiB, within the 4 MiB cap
200
201 // Drive send and recv CONCURRENTLY. A 1 MiB `write_all` does not complete
202 // until the peer starts draining (the kernel socket buffer is far
203 // smaller than 1 MiB once it is contended — e.g. when the parallel test
204 // suite saturates loopback), so doing send-then-recv sequentially on one
205 // task deadlocks on TCP flow control. `join!` lets the reader drain while
206 // the writer fills.
207 let (send_res, recv_res) = tokio::join!(client.send_bytes(&payload), server.recv_bytes());
208 send_res.expect("send 1 MiB");
209 let got = recv_res.expect("recv 1 MiB");
210 assert_eq!(got.len(), payload.len());
211 assert_eq!(&got[..8], &payload[..8]);
212 let cap = server.accum_capacity().await;
213 assert!(
214 cap <= RECV_BUF_INITIAL_CAPACITY * SHRINK_SLACK_MULT,
215 "accumulator must reset to baseline after a large frame (LEGS-003); capacity = {cap}"
216 );
217 // A small follow-up frame still works.
218 client.send_bytes(b"small").await.expect("send small");
219 let got = server.recv_bytes().await.expect("recv small");
220 assert_eq!(&got[..], b"small");
221 }
222
223 /// The send path enforces the 4 MiB steady-state cap (down from 16 MiB).
224 #[tokio::test]
225 async fn send_rejects_over_steady_state_cap() {
226 let (client, _server) = tcp_pair().await;
227 let too_big = vec![0u8; STEADY_STATE_FRAME_CAP + 1];
228 assert!(client.send_bytes(&too_big).await.is_err());
229 }
230}