1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; fn write_store_index_and_footer(
32 writer: &mut (impl Write + ?Sized),
33 index: &[StoreBlockIndex],
34 data_end_offset: u64,
35 dict_offset: u64,
36 num_docs: u32,
37 has_dict: bool,
38) -> io::Result<()> {
39 writer.write_u32::<LittleEndian>(index.len() as u32)?;
40 for entry in index {
41 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
42 writer.write_u64::<LittleEndian>(entry.offset)?;
43 writer.write_u32::<LittleEndian>(entry.length)?;
44 writer.write_u32::<LittleEndian>(entry.num_docs)?;
45 }
46 writer.write_u64::<LittleEndian>(data_end_offset)?;
47 writer.write_u64::<LittleEndian>(dict_offset)?;
48 writer.write_u32::<LittleEndian>(num_docs)?;
49 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
50 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
51 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
52 Ok(())
53}
54
55pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
58
59pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
61
62#[cfg(feature = "native")]
64const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
65
66pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78 use crate::dsl::FieldValue;
79
80 let stored: Vec<_> = doc
81 .field_values()
82 .iter()
83 .filter(|(field, value)| {
84 if matches!(value, crate::dsl::FieldValue::DenseVector(_)) {
86 return false;
87 }
88 schema.get_field_entry(*field).is_some_and(|e| e.stored)
89 })
90 .collect();
91
92 let mut buf = Vec::with_capacity(256);
93 buf.write_u16::<LittleEndian>(stored.len() as u16)?;
94
95 for (field, value) in &stored {
96 buf.write_u16::<LittleEndian>(field.0 as u16)?;
97 match value {
98 FieldValue::Text(s) => {
99 buf.push(0);
100 let bytes = s.as_bytes();
101 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
102 buf.extend_from_slice(bytes);
103 }
104 FieldValue::U64(v) => {
105 buf.push(1);
106 buf.write_u64::<LittleEndian>(*v)?;
107 }
108 FieldValue::I64(v) => {
109 buf.push(2);
110 buf.write_i64::<LittleEndian>(*v)?;
111 }
112 FieldValue::F64(v) => {
113 buf.push(3);
114 buf.write_f64::<LittleEndian>(*v)?;
115 }
116 FieldValue::Bytes(b) => {
117 buf.push(4);
118 buf.write_u32::<LittleEndian>(b.len() as u32)?;
119 buf.extend_from_slice(b);
120 }
121 FieldValue::SparseVector(entries) => {
122 buf.push(5);
123 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
124 for (idx, val) in entries {
125 buf.write_u32::<LittleEndian>(*idx)?;
126 buf.write_f32::<LittleEndian>(*val)?;
127 }
128 }
129 FieldValue::DenseVector(values) => {
130 buf.push(6);
131 buf.write_u32::<LittleEndian>(values.len() as u32)?;
132 let byte_slice = unsafe {
134 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
135 };
136 buf.extend_from_slice(byte_slice);
137 }
138 FieldValue::Json(v) => {
139 buf.push(7);
140 let json_bytes = serde_json::to_vec(v)
141 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
142 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
143 buf.extend_from_slice(&json_bytes);
144 }
145 }
146 }
147
148 Ok(buf)
149}
150
151#[cfg(feature = "native")]
153struct CompressedBlock {
154 seq: usize,
155 first_doc_id: DocId,
156 num_docs: u32,
157 compressed: Vec<u8>,
158}
159
160#[cfg(feature = "native")]
168pub struct EagerParallelStoreWriter<'a> {
169 writer: &'a mut dyn Write,
170 block_buffer: Vec<u8>,
171 compressed_blocks: Vec<CompressedBlock>,
173 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
175 next_seq: usize,
176 next_doc_id: DocId,
177 block_first_doc: DocId,
178 dict: Option<Arc<CompressionDict>>,
179 compression_level: CompressionLevel,
180}
181
182#[cfg(feature = "native")]
183impl<'a> EagerParallelStoreWriter<'a> {
184 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
186 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
187 }
188
189 pub fn with_compression_level(
191 writer: &'a mut dyn Write,
192 _num_threads: usize,
193 compression_level: CompressionLevel,
194 ) -> Self {
195 Self {
196 writer,
197 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
198 compressed_blocks: Vec::new(),
199 pending_handles: Vec::new(),
200 next_seq: 0,
201 next_doc_id: 0,
202 block_first_doc: 0,
203 dict: None,
204 compression_level,
205 }
206 }
207
208 pub fn with_dict(
210 writer: &'a mut dyn Write,
211 dict: CompressionDict,
212 _num_threads: usize,
213 ) -> Self {
214 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
215 }
216
217 pub fn with_dict_and_level(
219 writer: &'a mut dyn Write,
220 dict: CompressionDict,
221 _num_threads: usize,
222 compression_level: CompressionLevel,
223 ) -> Self {
224 Self {
225 writer,
226 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
227 compressed_blocks: Vec::new(),
228 pending_handles: Vec::new(),
229 next_seq: 0,
230 next_doc_id: 0,
231 block_first_doc: 0,
232 dict: Some(Arc::new(dict)),
233 compression_level,
234 }
235 }
236
237 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
238 let doc_id = self.next_doc_id;
239 self.next_doc_id += 1;
240
241 let doc_bytes = serialize_document(doc, schema)?;
242
243 self.block_buffer
244 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
245 self.block_buffer.extend_from_slice(&doc_bytes);
246
247 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
248 self.spawn_compression();
249 }
250
251 Ok(doc_id)
252 }
253
254 fn spawn_compression(&mut self) {
256 if self.block_buffer.is_empty() {
257 return;
258 }
259
260 let num_docs = self.next_doc_id - self.block_first_doc;
261 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
262 let seq = self.next_seq;
263 let first_doc_id = self.block_first_doc;
264 let dict = self.dict.clone();
265
266 self.next_seq += 1;
267 self.block_first_doc = self.next_doc_id;
268
269 let level = self.compression_level;
271 let handle = std::thread::spawn(move || {
272 let compressed = if let Some(ref d) = dict {
273 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
274 } else {
275 crate::compression::compress(&data, level).expect("compression failed")
276 };
277
278 CompressedBlock {
279 seq,
280 first_doc_id,
281 num_docs,
282 compressed,
283 }
284 });
285
286 self.pending_handles.push(handle);
287 }
288
289 fn collect_completed(&mut self) {
291 let mut remaining = Vec::new();
292 for handle in self.pending_handles.drain(..) {
293 if handle.is_finished() {
294 if let Ok(block) = handle.join() {
295 self.compressed_blocks.push(block);
296 }
297 } else {
298 remaining.push(handle);
299 }
300 }
301 self.pending_handles = remaining;
302 }
303
304 pub fn finish(mut self) -> io::Result<u32> {
305 self.spawn_compression();
307
308 self.collect_completed();
310
311 for handle in self.pending_handles.drain(..) {
313 if let Ok(block) = handle.join() {
314 self.compressed_blocks.push(block);
315 }
316 }
317
318 if self.compressed_blocks.is_empty() {
319 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
320 return Ok(0);
321 }
322
323 self.compressed_blocks.sort_by_key(|b| b.seq);
325
326 let mut index = Vec::with_capacity(self.compressed_blocks.len());
328 let mut current_offset = 0u64;
329
330 for block in &self.compressed_blocks {
331 index.push(StoreBlockIndex {
332 first_doc_id: block.first_doc_id,
333 offset: current_offset,
334 length: block.compressed.len() as u32,
335 num_docs: block.num_docs,
336 });
337
338 self.writer.write_all(&block.compressed)?;
339 current_offset += block.compressed.len() as u64;
340 }
341
342 let data_end_offset = current_offset;
343
344 let dict_offset = if let Some(ref dict) = self.dict {
346 let offset = current_offset;
347 let dict_bytes = dict.as_bytes();
348 self.writer
349 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
350 self.writer.write_all(dict_bytes)?;
351 Some(offset)
352 } else {
353 None
354 };
355
356 write_store_index_and_footer(
358 &mut self.writer,
359 &index,
360 data_end_offset,
361 dict_offset.unwrap_or(0),
362 self.next_doc_id,
363 self.dict.is_some(),
364 )?;
365
366 Ok(self.next_doc_id)
367 }
368}
369
370#[derive(Debug, Clone)]
372pub(crate) struct StoreBlockIndex {
373 pub(crate) first_doc_id: DocId,
374 pub(crate) offset: u64,
375 pub(crate) length: u32,
376 pub(crate) num_docs: u32,
377}
378
379pub struct AsyncStoreReader {
381 data_slice: LazyFileSlice,
383 index: Vec<StoreBlockIndex>,
385 num_docs: u32,
386 dict: Option<CompressionDict>,
388 cache: RwLock<StoreBlockCache>,
390}
391
392struct StoreBlockCache {
397 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
398 insert_order: std::collections::VecDeque<DocId>,
399 max_blocks: usize,
400}
401
402impl StoreBlockCache {
403 fn new(max_blocks: usize) -> Self {
404 Self {
405 blocks: FxHashMap::default(),
406 insert_order: std::collections::VecDeque::with_capacity(max_blocks),
407 max_blocks,
408 }
409 }
410
411 fn get(&self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
412 self.blocks.get(&first_doc_id).map(Arc::clone)
413 }
414
415 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
416 if self.blocks.contains_key(&first_doc_id) {
417 return; }
419 while self.blocks.len() >= self.max_blocks {
420 if let Some(evict) = self.insert_order.pop_front() {
421 self.blocks.remove(&evict);
422 } else {
423 break;
424 }
425 }
426 self.blocks.insert(first_doc_id, block);
427 self.insert_order.push_back(first_doc_id);
428 }
429}
430
431impl AsyncStoreReader {
432 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
435 let file_len = file_handle.len();
436 if file_len < 32 {
438 return Err(io::Error::new(
439 io::ErrorKind::InvalidData,
440 "Store too small",
441 ));
442 }
443
444 let footer = file_handle
446 .read_bytes_range(file_len - 32..file_len)
447 .await?;
448 let mut reader = footer.as_slice();
449 let data_end_offset = reader.read_u64::<LittleEndian>()?;
450 let dict_offset = reader.read_u64::<LittleEndian>()?;
451 let num_docs = reader.read_u32::<LittleEndian>()?;
452 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
453 let version = reader.read_u32::<LittleEndian>()?;
454 let magic = reader.read_u32::<LittleEndian>()?;
455
456 if magic != STORE_MAGIC {
457 return Err(io::Error::new(
458 io::ErrorKind::InvalidData,
459 "Invalid store magic",
460 ));
461 }
462 if version != STORE_VERSION {
463 return Err(io::Error::new(
464 io::ErrorKind::InvalidData,
465 format!("Unsupported store version: {}", version),
466 ));
467 }
468
469 let dict = if has_dict && dict_offset > 0 {
471 let dict_start = dict_offset;
472 let dict_len_bytes = file_handle
473 .read_bytes_range(dict_start..dict_start + 4)
474 .await?;
475 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
476 let dict_bytes = file_handle
477 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
478 .await?;
479 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
480 } else {
481 None
482 };
483
484 let index_start = if has_dict && dict_offset > 0 {
486 let dict_start = dict_offset;
487 let dict_len_bytes = file_handle
488 .read_bytes_range(dict_start..dict_start + 4)
489 .await?;
490 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
491 dict_start + 4 + dict_len
492 } else {
493 data_end_offset
494 };
495 let index_end = file_len - 32;
496
497 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
498 let mut reader = index_bytes.as_slice();
499
500 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
501 let mut index = Vec::with_capacity(num_blocks);
502
503 for _ in 0..num_blocks {
504 let first_doc_id = reader.read_u32::<LittleEndian>()?;
505 let offset = reader.read_u64::<LittleEndian>()?;
506 let length = reader.read_u32::<LittleEndian>()?;
507 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
508
509 index.push(StoreBlockIndex {
510 first_doc_id,
511 offset,
512 length,
513 num_docs: num_docs_in_block,
514 });
515 }
516
517 let data_slice = file_handle.slice(0..data_end_offset);
519
520 Ok(Self {
521 data_slice,
522 index,
523 num_docs,
524 dict,
525 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
526 })
527 }
528
529 pub fn num_docs(&self) -> u32 {
531 self.num_docs
532 }
533
534 pub fn cached_blocks(&self) -> usize {
536 self.cache.read().blocks.len()
537 }
538
539 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
541 if doc_id >= self.num_docs {
542 return Ok(None);
543 }
544
545 let block_idx = self
547 .index
548 .binary_search_by(|entry| {
549 if doc_id < entry.first_doc_id {
550 std::cmp::Ordering::Greater
551 } else if doc_id >= entry.first_doc_id + entry.num_docs {
552 std::cmp::Ordering::Less
553 } else {
554 std::cmp::Ordering::Equal
555 }
556 })
557 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
558
559 let entry = &self.index[block_idx];
560 let block_data = self.load_block(entry).await?;
561
562 let doc_offset_in_block = doc_id - entry.first_doc_id;
564 let mut reader = &block_data[..];
565
566 for _ in 0..doc_offset_in_block {
567 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
568 if doc_len > reader.len() {
569 return Err(io::Error::new(
570 io::ErrorKind::InvalidData,
571 "Invalid doc length",
572 ));
573 }
574 reader = &reader[doc_len..];
575 }
576
577 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
578 let doc_bytes = &reader[..doc_len];
579
580 deserialize_document(doc_bytes, schema).map(Some)
581 }
582
583 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
584 {
586 if let Some(block) = self.cache.read().get(entry.first_doc_id) {
587 return Ok(block);
588 }
589 }
590
591 let start = entry.offset;
593 let end = start + entry.length as u64;
594 let compressed = self.data_slice.read_bytes_range(start..end).await?;
595
596 let decompressed = if let Some(ref dict) = self.dict {
598 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
599 } else {
600 crate::compression::decompress(compressed.as_slice())?
601 };
602
603 let block = Arc::new(decompressed);
604
605 {
607 let mut cache = self.cache.write();
608 cache.insert(entry.first_doc_id, Arc::clone(&block));
609 }
610
611 Ok(block)
612 }
613}
614
615pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
616 use crate::dsl::Field;
617
618 let mut reader = data;
619 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
620 let mut doc = Document::new();
621
622 for _ in 0..num_fields {
623 let field_id = reader.read_u16::<LittleEndian>()?;
624 let field = Field(field_id as u32);
625 let type_tag = reader.read_u8()?;
626
627 match type_tag {
628 0 => {
629 let len = reader.read_u32::<LittleEndian>()? as usize;
631 let s = std::str::from_utf8(&reader[..len])
632 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
633 doc.add_text(field, s);
634 reader = &reader[len..];
635 }
636 1 => {
637 doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
639 }
640 2 => {
641 doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
643 }
644 3 => {
645 doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
647 }
648 4 => {
649 let len = reader.read_u32::<LittleEndian>()? as usize;
651 doc.add_bytes(field, reader[..len].to_vec());
652 reader = &reader[len..];
653 }
654 5 => {
655 let count = reader.read_u32::<LittleEndian>()? as usize;
657 let mut entries = Vec::with_capacity(count);
658 for _ in 0..count {
659 let idx = reader.read_u32::<LittleEndian>()?;
660 let val = reader.read_f32::<LittleEndian>()?;
661 entries.push((idx, val));
662 }
663 doc.add_sparse_vector(field, entries);
664 }
665 6 => {
666 let count = reader.read_u32::<LittleEndian>()? as usize;
668 let byte_len = count * 4;
669 if reader.len() < byte_len {
670 return Err(io::Error::new(
671 io::ErrorKind::UnexpectedEof,
672 format!(
673 "dense vector field {}: need {} bytes but only {} remain",
674 field.0,
675 byte_len,
676 reader.len()
677 ),
678 ));
679 }
680 let mut values = vec![0.0f32; count];
681 unsafe {
683 std::ptr::copy_nonoverlapping(
684 reader.as_ptr(),
685 values.as_mut_ptr() as *mut u8,
686 byte_len,
687 );
688 }
689 reader = &reader[byte_len..];
690 doc.add_dense_vector(field, values);
691 }
692 7 => {
693 let len = reader.read_u32::<LittleEndian>()? as usize;
695 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
696 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
697 doc.add_json(field, v);
698 reader = &reader[len..];
699 }
700 _ => {
701 return Err(io::Error::new(
702 io::ErrorKind::InvalidData,
703 format!("Unknown field type tag: {}", type_tag),
704 ));
705 }
706 }
707 }
708
709 Ok(doc)
710}
711
712#[derive(Debug, Clone)]
714pub struct RawStoreBlock {
715 pub first_doc_id: DocId,
716 pub num_docs: u32,
717 pub offset: u64,
718 pub length: u32,
719}
720
721pub struct StoreMerger<'a, W: Write> {
732 writer: &'a mut W,
733 index: Vec<StoreBlockIndex>,
734 current_offset: u64,
735 next_doc_id: DocId,
736}
737
738impl<'a, W: Write> StoreMerger<'a, W> {
739 pub fn new(writer: &'a mut W) -> Self {
740 Self {
741 writer,
742 index: Vec::new(),
743 current_offset: 0,
744 next_doc_id: 0,
745 }
746 }
747
748 pub async fn append_store<F: AsyncFileRead>(
753 &mut self,
754 data_slice: &F,
755 blocks: &[RawStoreBlock],
756 ) -> io::Result<()> {
757 for block in blocks {
758 let start = block.offset;
760 let end = start + block.length as u64;
761 let compressed_data = data_slice.read_bytes_range(start..end).await?;
762
763 self.writer.write_all(compressed_data.as_slice())?;
765
766 self.index.push(StoreBlockIndex {
768 first_doc_id: self.next_doc_id,
769 offset: self.current_offset,
770 length: block.length,
771 num_docs: block.num_docs,
772 });
773
774 self.current_offset += block.length as u64;
775 self.next_doc_id += block.num_docs;
776 }
777
778 Ok(())
779 }
780
781 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
788 let dict = store.dict();
789 let data_slice = store.data_slice();
790 let blocks = store.block_index();
791
792 for block in blocks {
793 let start = block.offset;
794 let end = start + block.length as u64;
795 let compressed = data_slice.read_bytes_range(start..end).await?;
796
797 let decompressed = if let Some(d) = dict {
799 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
800 } else {
801 crate::compression::decompress(compressed.as_slice())?
802 };
803
804 let recompressed = crate::compression::compress(
806 &decompressed,
807 crate::compression::CompressionLevel::default(),
808 )?;
809
810 self.writer.write_all(&recompressed)?;
811
812 self.index.push(StoreBlockIndex {
813 first_doc_id: self.next_doc_id,
814 offset: self.current_offset,
815 length: recompressed.len() as u32,
816 num_docs: block.num_docs,
817 });
818
819 self.current_offset += recompressed.len() as u64;
820 self.next_doc_id += block.num_docs;
821 }
822
823 Ok(())
824 }
825
826 pub fn finish(self) -> io::Result<u32> {
828 let data_end_offset = self.current_offset;
829
830 let dict_offset = 0u64;
832
833 write_store_index_and_footer(
835 self.writer,
836 &self.index,
837 data_end_offset,
838 dict_offset,
839 self.next_doc_id,
840 false,
841 )?;
842
843 Ok(self.next_doc_id)
844 }
845}
846
847impl AsyncStoreReader {
848 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
850 self.index
851 .iter()
852 .map(|entry| RawStoreBlock {
853 first_doc_id: entry.first_doc_id,
854 num_docs: entry.num_docs,
855 offset: entry.offset,
856 length: entry.length,
857 })
858 .collect()
859 }
860
861 pub fn data_slice(&self) -> &LazyFileSlice {
863 &self.data_slice
864 }
865
866 pub fn has_dict(&self) -> bool {
868 self.dict.is_some()
869 }
870
871 pub fn dict(&self) -> Option<&CompressionDict> {
873 self.dict.as_ref()
874 }
875
876 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
878 &self.index
879 }
880}