alopex_core/storage/large_value/
chunk.rs1use crate::error::{Error, Result};
6use crc32fast::Hasher;
7use std::fs::{remove_file, File, OpenOptions};
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11const HEADER_MAGIC: &[u8; 4] = b"LVCH";
12const FOOTER_MAGIC: &[u8; 4] = b"LVFT";
13const VERSION: u16 = 1;
14const HEADER_SIZE: u64 = 4 + 2 + 1 + 1 + 2 + 8 + 4 + 4; const FOOTER_SIZE: u64 = 4 + 4 + 4; pub const DEFAULT_CHUNK_SIZE: u32 = 1024 * 1024; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum LargeValueKind {
22 Blob,
24 Typed(u16),
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct LargeValueMeta {
31 pub kind: LargeValueKind,
33 pub total_len: u64,
35 pub chunk_size: u32,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct LargeValueChunkInfo {
42 pub index: u32,
44 pub is_last: bool,
46}
47
48fn write_header(file: &mut File, meta: &LargeValueMeta, chunk_count: u32) -> Result<()> {
49 let mut buf = [0u8; HEADER_SIZE as usize];
50 buf[0..4].copy_from_slice(HEADER_MAGIC);
51 buf[4..6].copy_from_slice(&VERSION.to_le_bytes());
52 buf[6] = match meta.kind {
53 LargeValueKind::Blob => 0,
54 LargeValueKind::Typed(_) => 1,
55 };
56 buf[7] = 0; let type_id = match meta.kind {
58 LargeValueKind::Blob => 0,
59 LargeValueKind::Typed(id) => id,
60 };
61 buf[8..10].copy_from_slice(&type_id.to_le_bytes());
62 buf[10..18].copy_from_slice(&meta.total_len.to_le_bytes());
63 buf[18..22].copy_from_slice(&meta.chunk_size.to_le_bytes());
64 buf[22..26].copy_from_slice(&chunk_count.to_le_bytes());
65 file.seek(SeekFrom::Start(0))?;
66 file.write_all(&buf)?;
67 Ok(())
68}
69
70fn read_header(file: &mut File) -> Result<(LargeValueMeta, u32)> {
71 let mut buf = [0u8; HEADER_SIZE as usize];
72 file.seek(SeekFrom::Start(0))?;
73 file.read_exact(&mut buf)?;
74
75 if &buf[0..4] != HEADER_MAGIC {
76 return Err(Error::InvalidFormat(
77 "invalid large_value header magic".into(),
78 ));
79 }
80 let version = u16::from_le_bytes(buf[4..6].try_into().unwrap());
81 if version != VERSION {
82 return Err(Error::InvalidFormat(format!(
83 "unsupported large_value version: {version}"
84 )));
85 }
86 let kind = match buf[6] {
87 0 => LargeValueKind::Blob,
88 1 => {
89 let id = u16::from_le_bytes(buf[8..10].try_into().unwrap());
90 LargeValueKind::Typed(id)
91 }
92 other => {
93 return Err(Error::InvalidFormat(format!(
94 "unknown large_value kind: {other}"
95 )))
96 }
97 };
98
99 let total_len = u64::from_le_bytes(buf[10..18].try_into().unwrap());
100 let chunk_size = u32::from_le_bytes(buf[18..22].try_into().unwrap());
101 if chunk_size == 0 {
102 return Err(Error::InvalidFormat("chunk_size must be > 0".into()));
103 }
104 let chunk_count = u32::from_le_bytes(buf[22..26].try_into().unwrap());
105
106 Ok((
107 LargeValueMeta {
108 kind,
109 total_len,
110 chunk_size,
111 },
112 chunk_count,
113 ))
114}
115
116fn read_footer(file: &mut File, footer_start: u64) -> Result<(u32, u32)> {
117 file.seek(SeekFrom::Start(footer_start))?;
118 let mut buf = [0u8; FOOTER_SIZE as usize];
119 file.read_exact(&mut buf)?;
120 if &buf[0..4] != FOOTER_MAGIC {
121 return Err(Error::InvalidFormat(
122 "invalid large_value footer magic".into(),
123 ));
124 }
125 let chunk_count = u32::from_le_bytes(buf[4..8].try_into().unwrap());
126 let checksum = u32::from_le_bytes(buf[8..12].try_into().unwrap());
127 Ok((chunk_count, checksum))
128}
129
130fn build_footer(chunk_count: u32, checksum: u32) -> [u8; FOOTER_SIZE as usize] {
131 let mut buf = [0u8; FOOTER_SIZE as usize];
132 buf[0..4].copy_from_slice(FOOTER_MAGIC);
133 buf[4..8].copy_from_slice(&chunk_count.to_le_bytes());
134 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
135 buf
136}
137
138pub struct LargeValueWriter {
140 path: PathBuf,
141 writer: BufWriter<File>,
142 meta: LargeValueMeta,
143 chunk_index: u32,
144 written: u64,
145 hasher: Hasher,
146 finished: bool,
147}
148
149impl LargeValueWriter {
150 pub fn create(path: &Path, meta: LargeValueMeta) -> Result<Self> {
152 if meta.chunk_size == 0 {
153 return Err(Error::InvalidFormat("chunk_size must be > 0".into()));
154 }
155 if let Some(parent) = path.parent() {
156 std::fs::create_dir_all(parent)?;
157 }
158 let mut file = OpenOptions::new()
159 .write(true)
160 .read(true)
161 .create(true)
162 .truncate(true)
163 .open(path)?;
164 write_header(&mut file, &meta, 0)?;
165
166 Ok(Self {
167 path: path.to_path_buf(),
168 writer: BufWriter::new(file),
169 meta,
170 chunk_index: 0,
171 written: 0,
172 hasher: Hasher::new(),
173 finished: false,
174 })
175 }
176
177 pub fn meta(&self) -> LargeValueMeta {
179 self.meta
180 }
181
182 pub fn remaining(&self) -> u64 {
184 self.meta
185 .total_len
186 .saturating_sub(self.written.min(self.meta.total_len))
187 }
188
189 pub fn chunk_size(&self) -> u32 {
191 self.meta.chunk_size
192 }
193
194 pub fn write_chunk(&mut self, chunk: &[u8]) -> Result<()> {
196 if self.finished {
197 return Err(Error::InvalidFormat("writer already finished".into()));
198 }
199 if chunk.is_empty() {
200 return Err(Error::InvalidFormat("chunk must not be empty".into()));
201 }
202 if chunk.len() as u32 > self.meta.chunk_size {
203 return Err(Error::InvalidFormat("chunk exceeds chunk_size".into()));
204 }
205 if self.written + chunk.len() as u64 > self.meta.total_len {
206 return Err(Error::InvalidFormat(
207 "chunk writes exceed declared total_len".into(),
208 ));
209 }
210
211 let mut len_buf = [0u8; 8];
212 len_buf[..4].copy_from_slice(&self.chunk_index.to_le_bytes());
213 len_buf[4..].copy_from_slice(&(chunk.len() as u32).to_le_bytes());
214
215 self.writer.write_all(&len_buf)?;
216 self.writer.write_all(chunk)?;
217
218 self.hasher.update(&len_buf);
219 self.hasher.update(chunk);
220 self.written += chunk.len() as u64;
221 self.chunk_index += 1;
222 Ok(())
223 }
224
225 pub fn finish(mut self) -> Result<()> {
227 if self.finished {
228 return Err(Error::InvalidFormat("writer already finished".into()));
229 }
230 if self.written != self.meta.total_len {
231 return Err(Error::InvalidFormat(
232 "written length does not match total_len".into(),
233 ));
234 }
235
236 let checksum = self.hasher.finalize();
237 let footer = build_footer(self.chunk_index, checksum);
238 self.writer.write_all(&footer)?;
239 self.writer.flush()?;
240
241 {
242 let file = self.writer.get_mut();
243 file.sync_all()?;
244 write_header(file, &self.meta, self.chunk_index)?;
246 file.sync_all()?;
247 }
248
249 self.finished = true;
250 Ok(())
251 }
252
253 pub fn cancel(self) -> Result<()> {
255 drop(self.writer);
257 let _ = remove_file(&self.path);
258 Ok(())
259 }
260}
261
262pub struct LargeValueReader {
264 reader: BufReader<File>,
265 meta: LargeValueMeta,
266 footer_chunk_count: u32,
267 footer_checksum: u32,
268 hasher: Option<Hasher>,
269 next_index: u32,
270 remaining: u64,
271 footer_start: u64,
272 done: bool,
273}
274
275impl LargeValueReader {
276 pub fn open(path: &Path) -> Result<Self> {
278 let mut file = OpenOptions::new().read(true).open(path)?;
279 let file_len = file.metadata()?.len();
280 if file_len < HEADER_SIZE + FOOTER_SIZE {
281 return Err(Error::InvalidFormat(
282 "large_value file too small for header/footer".into(),
283 ));
284 }
285
286 let (meta, header_chunk_count) = read_header(&mut file)?;
287 let footer_start = file_len
288 .checked_sub(FOOTER_SIZE)
289 .ok_or_else(|| Error::InvalidFormat("file shorter than footer".into()))?;
290 let (footer_chunk_count, footer_checksum) = read_footer(&mut file, footer_start)?;
291 if header_chunk_count != 0 && header_chunk_count != footer_chunk_count {
292 return Err(Error::InvalidFormat(
293 "header/footer chunk counts do not match".into(),
294 ));
295 }
296
297 file.seek(SeekFrom::Start(HEADER_SIZE))?;
298
299 Ok(Self {
300 reader: BufReader::new(file),
301 meta,
302 footer_chunk_count,
303 footer_checksum,
304 hasher: Some(Hasher::new()),
305 next_index: 0,
306 remaining: meta.total_len,
307 footer_start,
308 done: false,
309 })
310 }
311
312 pub fn meta(&self) -> LargeValueMeta {
314 self.meta
315 }
316
317 fn finalize_checksum(&mut self) -> Result<()> {
318 if self.done {
319 return Ok(());
320 }
321 let hasher = self
322 .hasher
323 .take()
324 .ok_or_else(|| Error::InvalidFormat("reader checksum already finalized".into()))?;
325 let computed = hasher.finalize();
326 if computed != self.footer_checksum {
327 return Err(Error::ChecksumMismatch);
328 }
329 if self.next_index != self.footer_chunk_count {
330 return Err(Error::InvalidFormat(
331 "chunk count mismatch at end of stream".into(),
332 ));
333 }
334 self.done = true;
335 Ok(())
336 }
337
338 pub fn next_chunk(&mut self) -> Result<Option<(LargeValueChunkInfo, Vec<u8>)>> {
340 if self.done {
341 return Ok(None);
342 }
343 if self.remaining == 0 {
344 self.finalize_checksum()?;
345 return Ok(None);
346 }
347
348 let pos = self.reader.stream_position()?;
349 if pos + 8 > self.footer_start {
350 return Err(Error::InvalidFormat(
351 "unexpected end before footer while reading chunk header".into(),
352 ));
353 }
354
355 let mut len_buf = [0u8; 8];
356 self.reader.read_exact(&mut len_buf)?;
357 let chunk_index = u32::from_le_bytes(len_buf[..4].try_into().unwrap());
358 let chunk_len = u32::from_le_bytes(len_buf[4..].try_into().unwrap());
359
360 if chunk_index != self.next_index {
361 return Err(Error::InvalidFormat(
362 "chunk index out of sequence in large_value".into(),
363 ));
364 }
365 if chunk_len as u64 > self.remaining {
366 return Err(Error::InvalidFormat(
367 "chunk length exceeds remaining payload".into(),
368 ));
369 }
370 if chunk_len > self.meta.chunk_size {
371 return Err(Error::InvalidFormat(
372 "chunk length exceeds declared chunk_size".into(),
373 ));
374 }
375 let after_chunk = self.reader.stream_position()? + chunk_len as u64;
376 if after_chunk > self.footer_start {
377 return Err(Error::InvalidFormat(
378 "chunk overruns footer boundary".into(),
379 ));
380 }
381
382 let mut data = vec![0u8; chunk_len as usize];
383 self.reader.read_exact(&mut data)?;
384
385 if let Some(hasher) = &mut self.hasher {
386 hasher.update(&len_buf);
387 hasher.update(&data);
388 }
389
390 self.remaining -= chunk_len as u64;
391 self.next_index += 1;
392 let is_last = self.remaining == 0;
393 if is_last {
394 self.finalize_checksum()?;
395 }
396
397 Ok(Some((
398 LargeValueChunkInfo {
399 index: chunk_index,
400 is_last,
401 },
402 data,
403 )))
404 }
405}
406
407#[cfg(all(test, not(target_arch = "wasm32")))]
408mod tests {
409 use super::*;
410 use tempfile::tempdir;
411
412 fn blob_meta(total: u64, chunk_size: u32) -> LargeValueMeta {
413 LargeValueMeta {
414 kind: LargeValueKind::Blob,
415 total_len: total,
416 chunk_size,
417 }
418 }
419
420 #[test]
421 fn writes_and_reads_blob_chunks() {
422 let dir = tempdir().unwrap();
423 let path = dir.path().join("blob.lv");
424 let data = b"abcdefghi";
425
426 {
427 let mut writer =
428 LargeValueWriter::create(&path, blob_meta(data.len() as u64, 4)).unwrap();
429 writer.write_chunk(&data[..4]).unwrap();
430 writer.write_chunk(&data[4..8]).unwrap();
431 writer.write_chunk(&data[8..]).unwrap();
432 writer.finish().unwrap();
433 }
434
435 let mut reader = LargeValueReader::open(&path).unwrap();
436 let mut collected = Vec::new();
437 while let Some((info, chunk)) = reader.next_chunk().unwrap() {
438 collected.extend_from_slice(&chunk);
439 if info.is_last {
440 assert_eq!(info.index, 2);
441 }
442 }
443 assert_eq!(collected, data);
444 assert_eq!(reader.meta().total_len, data.len() as u64);
445 }
446
447 #[test]
448 fn typed_payload_roundtrip_and_partial_read() {
449 let dir = tempdir().unwrap();
450 let path = dir.path().join("typed.lv");
451 let data = b"012345";
452 let meta = LargeValueMeta {
453 kind: LargeValueKind::Typed(42),
454 total_len: data.len() as u64,
455 chunk_size: 4,
456 };
457
458 {
459 let mut writer = LargeValueWriter::create(&path, meta).unwrap();
460 writer.write_chunk(&data[..4]).unwrap();
461 writer.write_chunk(&data[4..]).unwrap();
462 assert_eq!(writer.remaining(), 0);
463 writer.finish().unwrap();
464 }
465
466 let mut reader = LargeValueReader::open(&path).unwrap();
467 assert!(matches!(reader.meta().kind, LargeValueKind::Typed(42)));
468
469 let first = reader.next_chunk().unwrap().unwrap();
471 assert_eq!(first.0.index, 0);
472 assert!(!first.0.is_last);
473 assert_eq!(first.1, b"0123");
474
475 let second = reader.next_chunk().unwrap().unwrap();
476 assert_eq!(second.0.index, 1);
477 assert!(second.0.is_last);
478 assert_eq!(second.1, b"45");
479 assert!(reader.next_chunk().unwrap().is_none());
480 }
481
482 #[test]
483 fn detects_checksum_mismatch() {
484 let dir = tempdir().unwrap();
485 let path = dir.path().join("blob.lv");
486 {
487 let mut writer = LargeValueWriter::create(&path, blob_meta(3, 4)).unwrap();
488 writer.write_chunk(b"abc").unwrap();
489 writer.finish().unwrap();
490 }
491
492 {
494 let mut file = OpenOptions::new()
495 .read(true)
496 .write(true)
497 .open(&path)
498 .unwrap();
499 file.seek(SeekFrom::Start(HEADER_SIZE + 8 + 1)).unwrap(); let mut b = [0u8; 1];
501 file.read_exact(&mut b).unwrap();
502 file.seek(SeekFrom::Current(-1)).unwrap();
503 file.write_all(&[b[0] ^ 0xAA]).unwrap();
504 file.sync_all().unwrap();
505 }
506
507 let mut reader = LargeValueReader::open(&path).unwrap();
508 let err = reader.next_chunk().unwrap_err();
509 assert!(matches!(err, Error::ChecksumMismatch));
510 }
511
512 #[test]
513 fn cancel_removes_file() {
514 let dir = tempdir().unwrap();
515 let path = dir.path().join("blob.lv");
516 {
517 let writer = LargeValueWriter::create(&path, blob_meta(3, 4)).unwrap();
518 writer.cancel().unwrap();
519 }
520 assert!(!path.exists());
521 }
522}