Skip to main content

luci/store/
doc_store.rs

1//! LZ4-compressed document store.
2//!
3//! Stores original JSON source documents in 16 KB blocks compressed with
4//! `lz4_flex`. Each document can be retrieved by doc ID.
5//!
6//! On-disk format:
7//! ```text
8//! [num_blocks: u32] [num_docs: u32]
9//! Per-block: [offset: u64] [compressed_len: u32] [uncompressed_len: u32]
10//! Per-doc:   [block_idx: u32] [within_block_offset: u32] [len: u32]
11//! Compressed block data
12//! ```
13//!
14//! See [[document-store]] and [[architecture-overview#Step 4]].
15
16/// Uncompressed block size target (16 KB).
17const BLOCK_SIZE: usize = 16 * 1024;
18
19// --- DocStoreWriter ---
20
21/// Builds a compressed document store from raw document bytes.
22pub struct DocStoreWriter {
23    /// Current uncompressed block being filled.
24    current_block: Vec<u8>,
25    /// Compressed blocks accumulated so far.
26    compressed_blocks: Vec<CompressedBlock>,
27    /// Per-document metadata: (block_idx, offset_in_block, length).
28    doc_entries: Vec<DocEntry>,
29}
30
31struct CompressedBlock {
32    data: Vec<u8>,
33    uncompressed_len: u32,
34}
35
36#[derive(Clone, Copy)]
37struct DocEntry {
38    block_idx: u32,
39    within_block_offset: u32,
40    len: u32,
41}
42
43impl DocStoreWriter {
44    pub fn new() -> Self {
45        Self {
46            current_block: Vec::new(),
47            compressed_blocks: Vec::new(),
48            doc_entries: Vec::new(),
49        }
50    }
51
52    /// Add a document's raw bytes (typically JSON). Documents are assigned
53    /// sequential IDs starting from 0.
54    pub fn add(&mut self, doc_bytes: &[u8]) {
55        let block_idx = self.compressed_blocks.len() as u32;
56        let offset = self.current_block.len() as u32;
57
58        self.doc_entries.push(DocEntry {
59            block_idx,
60            within_block_offset: offset,
61            len: doc_bytes.len() as u32,
62        });
63
64        self.current_block.extend_from_slice(doc_bytes);
65
66        // Flush if block is full
67        if self.current_block.len() >= BLOCK_SIZE {
68            self.flush_block();
69        }
70    }
71
72    /// Number of documents stored so far.
73    pub fn doc_count(&self) -> u32 {
74        self.doc_entries.len() as u32
75    }
76
77    fn flush_block(&mut self) {
78        if self.current_block.is_empty() {
79            return;
80        }
81        let uncompressed_len = self.current_block.len() as u32;
82        let compressed = lz4_flex::compress_prepend_size(&self.current_block);
83        self.compressed_blocks.push(CompressedBlock {
84            data: compressed,
85            uncompressed_len,
86        });
87        self.current_block.clear();
88    }
89
90    /// Finalize and return the encoded doc store bytes.
91    pub fn finish(mut self) -> Vec<u8> {
92        // Flush any remaining data in the current block
93        self.flush_block();
94
95        // Fix up doc entries for docs that were in the last block
96        // (their block_idx was set to compressed_blocks.len() before flush)
97        // This is already correct because flush_block pushes to compressed_blocks.
98
99        let num_blocks = self.compressed_blocks.len() as u32;
100        let num_docs = self.doc_entries.len() as u32;
101
102        let mut result = Vec::new();
103
104        // Header
105        result.extend_from_slice(&num_blocks.to_le_bytes());
106        result.extend_from_slice(&num_docs.to_le_bytes());
107
108        // Block metadata (offsets filled in after we know where blocks start)
109        let block_meta_start = result.len();
110        // Reserve space: per block = 8 (offset) + 4 (compressed_len) + 4 (uncompressed_len) = 16
111        let block_meta_size = num_blocks as usize * 16;
112        result.resize(result.len() + block_meta_size, 0);
113
114        // Doc metadata
115        let doc_meta_start = result.len();
116        // Per doc = 4 (block_idx) + 4 (within_block_offset) + 4 (len) = 12
117        let doc_meta_size = num_docs as usize * 12;
118        result.resize(result.len() + doc_meta_size, 0);
119
120        // Write compressed blocks and record offsets
121        for (i, block) in self.compressed_blocks.iter().enumerate() {
122            let offset = result.len() as u64;
123            let compressed_len = block.data.len() as u32;
124
125            // Write block metadata
126            let meta_pos = block_meta_start + i * 16;
127            result[meta_pos..meta_pos + 8].copy_from_slice(&offset.to_le_bytes());
128            result[meta_pos + 8..meta_pos + 12].copy_from_slice(&compressed_len.to_le_bytes());
129            result[meta_pos + 12..meta_pos + 16]
130                .copy_from_slice(&block.uncompressed_len.to_le_bytes());
131
132            // Write compressed data
133            result.extend_from_slice(&block.data);
134        }
135
136        // Write doc metadata
137        for (i, entry) in self.doc_entries.iter().enumerate() {
138            let pos = doc_meta_start + i * 12;
139            result[pos..pos + 4].copy_from_slice(&entry.block_idx.to_le_bytes());
140            result[pos + 4..pos + 8].copy_from_slice(&entry.within_block_offset.to_le_bytes());
141            result[pos + 8..pos + 12].copy_from_slice(&entry.len.to_le_bytes());
142        }
143
144        result
145    }
146}
147
148impl Default for DocStoreWriter {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154// --- DocStoreReader ---
155
156/// Reads documents from a compressed doc store.
157///
158/// Caches the most recently decompressed block to avoid redundant
159/// decompression when reading sequential documents.
160pub struct DocStoreReader<'a> {
161    data: &'a [u8],
162    #[allow(dead_code)]
163    num_blocks: u32,
164    num_docs: u32,
165    block_meta_start: usize,
166    doc_meta_start: usize,
167    /// Cached last decompressed block index and data.
168    cached_block: std::sync::Mutex<Option<(u32, Vec<u8>)>>,
169}
170
171impl<'a> DocStoreReader<'a> {
172    /// Open a doc store from encoded bytes.
173    pub fn open(data: &'a [u8]) -> Self {
174        let num_blocks = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
175        let num_docs = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
176        let block_meta_start = 8;
177        let doc_meta_start = block_meta_start + num_blocks as usize * 16;
178
179        Self {
180            data,
181            num_blocks,
182            num_docs,
183            block_meta_start,
184            doc_meta_start,
185            cached_block: std::sync::Mutex::new(None),
186        }
187    }
188
189    /// Number of documents in the store.
190    pub fn doc_count(&self) -> u32 {
191        self.num_docs
192    }
193
194    /// Retrieve a document by doc ID. Returns None if out of range.
195    pub fn get(&self, doc_id: u32) -> Option<Vec<u8>> {
196        if doc_id >= self.num_docs {
197            return None;
198        }
199
200        // Read doc metadata
201        let doc_pos = self.doc_meta_start + doc_id as usize * 12;
202        let block_idx = u32::from_le_bytes(self.data[doc_pos..doc_pos + 4].try_into().unwrap());
203        let within_offset =
204            u32::from_le_bytes(self.data[doc_pos + 4..doc_pos + 8].try_into().unwrap());
205        let len = u32::from_le_bytes(self.data[doc_pos + 8..doc_pos + 12].try_into().unwrap());
206
207        // Decompress block (with caching)
208        let block_data = self.get_block(block_idx);
209        let start = within_offset as usize;
210        let end = start + len as usize;
211        Some(block_data[start..end].to_vec())
212    }
213
214    fn get_block(&self, block_idx: u32) -> Vec<u8> {
215        // Check cache
216        {
217            let cache = self.cached_block.lock().unwrap();
218            if let Some((cached_idx, ref data)) = *cache {
219                if cached_idx == block_idx {
220                    return data.clone();
221                }
222            }
223        }
224
225        // Read block metadata
226        let meta_pos = self.block_meta_start + block_idx as usize * 16;
227        let offset =
228            u64::from_le_bytes(self.data[meta_pos..meta_pos + 8].try_into().unwrap()) as usize;
229        let compressed_len =
230            u32::from_le_bytes(self.data[meta_pos + 8..meta_pos + 12].try_into().unwrap()) as usize;
231
232        // Decompress
233        let compressed = &self.data[offset..offset + compressed_len];
234        let decompressed =
235            lz4_flex::decompress_size_prepended(compressed).expect("LZ4 decompression failed");
236
237        // Update cache
238        *self.cached_block.lock().unwrap() = Some((block_idx, decompressed.clone()));
239
240        decompressed
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn single_doc() {
250        let mut writer = DocStoreWriter::new();
251        writer.add(b"hello world");
252        let data = writer.finish();
253
254        let reader = DocStoreReader::open(&data);
255        assert_eq!(reader.doc_count(), 1);
256        assert_eq!(reader.get(0).unwrap(), b"hello world");
257        assert_eq!(reader.get(1), None);
258    }
259
260    #[test]
261    fn multiple_docs() {
262        let mut writer = DocStoreWriter::new();
263        writer.add(b"doc zero");
264        writer.add(b"doc one");
265        writer.add(b"doc two");
266        let data = writer.finish();
267
268        let reader = DocStoreReader::open(&data);
269        assert_eq!(reader.doc_count(), 3);
270        assert_eq!(reader.get(0).unwrap(), b"doc zero");
271        assert_eq!(reader.get(1).unwrap(), b"doc one");
272        assert_eq!(reader.get(2).unwrap(), b"doc two");
273    }
274
275    #[test]
276    fn empty_doc() {
277        let mut writer = DocStoreWriter::new();
278        writer.add(b"");
279        writer.add(b"nonempty");
280        writer.add(b"");
281        let data = writer.finish();
282
283        let reader = DocStoreReader::open(&data);
284        assert_eq!(reader.get(0).unwrap(), b"");
285        assert_eq!(reader.get(1).unwrap(), b"nonempty");
286        assert_eq!(reader.get(2).unwrap(), b"");
287    }
288
289    #[test]
290    fn multi_block_spanning() {
291        // Write enough docs to fill multiple blocks
292        let mut writer = DocStoreWriter::new();
293        let doc = vec![b'x'; 4096]; // 4 KB each, 4 docs per block
294        for _ in 0..20 {
295            writer.add(&doc);
296        }
297        let data = writer.finish();
298
299        let reader = DocStoreReader::open(&data);
300        assert_eq!(reader.doc_count(), 20);
301        for i in 0..20 {
302            assert_eq!(reader.get(i).unwrap(), doc);
303        }
304    }
305
306    #[test]
307    fn large_doc_filling_block() {
308        let mut writer = DocStoreWriter::new();
309        let large_doc = vec![b'A'; BLOCK_SIZE]; // exactly one block
310        writer.add(&large_doc);
311        writer.add(b"small");
312        let data = writer.finish();
313
314        let reader = DocStoreReader::open(&data);
315        assert_eq!(reader.get(0).unwrap(), large_doc);
316        assert_eq!(reader.get(1).unwrap(), b"small");
317    }
318
319    #[test]
320    fn compression_reduces_size() {
321        let mut writer = DocStoreWriter::new();
322        // Highly compressible: all zeros
323        let doc = vec![0u8; BLOCK_SIZE * 2];
324        writer.add(&doc);
325        let data = writer.finish();
326
327        // Compressed should be significantly smaller than raw
328        assert!(
329            data.len() < doc.len(),
330            "compressed size {} should be less than raw {}",
331            data.len(),
332            doc.len()
333        );
334    }
335
336    #[test]
337    fn all_docs_retrievable() {
338        let mut writer = DocStoreWriter::new();
339        let docs: Vec<Vec<u8>> = (0..100)
340            .map(|i| format!("{{\"id\": {i}, \"text\": \"document number {i}\"}}").into_bytes())
341            .collect();
342
343        for doc in &docs {
344            writer.add(doc);
345        }
346        let data = writer.finish();
347
348        let reader = DocStoreReader::open(&data);
349        assert_eq!(reader.doc_count(), 100);
350        for (i, expected) in docs.iter().enumerate() {
351            assert_eq!(&reader.get(i as u32).unwrap(), expected);
352        }
353    }
354
355    #[test]
356    fn out_of_range_returns_none() {
357        let mut writer = DocStoreWriter::new();
358        writer.add(b"only one");
359        let data = writer.finish();
360
361        let reader = DocStoreReader::open(&data);
362        assert_eq!(reader.get(1), None);
363        assert_eq!(reader.get(100), None);
364        assert_eq!(reader.get(u32::MAX), None);
365    }
366
367    #[test]
368    fn empty_store() {
369        let writer = DocStoreWriter::new();
370        let data = writer.finish();
371
372        let reader = DocStoreReader::open(&data);
373        assert_eq!(reader.doc_count(), 0);
374        assert_eq!(reader.get(0), None);
375    }
376
377    #[test]
378    fn json_docs_round_trip() {
379        let mut writer = DocStoreWriter::new();
380        let json1 = br#"{"title":"hello","body":"world"}"#;
381        let json2 = br#"{"title":"foo","body":"bar baz"}"#;
382        writer.add(json1);
383        writer.add(json2);
384        let data = writer.finish();
385
386        let reader = DocStoreReader::open(&data);
387        let doc1: serde_json::Value = serde_json::from_slice(&reader.get(0).unwrap()).unwrap();
388        assert_eq!(doc1["title"], "hello");
389        let doc2: serde_json::Value = serde_json::from_slice(&reader.get(1).unwrap()).unwrap();
390        assert_eq!(doc2["body"], "bar baz");
391    }
392
393    #[test]
394    fn block_cache_hit() {
395        // Two docs in the same block — second read should hit cache
396        let mut writer = DocStoreWriter::new();
397        writer.add(b"doc A");
398        writer.add(b"doc B");
399        let data = writer.finish();
400
401        let reader = DocStoreReader::open(&data);
402        assert_eq!(reader.get(0).unwrap(), b"doc A");
403        // Second get from same block should use cache
404        assert_eq!(reader.get(1).unwrap(), b"doc B");
405    }
406
407    #[test]
408    fn varying_doc_sizes() {
409        let mut writer = DocStoreWriter::new();
410        writer.add(b"x");
411        writer.add(&vec![b'y'; 1000]);
412        writer.add(b"z");
413        writer.add(&vec![b'w'; 8000]);
414        let data = writer.finish();
415
416        let reader = DocStoreReader::open(&data);
417        assert_eq!(reader.get(0).unwrap(), b"x");
418        assert_eq!(reader.get(1).unwrap(), vec![b'y'; 1000]);
419        assert_eq!(reader.get(2).unwrap(), b"z");
420        assert_eq!(reader.get(3).unwrap(), vec![b'w'; 8000]);
421    }
422}