1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
51 use crate::dsl::FieldValue;
52
53 let stored: Vec<_> = doc
54 .field_values()
55 .iter()
56 .filter(|(field, _)| schema.get_field_entry(*field).is_some_and(|e| e.stored))
57 .collect();
58
59 let mut buf = Vec::with_capacity(256);
60 buf.write_u16::<LittleEndian>(stored.len() as u16)?;
61
62 for (field, value) in &stored {
63 buf.write_u16::<LittleEndian>(field.0 as u16)?;
64 match value {
65 FieldValue::Text(s) => {
66 buf.push(0);
67 let bytes = s.as_bytes();
68 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
69 buf.extend_from_slice(bytes);
70 }
71 FieldValue::U64(v) => {
72 buf.push(1);
73 buf.write_u64::<LittleEndian>(*v)?;
74 }
75 FieldValue::I64(v) => {
76 buf.push(2);
77 buf.write_i64::<LittleEndian>(*v)?;
78 }
79 FieldValue::F64(v) => {
80 buf.push(3);
81 buf.write_f64::<LittleEndian>(*v)?;
82 }
83 FieldValue::Bytes(b) => {
84 buf.push(4);
85 buf.write_u32::<LittleEndian>(b.len() as u32)?;
86 buf.extend_from_slice(b);
87 }
88 FieldValue::SparseVector(entries) => {
89 buf.push(5);
90 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
91 for (idx, val) in entries {
92 buf.write_u32::<LittleEndian>(*idx)?;
93 buf.write_f32::<LittleEndian>(*val)?;
94 }
95 }
96 FieldValue::DenseVector(values) => {
97 buf.push(6);
98 buf.write_u32::<LittleEndian>(values.len() as u32)?;
99 let byte_slice = unsafe {
101 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
102 };
103 buf.extend_from_slice(byte_slice);
104 }
105 FieldValue::Json(v) => {
106 buf.push(7);
107 let json_bytes = serde_json::to_vec(v)
108 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
109 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
110 buf.extend_from_slice(&json_bytes);
111 }
112 }
113 }
114
115 Ok(buf)
116}
117
118#[cfg(feature = "native")]
120struct CompressedBlock {
121 seq: usize,
122 first_doc_id: DocId,
123 num_docs: u32,
124 compressed: Vec<u8>,
125}
126
127#[cfg(feature = "native")]
135pub struct EagerParallelStoreWriter<'a> {
136 writer: &'a mut dyn Write,
137 block_buffer: Vec<u8>,
138 compressed_blocks: Vec<CompressedBlock>,
140 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
142 next_seq: usize,
143 next_doc_id: DocId,
144 block_first_doc: DocId,
145 dict: Option<Arc<CompressionDict>>,
146 compression_level: CompressionLevel,
147}
148
149#[cfg(feature = "native")]
150impl<'a> EagerParallelStoreWriter<'a> {
151 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
153 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
154 }
155
156 pub fn with_compression_level(
158 writer: &'a mut dyn Write,
159 _num_threads: usize,
160 compression_level: CompressionLevel,
161 ) -> Self {
162 Self {
163 writer,
164 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
165 compressed_blocks: Vec::new(),
166 pending_handles: Vec::new(),
167 next_seq: 0,
168 next_doc_id: 0,
169 block_first_doc: 0,
170 dict: None,
171 compression_level,
172 }
173 }
174
175 pub fn with_dict(
177 writer: &'a mut dyn Write,
178 dict: CompressionDict,
179 _num_threads: usize,
180 ) -> Self {
181 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
182 }
183
184 pub fn with_dict_and_level(
186 writer: &'a mut dyn Write,
187 dict: CompressionDict,
188 _num_threads: usize,
189 compression_level: CompressionLevel,
190 ) -> Self {
191 Self {
192 writer,
193 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
194 compressed_blocks: Vec::new(),
195 pending_handles: Vec::new(),
196 next_seq: 0,
197 next_doc_id: 0,
198 block_first_doc: 0,
199 dict: Some(Arc::new(dict)),
200 compression_level,
201 }
202 }
203
204 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
205 let doc_id = self.next_doc_id;
206 self.next_doc_id += 1;
207
208 let doc_bytes = serialize_document(doc, schema)?;
209
210 self.block_buffer
211 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
212 self.block_buffer.extend_from_slice(&doc_bytes);
213
214 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
215 self.spawn_compression();
216 }
217
218 Ok(doc_id)
219 }
220
221 fn spawn_compression(&mut self) {
223 if self.block_buffer.is_empty() {
224 return;
225 }
226
227 let num_docs = self.next_doc_id - self.block_first_doc;
228 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
229 let seq = self.next_seq;
230 let first_doc_id = self.block_first_doc;
231 let dict = self.dict.clone();
232
233 self.next_seq += 1;
234 self.block_first_doc = self.next_doc_id;
235
236 let level = self.compression_level;
238 let handle = std::thread::spawn(move || {
239 let compressed = if let Some(ref d) = dict {
240 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
241 } else {
242 crate::compression::compress(&data, level).expect("compression failed")
243 };
244
245 CompressedBlock {
246 seq,
247 first_doc_id,
248 num_docs,
249 compressed,
250 }
251 });
252
253 self.pending_handles.push(handle);
254 }
255
256 fn collect_completed(&mut self) {
258 let mut remaining = Vec::new();
259 for handle in self.pending_handles.drain(..) {
260 if handle.is_finished() {
261 if let Ok(block) = handle.join() {
262 self.compressed_blocks.push(block);
263 }
264 } else {
265 remaining.push(handle);
266 }
267 }
268 self.pending_handles = remaining;
269 }
270
271 pub fn finish(mut self) -> io::Result<u32> {
272 self.spawn_compression();
274
275 self.collect_completed();
277
278 for handle in self.pending_handles.drain(..) {
280 if let Ok(block) = handle.join() {
281 self.compressed_blocks.push(block);
282 }
283 }
284
285 if self.compressed_blocks.is_empty() {
286 let data_end_offset = 0u64;
288 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
290 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
294 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
295 return Ok(0);
296 }
297
298 self.compressed_blocks.sort_by_key(|b| b.seq);
300
301 let mut index = Vec::with_capacity(self.compressed_blocks.len());
303 let mut current_offset = 0u64;
304
305 for block in &self.compressed_blocks {
306 index.push(StoreBlockIndex {
307 first_doc_id: block.first_doc_id,
308 offset: current_offset,
309 length: block.compressed.len() as u32,
310 num_docs: block.num_docs,
311 });
312
313 self.writer.write_all(&block.compressed)?;
314 current_offset += block.compressed.len() as u64;
315 }
316
317 let data_end_offset = current_offset;
318
319 let dict_offset = if let Some(ref dict) = self.dict {
321 let offset = current_offset;
322 let dict_bytes = dict.as_bytes();
323 self.writer
324 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
325 self.writer.write_all(dict_bytes)?;
326 Some(offset)
327 } else {
328 None
329 };
330
331 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
333 for entry in &index {
334 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
335 self.writer.write_u64::<LittleEndian>(entry.offset)?;
336 self.writer.write_u32::<LittleEndian>(entry.length)?;
337 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
338 }
339
340 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
342 self.writer
343 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
344 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
345 self.writer
346 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
347 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
348 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
349
350 Ok(self.next_doc_id)
351 }
352}
353
354#[derive(Debug, Clone)]
356struct StoreBlockIndex {
357 first_doc_id: DocId,
358 offset: u64,
359 length: u32,
360 num_docs: u32,
361}
362
363pub struct AsyncStoreReader {
365 data_slice: LazyFileSlice,
367 index: Vec<StoreBlockIndex>,
369 num_docs: u32,
370 dict: Option<CompressionDict>,
372 cache: RwLock<StoreBlockCache>,
374}
375
376struct StoreBlockCache {
377 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
378 access_order: Vec<DocId>,
379 max_blocks: usize,
380}
381
382impl StoreBlockCache {
383 fn new(max_blocks: usize) -> Self {
384 Self {
385 blocks: FxHashMap::default(),
386 access_order: Vec::new(),
387 max_blocks,
388 }
389 }
390
391 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
392 if let Some(block) = self.blocks.get(&first_doc_id) {
393 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
394 self.access_order.remove(pos);
395 self.access_order.push(first_doc_id);
396 }
397 Some(Arc::clone(block))
398 } else {
399 None
400 }
401 }
402
403 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
404 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
405 let evict = self.access_order.remove(0);
406 self.blocks.remove(&evict);
407 }
408 self.blocks.insert(first_doc_id, block);
409 self.access_order.push(first_doc_id);
410 }
411}
412
413impl AsyncStoreReader {
414 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
417 let file_len = file_handle.len();
418 if file_len < 32 {
420 return Err(io::Error::new(
421 io::ErrorKind::InvalidData,
422 "Store too small",
423 ));
424 }
425
426 let footer = file_handle
428 .read_bytes_range(file_len - 32..file_len)
429 .await?;
430 let mut reader = footer.as_slice();
431 let data_end_offset = reader.read_u64::<LittleEndian>()?;
432 let dict_offset = reader.read_u64::<LittleEndian>()?;
433 let num_docs = reader.read_u32::<LittleEndian>()?;
434 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
435 let version = reader.read_u32::<LittleEndian>()?;
436 let magic = reader.read_u32::<LittleEndian>()?;
437
438 if magic != STORE_MAGIC {
439 return Err(io::Error::new(
440 io::ErrorKind::InvalidData,
441 "Invalid store magic",
442 ));
443 }
444 if version != STORE_VERSION {
445 return Err(io::Error::new(
446 io::ErrorKind::InvalidData,
447 format!("Unsupported store version: {}", version),
448 ));
449 }
450
451 let dict = if has_dict && dict_offset > 0 {
453 let dict_start = dict_offset;
454 let dict_len_bytes = file_handle
455 .read_bytes_range(dict_start..dict_start + 4)
456 .await?;
457 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
458 let dict_bytes = file_handle
459 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
460 .await?;
461 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
462 } else {
463 None
464 };
465
466 let index_start = if has_dict && dict_offset > 0 {
468 let dict_start = dict_offset;
469 let dict_len_bytes = file_handle
470 .read_bytes_range(dict_start..dict_start + 4)
471 .await?;
472 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
473 dict_start + 4 + dict_len
474 } else {
475 data_end_offset
476 };
477 let index_end = file_len - 32;
478
479 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
480 let mut reader = index_bytes.as_slice();
481
482 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
483 let mut index = Vec::with_capacity(num_blocks);
484
485 for _ in 0..num_blocks {
486 let first_doc_id = reader.read_u32::<LittleEndian>()?;
487 let offset = reader.read_u64::<LittleEndian>()?;
488 let length = reader.read_u32::<LittleEndian>()?;
489 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
490
491 index.push(StoreBlockIndex {
492 first_doc_id,
493 offset,
494 length,
495 num_docs: num_docs_in_block,
496 });
497 }
498
499 let data_slice = file_handle.slice(0..data_end_offset);
501
502 Ok(Self {
503 data_slice,
504 index,
505 num_docs,
506 dict,
507 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
508 })
509 }
510
511 pub fn num_docs(&self) -> u32 {
513 self.num_docs
514 }
515
516 pub fn cached_blocks(&self) -> usize {
518 self.cache.read().blocks.len()
519 }
520
521 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
523 if doc_id >= self.num_docs {
524 return Ok(None);
525 }
526
527 let block_idx = self
529 .index
530 .binary_search_by(|entry| {
531 if doc_id < entry.first_doc_id {
532 std::cmp::Ordering::Greater
533 } else if doc_id >= entry.first_doc_id + entry.num_docs {
534 std::cmp::Ordering::Less
535 } else {
536 std::cmp::Ordering::Equal
537 }
538 })
539 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
540
541 let entry = &self.index[block_idx];
542 let block_data = self.load_block(entry).await?;
543
544 let doc_offset_in_block = doc_id - entry.first_doc_id;
546 let mut reader = &block_data[..];
547
548 for _ in 0..doc_offset_in_block {
549 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
550 if doc_len > reader.len() {
551 return Err(io::Error::new(
552 io::ErrorKind::InvalidData,
553 "Invalid doc length",
554 ));
555 }
556 reader = &reader[doc_len..];
557 }
558
559 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
560 let doc_bytes = &reader[..doc_len];
561
562 deserialize_document(doc_bytes, schema).map(Some)
563 }
564
565 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
566 {
568 let mut cache = self.cache.write();
569 if let Some(block) = cache.get(entry.first_doc_id) {
570 return Ok(block);
571 }
572 }
573
574 let start = entry.offset;
576 let end = start + entry.length as u64;
577 let compressed = self.data_slice.read_bytes_range(start..end).await?;
578
579 let decompressed = if let Some(ref dict) = self.dict {
581 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
582 } else {
583 crate::compression::decompress(compressed.as_slice())?
584 };
585
586 let block = Arc::new(decompressed);
587
588 {
590 let mut cache = self.cache.write();
591 cache.insert(entry.first_doc_id, Arc::clone(&block));
592 }
593
594 Ok(block)
595 }
596}
597
598pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
599 use crate::dsl::Field;
600
601 let mut reader = data;
602 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
603 let mut doc = Document::new();
604
605 for _ in 0..num_fields {
606 let field_id = reader.read_u16::<LittleEndian>()?;
607 let field = Field(field_id as u32);
608 let type_tag = reader.read_u8()?;
609
610 match type_tag {
611 0 => {
612 let len = reader.read_u32::<LittleEndian>()? as usize;
614 let s = std::str::from_utf8(&reader[..len])
615 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
616 doc.add_text(field, s);
617 reader = &reader[len..];
618 }
619 1 => {
620 doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
622 }
623 2 => {
624 doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
626 }
627 3 => {
628 doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
630 }
631 4 => {
632 let len = reader.read_u32::<LittleEndian>()? as usize;
634 doc.add_bytes(field, reader[..len].to_vec());
635 reader = &reader[len..];
636 }
637 5 => {
638 let count = reader.read_u32::<LittleEndian>()? as usize;
640 let mut entries = Vec::with_capacity(count);
641 for _ in 0..count {
642 let idx = reader.read_u32::<LittleEndian>()?;
643 let val = reader.read_f32::<LittleEndian>()?;
644 entries.push((idx, val));
645 }
646 doc.add_sparse_vector(field, entries);
647 }
648 6 => {
649 let count = reader.read_u32::<LittleEndian>()? as usize;
651 let byte_len = count * 4;
652 let mut values = vec![0.0f32; count];
653 unsafe {
655 std::ptr::copy_nonoverlapping(
656 reader.as_ptr(),
657 values.as_mut_ptr() as *mut u8,
658 byte_len,
659 );
660 }
661 reader = &reader[byte_len..];
662 doc.add_dense_vector(field, values);
663 }
664 7 => {
665 let len = reader.read_u32::<LittleEndian>()? as usize;
667 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
668 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
669 doc.add_json(field, v);
670 reader = &reader[len..];
671 }
672 _ => {
673 return Err(io::Error::new(
674 io::ErrorKind::InvalidData,
675 format!("Unknown field type tag: {}", type_tag),
676 ));
677 }
678 }
679 }
680
681 Ok(doc)
682}
683
684#[derive(Debug, Clone)]
686pub struct RawStoreBlock {
687 pub first_doc_id: DocId,
688 pub num_docs: u32,
689 pub offset: u64,
690 pub length: u32,
691}
692
693pub struct StoreMerger<'a, W: Write> {
704 writer: &'a mut W,
705 index: Vec<StoreBlockIndex>,
706 current_offset: u64,
707 next_doc_id: DocId,
708}
709
710impl<'a, W: Write> StoreMerger<'a, W> {
711 pub fn new(writer: &'a mut W) -> Self {
712 Self {
713 writer,
714 index: Vec::new(),
715 current_offset: 0,
716 next_doc_id: 0,
717 }
718 }
719
720 pub async fn append_store<F: AsyncFileRead>(
725 &mut self,
726 data_slice: &F,
727 blocks: &[RawStoreBlock],
728 ) -> io::Result<()> {
729 for block in blocks {
730 let start = block.offset;
732 let end = start + block.length as u64;
733 let compressed_data = data_slice.read_bytes_range(start..end).await?;
734
735 self.writer.write_all(compressed_data.as_slice())?;
737
738 self.index.push(StoreBlockIndex {
740 first_doc_id: self.next_doc_id,
741 offset: self.current_offset,
742 length: block.length,
743 num_docs: block.num_docs,
744 });
745
746 self.current_offset += block.length as u64;
747 self.next_doc_id += block.num_docs;
748 }
749
750 Ok(())
751 }
752
753 pub fn finish(self) -> io::Result<u32> {
755 let data_end_offset = self.current_offset;
756
757 let dict_offset = 0u64;
759
760 self.writer
762 .write_u32::<LittleEndian>(self.index.len() as u32)?;
763 for entry in &self.index {
764 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
765 self.writer.write_u64::<LittleEndian>(entry.offset)?;
766 self.writer.write_u32::<LittleEndian>(entry.length)?;
767 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
768 }
769
770 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
772 self.writer.write_u64::<LittleEndian>(dict_offset)?;
773 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
774 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
776 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
777
778 Ok(self.next_doc_id)
779 }
780}
781
782impl AsyncStoreReader {
783 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
785 self.index
786 .iter()
787 .map(|entry| RawStoreBlock {
788 first_doc_id: entry.first_doc_id,
789 num_docs: entry.num_docs,
790 offset: entry.offset,
791 length: entry.length,
792 })
793 .collect()
794 }
795
796 pub fn data_slice(&self) -> &LazyFileSlice {
798 &self.data_slice
799 }
800
801 pub fn has_dict(&self) -> bool {
803 self.dict.is_some()
804 }
805}