1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36pub struct StoreWriter<'a> {
38 writer: &'a mut dyn Write,
39 block_buffer: Vec<u8>,
40 index: Vec<StoreBlockIndex>,
41 current_offset: u64,
42 next_doc_id: DocId,
43 block_first_doc: DocId,
44 dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48 pub fn new(writer: &'a mut dyn Write) -> Self {
50 Self {
51 writer,
52 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53 index: Vec::new(),
54 current_offset: 0,
55 next_doc_id: 0,
56 block_first_doc: 0,
57 dict: None,
58 }
59 }
60
61 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63 Self {
64 writer,
65 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66 index: Vec::new(),
67 current_offset: 0,
68 next_doc_id: 0,
69 block_first_doc: 0,
70 dict: Some(dict),
71 }
72 }
73
74 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75 let doc_id = self.next_doc_id;
76 self.next_doc_id += 1;
77
78 let doc_bytes = serialize_document(doc, schema)?;
79
80 self.block_buffer
81 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82 self.block_buffer.extend_from_slice(&doc_bytes);
83
84 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85 self.flush_block()?;
86 }
87
88 Ok(doc_id)
89 }
90
91 fn flush_block(&mut self) -> io::Result<()> {
92 if self.block_buffer.is_empty() {
93 return Ok(());
94 }
95
96 let num_docs = self.next_doc_id - self.block_first_doc;
97
98 let compressed = if let Some(ref dict) = self.dict {
100 crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101 } else {
102 crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103 };
104
105 self.index.push(StoreBlockIndex {
106 first_doc_id: self.block_first_doc,
107 offset: self.current_offset,
108 length: compressed.len() as u32,
109 num_docs,
110 });
111
112 self.writer.write_all(&compressed)?;
113 self.current_offset += compressed.len() as u64;
114
115 self.block_buffer.clear();
116 self.block_first_doc = self.next_doc_id;
117
118 Ok(())
119 }
120
121 pub fn finish(mut self) -> io::Result<u32> {
122 self.flush_block()?;
123
124 let data_end_offset = self.current_offset;
125
126 let dict_offset = if let Some(ref dict) = self.dict {
128 let offset = self.current_offset;
129 let dict_bytes = dict.as_bytes();
130 self.writer
131 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132 self.writer.write_all(dict_bytes)?;
133 self.current_offset += 4 + dict_bytes.len() as u64;
134 Some(offset)
135 } else {
136 None
137 };
138
139 self.writer
141 .write_u32::<LittleEndian>(self.index.len() as u32)?;
142 for entry in &self.index {
143 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144 self.writer.write_u64::<LittleEndian>(entry.offset)?;
145 self.writer.write_u32::<LittleEndian>(entry.length)?;
146 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147 }
148
149 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151 self.writer
152 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154 self.writer
155 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159 Ok(self.next_doc_id)
160 }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167struct PendingBlock {
169 seq: usize,
170 first_doc_id: DocId,
171 num_docs: u32,
172 data: Vec<u8>,
173}
174
175struct CompressedBlock {
177 seq: usize,
178 first_doc_id: DocId,
179 num_docs: u32,
180 compressed: Vec<u8>,
181}
182
183#[cfg(feature = "native")]
188pub struct ParallelStoreWriter<'a> {
189 writer: &'a mut dyn Write,
190 block_buffer: Vec<u8>,
191 pending_blocks: Vec<PendingBlock>,
192 next_seq: usize,
193 next_doc_id: DocId,
194 block_first_doc: DocId,
195 dict: Option<CompressionDict>,
196 #[allow(dead_code)]
197 num_threads: usize,
198}
199
200#[cfg(feature = "native")]
201impl<'a> ParallelStoreWriter<'a> {
202 pub fn new(writer: &'a mut dyn Write, num_threads: usize) -> Self {
204 Self {
205 writer,
206 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
207 pending_blocks: Vec::new(),
208 next_seq: 0,
209 next_doc_id: 0,
210 block_first_doc: 0,
211 dict: None,
212 num_threads: num_threads.max(1),
213 }
214 }
215
216 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict, num_threads: usize) -> Self {
218 Self {
219 writer,
220 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
221 pending_blocks: Vec::new(),
222 next_seq: 0,
223 next_doc_id: 0,
224 block_first_doc: 0,
225 dict: Some(dict),
226 num_threads: num_threads.max(1),
227 }
228 }
229
230 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
231 let doc_id = self.next_doc_id;
232 self.next_doc_id += 1;
233
234 let doc_bytes = serialize_document(doc, schema)?;
235
236 self.block_buffer
237 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
238 self.block_buffer.extend_from_slice(&doc_bytes);
239
240 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
241 self.queue_block();
242 }
243
244 Ok(doc_id)
245 }
246
247 fn queue_block(&mut self) {
248 if self.block_buffer.is_empty() {
249 return;
250 }
251
252 let num_docs = self.next_doc_id - self.block_first_doc;
253 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
254
255 self.pending_blocks.push(PendingBlock {
256 seq: self.next_seq,
257 first_doc_id: self.block_first_doc,
258 num_docs,
259 data,
260 });
261
262 self.next_seq += 1;
263 self.block_first_doc = self.next_doc_id;
264 }
265
266 pub fn finish(mut self) -> io::Result<u32> {
267 self.queue_block();
269
270 if self.pending_blocks.is_empty() {
271 let data_end_offset = 0u64;
273 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
275 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
279 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
280 return Ok(0);
281 }
282
283 let dict = self.dict.clone();
285 let compressed_blocks: Vec<CompressedBlock> = {
286 use rayon::prelude::*;
287
288 self.pending_blocks
289 .into_par_iter()
290 .map(|block| {
291 let compressed = if let Some(ref d) = dict {
292 crate::compression::compress_with_dict(&block.data, COMPRESSION_LEVEL, d)
293 .expect("compression failed")
294 } else {
295 crate::compression::compress(&block.data, COMPRESSION_LEVEL)
296 .expect("compression failed")
297 };
298
299 CompressedBlock {
300 seq: block.seq,
301 first_doc_id: block.first_doc_id,
302 num_docs: block.num_docs,
303 compressed,
304 }
305 })
306 .collect()
307 };
308
309 let mut sorted_blocks = compressed_blocks;
311 sorted_blocks.sort_by_key(|b| b.seq);
312
313 let mut index = Vec::with_capacity(sorted_blocks.len());
315 let mut current_offset = 0u64;
316
317 for block in sorted_blocks {
318 index.push(StoreBlockIndex {
319 first_doc_id: block.first_doc_id,
320 offset: current_offset,
321 length: block.compressed.len() as u32,
322 num_docs: block.num_docs,
323 });
324
325 self.writer.write_all(&block.compressed)?;
326 current_offset += block.compressed.len() as u64;
327 }
328
329 let data_end_offset = current_offset;
330
331 let dict_offset = if let Some(ref dict) = self.dict {
333 let offset = current_offset;
334 let dict_bytes = dict.as_bytes();
335 self.writer
336 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
337 self.writer.write_all(dict_bytes)?;
338 let _ = current_offset + 4 + dict_bytes.len() as u64; Some(offset)
340 } else {
341 None
342 };
343
344 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
346 for entry in &index {
347 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
348 self.writer.write_u64::<LittleEndian>(entry.offset)?;
349 self.writer.write_u32::<LittleEndian>(entry.length)?;
350 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
351 }
352
353 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
355 self.writer
356 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
357 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
358 self.writer
359 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
360 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
361 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
362
363 Ok(self.next_doc_id)
364 }
365}
366
367#[derive(Debug, Clone)]
369struct StoreBlockIndex {
370 first_doc_id: DocId,
371 offset: u64,
372 length: u32,
373 num_docs: u32,
374}
375
376pub struct AsyncStoreReader {
378 data_slice: LazyFileSlice,
380 index: Vec<StoreBlockIndex>,
382 num_docs: u32,
383 dict: Option<CompressionDict>,
385 cache: RwLock<StoreBlockCache>,
387}
388
389struct StoreBlockCache {
390 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
391 access_order: Vec<DocId>,
392 max_blocks: usize,
393}
394
395impl StoreBlockCache {
396 fn new(max_blocks: usize) -> Self {
397 Self {
398 blocks: FxHashMap::default(),
399 access_order: Vec::new(),
400 max_blocks,
401 }
402 }
403
404 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
405 if let Some(block) = self.blocks.get(&first_doc_id) {
406 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
407 self.access_order.remove(pos);
408 self.access_order.push(first_doc_id);
409 }
410 Some(Arc::clone(block))
411 } else {
412 None
413 }
414 }
415
416 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
417 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
418 let evict = self.access_order.remove(0);
419 self.blocks.remove(&evict);
420 }
421 self.blocks.insert(first_doc_id, block);
422 self.access_order.push(first_doc_id);
423 }
424}
425
426impl AsyncStoreReader {
427 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
430 let file_len = file_handle.len();
431 if file_len < 32 {
433 return Err(io::Error::new(
434 io::ErrorKind::InvalidData,
435 "Store too small",
436 ));
437 }
438
439 let footer = file_handle
441 .read_bytes_range(file_len - 32..file_len)
442 .await?;
443 let mut reader = footer.as_slice();
444 let data_end_offset = reader.read_u64::<LittleEndian>()?;
445 let dict_offset = reader.read_u64::<LittleEndian>()?;
446 let num_docs = reader.read_u32::<LittleEndian>()?;
447 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
448 let version = reader.read_u32::<LittleEndian>()?;
449 let magic = reader.read_u32::<LittleEndian>()?;
450
451 if magic != STORE_MAGIC {
452 return Err(io::Error::new(
453 io::ErrorKind::InvalidData,
454 "Invalid store magic",
455 ));
456 }
457 if version != STORE_VERSION {
458 return Err(io::Error::new(
459 io::ErrorKind::InvalidData,
460 format!("Unsupported store version: {}", version),
461 ));
462 }
463
464 let dict = if has_dict && dict_offset > 0 {
466 let dict_start = dict_offset as usize;
467 let dict_len_bytes = file_handle
468 .read_bytes_range(dict_start..dict_start + 4)
469 .await?;
470 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
471 let dict_bytes = file_handle
472 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
473 .await?;
474 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
475 } else {
476 None
477 };
478
479 let index_start = if has_dict && dict_offset > 0 {
481 let dict_start = dict_offset as usize;
482 let dict_len_bytes = file_handle
483 .read_bytes_range(dict_start..dict_start + 4)
484 .await?;
485 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
486 dict_start + 4 + dict_len
487 } else {
488 data_end_offset as usize
489 };
490 let index_end = file_len - 32;
491
492 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
493 let mut reader = index_bytes.as_slice();
494
495 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
496 let mut index = Vec::with_capacity(num_blocks);
497
498 for _ in 0..num_blocks {
499 let first_doc_id = reader.read_u32::<LittleEndian>()?;
500 let offset = reader.read_u64::<LittleEndian>()?;
501 let length = reader.read_u32::<LittleEndian>()?;
502 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
503
504 index.push(StoreBlockIndex {
505 first_doc_id,
506 offset,
507 length,
508 num_docs: num_docs_in_block,
509 });
510 }
511
512 let data_slice = file_handle.slice(0..data_end_offset as usize);
514
515 Ok(Self {
516 data_slice,
517 index,
518 num_docs,
519 dict,
520 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
521 })
522 }
523
524 pub fn num_docs(&self) -> u32 {
526 self.num_docs
527 }
528
529 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
531 if doc_id >= self.num_docs {
532 return Ok(None);
533 }
534
535 let block_idx = self
537 .index
538 .binary_search_by(|entry| {
539 if doc_id < entry.first_doc_id {
540 std::cmp::Ordering::Greater
541 } else if doc_id >= entry.first_doc_id + entry.num_docs {
542 std::cmp::Ordering::Less
543 } else {
544 std::cmp::Ordering::Equal
545 }
546 })
547 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
548
549 let entry = &self.index[block_idx];
550 let block_data = self.load_block(entry).await?;
551
552 let doc_offset_in_block = doc_id - entry.first_doc_id;
554 let mut reader = &block_data[..];
555
556 for _ in 0..doc_offset_in_block {
557 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
558 if doc_len > reader.len() {
559 return Err(io::Error::new(
560 io::ErrorKind::InvalidData,
561 "Invalid doc length",
562 ));
563 }
564 reader = &reader[doc_len..];
565 }
566
567 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
568 let doc_bytes = &reader[..doc_len];
569
570 deserialize_document(doc_bytes, schema).map(Some)
571 }
572
573 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
574 {
576 let mut cache = self.cache.write();
577 if let Some(block) = cache.get(entry.first_doc_id) {
578 return Ok(block);
579 }
580 }
581
582 let start = entry.offset as usize;
584 let end = start + entry.length as usize;
585 let compressed = self.data_slice.read_bytes_range(start..end).await?;
586
587 let decompressed = if let Some(ref dict) = self.dict {
589 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
590 } else {
591 crate::compression::decompress(compressed.as_slice())?
592 };
593
594 let block = Arc::new(decompressed);
595
596 {
598 let mut cache = self.cache.write();
599 cache.insert(entry.first_doc_id, Arc::clone(&block));
600 }
601
602 Ok(block)
603 }
604}
605
606fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
607 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
608}
609
610#[derive(Debug, Clone)]
612pub struct RawStoreBlock {
613 pub first_doc_id: DocId,
614 pub num_docs: u32,
615 pub offset: u64,
616 pub length: u32,
617}
618
619pub struct StoreMerger<'a, W: Write> {
630 writer: &'a mut W,
631 index: Vec<StoreBlockIndex>,
632 current_offset: u64,
633 next_doc_id: DocId,
634}
635
636impl<'a, W: Write> StoreMerger<'a, W> {
637 pub fn new(writer: &'a mut W) -> Self {
638 Self {
639 writer,
640 index: Vec::new(),
641 current_offset: 0,
642 next_doc_id: 0,
643 }
644 }
645
646 pub async fn append_store<F: AsyncFileRead>(
651 &mut self,
652 data_slice: &F,
653 blocks: &[RawStoreBlock],
654 ) -> io::Result<()> {
655 for block in blocks {
656 let start = block.offset as usize;
658 let end = start + block.length as usize;
659 let compressed_data = data_slice.read_bytes_range(start..end).await?;
660
661 self.writer.write_all(compressed_data.as_slice())?;
663
664 self.index.push(StoreBlockIndex {
666 first_doc_id: self.next_doc_id,
667 offset: self.current_offset,
668 length: block.length,
669 num_docs: block.num_docs,
670 });
671
672 self.current_offset += block.length as u64;
673 self.next_doc_id += block.num_docs;
674 }
675
676 Ok(())
677 }
678
679 pub fn finish(self) -> io::Result<u32> {
681 let data_end_offset = self.current_offset;
682
683 let dict_offset = 0u64;
685
686 self.writer
688 .write_u32::<LittleEndian>(self.index.len() as u32)?;
689 for entry in &self.index {
690 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
691 self.writer.write_u64::<LittleEndian>(entry.offset)?;
692 self.writer.write_u32::<LittleEndian>(entry.length)?;
693 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
694 }
695
696 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
698 self.writer.write_u64::<LittleEndian>(dict_offset)?;
699 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
700 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
702 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
703
704 Ok(self.next_doc_id)
705 }
706}
707
708impl AsyncStoreReader {
709 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
711 self.index
712 .iter()
713 .map(|entry| RawStoreBlock {
714 first_doc_id: entry.first_doc_id,
715 num_docs: entry.num_docs,
716 offset: entry.offset,
717 length: entry.length,
718 })
719 .collect()
720 }
721
722 pub fn data_slice(&self) -> &LazyFileSlice {
724 &self.data_slice
725 }
726
727 pub fn has_dict(&self) -> bool {
729 self.dict.is_some()
730 }
731}