Skip to main content

crous_io/
lib.rs

1//! # crous-io
2//!
3//! Async IO adapters for Crous, including:
4//! - Framed stream reader/writer for Tokio
5//! - Memory-mapped file reader (feature `mmap`)
6//! - Streaming block reader
7//! - Bytes-based shared buffer API
8//!
9//! ## Feature flags
10//! - `mmap` — enables `MmapReader` for zero-copy file access.
11//!   Citation: https://docs.rs/memmap2 — memmap best practices
12
13use crous_core::block::BlockWriter;
14use crous_core::error::{CrousError, Result};
15use crous_core::header::{FileHeader, HEADER_SIZE};
16use crous_core::wire::BlockType;
17use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
18
19/// Async writer that frames Crous data into blocks over a Tokio stream.
20pub struct FramedWriter<W: AsyncWrite + Unpin> {
21    writer: W,
22    header_written: bool,
23    flags: u8,
24}
25
26impl<W: AsyncWrite + Unpin> FramedWriter<W> {
27    pub fn new(writer: W) -> Self {
28        Self {
29            writer,
30            header_written: false,
31            flags: 0,
32        }
33    }
34
35    pub fn with_flags(writer: W, flags: u8) -> Self {
36        Self {
37            writer,
38            header_written: false,
39            flags,
40        }
41    }
42
43    /// Write the file header if not already written.
44    async fn ensure_header(&mut self) -> Result<()> {
45        if !self.header_written {
46            let header = FileHeader::new(self.flags);
47            self.writer.write_all(&header.encode()).await?;
48            self.header_written = true;
49        }
50        Ok(())
51    }
52
53    /// Write a pre-built block.
54    pub async fn write_block(&mut self, block: &[u8]) -> Result<()> {
55        self.ensure_header().await?;
56        self.writer.write_all(block).await?;
57        Ok(())
58    }
59
60    /// Write a raw data payload as a framed block.
61    pub async fn write_data(&mut self, payload: &[u8]) -> Result<()> {
62        self.ensure_header().await?;
63        let mut bw = BlockWriter::new(BlockType::Data);
64        bw.write(payload);
65        let block_bytes = bw.finish();
66        self.writer.write_all(&block_bytes).await?;
67        Ok(())
68    }
69
70    /// Flush the underlying writer.
71    pub async fn flush(&mut self) -> Result<()> {
72        self.writer.flush().await?;
73        Ok(())
74    }
75
76    /// Consume the writer and return the inner stream.
77    pub fn into_inner(self) -> W {
78        self.writer
79    }
80}
81
82/// Async reader that reads Crous blocks from a Tokio stream.
83pub struct FramedReader<R: AsyncRead + Unpin> {
84    reader: R,
85    header: Option<FileHeader>,
86    #[allow(dead_code)]
87    buf: Vec<u8>,
88}
89
90impl<R: AsyncRead + Unpin> FramedReader<R> {
91    pub fn new(reader: R) -> Self {
92        Self {
93            reader,
94            header: None,
95            buf: Vec::with_capacity(4096),
96        }
97    }
98
99    /// Read and parse the file header.
100    pub async fn read_header(&mut self) -> Result<&FileHeader> {
101        if self.header.is_none() {
102            let mut header_buf = [0u8; HEADER_SIZE];
103            self.reader.read_exact(&mut header_buf).await?;
104            self.header = Some(FileHeader::decode(&header_buf)?);
105        }
106        Ok(self.header.as_ref().unwrap())
107    }
108
109    /// Read the next block's raw bytes. Returns None at EOF.
110    pub async fn read_next_block_raw(&mut self) -> Result<Option<Vec<u8>>> {
111        if self.header.is_none() {
112            self.read_header().await?;
113        }
114
115        // Read block type (1 byte).
116        let mut type_buf = [0u8; 1];
117        match self.reader.read_exact(&mut type_buf).await {
118            Ok(_) => {}
119            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
120            Err(e) => return Err(e.into()),
121        }
122
123        if type_buf[0] == BlockType::Trailer as u8 {
124            return Ok(None);
125        }
126
127        // Read varint length (up to 10 bytes, 1 at a time for streaming).
128        let mut len_bytes = Vec::with_capacity(10);
129        loop {
130            let mut b = [0u8; 1];
131            self.reader.read_exact(&mut b).await?;
132            len_bytes.push(b[0]);
133            if b[0] & 0x80 == 0 {
134                break;
135            }
136            if len_bytes.len() > 10 {
137                return Err(CrousError::VarintOverflow);
138            }
139        }
140
141        let (block_len, _) = crous_core::varint::decode_varint(&len_bytes, 0)?;
142        let block_len = block_len as usize;
143
144        // Read compression type (1 byte) + checksum (8 bytes) + payload.
145        let remaining = 1 + 8 + block_len;
146        let mut payload = vec![0u8; remaining];
147        self.reader.read_exact(&mut payload).await?;
148
149        // Reconstruct the full block bytes.
150        let mut block = Vec::with_capacity(1 + len_bytes.len() + remaining);
151        block.push(type_buf[0]);
152        block.extend_from_slice(&len_bytes);
153        block.extend_from_slice(&payload);
154
155        Ok(Some(block))
156    }
157}
158
159/// Read a complete Crous file from memory-mapped or in-memory bytes.
160///
161/// This is the simplest API for reading a complete file.
162pub fn read_file_bytes(data: &[u8]) -> Result<Vec<crous_core::Value>> {
163    let mut decoder = crous_core::Decoder::new(data);
164    decoder.decode_all_owned()
165}
166
167/// Write values to an in-memory buffer as a complete Crous file.
168pub fn write_values_to_bytes(values: &[crous_core::Value]) -> Result<Vec<u8>> {
169    let mut encoder = crous_core::Encoder::new();
170    for v in values {
171        encoder.encode_value(v)?;
172    }
173    encoder.finish()
174}
175
176// ---------------------------------------------------------------------------
177// Bytes-based shared buffer API
178// ---------------------------------------------------------------------------
179
180/// Read a complete Crous file from a `bytes::Bytes` buffer.
181///
182/// The `Bytes` reference-counted buffer avoids copies when sharing
183/// between threads or network layers.
184/// Citation: https://docs.rs/bytes
185pub fn read_from_shared(data: bytes::Bytes) -> Result<Vec<crous_core::Value>> {
186    let mut decoder = crous_core::Decoder::new(&data);
187    decoder.decode_all_owned()
188}
189
190/// Write values into a `bytes::Bytes` shared buffer.
191pub fn write_to_shared(values: &[crous_core::Value]) -> Result<bytes::Bytes> {
192    let vec = write_values_to_bytes(values)?;
193    Ok(bytes::Bytes::from(vec))
194}
195
196// ---------------------------------------------------------------------------
197// Memory-mapped file reader (feature = "mmap")
198// ---------------------------------------------------------------------------
199
200/// Zero-copy memory-mapped file reader for Crous files.
201///
202/// Maps a file into the process address space and provides direct
203/// zero-copy access to the underlying bytes. The `Decoder` can
204/// borrow `CrousValue<'a>` directly from the mapped memory.
205///
206/// # Safety considerations
207/// The file must not be modified while the mapping is live.
208/// `MmapReader` uses a read-only mapping which will cause SIGBUS
209/// if the file is truncated. For untrusted files, prefer `read_file_bytes`.
210///
211/// Citation: memmap best practices — https://docs.rs/memmap2
212#[cfg(feature = "mmap")]
213pub struct MmapReader {
214    _mmap: memmap2::Mmap,
215}
216
217#[cfg(feature = "mmap")]
218impl MmapReader {
219    /// Open a Crous file for zero-copy reading.
220    ///
221    /// ```rust,ignore
222    /// let reader = MmapReader::open("data.crous")?;
223    /// let values = reader.decode_all()?;
224    /// ```
225    pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
226        let file = std::fs::File::open(path)?;
227        // SAFETY: We require the file not to be modified while mapped.
228        let mmap = unsafe { memmap2::Mmap::map(&file)? };
229        Ok(Self { _mmap: mmap })
230    }
231
232    /// Get a reference to the mapped bytes.
233    pub fn as_bytes(&self) -> &[u8] {
234        &self._mmap
235    }
236
237    /// Get the file size.
238    pub fn len(&self) -> usize {
239        self._mmap.len()
240    }
241
242    /// Check if the mapping is empty.
243    pub fn is_empty(&self) -> bool {
244        self._mmap.is_empty()
245    }
246
247    /// Create a decoder over the mapped memory.
248    ///
249    /// The returned decoder borrows from the mapping, enabling zero-copy
250    /// `CrousValue<'_>` decoding with no additional allocation for strings/bytes.
251    pub fn decoder(&self) -> crous_core::Decoder<'_> {
252        crous_core::Decoder::new(&self._mmap)
253    }
254
255    /// Create a decoder with custom limits.
256    pub fn decoder_with_limits(&self, limits: crous_core::Limits) -> crous_core::Decoder<'_> {
257        crous_core::Decoder::with_limits(&self._mmap, limits)
258    }
259
260    /// Convenience: decode all values as owned Values.
261    pub fn decode_all(&self) -> Result<Vec<crous_core::Value>> {
262        let mut dec = self.decoder();
263        dec.decode_all_owned()
264    }
265
266    /// Convenience: decode all values as zero-copy CrousValues.
267    pub fn decode_all_borrowed(&self) -> Result<Vec<crous_core::CrousValue<'_>>> {
268        let mut dec = self.decoder();
269        dec.decode_all()
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crous_core::Value;
277
278    #[tokio::test]
279    async fn framed_writer_basic() {
280        let mut buf = Vec::new();
281        {
282            let mut writer = FramedWriter::new(&mut buf);
283            writer.write_data(b"hello").await.unwrap();
284            writer.flush().await.unwrap();
285        }
286        // Should start with magic.
287        assert_eq!(&buf[..7], b"CROUSv1");
288    }
289
290    #[test]
291    fn read_write_bytes() {
292        let values = vec![Value::Str("hello".into()), Value::UInt(42)];
293        let bytes = write_values_to_bytes(&values).unwrap();
294        let decoded = read_file_bytes(&bytes).unwrap();
295        assert_eq!(decoded, values);
296    }
297}