Skip to main content

zamsync_network/protocol/
frame_buf.rs

1use super::frame::MAX_FRAME_SIZE;
2use std::io::{ErrorKind, Read};
3use zstd;
4use zamsync_core::{ZamError, ZamResult};
5
6/// Per-connection receive buffer.
7///
8/// The 50ms `read_timeout` on TCP sockets is used to poll multiple peers without
9/// blocking forever, but it means a `read_exact` inside `read_frame` can be
10/// interrupted mid-frame on very slow links (e.g. 3 KB/s). When that happens
11/// the partial bytes that were already pulled from the kernel buffer are lost,
12/// which shifts every subsequent frame by some number of bytes and breaks the
13/// length-prefix framing entirely.
14///
15/// `FrameBuffer` fixes this by accumulating all received bytes in `buf` and
16/// only returning a complete frame once enough bytes are present. Partial reads
17/// due to timeout just leave bytes in the buffer for the next poll cycle.
18pub struct FrameBuffer {
19    buf: Vec<u8>,
20}
21
22impl FrameBuffer {
23    pub fn new() -> Self {
24        Self { buf: Vec::new() }
25    }
26
27    /// Try to return one complete decoded frame.
28    ///
29    /// Reads as many bytes as the stream offers (stopping on `WouldBlock` or
30    /// `TimedOut`), then checks whether the accumulated buffer contains a full
31    /// length-prefixed frame.
32    ///
33    /// Returns:
34    /// * `Ok(Some(payload))` -- a complete, decompressed frame is ready.
35    /// * `Ok(None)`          -- not enough bytes yet; call again after the next
36    ///                          read opportunity.
37    /// * `Err(_)`            -- a real I/O or protocol error occurred.
38    pub fn try_read_frame(&mut self, stream: &mut impl Read) -> ZamResult<Option<Vec<u8>>> {
39        // Fast path: if the buffer already holds a complete frame, return it
40        // without touching the stream at all. This handles the case where a
41        // previous read delivered two frames in one syscall.
42        if let Some(frame) = self.try_consume_frame()? {
43            return Ok(Some(frame));
44        }
45
46        // Drain whatever bytes the stream has right now into our buffer.
47        let mut tmp = [0u8; 8192];
48        let mut got_new_bytes = false;
49        loop {
50            match stream.read(&mut tmp) {
51                Ok(0) => {
52                    // The peer sent us EOF (connection closed cleanly).
53                    // Only treat it as a real EOF if we received no new bytes
54                    // in this call. If we did receive some bytes and *then*
55                    // got Ok(0), the OS drained the kernel buffer -- we process
56                    // what we have and let the next poll see another Ok(0).
57                    if !got_new_bytes {
58                        return Err(ZamError::Io(std::io::Error::new(
59                            ErrorKind::UnexpectedEof,
60                            "connection closed by peer",
61                        )));
62                    }
63                    break;
64                }
65                Ok(n) => {
66                    self.buf.extend_from_slice(&tmp[..n]);
67                    got_new_bytes = true;
68                }
69                Err(e) if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => {
70                    // Nothing more to read right now -- stop draining.
71                    break;
72                }
73                Err(e) => return Err(ZamError::Io(e)),
74            }
75        }
76
77        // Check the buffer again after draining.
78        self.try_consume_frame()
79    }
80
81    /// Attempt to extract and decode one complete frame from `self.buf`.
82    /// Returns `Ok(None)` if fewer than `4 + total_len` bytes are present.
83    fn try_consume_frame(&mut self) -> ZamResult<Option<Vec<u8>>> {
84        // Check whether we have accumulated a full frame.
85        // Wire format: [4 bytes big-endian u32 = total_len] [1 byte flag] [body]
86        if self.buf.len() < 4 {
87            return Ok(None);
88        }
89        let total_len =
90            u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]) as usize;
91
92        if total_len == 0 {
93            // Empty frame -- consume the 4-byte header and return an empty payload.
94            self.buf.drain(..4);
95            return Ok(Some(vec![]));
96        }
97
98        if total_len as u64 > MAX_FRAME_SIZE as u64 {
99            return Err(ZamError::Protocol(format!(
100                "received frame too large: {} bytes (max {})",
101                total_len, MAX_FRAME_SIZE
102            )));
103        }
104
105        let frame_end = 4 + total_len;
106        if self.buf.len() < frame_end {
107            // Not enough bytes yet -- wait for more.
108            return Ok(None);
109        }
110
111        // We have a complete frame; decode it.
112        let flag = self.buf[4];
113        let body = self.buf[5..frame_end].to_vec();
114        self.buf.drain(..frame_end);
115
116        const FLAG_RAW: u8 = 0x00;
117        const FLAG_ZSTD: u8 = 0x01;
118
119        let payload = match flag {
120            FLAG_RAW => body,
121            FLAG_ZSTD => zstd::decode_all(body.as_slice())
122                .map_err(|e| ZamError::Protocol(format!("zstd decompress: {e}")))?,
123            other => {
124                return Err(ZamError::Protocol(format!(
125                    "unknown frame flag: 0x{other:02x}"
126                )))
127            }
128        };
129
130        Ok(Some(payload))
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::protocol::frame::write_frame;
138    use std::io::Cursor;
139
140    fn make_frame(payload: &[u8]) -> Vec<u8> {
141        let mut buf = Vec::new();
142        write_frame(&mut buf, payload).unwrap();
143        buf
144    }
145
146    #[test]
147    fn test_complete_frame_at_once() {
148        let payload = b"hello from bhutan";
149        let wire = make_frame(payload);
150        let mut fb = FrameBuffer::new();
151        // Simulate one big read delivering everything.
152        let result = fb.try_read_frame(&mut Cursor::new(&wire)).unwrap();
153        assert_eq!(result, Some(payload.to_vec()));
154        // Buffer should be empty now.
155        assert!(fb.buf.is_empty());
156    }
157
158    #[test]
159    fn test_two_frames_back_to_back() {
160        let wire1 = make_frame(b"frame-one");
161        let wire2 = make_frame(b"frame-two");
162        let mut combined = wire1.clone();
163        combined.extend_from_slice(&wire2);
164
165        let mut fb = FrameBuffer::new();
166        let r1 = fb.try_read_frame(&mut Cursor::new(&combined)).unwrap();
167        assert_eq!(r1, Some(b"frame-one".to_vec()));
168        // The second frame's bytes are still in fb.buf -- calling with an
169        // empty reader should return the second frame from buffered data.
170        let r2 = fb.try_read_frame(&mut Cursor::new(&[])).unwrap();
171        assert_eq!(r2, Some(b"frame-two".to_vec()));
172    }
173
174    #[test]
175    fn test_partial_header_returns_none() {
176        let wire = make_frame(b"some data");
177        let partial = &wire[..2]; // only 2 of the 4 header bytes
178        let mut fb = FrameBuffer::new();
179        let result = fb.try_read_frame(&mut Cursor::new(partial)).unwrap();
180        assert!(result.is_none());
181        assert_eq!(fb.buf.len(), 2);
182    }
183
184    #[test]
185    fn test_partial_body_returns_none() {
186        let wire = make_frame(b"some longer payload that has many bytes");
187        let partial = &wire[..wire.len() - 5]; // missing last 5 bytes
188        let mut fb = FrameBuffer::new();
189        let result = fb.try_read_frame(&mut Cursor::new(partial)).unwrap();
190        assert!(result.is_none());
191        // Bytes are preserved in the buffer.
192        assert_eq!(fb.buf.len(), partial.len());
193    }
194
195    #[test]
196    fn test_split_delivery_reassembles_frame() {
197        let payload = b"patient-record-from-rural-bhutan";
198        let wire = make_frame(payload);
199        let mid = wire.len() / 2;
200
201        let mut fb = FrameBuffer::new();
202        // First half -- should return None.
203        let r1 = fb.try_read_frame(&mut Cursor::new(&wire[..mid])).unwrap();
204        assert!(r1.is_none());
205        // Second half -- should now return the full frame.
206        let r2 = fb.try_read_frame(&mut Cursor::new(&wire[mid..])).unwrap();
207        assert_eq!(r2, Some(payload.to_vec()));
208    }
209
210    #[test]
211    fn test_empty_reader_on_empty_buffer_is_eof() {
212        let mut fb = FrameBuffer::new();
213        // Empty buffer + reader returning Ok(0) = peer closed the connection.
214        // This is how the sync session's graceful-close loop detects the end.
215        let result = fb.try_read_frame(&mut Cursor::new(&[]));
216        assert!(matches!(result, Err(ZamError::Io(_))));
217    }
218}