alopex_core/storage/large_value/
chunk.rs

1//! 大容量チャンクフォーマットの writer/reader 実装。
2//! バックプレッシャは `chunk_size` で制御し、writer/reader ともに同時保持は 1 チャンク分に限定する。
3//! crc32 でボディを保護し、Blob/Typed いずれも同じレイアウトを用いる。
4
5use crate::error::{Error, Result};
6use crc32fast::Hasher;
7use std::fs::{remove_file, File, OpenOptions};
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11const HEADER_MAGIC: &[u8; 4] = b"LVCH";
12const FOOTER_MAGIC: &[u8; 4] = b"LVFT";
13const VERSION: u16 = 1;
14const HEADER_SIZE: u64 = 4 + 2 + 1 + 1 + 2 + 8 + 4 + 4; // magic + version + kind + reserved + type_id + total_len + chunk_size + chunk_count
15const FOOTER_SIZE: u64 = 4 + 4 + 4; // magic + chunk_count + checksum
16/// Default chunk size (1 MiB) used when callers want a reasonable starting point.
17pub const DEFAULT_CHUNK_SIZE: u32 = 1024 * 1024; // 1 MiB
18
19/// Identifies whether the large value is typed or opaque.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum LargeValueKind {
22    /// Opaque BLOB without a type identifier.
23    Blob,
24    /// Typed payload with a user-provided type identifier.
25    Typed(u16),
26}
27
28/// Metadata describing a large value container.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct LargeValueMeta {
31    /// Kind of payload (typed vs blob).
32    pub kind: LargeValueKind,
33    /// Total length of the payload in bytes.
34    pub total_len: u64,
35    /// Maximum chunk size in bytes.
36    pub chunk_size: u32,
37}
38
39/// Per-chunk information returned by the reader.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct LargeValueChunkInfo {
42    /// Zero-based chunk index.
43    pub index: u32,
44    /// Whether this chunk is the last one.
45    pub is_last: bool,
46}
47
48fn write_header(file: &mut File, meta: &LargeValueMeta, chunk_count: u32) -> Result<()> {
49    let mut buf = [0u8; HEADER_SIZE as usize];
50    buf[0..4].copy_from_slice(HEADER_MAGIC);
51    buf[4..6].copy_from_slice(&VERSION.to_le_bytes());
52    buf[6] = match meta.kind {
53        LargeValueKind::Blob => 0,
54        LargeValueKind::Typed(_) => 1,
55    };
56    buf[7] = 0; // reserved
57    let type_id = match meta.kind {
58        LargeValueKind::Blob => 0,
59        LargeValueKind::Typed(id) => id,
60    };
61    buf[8..10].copy_from_slice(&type_id.to_le_bytes());
62    buf[10..18].copy_from_slice(&meta.total_len.to_le_bytes());
63    buf[18..22].copy_from_slice(&meta.chunk_size.to_le_bytes());
64    buf[22..26].copy_from_slice(&chunk_count.to_le_bytes());
65    file.seek(SeekFrom::Start(0))?;
66    file.write_all(&buf)?;
67    Ok(())
68}
69
70fn read_header(file: &mut File) -> Result<(LargeValueMeta, u32)> {
71    let mut buf = [0u8; HEADER_SIZE as usize];
72    file.seek(SeekFrom::Start(0))?;
73    file.read_exact(&mut buf)?;
74
75    if &buf[0..4] != HEADER_MAGIC {
76        return Err(Error::InvalidFormat(
77            "invalid large_value header magic".into(),
78        ));
79    }
80    let version = u16::from_le_bytes(buf[4..6].try_into().unwrap());
81    if version != VERSION {
82        return Err(Error::InvalidFormat(format!(
83            "unsupported large_value version: {version}"
84        )));
85    }
86    let kind = match buf[6] {
87        0 => LargeValueKind::Blob,
88        1 => {
89            let id = u16::from_le_bytes(buf[8..10].try_into().unwrap());
90            LargeValueKind::Typed(id)
91        }
92        other => {
93            return Err(Error::InvalidFormat(format!(
94                "unknown large_value kind: {other}"
95            )))
96        }
97    };
98
99    let total_len = u64::from_le_bytes(buf[10..18].try_into().unwrap());
100    let chunk_size = u32::from_le_bytes(buf[18..22].try_into().unwrap());
101    if chunk_size == 0 {
102        return Err(Error::InvalidFormat("chunk_size must be > 0".into()));
103    }
104    let chunk_count = u32::from_le_bytes(buf[22..26].try_into().unwrap());
105
106    Ok((
107        LargeValueMeta {
108            kind,
109            total_len,
110            chunk_size,
111        },
112        chunk_count,
113    ))
114}
115
116fn read_footer(file: &mut File, footer_start: u64) -> Result<(u32, u32)> {
117    file.seek(SeekFrom::Start(footer_start))?;
118    let mut buf = [0u8; FOOTER_SIZE as usize];
119    file.read_exact(&mut buf)?;
120    if &buf[0..4] != FOOTER_MAGIC {
121        return Err(Error::InvalidFormat(
122            "invalid large_value footer magic".into(),
123        ));
124    }
125    let chunk_count = u32::from_le_bytes(buf[4..8].try_into().unwrap());
126    let checksum = u32::from_le_bytes(buf[8..12].try_into().unwrap());
127    Ok((chunk_count, checksum))
128}
129
130fn build_footer(chunk_count: u32, checksum: u32) -> [u8; FOOTER_SIZE as usize] {
131    let mut buf = [0u8; FOOTER_SIZE as usize];
132    buf[0..4].copy_from_slice(FOOTER_MAGIC);
133    buf[4..8].copy_from_slice(&chunk_count.to_le_bytes());
134    buf[8..12].copy_from_slice(&checksum.to_le_bytes());
135    buf
136}
137
138/// Writer for chunked large values.
139pub struct LargeValueWriter {
140    path: PathBuf,
141    writer: BufWriter<File>,
142    meta: LargeValueMeta,
143    chunk_index: u32,
144    written: u64,
145    hasher: Hasher,
146    finished: bool,
147}
148
149impl LargeValueWriter {
150    /// Creates a new chunked writer at the given path.
151    pub fn create(path: &Path, meta: LargeValueMeta) -> Result<Self> {
152        if meta.chunk_size == 0 {
153            return Err(Error::InvalidFormat("chunk_size must be > 0".into()));
154        }
155        if let Some(parent) = path.parent() {
156            std::fs::create_dir_all(parent)?;
157        }
158        let mut file = OpenOptions::new()
159            .write(true)
160            .read(true)
161            .create(true)
162            .truncate(true)
163            .open(path)?;
164        write_header(&mut file, &meta, 0)?;
165
166        Ok(Self {
167            path: path.to_path_buf(),
168            writer: BufWriter::new(file),
169            meta,
170            chunk_index: 0,
171            written: 0,
172            hasher: Hasher::new(),
173            finished: false,
174        })
175    }
176
177    /// Returns the metadata describing this large value.
178    pub fn meta(&self) -> LargeValueMeta {
179        self.meta
180    }
181
182    /// Returns remaining bytes expected before the writer reaches total_len.
183    pub fn remaining(&self) -> u64 {
184        self.meta
185            .total_len
186            .saturating_sub(self.written.min(self.meta.total_len))
187    }
188
189    /// Maximum chunk size permitted for this writer.
190    pub fn chunk_size(&self) -> u32 {
191        self.meta.chunk_size
192    }
193
194    /// Writes a single chunk. Chunks must respect the configured chunk_size and total_len.
195    pub fn write_chunk(&mut self, chunk: &[u8]) -> Result<()> {
196        if self.finished {
197            return Err(Error::InvalidFormat("writer already finished".into()));
198        }
199        if chunk.is_empty() {
200            return Err(Error::InvalidFormat("chunk must not be empty".into()));
201        }
202        if chunk.len() as u32 > self.meta.chunk_size {
203            return Err(Error::InvalidFormat("chunk exceeds chunk_size".into()));
204        }
205        if self.written + chunk.len() as u64 > self.meta.total_len {
206            return Err(Error::InvalidFormat(
207                "chunk writes exceed declared total_len".into(),
208            ));
209        }
210
211        let mut len_buf = [0u8; 8];
212        len_buf[..4].copy_from_slice(&self.chunk_index.to_le_bytes());
213        len_buf[4..].copy_from_slice(&(chunk.len() as u32).to_le_bytes());
214
215        self.writer.write_all(&len_buf)?;
216        self.writer.write_all(chunk)?;
217
218        self.hasher.update(&len_buf);
219        self.hasher.update(chunk);
220        self.written += chunk.len() as u64;
221        self.chunk_index += 1;
222        Ok(())
223    }
224
225    /// Finalizes the writer, writing footer and updating header metadata.
226    pub fn finish(mut self) -> Result<()> {
227        if self.finished {
228            return Err(Error::InvalidFormat("writer already finished".into()));
229        }
230        if self.written != self.meta.total_len {
231            return Err(Error::InvalidFormat(
232                "written length does not match total_len".into(),
233            ));
234        }
235
236        let checksum = self.hasher.finalize();
237        let footer = build_footer(self.chunk_index, checksum);
238        self.writer.write_all(&footer)?;
239        self.writer.flush()?;
240
241        {
242            let file = self.writer.get_mut();
243            file.sync_all()?;
244            // Rewrite header chunk_count with the final count.
245            write_header(file, &self.meta, self.chunk_index)?;
246            file.sync_all()?;
247        }
248
249        self.finished = true;
250        Ok(())
251    }
252
253    /// Cancels the writer and removes the partially written file.
254    pub fn cancel(self) -> Result<()> {
255        // Drop the writer to close the file handle.
256        drop(self.writer);
257        let _ = remove_file(&self.path);
258        Ok(())
259    }
260}
261
262/// Reader for chunked large values. Maintains O(chunk_size) memory.
263pub struct LargeValueReader {
264    reader: BufReader<File>,
265    meta: LargeValueMeta,
266    footer_chunk_count: u32,
267    footer_checksum: u32,
268    hasher: Option<Hasher>,
269    next_index: u32,
270    remaining: u64,
271    footer_start: u64,
272    done: bool,
273}
274
275impl LargeValueReader {
276    /// Opens a chunked large value file and validates header/footer.
277    pub fn open(path: &Path) -> Result<Self> {
278        let mut file = OpenOptions::new().read(true).open(path)?;
279        let file_len = file.metadata()?.len();
280        if file_len < HEADER_SIZE + FOOTER_SIZE {
281            return Err(Error::InvalidFormat(
282                "large_value file too small for header/footer".into(),
283            ));
284        }
285
286        let (meta, header_chunk_count) = read_header(&mut file)?;
287        let footer_start = file_len
288            .checked_sub(FOOTER_SIZE)
289            .ok_or_else(|| Error::InvalidFormat("file shorter than footer".into()))?;
290        let (footer_chunk_count, footer_checksum) = read_footer(&mut file, footer_start)?;
291        if header_chunk_count != 0 && header_chunk_count != footer_chunk_count {
292            return Err(Error::InvalidFormat(
293                "header/footer chunk counts do not match".into(),
294            ));
295        }
296
297        file.seek(SeekFrom::Start(HEADER_SIZE))?;
298
299        Ok(Self {
300            reader: BufReader::new(file),
301            meta,
302            footer_chunk_count,
303            footer_checksum,
304            hasher: Some(Hasher::new()),
305            next_index: 0,
306            remaining: meta.total_len,
307            footer_start,
308            done: false,
309        })
310    }
311
312    /// Returns the metadata of the large value.
313    pub fn meta(&self) -> LargeValueMeta {
314        self.meta
315    }
316
317    fn finalize_checksum(&mut self) -> Result<()> {
318        if self.done {
319            return Ok(());
320        }
321        let hasher = self
322            .hasher
323            .take()
324            .ok_or_else(|| Error::InvalidFormat("reader checksum already finalized".into()))?;
325        let computed = hasher.finalize();
326        if computed != self.footer_checksum {
327            return Err(Error::ChecksumMismatch);
328        }
329        if self.next_index != self.footer_chunk_count {
330            return Err(Error::InvalidFormat(
331                "chunk count mismatch at end of stream".into(),
332            ));
333        }
334        self.done = true;
335        Ok(())
336    }
337
338    /// Reads the next chunk, returning `Ok(None)` when the stream ends.
339    pub fn next_chunk(&mut self) -> Result<Option<(LargeValueChunkInfo, Vec<u8>)>> {
340        if self.done {
341            return Ok(None);
342        }
343        if self.remaining == 0 {
344            self.finalize_checksum()?;
345            return Ok(None);
346        }
347
348        let pos = self.reader.stream_position()?;
349        if pos + 8 > self.footer_start {
350            return Err(Error::InvalidFormat(
351                "unexpected end before footer while reading chunk header".into(),
352            ));
353        }
354
355        let mut len_buf = [0u8; 8];
356        self.reader.read_exact(&mut len_buf)?;
357        let chunk_index = u32::from_le_bytes(len_buf[..4].try_into().unwrap());
358        let chunk_len = u32::from_le_bytes(len_buf[4..].try_into().unwrap());
359
360        if chunk_index != self.next_index {
361            return Err(Error::InvalidFormat(
362                "chunk index out of sequence in large_value".into(),
363            ));
364        }
365        if chunk_len as u64 > self.remaining {
366            return Err(Error::InvalidFormat(
367                "chunk length exceeds remaining payload".into(),
368            ));
369        }
370        if chunk_len > self.meta.chunk_size {
371            return Err(Error::InvalidFormat(
372                "chunk length exceeds declared chunk_size".into(),
373            ));
374        }
375        let after_chunk = self.reader.stream_position()? + chunk_len as u64;
376        if after_chunk > self.footer_start {
377            return Err(Error::InvalidFormat(
378                "chunk overruns footer boundary".into(),
379            ));
380        }
381
382        let mut data = vec![0u8; chunk_len as usize];
383        self.reader.read_exact(&mut data)?;
384
385        if let Some(hasher) = &mut self.hasher {
386            hasher.update(&len_buf);
387            hasher.update(&data);
388        }
389
390        self.remaining -= chunk_len as u64;
391        self.next_index += 1;
392        let is_last = self.remaining == 0;
393        if is_last {
394            self.finalize_checksum()?;
395        }
396
397        Ok(Some((
398            LargeValueChunkInfo {
399                index: chunk_index,
400                is_last,
401            },
402            data,
403        )))
404    }
405}
406
407#[cfg(all(test, not(target_arch = "wasm32")))]
408mod tests {
409    use super::*;
410    use tempfile::tempdir;
411
412    fn blob_meta(total: u64, chunk_size: u32) -> LargeValueMeta {
413        LargeValueMeta {
414            kind: LargeValueKind::Blob,
415            total_len: total,
416            chunk_size,
417        }
418    }
419
420    #[test]
421    fn writes_and_reads_blob_chunks() {
422        let dir = tempdir().unwrap();
423        let path = dir.path().join("blob.lv");
424        let data = b"abcdefghi";
425
426        {
427            let mut writer =
428                LargeValueWriter::create(&path, blob_meta(data.len() as u64, 4)).unwrap();
429            writer.write_chunk(&data[..4]).unwrap();
430            writer.write_chunk(&data[4..8]).unwrap();
431            writer.write_chunk(&data[8..]).unwrap();
432            writer.finish().unwrap();
433        }
434
435        let mut reader = LargeValueReader::open(&path).unwrap();
436        let mut collected = Vec::new();
437        while let Some((info, chunk)) = reader.next_chunk().unwrap() {
438            collected.extend_from_slice(&chunk);
439            if info.is_last {
440                assert_eq!(info.index, 2);
441            }
442        }
443        assert_eq!(collected, data);
444        assert_eq!(reader.meta().total_len, data.len() as u64);
445    }
446
447    #[test]
448    fn typed_payload_roundtrip_and_partial_read() {
449        let dir = tempdir().unwrap();
450        let path = dir.path().join("typed.lv");
451        let data = b"012345";
452        let meta = LargeValueMeta {
453            kind: LargeValueKind::Typed(42),
454            total_len: data.len() as u64,
455            chunk_size: 4,
456        };
457
458        {
459            let mut writer = LargeValueWriter::create(&path, meta).unwrap();
460            writer.write_chunk(&data[..4]).unwrap();
461            writer.write_chunk(&data[4..]).unwrap();
462            assert_eq!(writer.remaining(), 0);
463            writer.finish().unwrap();
464        }
465
466        let mut reader = LargeValueReader::open(&path).unwrap();
467        assert!(matches!(reader.meta().kind, LargeValueKind::Typed(42)));
468
469        // Partial read: only consume first chunk, ensure iterator can continue.
470        let first = reader.next_chunk().unwrap().unwrap();
471        assert_eq!(first.0.index, 0);
472        assert!(!first.0.is_last);
473        assert_eq!(first.1, b"0123");
474
475        let second = reader.next_chunk().unwrap().unwrap();
476        assert_eq!(second.0.index, 1);
477        assert!(second.0.is_last);
478        assert_eq!(second.1, b"45");
479        assert!(reader.next_chunk().unwrap().is_none());
480    }
481
482    #[test]
483    fn detects_checksum_mismatch() {
484        let dir = tempdir().unwrap();
485        let path = dir.path().join("blob.lv");
486        {
487            let mut writer = LargeValueWriter::create(&path, blob_meta(3, 4)).unwrap();
488            writer.write_chunk(b"abc").unwrap();
489            writer.finish().unwrap();
490        }
491
492        // Corrupt one byte in the body.
493        {
494            let mut file = OpenOptions::new()
495                .read(true)
496                .write(true)
497                .open(&path)
498                .unwrap();
499            file.seek(SeekFrom::Start(HEADER_SIZE + 8 + 1)).unwrap(); // header + chunk header + 1 byte into payload
500            let mut b = [0u8; 1];
501            file.read_exact(&mut b).unwrap();
502            file.seek(SeekFrom::Current(-1)).unwrap();
503            file.write_all(&[b[0] ^ 0xAA]).unwrap();
504            file.sync_all().unwrap();
505        }
506
507        let mut reader = LargeValueReader::open(&path).unwrap();
508        let err = reader.next_chunk().unwrap_err();
509        assert!(matches!(err, Error::ChecksumMismatch));
510    }
511
512    #[test]
513    fn cancel_removes_file() {
514        let dir = tempdir().unwrap();
515        let path = dir.path().join("blob.lv");
516        {
517            let writer = LargeValueWriter::create(&path, blob_meta(3, 4)).unwrap();
518            writer.cancel().unwrap();
519        }
520        assert!(!path.exists());
521    }
522}