1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
35
36pub struct StoreWriter<'a> {
38 writer: &'a mut dyn Write,
39 block_buffer: Vec<u8>,
40 index: Vec<StoreBlockIndex>,
41 current_offset: u64,
42 next_doc_id: DocId,
43 block_first_doc: DocId,
44 dict: Option<CompressionDict>,
45 compression_level: CompressionLevel,
46}
47
48impl<'a> StoreWriter<'a> {
49 pub fn new(writer: &'a mut dyn Write) -> Self {
51 Self::with_compression_level(writer, DEFAULT_COMPRESSION_LEVEL)
52 }
53
54 pub fn with_compression_level(
56 writer: &'a mut dyn Write,
57 compression_level: CompressionLevel,
58 ) -> Self {
59 Self {
60 writer,
61 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
62 index: Vec::new(),
63 current_offset: 0,
64 next_doc_id: 0,
65 block_first_doc: 0,
66 dict: None,
67 compression_level,
68 }
69 }
70
71 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
73 Self::with_dict_and_level(writer, dict, DEFAULT_COMPRESSION_LEVEL)
74 }
75
76 pub fn with_dict_and_level(
78 writer: &'a mut dyn Write,
79 dict: CompressionDict,
80 compression_level: CompressionLevel,
81 ) -> Self {
82 Self {
83 writer,
84 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
85 index: Vec::new(),
86 current_offset: 0,
87 next_doc_id: 0,
88 block_first_doc: 0,
89 dict: Some(dict),
90 compression_level,
91 }
92 }
93
94 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
95 let doc_id = self.next_doc_id;
96 self.next_doc_id += 1;
97
98 let doc_bytes = serialize_document(doc, schema)?;
99
100 self.block_buffer
101 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
102 self.block_buffer.extend_from_slice(&doc_bytes);
103
104 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
105 self.flush_block()?;
106 }
107
108 Ok(doc_id)
109 }
110
111 fn flush_block(&mut self) -> io::Result<()> {
112 if self.block_buffer.is_empty() {
113 return Ok(());
114 }
115
116 let num_docs = self.next_doc_id - self.block_first_doc;
117
118 let compressed = if let Some(ref dict) = self.dict {
120 crate::compression::compress_with_dict(
121 &self.block_buffer,
122 self.compression_level,
123 dict,
124 )?
125 } else {
126 crate::compression::compress(&self.block_buffer, self.compression_level)?
127 };
128
129 self.index.push(StoreBlockIndex {
130 first_doc_id: self.block_first_doc,
131 offset: self.current_offset,
132 length: compressed.len() as u32,
133 num_docs,
134 });
135
136 self.writer.write_all(&compressed)?;
137 self.current_offset += compressed.len() as u64;
138
139 self.block_buffer.clear();
140 self.block_first_doc = self.next_doc_id;
141
142 Ok(())
143 }
144
145 pub fn finish(mut self) -> io::Result<u32> {
146 self.flush_block()?;
147
148 let data_end_offset = self.current_offset;
149
150 let dict_offset = if let Some(ref dict) = self.dict {
152 let offset = self.current_offset;
153 let dict_bytes = dict.as_bytes();
154 self.writer
155 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
156 self.writer.write_all(dict_bytes)?;
157 self.current_offset += 4 + dict_bytes.len() as u64;
158 Some(offset)
159 } else {
160 None
161 };
162
163 self.writer
165 .write_u32::<LittleEndian>(self.index.len() as u32)?;
166 for entry in &self.index {
167 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
168 self.writer.write_u64::<LittleEndian>(entry.offset)?;
169 self.writer.write_u32::<LittleEndian>(entry.length)?;
170 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
171 }
172
173 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
175 self.writer
176 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
177 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
178 self.writer
179 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
181 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
182
183 Ok(self.next_doc_id)
184 }
185}
186
187pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
188 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
189}
190
191#[cfg(feature = "native")]
193struct CompressedBlock {
194 seq: usize,
195 first_doc_id: DocId,
196 num_docs: u32,
197 compressed: Vec<u8>,
198}
199
200#[cfg(feature = "native")]
208pub struct EagerParallelStoreWriter<'a> {
209 writer: &'a mut dyn Write,
210 block_buffer: Vec<u8>,
211 compressed_blocks: Vec<CompressedBlock>,
213 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
215 next_seq: usize,
216 next_doc_id: DocId,
217 block_first_doc: DocId,
218 dict: Option<Arc<CompressionDict>>,
219 compression_level: CompressionLevel,
220}
221
222#[cfg(feature = "native")]
223impl<'a> EagerParallelStoreWriter<'a> {
224 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
226 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
227 }
228
229 pub fn with_compression_level(
231 writer: &'a mut dyn Write,
232 _num_threads: usize,
233 compression_level: CompressionLevel,
234 ) -> Self {
235 Self {
236 writer,
237 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
238 compressed_blocks: Vec::new(),
239 pending_handles: Vec::new(),
240 next_seq: 0,
241 next_doc_id: 0,
242 block_first_doc: 0,
243 dict: None,
244 compression_level,
245 }
246 }
247
248 pub fn with_dict(
250 writer: &'a mut dyn Write,
251 dict: CompressionDict,
252 _num_threads: usize,
253 ) -> Self {
254 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
255 }
256
257 pub fn with_dict_and_level(
259 writer: &'a mut dyn Write,
260 dict: CompressionDict,
261 _num_threads: usize,
262 compression_level: CompressionLevel,
263 ) -> Self {
264 Self {
265 writer,
266 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
267 compressed_blocks: Vec::new(),
268 pending_handles: Vec::new(),
269 next_seq: 0,
270 next_doc_id: 0,
271 block_first_doc: 0,
272 dict: Some(Arc::new(dict)),
273 compression_level,
274 }
275 }
276
277 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
278 let doc_id = self.next_doc_id;
279 self.next_doc_id += 1;
280
281 let doc_bytes = serialize_document(doc, schema)?;
282
283 self.block_buffer
284 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
285 self.block_buffer.extend_from_slice(&doc_bytes);
286
287 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
288 self.spawn_compression();
289 }
290
291 Ok(doc_id)
292 }
293
294 fn spawn_compression(&mut self) {
296 if self.block_buffer.is_empty() {
297 return;
298 }
299
300 let num_docs = self.next_doc_id - self.block_first_doc;
301 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
302 let seq = self.next_seq;
303 let first_doc_id = self.block_first_doc;
304 let dict = self.dict.clone();
305
306 self.next_seq += 1;
307 self.block_first_doc = self.next_doc_id;
308
309 let level = self.compression_level;
311 let handle = std::thread::spawn(move || {
312 let compressed = if let Some(ref d) = dict {
313 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
314 } else {
315 crate::compression::compress(&data, level).expect("compression failed")
316 };
317
318 CompressedBlock {
319 seq,
320 first_doc_id,
321 num_docs,
322 compressed,
323 }
324 });
325
326 self.pending_handles.push(handle);
327 }
328
329 fn collect_completed(&mut self) {
331 let mut remaining = Vec::new();
332 for handle in self.pending_handles.drain(..) {
333 if handle.is_finished() {
334 if let Ok(block) = handle.join() {
335 self.compressed_blocks.push(block);
336 }
337 } else {
338 remaining.push(handle);
339 }
340 }
341 self.pending_handles = remaining;
342 }
343
344 pub fn finish(mut self) -> io::Result<u32> {
345 self.spawn_compression();
347
348 self.collect_completed();
350
351 for handle in self.pending_handles.drain(..) {
353 if let Ok(block) = handle.join() {
354 self.compressed_blocks.push(block);
355 }
356 }
357
358 if self.compressed_blocks.is_empty() {
359 let data_end_offset = 0u64;
361 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
363 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
367 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
368 return Ok(0);
369 }
370
371 self.compressed_blocks.sort_by_key(|b| b.seq);
373
374 let mut index = Vec::with_capacity(self.compressed_blocks.len());
376 let mut current_offset = 0u64;
377
378 for block in &self.compressed_blocks {
379 index.push(StoreBlockIndex {
380 first_doc_id: block.first_doc_id,
381 offset: current_offset,
382 length: block.compressed.len() as u32,
383 num_docs: block.num_docs,
384 });
385
386 self.writer.write_all(&block.compressed)?;
387 current_offset += block.compressed.len() as u64;
388 }
389
390 let data_end_offset = current_offset;
391
392 let dict_offset = if let Some(ref dict) = self.dict {
394 let offset = current_offset;
395 let dict_bytes = dict.as_bytes();
396 self.writer
397 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
398 self.writer.write_all(dict_bytes)?;
399 Some(offset)
400 } else {
401 None
402 };
403
404 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
406 for entry in &index {
407 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
408 self.writer.write_u64::<LittleEndian>(entry.offset)?;
409 self.writer.write_u32::<LittleEndian>(entry.length)?;
410 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
411 }
412
413 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
415 self.writer
416 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
417 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
418 self.writer
419 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
420 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
421 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
422
423 Ok(self.next_doc_id)
424 }
425}
426
427#[derive(Debug, Clone)]
429struct StoreBlockIndex {
430 first_doc_id: DocId,
431 offset: u64,
432 length: u32,
433 num_docs: u32,
434}
435
436pub struct AsyncStoreReader {
438 data_slice: LazyFileSlice,
440 index: Vec<StoreBlockIndex>,
442 num_docs: u32,
443 dict: Option<CompressionDict>,
445 cache: RwLock<StoreBlockCache>,
447}
448
449struct StoreBlockCache {
450 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
451 access_order: Vec<DocId>,
452 max_blocks: usize,
453}
454
455impl StoreBlockCache {
456 fn new(max_blocks: usize) -> Self {
457 Self {
458 blocks: FxHashMap::default(),
459 access_order: Vec::new(),
460 max_blocks,
461 }
462 }
463
464 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
465 if let Some(block) = self.blocks.get(&first_doc_id) {
466 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
467 self.access_order.remove(pos);
468 self.access_order.push(first_doc_id);
469 }
470 Some(Arc::clone(block))
471 } else {
472 None
473 }
474 }
475
476 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
477 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
478 let evict = self.access_order.remove(0);
479 self.blocks.remove(&evict);
480 }
481 self.blocks.insert(first_doc_id, block);
482 self.access_order.push(first_doc_id);
483 }
484}
485
486impl AsyncStoreReader {
487 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
490 let file_len = file_handle.len();
491 if file_len < 32 {
493 return Err(io::Error::new(
494 io::ErrorKind::InvalidData,
495 "Store too small",
496 ));
497 }
498
499 let footer = file_handle
501 .read_bytes_range(file_len - 32..file_len)
502 .await?;
503 let mut reader = footer.as_slice();
504 let data_end_offset = reader.read_u64::<LittleEndian>()?;
505 let dict_offset = reader.read_u64::<LittleEndian>()?;
506 let num_docs = reader.read_u32::<LittleEndian>()?;
507 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
508 let version = reader.read_u32::<LittleEndian>()?;
509 let magic = reader.read_u32::<LittleEndian>()?;
510
511 if magic != STORE_MAGIC {
512 return Err(io::Error::new(
513 io::ErrorKind::InvalidData,
514 "Invalid store magic",
515 ));
516 }
517 if version != STORE_VERSION {
518 return Err(io::Error::new(
519 io::ErrorKind::InvalidData,
520 format!("Unsupported store version: {}", version),
521 ));
522 }
523
524 let dict = if has_dict && dict_offset > 0 {
526 let dict_start = dict_offset as usize;
527 let dict_len_bytes = file_handle
528 .read_bytes_range(dict_start..dict_start + 4)
529 .await?;
530 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
531 let dict_bytes = file_handle
532 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
533 .await?;
534 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
535 } else {
536 None
537 };
538
539 let index_start = if has_dict && dict_offset > 0 {
541 let dict_start = dict_offset as usize;
542 let dict_len_bytes = file_handle
543 .read_bytes_range(dict_start..dict_start + 4)
544 .await?;
545 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
546 dict_start + 4 + dict_len
547 } else {
548 data_end_offset as usize
549 };
550 let index_end = file_len - 32;
551
552 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
553 let mut reader = index_bytes.as_slice();
554
555 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
556 let mut index = Vec::with_capacity(num_blocks);
557
558 for _ in 0..num_blocks {
559 let first_doc_id = reader.read_u32::<LittleEndian>()?;
560 let offset = reader.read_u64::<LittleEndian>()?;
561 let length = reader.read_u32::<LittleEndian>()?;
562 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
563
564 index.push(StoreBlockIndex {
565 first_doc_id,
566 offset,
567 length,
568 num_docs: num_docs_in_block,
569 });
570 }
571
572 let data_slice = file_handle.slice(0..data_end_offset as usize);
574
575 Ok(Self {
576 data_slice,
577 index,
578 num_docs,
579 dict,
580 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
581 })
582 }
583
584 pub fn num_docs(&self) -> u32 {
586 self.num_docs
587 }
588
589 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
591 if doc_id >= self.num_docs {
592 return Ok(None);
593 }
594
595 let block_idx = self
597 .index
598 .binary_search_by(|entry| {
599 if doc_id < entry.first_doc_id {
600 std::cmp::Ordering::Greater
601 } else if doc_id >= entry.first_doc_id + entry.num_docs {
602 std::cmp::Ordering::Less
603 } else {
604 std::cmp::Ordering::Equal
605 }
606 })
607 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
608
609 let entry = &self.index[block_idx];
610 let block_data = self.load_block(entry).await?;
611
612 let doc_offset_in_block = doc_id - entry.first_doc_id;
614 let mut reader = &block_data[..];
615
616 for _ in 0..doc_offset_in_block {
617 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
618 if doc_len > reader.len() {
619 return Err(io::Error::new(
620 io::ErrorKind::InvalidData,
621 "Invalid doc length",
622 ));
623 }
624 reader = &reader[doc_len..];
625 }
626
627 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
628 let doc_bytes = &reader[..doc_len];
629
630 deserialize_document(doc_bytes, schema).map(Some)
631 }
632
633 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
634 {
636 let mut cache = self.cache.write();
637 if let Some(block) = cache.get(entry.first_doc_id) {
638 return Ok(block);
639 }
640 }
641
642 let start = entry.offset as usize;
644 let end = start + entry.length as usize;
645 let compressed = self.data_slice.read_bytes_range(start..end).await?;
646
647 let decompressed = if let Some(ref dict) = self.dict {
649 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
650 } else {
651 crate::compression::decompress(compressed.as_slice())?
652 };
653
654 let block = Arc::new(decompressed);
655
656 {
658 let mut cache = self.cache.write();
659 cache.insert(entry.first_doc_id, Arc::clone(&block));
660 }
661
662 Ok(block)
663 }
664}
665
666pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
667 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
668}
669
670#[derive(Debug, Clone)]
672pub struct RawStoreBlock {
673 pub first_doc_id: DocId,
674 pub num_docs: u32,
675 pub offset: u64,
676 pub length: u32,
677}
678
679pub struct StoreMerger<'a, W: Write> {
690 writer: &'a mut W,
691 index: Vec<StoreBlockIndex>,
692 current_offset: u64,
693 next_doc_id: DocId,
694}
695
696impl<'a, W: Write> StoreMerger<'a, W> {
697 pub fn new(writer: &'a mut W) -> Self {
698 Self {
699 writer,
700 index: Vec::new(),
701 current_offset: 0,
702 next_doc_id: 0,
703 }
704 }
705
706 pub async fn append_store<F: AsyncFileRead>(
711 &mut self,
712 data_slice: &F,
713 blocks: &[RawStoreBlock],
714 ) -> io::Result<()> {
715 for block in blocks {
716 let start = block.offset as usize;
718 let end = start + block.length as usize;
719 let compressed_data = data_slice.read_bytes_range(start..end).await?;
720
721 self.writer.write_all(compressed_data.as_slice())?;
723
724 self.index.push(StoreBlockIndex {
726 first_doc_id: self.next_doc_id,
727 offset: self.current_offset,
728 length: block.length,
729 num_docs: block.num_docs,
730 });
731
732 self.current_offset += block.length as u64;
733 self.next_doc_id += block.num_docs;
734 }
735
736 Ok(())
737 }
738
739 pub fn finish(self) -> io::Result<u32> {
741 let data_end_offset = self.current_offset;
742
743 let dict_offset = 0u64;
745
746 self.writer
748 .write_u32::<LittleEndian>(self.index.len() as u32)?;
749 for entry in &self.index {
750 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
751 self.writer.write_u64::<LittleEndian>(entry.offset)?;
752 self.writer.write_u32::<LittleEndian>(entry.length)?;
753 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
754 }
755
756 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
758 self.writer.write_u64::<LittleEndian>(dict_offset)?;
759 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
760 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
762 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
763
764 Ok(self.next_doc_id)
765 }
766}
767
768impl AsyncStoreReader {
769 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
771 self.index
772 .iter()
773 .map(|entry| RawStoreBlock {
774 first_doc_id: entry.first_doc_id,
775 num_docs: entry.num_docs,
776 offset: entry.offset,
777 length: entry.length,
778 })
779 .collect()
780 }
781
782 pub fn data_slice(&self) -> &LazyFileSlice {
784 &self.data_slice
785 }
786
787 pub fn has_dict(&self) -> bool {
789 self.dict.is_some()
790 }
791}