1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; fn write_store_index_and_footer(
32 writer: &mut (impl Write + ?Sized),
33 index: &[StoreBlockIndex],
34 data_end_offset: u64,
35 dict_offset: u64,
36 num_docs: u32,
37 has_dict: bool,
38) -> io::Result<()> {
39 writer.write_u32::<LittleEndian>(index.len() as u32)?;
40 for entry in index {
41 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
42 writer.write_u64::<LittleEndian>(entry.offset)?;
43 writer.write_u32::<LittleEndian>(entry.length)?;
44 writer.write_u32::<LittleEndian>(entry.num_docs)?;
45 }
46 writer.write_u64::<LittleEndian>(data_end_offset)?;
47 writer.write_u64::<LittleEndian>(dict_offset)?;
48 writer.write_u32::<LittleEndian>(num_docs)?;
49 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
50 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
51 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
52 Ok(())
53}
54
55pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
58
59pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
61
62#[cfg(feature = "native")]
64const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
65
66pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78 use crate::dsl::FieldValue;
79
80 let stored: Vec<_> = doc
81 .field_values()
82 .iter()
83 .filter(|(field, value)| {
84 if matches!(value, crate::dsl::FieldValue::DenseVector(_)) {
86 return false;
87 }
88 schema.get_field_entry(*field).is_some_and(|e| e.stored)
89 })
90 .collect();
91
92 let mut buf = Vec::with_capacity(256);
93 buf.write_u16::<LittleEndian>(stored.len() as u16)?;
94
95 for (field, value) in &stored {
96 buf.write_u16::<LittleEndian>(field.0 as u16)?;
97 match value {
98 FieldValue::Text(s) => {
99 buf.push(0);
100 let bytes = s.as_bytes();
101 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
102 buf.extend_from_slice(bytes);
103 }
104 FieldValue::U64(v) => {
105 buf.push(1);
106 buf.write_u64::<LittleEndian>(*v)?;
107 }
108 FieldValue::I64(v) => {
109 buf.push(2);
110 buf.write_i64::<LittleEndian>(*v)?;
111 }
112 FieldValue::F64(v) => {
113 buf.push(3);
114 buf.write_f64::<LittleEndian>(*v)?;
115 }
116 FieldValue::Bytes(b) => {
117 buf.push(4);
118 buf.write_u32::<LittleEndian>(b.len() as u32)?;
119 buf.extend_from_slice(b);
120 }
121 FieldValue::SparseVector(entries) => {
122 buf.push(5);
123 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
124 for (idx, val) in entries {
125 buf.write_u32::<LittleEndian>(*idx)?;
126 buf.write_f32::<LittleEndian>(*val)?;
127 }
128 }
129 FieldValue::DenseVector(values) => {
130 buf.push(6);
131 buf.write_u32::<LittleEndian>(values.len() as u32)?;
132 let byte_slice = unsafe {
134 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
135 };
136 buf.extend_from_slice(byte_slice);
137 }
138 FieldValue::Json(v) => {
139 buf.push(7);
140 let json_bytes = serde_json::to_vec(v)
141 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
142 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
143 buf.extend_from_slice(&json_bytes);
144 }
145 }
146 }
147
148 Ok(buf)
149}
150
151#[cfg(feature = "native")]
153struct CompressedBlock {
154 seq: usize,
155 first_doc_id: DocId,
156 num_docs: u32,
157 compressed: Vec<u8>,
158}
159
160#[cfg(feature = "native")]
168pub struct EagerParallelStoreWriter<'a> {
169 writer: &'a mut dyn Write,
170 block_buffer: Vec<u8>,
171 compressed_blocks: Vec<CompressedBlock>,
173 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
175 next_seq: usize,
176 next_doc_id: DocId,
177 block_first_doc: DocId,
178 dict: Option<Arc<CompressionDict>>,
179 compression_level: CompressionLevel,
180}
181
182#[cfg(feature = "native")]
183impl<'a> EagerParallelStoreWriter<'a> {
184 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
186 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
187 }
188
189 pub fn with_compression_level(
191 writer: &'a mut dyn Write,
192 _num_threads: usize,
193 compression_level: CompressionLevel,
194 ) -> Self {
195 Self {
196 writer,
197 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
198 compressed_blocks: Vec::new(),
199 pending_handles: Vec::new(),
200 next_seq: 0,
201 next_doc_id: 0,
202 block_first_doc: 0,
203 dict: None,
204 compression_level,
205 }
206 }
207
208 pub fn with_dict(
210 writer: &'a mut dyn Write,
211 dict: CompressionDict,
212 _num_threads: usize,
213 ) -> Self {
214 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
215 }
216
217 pub fn with_dict_and_level(
219 writer: &'a mut dyn Write,
220 dict: CompressionDict,
221 _num_threads: usize,
222 compression_level: CompressionLevel,
223 ) -> Self {
224 Self {
225 writer,
226 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
227 compressed_blocks: Vec::new(),
228 pending_handles: Vec::new(),
229 next_seq: 0,
230 next_doc_id: 0,
231 block_first_doc: 0,
232 dict: Some(Arc::new(dict)),
233 compression_level,
234 }
235 }
236
237 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
238 let doc_bytes = serialize_document(doc, schema)?;
239 self.store_raw(&doc_bytes)
240 }
241
242 pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
244 let doc_id = self.next_doc_id;
245 self.next_doc_id += 1;
246
247 self.block_buffer
248 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
249 self.block_buffer.extend_from_slice(doc_bytes);
250
251 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
252 self.spawn_compression();
253 }
254
255 Ok(doc_id)
256 }
257
258 fn spawn_compression(&mut self) {
260 if self.block_buffer.is_empty() {
261 return;
262 }
263
264 let num_docs = self.next_doc_id - self.block_first_doc;
265 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
266 let seq = self.next_seq;
267 let first_doc_id = self.block_first_doc;
268 let dict = self.dict.clone();
269
270 self.next_seq += 1;
271 self.block_first_doc = self.next_doc_id;
272
273 let level = self.compression_level;
275 let handle = std::thread::spawn(move || {
276 let compressed = if let Some(ref d) = dict {
277 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
278 } else {
279 crate::compression::compress(&data, level).expect("compression failed")
280 };
281
282 CompressedBlock {
283 seq,
284 first_doc_id,
285 num_docs,
286 compressed,
287 }
288 });
289
290 self.pending_handles.push(handle);
291 }
292
293 fn collect_completed(&mut self) {
295 let mut remaining = Vec::new();
296 for handle in self.pending_handles.drain(..) {
297 if handle.is_finished() {
298 if let Ok(block) = handle.join() {
299 self.compressed_blocks.push(block);
300 }
301 } else {
302 remaining.push(handle);
303 }
304 }
305 self.pending_handles = remaining;
306 }
307
308 pub fn finish(mut self) -> io::Result<u32> {
309 self.spawn_compression();
311
312 self.collect_completed();
314
315 for handle in self.pending_handles.drain(..) {
317 if let Ok(block) = handle.join() {
318 self.compressed_blocks.push(block);
319 }
320 }
321
322 if self.compressed_blocks.is_empty() {
323 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
324 return Ok(0);
325 }
326
327 self.compressed_blocks.sort_by_key(|b| b.seq);
329
330 let mut index = Vec::with_capacity(self.compressed_blocks.len());
332 let mut current_offset = 0u64;
333
334 for block in &self.compressed_blocks {
335 index.push(StoreBlockIndex {
336 first_doc_id: block.first_doc_id,
337 offset: current_offset,
338 length: block.compressed.len() as u32,
339 num_docs: block.num_docs,
340 });
341
342 self.writer.write_all(&block.compressed)?;
343 current_offset += block.compressed.len() as u64;
344 }
345
346 let data_end_offset = current_offset;
347
348 let dict_offset = if let Some(ref dict) = self.dict {
350 let offset = current_offset;
351 let dict_bytes = dict.as_bytes();
352 self.writer
353 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
354 self.writer.write_all(dict_bytes)?;
355 Some(offset)
356 } else {
357 None
358 };
359
360 write_store_index_and_footer(
362 &mut self.writer,
363 &index,
364 data_end_offset,
365 dict_offset.unwrap_or(0),
366 self.next_doc_id,
367 self.dict.is_some(),
368 )?;
369
370 Ok(self.next_doc_id)
371 }
372}
373
374#[derive(Debug, Clone)]
376pub(crate) struct StoreBlockIndex {
377 pub(crate) first_doc_id: DocId,
378 pub(crate) offset: u64,
379 pub(crate) length: u32,
380 pub(crate) num_docs: u32,
381}
382
383pub struct AsyncStoreReader {
385 data_slice: LazyFileSlice,
387 index: Vec<StoreBlockIndex>,
389 num_docs: u32,
390 dict: Option<CompressionDict>,
392 cache: RwLock<StoreBlockCache>,
394}
395
396struct StoreBlockCache {
401 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
402 insert_order: std::collections::VecDeque<DocId>,
403 max_blocks: usize,
404}
405
406impl StoreBlockCache {
407 fn new(max_blocks: usize) -> Self {
408 Self {
409 blocks: FxHashMap::default(),
410 insert_order: std::collections::VecDeque::with_capacity(max_blocks),
411 max_blocks,
412 }
413 }
414
415 fn get(&self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
416 self.blocks.get(&first_doc_id).map(Arc::clone)
417 }
418
419 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
420 if self.blocks.contains_key(&first_doc_id) {
421 return; }
423 while self.blocks.len() >= self.max_blocks {
424 if let Some(evict) = self.insert_order.pop_front() {
425 self.blocks.remove(&evict);
426 } else {
427 break;
428 }
429 }
430 self.blocks.insert(first_doc_id, block);
431 self.insert_order.push_back(first_doc_id);
432 }
433}
434
435impl AsyncStoreReader {
436 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
439 let file_len = file_handle.len();
440 if file_len < 32 {
442 return Err(io::Error::new(
443 io::ErrorKind::InvalidData,
444 "Store too small",
445 ));
446 }
447
448 let footer = file_handle
450 .read_bytes_range(file_len - 32..file_len)
451 .await?;
452 let mut reader = footer.as_slice();
453 let data_end_offset = reader.read_u64::<LittleEndian>()?;
454 let dict_offset = reader.read_u64::<LittleEndian>()?;
455 let num_docs = reader.read_u32::<LittleEndian>()?;
456 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
457 let version = reader.read_u32::<LittleEndian>()?;
458 let magic = reader.read_u32::<LittleEndian>()?;
459
460 if magic != STORE_MAGIC {
461 return Err(io::Error::new(
462 io::ErrorKind::InvalidData,
463 "Invalid store magic",
464 ));
465 }
466 if version != STORE_VERSION {
467 return Err(io::Error::new(
468 io::ErrorKind::InvalidData,
469 format!("Unsupported store version: {}", version),
470 ));
471 }
472
473 let (dict, index_start) = if has_dict && dict_offset > 0 {
475 let dict_start = dict_offset;
476 let dict_len_bytes = file_handle
477 .read_bytes_range(dict_start..dict_start + 4)
478 .await?;
479 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
480 let dict_bytes = file_handle
481 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
482 .await?;
483 let idx_start = dict_start + 4 + dict_len;
484 (
485 Some(CompressionDict::from_bytes(dict_bytes.to_vec())),
486 idx_start,
487 )
488 } else {
489 (None, data_end_offset)
490 };
491 let index_end = file_len - 32;
492
493 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
494 let mut reader = index_bytes.as_slice();
495
496 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
497 let mut index = Vec::with_capacity(num_blocks);
498
499 for _ in 0..num_blocks {
500 let first_doc_id = reader.read_u32::<LittleEndian>()?;
501 let offset = reader.read_u64::<LittleEndian>()?;
502 let length = reader.read_u32::<LittleEndian>()?;
503 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
504
505 index.push(StoreBlockIndex {
506 first_doc_id,
507 offset,
508 length,
509 num_docs: num_docs_in_block,
510 });
511 }
512
513 let data_slice = file_handle.slice(0..data_end_offset);
515
516 Ok(Self {
517 data_slice,
518 index,
519 num_docs,
520 dict,
521 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
522 })
523 }
524
525 pub fn num_docs(&self) -> u32 {
527 self.num_docs
528 }
529
530 pub fn cached_blocks(&self) -> usize {
532 self.cache.read().blocks.len()
533 }
534
535 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
537 if doc_id >= self.num_docs {
538 return Ok(None);
539 }
540
541 let block_idx = self
543 .index
544 .binary_search_by(|entry| {
545 if doc_id < entry.first_doc_id {
546 std::cmp::Ordering::Greater
547 } else if doc_id >= entry.first_doc_id + entry.num_docs {
548 std::cmp::Ordering::Less
549 } else {
550 std::cmp::Ordering::Equal
551 }
552 })
553 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
554
555 let entry = &self.index[block_idx];
556 let block_data = self.load_block(entry).await?;
557
558 let doc_offset_in_block = doc_id - entry.first_doc_id;
560 let mut reader = &block_data[..];
561
562 for _ in 0..doc_offset_in_block {
563 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
564 if doc_len > reader.len() {
565 return Err(io::Error::new(
566 io::ErrorKind::InvalidData,
567 "Invalid doc length",
568 ));
569 }
570 reader = &reader[doc_len..];
571 }
572
573 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
574 let doc_bytes = &reader[..doc_len];
575
576 deserialize_document(doc_bytes, schema).map(Some)
577 }
578
579 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
580 {
582 if let Some(block) = self.cache.read().get(entry.first_doc_id) {
583 return Ok(block);
584 }
585 }
586
587 let start = entry.offset;
589 let end = start + entry.length as u64;
590 let compressed = self.data_slice.read_bytes_range(start..end).await?;
591
592 let decompressed = if let Some(ref dict) = self.dict {
594 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
595 } else {
596 crate::compression::decompress(compressed.as_slice())?
597 };
598
599 let block = Arc::new(decompressed);
600
601 {
603 let mut cache = self.cache.write();
604 cache.insert(entry.first_doc_id, Arc::clone(&block));
605 }
606
607 Ok(block)
608 }
609}
610
611pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
612 use crate::dsl::Field;
613
614 let mut reader = data;
615 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
616 let mut doc = Document::new();
617
618 for _ in 0..num_fields {
619 let field_id = reader.read_u16::<LittleEndian>()?;
620 let field = Field(field_id as u32);
621 let type_tag = reader.read_u8()?;
622
623 match type_tag {
624 0 => {
625 let len = reader.read_u32::<LittleEndian>()? as usize;
627 let s = std::str::from_utf8(&reader[..len])
628 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
629 doc.add_text(field, s);
630 reader = &reader[len..];
631 }
632 1 => {
633 doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
635 }
636 2 => {
637 doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
639 }
640 3 => {
641 doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
643 }
644 4 => {
645 let len = reader.read_u32::<LittleEndian>()? as usize;
647 doc.add_bytes(field, reader[..len].to_vec());
648 reader = &reader[len..];
649 }
650 5 => {
651 let count = reader.read_u32::<LittleEndian>()? as usize;
653 let mut entries = Vec::with_capacity(count);
654 for _ in 0..count {
655 let idx = reader.read_u32::<LittleEndian>()?;
656 let val = reader.read_f32::<LittleEndian>()?;
657 entries.push((idx, val));
658 }
659 doc.add_sparse_vector(field, entries);
660 }
661 6 => {
662 let count = reader.read_u32::<LittleEndian>()? as usize;
664 let byte_len = count * 4;
665 if reader.len() < byte_len {
666 return Err(io::Error::new(
667 io::ErrorKind::UnexpectedEof,
668 format!(
669 "dense vector field {}: need {} bytes but only {} remain",
670 field.0,
671 byte_len,
672 reader.len()
673 ),
674 ));
675 }
676 let mut values = vec![0.0f32; count];
677 unsafe {
679 std::ptr::copy_nonoverlapping(
680 reader.as_ptr(),
681 values.as_mut_ptr() as *mut u8,
682 byte_len,
683 );
684 }
685 reader = &reader[byte_len..];
686 doc.add_dense_vector(field, values);
687 }
688 7 => {
689 let len = reader.read_u32::<LittleEndian>()? as usize;
691 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
692 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
693 doc.add_json(field, v);
694 reader = &reader[len..];
695 }
696 _ => {
697 return Err(io::Error::new(
698 io::ErrorKind::InvalidData,
699 format!("Unknown field type tag: {}", type_tag),
700 ));
701 }
702 }
703 }
704
705 Ok(doc)
706}
707
708#[derive(Debug, Clone)]
710pub struct RawStoreBlock {
711 pub first_doc_id: DocId,
712 pub num_docs: u32,
713 pub offset: u64,
714 pub length: u32,
715}
716
717pub struct StoreMerger<'a, W: Write> {
728 writer: &'a mut W,
729 index: Vec<StoreBlockIndex>,
730 current_offset: u64,
731 next_doc_id: DocId,
732}
733
734impl<'a, W: Write> StoreMerger<'a, W> {
735 pub fn new(writer: &'a mut W) -> Self {
736 Self {
737 writer,
738 index: Vec::new(),
739 current_offset: 0,
740 next_doc_id: 0,
741 }
742 }
743
744 pub async fn append_store<F: AsyncFileRead>(
749 &mut self,
750 data_slice: &F,
751 blocks: &[RawStoreBlock],
752 ) -> io::Result<()> {
753 for block in blocks {
754 let start = block.offset;
756 let end = start + block.length as u64;
757 let compressed_data = data_slice.read_bytes_range(start..end).await?;
758
759 self.writer.write_all(compressed_data.as_slice())?;
761
762 self.index.push(StoreBlockIndex {
764 first_doc_id: self.next_doc_id,
765 offset: self.current_offset,
766 length: block.length,
767 num_docs: block.num_docs,
768 });
769
770 self.current_offset += block.length as u64;
771 self.next_doc_id += block.num_docs;
772 }
773
774 Ok(())
775 }
776
777 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
784 let dict = store.dict();
785 let data_slice = store.data_slice();
786 let blocks = store.block_index();
787
788 for block in blocks {
789 let start = block.offset;
790 let end = start + block.length as u64;
791 let compressed = data_slice.read_bytes_range(start..end).await?;
792
793 let decompressed = if let Some(d) = dict {
795 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
796 } else {
797 crate::compression::decompress(compressed.as_slice())?
798 };
799
800 let recompressed = crate::compression::compress(
802 &decompressed,
803 crate::compression::CompressionLevel::default(),
804 )?;
805
806 self.writer.write_all(&recompressed)?;
807
808 self.index.push(StoreBlockIndex {
809 first_doc_id: self.next_doc_id,
810 offset: self.current_offset,
811 length: recompressed.len() as u32,
812 num_docs: block.num_docs,
813 });
814
815 self.current_offset += recompressed.len() as u64;
816 self.next_doc_id += block.num_docs;
817 }
818
819 Ok(())
820 }
821
822 pub fn finish(self) -> io::Result<u32> {
824 let data_end_offset = self.current_offset;
825
826 let dict_offset = 0u64;
828
829 write_store_index_and_footer(
831 self.writer,
832 &self.index,
833 data_end_offset,
834 dict_offset,
835 self.next_doc_id,
836 false,
837 )?;
838
839 Ok(self.next_doc_id)
840 }
841}
842
843impl AsyncStoreReader {
844 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
846 self.index
847 .iter()
848 .map(|entry| RawStoreBlock {
849 first_doc_id: entry.first_doc_id,
850 num_docs: entry.num_docs,
851 offset: entry.offset,
852 length: entry.length,
853 })
854 .collect()
855 }
856
857 pub fn data_slice(&self) -> &LazyFileSlice {
859 &self.data_slice
860 }
861
862 pub fn has_dict(&self) -> bool {
864 self.dict.is_some()
865 }
866
867 pub fn dict(&self) -> Option<&CompressionDict> {
869 self.dict.as_ref()
870 }
871
872 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
874 &self.index
875 }
876}