1use crous_core::block::BlockWriter;
14use crous_core::error::{CrousError, Result};
15use crous_core::header::{FileHeader, HEADER_SIZE};
16use crous_core::wire::BlockType;
17use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
18
19pub struct FramedWriter<W: AsyncWrite + Unpin> {
21 writer: W,
22 header_written: bool,
23 flags: u8,
24}
25
26impl<W: AsyncWrite + Unpin> FramedWriter<W> {
27 pub fn new(writer: W) -> Self {
28 Self {
29 writer,
30 header_written: false,
31 flags: 0,
32 }
33 }
34
35 pub fn with_flags(writer: W, flags: u8) -> Self {
36 Self {
37 writer,
38 header_written: false,
39 flags,
40 }
41 }
42
43 async fn ensure_header(&mut self) -> Result<()> {
45 if !self.header_written {
46 let header = FileHeader::new(self.flags);
47 self.writer.write_all(&header.encode()).await?;
48 self.header_written = true;
49 }
50 Ok(())
51 }
52
53 pub async fn write_block(&mut self, block: &[u8]) -> Result<()> {
55 self.ensure_header().await?;
56 self.writer.write_all(block).await?;
57 Ok(())
58 }
59
60 pub async fn write_data(&mut self, payload: &[u8]) -> Result<()> {
62 self.ensure_header().await?;
63 let mut bw = BlockWriter::new(BlockType::Data);
64 bw.write(payload);
65 let block_bytes = bw.finish();
66 self.writer.write_all(&block_bytes).await?;
67 Ok(())
68 }
69
70 pub async fn flush(&mut self) -> Result<()> {
72 self.writer.flush().await?;
73 Ok(())
74 }
75
76 pub fn into_inner(self) -> W {
78 self.writer
79 }
80}
81
82pub struct FramedReader<R: AsyncRead + Unpin> {
84 reader: R,
85 header: Option<FileHeader>,
86 #[allow(dead_code)]
87 buf: Vec<u8>,
88}
89
90impl<R: AsyncRead + Unpin> FramedReader<R> {
91 pub fn new(reader: R) -> Self {
92 Self {
93 reader,
94 header: None,
95 buf: Vec::with_capacity(4096),
96 }
97 }
98
99 pub async fn read_header(&mut self) -> Result<&FileHeader> {
101 if self.header.is_none() {
102 let mut header_buf = [0u8; HEADER_SIZE];
103 self.reader.read_exact(&mut header_buf).await?;
104 self.header = Some(FileHeader::decode(&header_buf)?);
105 }
106 Ok(self.header.as_ref().unwrap())
107 }
108
109 pub async fn read_next_block_raw(&mut self) -> Result<Option<Vec<u8>>> {
111 if self.header.is_none() {
112 self.read_header().await?;
113 }
114
115 let mut type_buf = [0u8; 1];
117 match self.reader.read_exact(&mut type_buf).await {
118 Ok(_) => {}
119 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
120 Err(e) => return Err(e.into()),
121 }
122
123 if type_buf[0] == BlockType::Trailer as u8 {
124 return Ok(None);
125 }
126
127 let mut len_bytes = Vec::with_capacity(10);
129 loop {
130 let mut b = [0u8; 1];
131 self.reader.read_exact(&mut b).await?;
132 len_bytes.push(b[0]);
133 if b[0] & 0x80 == 0 {
134 break;
135 }
136 if len_bytes.len() > 10 {
137 return Err(CrousError::VarintOverflow);
138 }
139 }
140
141 let (block_len, _) = crous_core::varint::decode_varint(&len_bytes, 0)?;
142 let block_len = block_len as usize;
143
144 let remaining = 1 + 8 + block_len;
146 let mut payload = vec![0u8; remaining];
147 self.reader.read_exact(&mut payload).await?;
148
149 let mut block = Vec::with_capacity(1 + len_bytes.len() + remaining);
151 block.push(type_buf[0]);
152 block.extend_from_slice(&len_bytes);
153 block.extend_from_slice(&payload);
154
155 Ok(Some(block))
156 }
157}
158
159pub fn read_file_bytes(data: &[u8]) -> Result<Vec<crous_core::Value>> {
163 let mut decoder = crous_core::Decoder::new(data);
164 decoder.decode_all_owned()
165}
166
167pub fn write_values_to_bytes(values: &[crous_core::Value]) -> Result<Vec<u8>> {
169 let mut encoder = crous_core::Encoder::new();
170 for v in values {
171 encoder.encode_value(v)?;
172 }
173 encoder.finish()
174}
175
176pub fn read_from_shared(data: bytes::Bytes) -> Result<Vec<crous_core::Value>> {
186 let mut decoder = crous_core::Decoder::new(&data);
187 decoder.decode_all_owned()
188}
189
190pub fn write_to_shared(values: &[crous_core::Value]) -> Result<bytes::Bytes> {
192 let vec = write_values_to_bytes(values)?;
193 Ok(bytes::Bytes::from(vec))
194}
195
196#[cfg(feature = "mmap")]
213pub struct MmapReader {
214 _mmap: memmap2::Mmap,
215}
216
217#[cfg(feature = "mmap")]
218impl MmapReader {
219 pub fn open<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
226 let file = std::fs::File::open(path)?;
227 let mmap = unsafe { memmap2::Mmap::map(&file)? };
229 Ok(Self { _mmap: mmap })
230 }
231
232 pub fn as_bytes(&self) -> &[u8] {
234 &self._mmap
235 }
236
237 pub fn len(&self) -> usize {
239 self._mmap.len()
240 }
241
242 pub fn is_empty(&self) -> bool {
244 self._mmap.is_empty()
245 }
246
247 pub fn decoder(&self) -> crous_core::Decoder<'_> {
252 crous_core::Decoder::new(&self._mmap)
253 }
254
255 pub fn decoder_with_limits(&self, limits: crous_core::Limits) -> crous_core::Decoder<'_> {
257 crous_core::Decoder::with_limits(&self._mmap, limits)
258 }
259
260 pub fn decode_all(&self) -> Result<Vec<crous_core::Value>> {
262 let mut dec = self.decoder();
263 dec.decode_all_owned()
264 }
265
266 pub fn decode_all_borrowed(&self) -> Result<Vec<crous_core::CrousValue<'_>>> {
268 let mut dec = self.decoder();
269 dec.decode_all()
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crous_core::Value;
277
278 #[tokio::test]
279 async fn framed_writer_basic() {
280 let mut buf = Vec::new();
281 {
282 let mut writer = FramedWriter::new(&mut buf);
283 writer.write_data(b"hello").await.unwrap();
284 writer.flush().await.unwrap();
285 }
286 assert_eq!(&buf[..7], b"CROUSv1");
288 }
289
290 #[test]
291 fn read_write_bytes() {
292 let values = vec![Value::Str("hello".into()), Value::UInt(42)];
293 let bytes = write_values_to_bytes(&values).unwrap();
294 let decoded = read_file_bytes(&bytes).unwrap();
295 assert_eq!(decoded, values);
296 }
297}