1const BLOCK_SIZE: usize = 16 * 1024;
18
19pub struct DocStoreWriter {
23 current_block: Vec<u8>,
25 compressed_blocks: Vec<CompressedBlock>,
27 doc_entries: Vec<DocEntry>,
29}
30
31struct CompressedBlock {
32 data: Vec<u8>,
33 uncompressed_len: u32,
34}
35
36#[derive(Clone, Copy)]
37struct DocEntry {
38 block_idx: u32,
39 within_block_offset: u32,
40 len: u32,
41}
42
43impl DocStoreWriter {
44 pub fn new() -> Self {
45 Self {
46 current_block: Vec::new(),
47 compressed_blocks: Vec::new(),
48 doc_entries: Vec::new(),
49 }
50 }
51
52 pub fn add(&mut self, doc_bytes: &[u8]) {
55 let block_idx = self.compressed_blocks.len() as u32;
56 let offset = self.current_block.len() as u32;
57
58 self.doc_entries.push(DocEntry {
59 block_idx,
60 within_block_offset: offset,
61 len: doc_bytes.len() as u32,
62 });
63
64 self.current_block.extend_from_slice(doc_bytes);
65
66 if self.current_block.len() >= BLOCK_SIZE {
68 self.flush_block();
69 }
70 }
71
72 pub fn doc_count(&self) -> u32 {
74 self.doc_entries.len() as u32
75 }
76
77 fn flush_block(&mut self) {
78 if self.current_block.is_empty() {
79 return;
80 }
81 let uncompressed_len = self.current_block.len() as u32;
82 let compressed = lz4_flex::compress_prepend_size(&self.current_block);
83 self.compressed_blocks.push(CompressedBlock {
84 data: compressed,
85 uncompressed_len,
86 });
87 self.current_block.clear();
88 }
89
90 pub fn finish(mut self) -> Vec<u8> {
92 self.flush_block();
94
95 let num_blocks = self.compressed_blocks.len() as u32;
100 let num_docs = self.doc_entries.len() as u32;
101
102 let mut result = Vec::new();
103
104 result.extend_from_slice(&num_blocks.to_le_bytes());
106 result.extend_from_slice(&num_docs.to_le_bytes());
107
108 let block_meta_start = result.len();
110 let block_meta_size = num_blocks as usize * 16;
112 result.resize(result.len() + block_meta_size, 0);
113
114 let doc_meta_start = result.len();
116 let doc_meta_size = num_docs as usize * 12;
118 result.resize(result.len() + doc_meta_size, 0);
119
120 for (i, block) in self.compressed_blocks.iter().enumerate() {
122 let offset = result.len() as u64;
123 let compressed_len = block.data.len() as u32;
124
125 let meta_pos = block_meta_start + i * 16;
127 result[meta_pos..meta_pos + 8].copy_from_slice(&offset.to_le_bytes());
128 result[meta_pos + 8..meta_pos + 12].copy_from_slice(&compressed_len.to_le_bytes());
129 result[meta_pos + 12..meta_pos + 16]
130 .copy_from_slice(&block.uncompressed_len.to_le_bytes());
131
132 result.extend_from_slice(&block.data);
134 }
135
136 for (i, entry) in self.doc_entries.iter().enumerate() {
138 let pos = doc_meta_start + i * 12;
139 result[pos..pos + 4].copy_from_slice(&entry.block_idx.to_le_bytes());
140 result[pos + 4..pos + 8].copy_from_slice(&entry.within_block_offset.to_le_bytes());
141 result[pos + 8..pos + 12].copy_from_slice(&entry.len.to_le_bytes());
142 }
143
144 result
145 }
146}
147
148impl Default for DocStoreWriter {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154pub struct DocStoreReader<'a> {
161 data: &'a [u8],
162 #[allow(dead_code)]
163 num_blocks: u32,
164 num_docs: u32,
165 block_meta_start: usize,
166 doc_meta_start: usize,
167 cached_block: std::sync::Mutex<Option<(u32, Vec<u8>)>>,
169}
170
171impl<'a> DocStoreReader<'a> {
172 pub fn open(data: &'a [u8]) -> Self {
174 let num_blocks = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
175 let num_docs = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
176 let block_meta_start = 8;
177 let doc_meta_start = block_meta_start + num_blocks as usize * 16;
178
179 Self {
180 data,
181 num_blocks,
182 num_docs,
183 block_meta_start,
184 doc_meta_start,
185 cached_block: std::sync::Mutex::new(None),
186 }
187 }
188
189 pub fn doc_count(&self) -> u32 {
191 self.num_docs
192 }
193
194 pub fn get(&self, doc_id: u32) -> Option<Vec<u8>> {
196 if doc_id >= self.num_docs {
197 return None;
198 }
199
200 let doc_pos = self.doc_meta_start + doc_id as usize * 12;
202 let block_idx = u32::from_le_bytes(self.data[doc_pos..doc_pos + 4].try_into().unwrap());
203 let within_offset =
204 u32::from_le_bytes(self.data[doc_pos + 4..doc_pos + 8].try_into().unwrap());
205 let len = u32::from_le_bytes(self.data[doc_pos + 8..doc_pos + 12].try_into().unwrap());
206
207 let block_data = self.get_block(block_idx);
209 let start = within_offset as usize;
210 let end = start + len as usize;
211 Some(block_data[start..end].to_vec())
212 }
213
214 fn get_block(&self, block_idx: u32) -> Vec<u8> {
215 {
217 let cache = self.cached_block.lock().unwrap();
218 if let Some((cached_idx, ref data)) = *cache {
219 if cached_idx == block_idx {
220 return data.clone();
221 }
222 }
223 }
224
225 let meta_pos = self.block_meta_start + block_idx as usize * 16;
227 let offset =
228 u64::from_le_bytes(self.data[meta_pos..meta_pos + 8].try_into().unwrap()) as usize;
229 let compressed_len =
230 u32::from_le_bytes(self.data[meta_pos + 8..meta_pos + 12].try_into().unwrap()) as usize;
231
232 let compressed = &self.data[offset..offset + compressed_len];
234 let decompressed =
235 lz4_flex::decompress_size_prepended(compressed).expect("LZ4 decompression failed");
236
237 *self.cached_block.lock().unwrap() = Some((block_idx, decompressed.clone()));
239
240 decompressed
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn single_doc() {
250 let mut writer = DocStoreWriter::new();
251 writer.add(b"hello world");
252 let data = writer.finish();
253
254 let reader = DocStoreReader::open(&data);
255 assert_eq!(reader.doc_count(), 1);
256 assert_eq!(reader.get(0).unwrap(), b"hello world");
257 assert_eq!(reader.get(1), None);
258 }
259
260 #[test]
261 fn multiple_docs() {
262 let mut writer = DocStoreWriter::new();
263 writer.add(b"doc zero");
264 writer.add(b"doc one");
265 writer.add(b"doc two");
266 let data = writer.finish();
267
268 let reader = DocStoreReader::open(&data);
269 assert_eq!(reader.doc_count(), 3);
270 assert_eq!(reader.get(0).unwrap(), b"doc zero");
271 assert_eq!(reader.get(1).unwrap(), b"doc one");
272 assert_eq!(reader.get(2).unwrap(), b"doc two");
273 }
274
275 #[test]
276 fn empty_doc() {
277 let mut writer = DocStoreWriter::new();
278 writer.add(b"");
279 writer.add(b"nonempty");
280 writer.add(b"");
281 let data = writer.finish();
282
283 let reader = DocStoreReader::open(&data);
284 assert_eq!(reader.get(0).unwrap(), b"");
285 assert_eq!(reader.get(1).unwrap(), b"nonempty");
286 assert_eq!(reader.get(2).unwrap(), b"");
287 }
288
289 #[test]
290 fn multi_block_spanning() {
291 let mut writer = DocStoreWriter::new();
293 let doc = vec![b'x'; 4096]; for _ in 0..20 {
295 writer.add(&doc);
296 }
297 let data = writer.finish();
298
299 let reader = DocStoreReader::open(&data);
300 assert_eq!(reader.doc_count(), 20);
301 for i in 0..20 {
302 assert_eq!(reader.get(i).unwrap(), doc);
303 }
304 }
305
306 #[test]
307 fn large_doc_filling_block() {
308 let mut writer = DocStoreWriter::new();
309 let large_doc = vec![b'A'; BLOCK_SIZE]; writer.add(&large_doc);
311 writer.add(b"small");
312 let data = writer.finish();
313
314 let reader = DocStoreReader::open(&data);
315 assert_eq!(reader.get(0).unwrap(), large_doc);
316 assert_eq!(reader.get(1).unwrap(), b"small");
317 }
318
319 #[test]
320 fn compression_reduces_size() {
321 let mut writer = DocStoreWriter::new();
322 let doc = vec![0u8; BLOCK_SIZE * 2];
324 writer.add(&doc);
325 let data = writer.finish();
326
327 assert!(
329 data.len() < doc.len(),
330 "compressed size {} should be less than raw {}",
331 data.len(),
332 doc.len()
333 );
334 }
335
336 #[test]
337 fn all_docs_retrievable() {
338 let mut writer = DocStoreWriter::new();
339 let docs: Vec<Vec<u8>> = (0..100)
340 .map(|i| format!("{{\"id\": {i}, \"text\": \"document number {i}\"}}").into_bytes())
341 .collect();
342
343 for doc in &docs {
344 writer.add(doc);
345 }
346 let data = writer.finish();
347
348 let reader = DocStoreReader::open(&data);
349 assert_eq!(reader.doc_count(), 100);
350 for (i, expected) in docs.iter().enumerate() {
351 assert_eq!(&reader.get(i as u32).unwrap(), expected);
352 }
353 }
354
355 #[test]
356 fn out_of_range_returns_none() {
357 let mut writer = DocStoreWriter::new();
358 writer.add(b"only one");
359 let data = writer.finish();
360
361 let reader = DocStoreReader::open(&data);
362 assert_eq!(reader.get(1), None);
363 assert_eq!(reader.get(100), None);
364 assert_eq!(reader.get(u32::MAX), None);
365 }
366
367 #[test]
368 fn empty_store() {
369 let writer = DocStoreWriter::new();
370 let data = writer.finish();
371
372 let reader = DocStoreReader::open(&data);
373 assert_eq!(reader.doc_count(), 0);
374 assert_eq!(reader.get(0), None);
375 }
376
377 #[test]
378 fn json_docs_round_trip() {
379 let mut writer = DocStoreWriter::new();
380 let json1 = br#"{"title":"hello","body":"world"}"#;
381 let json2 = br#"{"title":"foo","body":"bar baz"}"#;
382 writer.add(json1);
383 writer.add(json2);
384 let data = writer.finish();
385
386 let reader = DocStoreReader::open(&data);
387 let doc1: serde_json::Value = serde_json::from_slice(&reader.get(0).unwrap()).unwrap();
388 assert_eq!(doc1["title"], "hello");
389 let doc2: serde_json::Value = serde_json::from_slice(&reader.get(1).unwrap()).unwrap();
390 assert_eq!(doc2["body"], "bar baz");
391 }
392
393 #[test]
394 fn block_cache_hit() {
395 let mut writer = DocStoreWriter::new();
397 writer.add(b"doc A");
398 writer.add(b"doc B");
399 let data = writer.finish();
400
401 let reader = DocStoreReader::open(&data);
402 assert_eq!(reader.get(0).unwrap(), b"doc A");
403 assert_eq!(reader.get(1).unwrap(), b"doc B");
405 }
406
407 #[test]
408 fn varying_doc_sizes() {
409 let mut writer = DocStoreWriter::new();
410 writer.add(b"x");
411 writer.add(&vec![b'y'; 1000]);
412 writer.add(b"z");
413 writer.add(&vec![b'w'; 8000]);
414 let data = writer.finish();
415
416 let reader = DocStoreReader::open(&data);
417 assert_eq!(reader.get(0).unwrap(), b"x");
418 assert_eq!(reader.get(1).unwrap(), vec![b'y'; 1000]);
419 assert_eq!(reader.get(2).unwrap(), b"z");
420 assert_eq!(reader.get(3).unwrap(), vec![b'w'; 8000]);
421 }
422}