1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 16 * 1024;
33
34pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
36
37#[cfg(feature = "native")]
39const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
40
41fn write_store_index_and_footer(
45 writer: &mut (impl Write + ?Sized),
46 index: &[StoreBlockIndex],
47 data_end_offset: u64,
48 dict_offset: u64,
49 num_docs: u32,
50 has_dict: bool,
51) -> io::Result<()> {
52 writer.write_u32::<LittleEndian>(index.len() as u32)?;
53 for entry in index {
54 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
55 writer.write_u64::<LittleEndian>(entry.offset)?;
56 writer.write_u32::<LittleEndian>(entry.length)?;
57 writer.write_u32::<LittleEndian>(entry.num_docs)?;
58 }
59 writer.write_u64::<LittleEndian>(data_end_offset)?;
60 writer.write_u64::<LittleEndian>(dict_offset)?;
61 writer.write_u32::<LittleEndian>(num_docs)?;
62 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
63 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
64 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
65 Ok(())
66}
67
68pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
80 let mut buf = Vec::with_capacity(256);
81 serialize_document_into(doc, schema, &mut buf)?;
82 Ok(buf)
83}
84
85pub fn serialize_document_into(
88 doc: &Document,
89 schema: &Schema,
90 buf: &mut Vec<u8>,
91) -> io::Result<()> {
92 use crate::dsl::FieldValue;
93
94 buf.clear();
95
96 let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
98 if matches!(
100 value,
101 FieldValue::DenseVector(_) | FieldValue::BinaryDenseVector(_)
102 ) {
103 return false;
104 }
105 schema.get_field_entry(*field).is_some_and(|e| e.stored)
106 };
107
108 let stored_count = doc
109 .field_values()
110 .iter()
111 .filter(|(field, value)| is_stored(field, value))
112 .count();
113
114 buf.write_u16::<LittleEndian>(stored_count as u16)?;
115
116 for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
117 buf.write_u16::<LittleEndian>(field.0 as u16)?;
118 match value {
119 FieldValue::Text(s) => {
120 buf.push(0);
121 let bytes = s.as_bytes();
122 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
123 buf.extend_from_slice(bytes);
124 }
125 FieldValue::U64(v) => {
126 buf.push(1);
127 buf.write_u64::<LittleEndian>(*v)?;
128 }
129 FieldValue::I64(v) => {
130 buf.push(2);
131 buf.write_i64::<LittleEndian>(*v)?;
132 }
133 FieldValue::F64(v) => {
134 buf.push(3);
135 buf.write_f64::<LittleEndian>(*v)?;
136 }
137 FieldValue::Bytes(b) => {
138 buf.push(4);
139 buf.write_u32::<LittleEndian>(b.len() as u32)?;
140 buf.extend_from_slice(b);
141 }
142 FieldValue::SparseVector(entries) => {
143 buf.push(5);
144 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
145 for (idx, val) in entries {
146 buf.write_u32::<LittleEndian>(*idx)?;
147 buf.write_f32::<LittleEndian>(*val)?;
148 }
149 }
150 FieldValue::DenseVector(values) => {
151 buf.push(6);
152 buf.write_u32::<LittleEndian>(values.len() as u32)?;
153 let byte_slice = unsafe {
155 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
156 };
157 buf.extend_from_slice(byte_slice);
158 }
159 FieldValue::Json(v) => {
160 buf.push(7);
161 let json_bytes = serde_json::to_vec(v)
162 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
163 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
164 buf.extend_from_slice(&json_bytes);
165 }
166 FieldValue::BinaryDenseVector(b) => {
167 buf.push(8);
168 buf.write_u32::<LittleEndian>(b.len() as u32)?;
169 buf.extend_from_slice(b);
170 }
171 }
172 }
173
174 Ok(())
175}
176
177#[cfg(feature = "native")]
179struct CompressedBlock {
180 seq: usize,
181 first_doc_id: DocId,
182 num_docs: u32,
183 compressed: Vec<u8>,
184}
185
186#[cfg(feature = "native")]
194pub struct EagerParallelStoreWriter<'a> {
195 writer: &'a mut dyn Write,
196 block_buffer: Vec<u8>,
197 serialize_buf: Vec<u8>,
199 compressed_blocks: Vec<CompressedBlock>,
201 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
203 next_seq: usize,
204 next_doc_id: DocId,
205 block_first_doc: DocId,
206 dict: Option<Arc<CompressionDict>>,
207 compression_level: CompressionLevel,
208}
209
210#[cfg(feature = "native")]
211impl<'a> EagerParallelStoreWriter<'a> {
212 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
214 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
215 }
216
217 pub fn with_compression_level(
219 writer: &'a mut dyn Write,
220 _num_threads: usize,
221 compression_level: CompressionLevel,
222 ) -> Self {
223 Self {
224 writer,
225 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
226 serialize_buf: Vec::with_capacity(512),
227 compressed_blocks: Vec::new(),
228 pending_handles: Vec::new(),
229 next_seq: 0,
230 next_doc_id: 0,
231 block_first_doc: 0,
232 dict: None,
233 compression_level,
234 }
235 }
236
237 pub fn with_dict(
239 writer: &'a mut dyn Write,
240 dict: CompressionDict,
241 _num_threads: usize,
242 ) -> Self {
243 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
244 }
245
246 pub fn with_dict_and_level(
248 writer: &'a mut dyn Write,
249 dict: CompressionDict,
250 _num_threads: usize,
251 compression_level: CompressionLevel,
252 ) -> Self {
253 Self {
254 writer,
255 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
256 serialize_buf: Vec::with_capacity(512),
257 compressed_blocks: Vec::new(),
258 pending_handles: Vec::new(),
259 next_seq: 0,
260 next_doc_id: 0,
261 block_first_doc: 0,
262 dict: Some(Arc::new(dict)),
263 compression_level,
264 }
265 }
266
267 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
268 serialize_document_into(doc, schema, &mut self.serialize_buf)?;
269 let doc_id = self.next_doc_id;
270 self.next_doc_id += 1;
271 self.block_buffer
272 .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
273 self.block_buffer.extend_from_slice(&self.serialize_buf);
274 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
275 self.spawn_compression();
276 }
277 Ok(doc_id)
278 }
279
280 pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
282 let doc_id = self.next_doc_id;
283 self.next_doc_id += 1;
284
285 self.block_buffer
286 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
287 self.block_buffer.extend_from_slice(doc_bytes);
288
289 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
290 self.spawn_compression();
291 }
292
293 Ok(doc_id)
294 }
295
296 fn spawn_compression(&mut self) {
298 if self.block_buffer.is_empty() {
299 return;
300 }
301
302 let num_docs = self.next_doc_id - self.block_first_doc;
303 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
304 let seq = self.next_seq;
305 let first_doc_id = self.block_first_doc;
306 let dict = self.dict.clone();
307
308 self.next_seq += 1;
309 self.block_first_doc = self.next_doc_id;
310
311 let level = self.compression_level;
313 let handle = std::thread::spawn(move || {
314 let compressed = if let Some(ref d) = dict {
315 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
316 } else {
317 crate::compression::compress(&data, level).expect("compression failed")
318 };
319
320 CompressedBlock {
321 seq,
322 first_doc_id,
323 num_docs,
324 compressed,
325 }
326 });
327
328 self.pending_handles.push(handle);
329 }
330
331 fn collect_completed(&mut self) {
333 let mut remaining = Vec::new();
334 for handle in self.pending_handles.drain(..) {
335 if handle.is_finished() {
336 match handle.join() {
337 Ok(block) => self.compressed_blocks.push(block),
338 Err(payload) => std::panic::resume_unwind(payload),
339 }
340 } else {
341 remaining.push(handle);
342 }
343 }
344 self.pending_handles = remaining;
345 }
346
347 pub fn finish(mut self) -> io::Result<u32> {
348 self.spawn_compression();
350
351 self.collect_completed();
353
354 for handle in self.pending_handles.drain(..) {
356 match handle.join() {
357 Ok(block) => self.compressed_blocks.push(block),
358 Err(payload) => std::panic::resume_unwind(payload),
359 }
360 }
361
362 if self.compressed_blocks.is_empty() {
363 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
364 return Ok(0);
365 }
366
367 self.compressed_blocks.sort_by_key(|b| b.seq);
369
370 let mut index = Vec::with_capacity(self.compressed_blocks.len());
372 let mut current_offset = 0u64;
373
374 for block in &self.compressed_blocks {
375 index.push(StoreBlockIndex {
376 first_doc_id: block.first_doc_id,
377 offset: current_offset,
378 length: block.compressed.len() as u32,
379 num_docs: block.num_docs,
380 });
381
382 self.writer.write_all(&block.compressed)?;
383 current_offset += block.compressed.len() as u64;
384 }
385
386 let data_end_offset = current_offset;
387
388 let dict_offset = if let Some(ref dict) = self.dict {
390 let offset = current_offset;
391 let dict_bytes = dict.as_bytes();
392 self.writer
393 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
394 self.writer.write_all(dict_bytes)?;
395 Some(offset)
396 } else {
397 None
398 };
399
400 write_store_index_and_footer(
402 &mut self.writer,
403 &index,
404 data_end_offset,
405 dict_offset.unwrap_or(0),
406 self.next_doc_id,
407 self.dict.is_some(),
408 )?;
409
410 Ok(self.next_doc_id)
411 }
412}
413
414#[derive(Debug, Clone)]
416pub(crate) struct StoreBlockIndex {
417 pub(crate) first_doc_id: DocId,
418 pub(crate) offset: u64,
419 pub(crate) length: u32,
420 pub(crate) num_docs: u32,
421}
422
423pub struct AsyncStoreReader {
425 data_slice: FileHandle,
427 index: Vec<StoreBlockIndex>,
429 num_docs: u32,
430 dict: Option<CompressionDict>,
432 cache: RwLock<StoreBlockCache>,
434}
435
436struct CachedBlock {
442 data: Vec<u8>,
443 offsets: Vec<u32>,
446}
447
448impl CachedBlock {
449 fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
450 let mut offsets = Vec::with_capacity(num_docs as usize);
451 let mut pos = 0usize;
452 for _ in 0..num_docs {
453 if pos + 4 > data.len() {
454 return Err(io::Error::new(
455 io::ErrorKind::InvalidData,
456 "truncated block while building offset table",
457 ));
458 }
459 offsets.push(pos as u32);
460 let doc_len =
461 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
462 as usize;
463 pos += 4 + doc_len;
464 }
465 Ok(Self { data, offsets })
466 }
467
468 fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
470 let idx = doc_offset_in_block as usize;
471 if idx >= self.offsets.len() {
472 return Err(io::Error::new(
473 io::ErrorKind::InvalidData,
474 "doc offset out of range",
475 ));
476 }
477 let start = self.offsets[idx] as usize;
478 if start + 4 > self.data.len() {
479 return Err(io::Error::new(
480 io::ErrorKind::InvalidData,
481 "truncated doc length",
482 ));
483 }
484 let doc_len = u32::from_le_bytes([
485 self.data[start],
486 self.data[start + 1],
487 self.data[start + 2],
488 self.data[start + 3],
489 ]) as usize;
490 let data_start = start + 4;
491 if data_start + doc_len > self.data.len() {
492 return Err(io::Error::new(
493 io::ErrorKind::InvalidData,
494 "doc data overflow",
495 ));
496 }
497 Ok(&self.data[data_start..data_start + doc_len])
498 }
499}
500
501struct StoreBlockCache {
506 blocks: FxHashMap<DocId, Arc<CachedBlock>>,
507 lru_order: std::collections::VecDeque<DocId>,
508 max_blocks: usize,
509}
510
511impl StoreBlockCache {
512 fn new(max_blocks: usize) -> Self {
513 Self {
514 blocks: FxHashMap::default(),
515 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
516 max_blocks,
517 }
518 }
519
520 fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
522 self.blocks.get(&first_doc_id).map(Arc::clone)
523 }
524
525 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
526 let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
527 self.promote(first_doc_id);
528 Some(block)
529 }
530
531 fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
532 if self.blocks.contains_key(&first_doc_id) {
533 self.promote(first_doc_id);
534 return;
535 }
536 while self.blocks.len() >= self.max_blocks {
537 if let Some(evict) = self.lru_order.pop_front() {
538 self.blocks.remove(&evict);
539 } else {
540 break;
541 }
542 }
543 self.blocks.insert(first_doc_id, block);
544 self.lru_order.push_back(first_doc_id);
545 }
546
547 fn promote(&mut self, key: DocId) {
548 if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
549 self.lru_order.remove(pos);
550 self.lru_order.push_back(key);
551 }
552 }
553}
554
555impl AsyncStoreReader {
556 pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
559 let file_len = file_handle.len();
560 if file_len < 32 {
562 return Err(io::Error::new(
563 io::ErrorKind::InvalidData,
564 "Store too small",
565 ));
566 }
567
568 let footer = file_handle
570 .read_bytes_range(file_len - 32..file_len)
571 .await?;
572 let mut reader = footer.as_slice();
573 let data_end_offset = reader.read_u64::<LittleEndian>()?;
574 let dict_offset = reader.read_u64::<LittleEndian>()?;
575 let num_docs = reader.read_u32::<LittleEndian>()?;
576 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
577 let version = reader.read_u32::<LittleEndian>()?;
578 let magic = reader.read_u32::<LittleEndian>()?;
579
580 if magic != STORE_MAGIC {
581 return Err(io::Error::new(
582 io::ErrorKind::InvalidData,
583 "Invalid store magic",
584 ));
585 }
586 if version != STORE_VERSION {
587 return Err(io::Error::new(
588 io::ErrorKind::InvalidData,
589 format!("Unsupported store version: {}", version),
590 ));
591 }
592
593 let (dict, index_start) = if has_dict && dict_offset > 0 {
595 let dict_start = dict_offset;
596 let dict_len_bytes = file_handle
597 .read_bytes_range(dict_start..dict_start + 4)
598 .await?;
599 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
600 let dict_bytes = file_handle
601 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
602 .await?;
603 let idx_start = dict_start + 4 + dict_len;
604 (
605 Some(CompressionDict::from_owned_bytes(dict_bytes)),
606 idx_start,
607 )
608 } else {
609 (None, data_end_offset)
610 };
611 let index_end = file_len - 32;
612
613 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
614 let mut reader = index_bytes.as_slice();
615
616 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
617 let mut index = Vec::with_capacity(num_blocks);
618
619 for _ in 0..num_blocks {
620 let first_doc_id = reader.read_u32::<LittleEndian>()?;
621 let offset = reader.read_u64::<LittleEndian>()?;
622 let length = reader.read_u32::<LittleEndian>()?;
623 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
624
625 index.push(StoreBlockIndex {
626 first_doc_id,
627 offset,
628 length,
629 num_docs: num_docs_in_block,
630 });
631 }
632
633 let data_slice = file_handle.slice(0..data_end_offset);
635
636 Ok(Self {
637 data_slice,
638 index,
639 num_docs,
640 dict,
641 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
642 })
643 }
644
645 pub fn num_docs(&self) -> u32 {
647 self.num_docs
648 }
649
650 pub fn cached_blocks(&self) -> usize {
652 self.cache.read().blocks.len()
653 }
654
655 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
657 if doc_id >= self.num_docs {
658 return Ok(None);
659 }
660
661 let (entry, block) = self.find_and_load_block(doc_id).await?;
662 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
663 deserialize_document(doc_bytes, schema).map(Some)
664 }
665
666 pub async fn get_fields(
672 &self,
673 doc_id: DocId,
674 schema: &Schema,
675 field_ids: &[u32],
676 ) -> io::Result<Option<Document>> {
677 if doc_id >= self.num_docs {
678 return Ok(None);
679 }
680
681 let (entry, block) = self.find_and_load_block(doc_id).await?;
682 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
683 deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
684 }
685
686 async fn find_and_load_block(
688 &self,
689 doc_id: DocId,
690 ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
691 let block_idx = self
692 .index
693 .binary_search_by(|entry| {
694 if doc_id < entry.first_doc_id {
695 std::cmp::Ordering::Greater
696 } else if doc_id >= entry.first_doc_id + entry.num_docs {
697 std::cmp::Ordering::Less
698 } else {
699 std::cmp::Ordering::Equal
700 }
701 })
702 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
703
704 let entry = &self.index[block_idx];
705 let block = self.load_block(entry).await?;
706 Ok((entry, block))
707 }
708
709 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
710 {
712 let cache = self.cache.read();
713 if let Some(block) = cache.peek(entry.first_doc_id) {
714 return Ok(block);
715 }
716 }
717 {
719 if let Some(block) = self.cache.write().get(entry.first_doc_id) {
720 return Ok(block);
721 }
722 }
723
724 let start = entry.offset;
726 let end = start + entry.length as u64;
727 let compressed = self.data_slice.read_bytes_range(start..end).await?;
728
729 let decompressed = if let Some(ref dict) = self.dict {
731 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
732 } else {
733 crate::compression::decompress(compressed.as_slice())?
734 };
735
736 let cached = CachedBlock::build(decompressed, entry.num_docs)?;
738 let block = Arc::new(cached);
739
740 {
742 let mut cache = self.cache.write();
743 cache.insert(entry.first_doc_id, Arc::clone(&block));
744 }
745
746 Ok(block)
747 }
748}
749
750pub fn deserialize_document_fields(
757 data: &[u8],
758 schema: &Schema,
759 field_ids: &[u32],
760) -> io::Result<Document> {
761 deserialize_document_inner(data, schema, Some(field_ids))
762}
763
764pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
768 deserialize_document_inner(data, schema, None)
769}
770
771fn deserialize_document_inner(
773 data: &[u8],
774 _schema: &Schema,
775 field_filter: Option<&[u32]>,
776) -> io::Result<Document> {
777 use crate::dsl::Field;
778
779 let mut reader = data;
780 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
781 let mut doc = Document::new();
782
783 for _ in 0..num_fields {
784 let field_id = reader.read_u16::<LittleEndian>()?;
785 let type_tag = reader.read_u8()?;
786
787 let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
788
789 match type_tag {
790 0 => {
791 let len = reader.read_u32::<LittleEndian>()? as usize;
793 if wanted {
794 let s = std::str::from_utf8(&reader[..len])
795 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
796 doc.add_text(Field(field_id as u32), s);
797 }
798 reader = &reader[len..];
799 }
800 1 => {
801 let v = reader.read_u64::<LittleEndian>()?;
803 if wanted {
804 doc.add_u64(Field(field_id as u32), v);
805 }
806 }
807 2 => {
808 let v = reader.read_i64::<LittleEndian>()?;
810 if wanted {
811 doc.add_i64(Field(field_id as u32), v);
812 }
813 }
814 3 => {
815 let v = reader.read_f64::<LittleEndian>()?;
817 if wanted {
818 doc.add_f64(Field(field_id as u32), v);
819 }
820 }
821 4 => {
822 let len = reader.read_u32::<LittleEndian>()? as usize;
824 if wanted {
825 doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
826 }
827 reader = &reader[len..];
828 }
829 5 => {
830 let count = reader.read_u32::<LittleEndian>()? as usize;
832 if wanted {
833 let mut entries = Vec::with_capacity(count);
834 for _ in 0..count {
835 let idx = reader.read_u32::<LittleEndian>()?;
836 let val = reader.read_f32::<LittleEndian>()?;
837 entries.push((idx, val));
838 }
839 doc.add_sparse_vector(Field(field_id as u32), entries);
840 } else {
841 let skip = count * 8;
842 if skip > reader.len() {
843 return Err(io::Error::new(
844 io::ErrorKind::UnexpectedEof,
845 "sparse vector skip overflow",
846 ));
847 }
848 reader = &reader[skip..];
849 }
850 }
851 6 => {
852 let count = reader.read_u32::<LittleEndian>()? as usize;
854 let byte_len = count * 4;
855 if byte_len > reader.len() {
856 return Err(io::Error::new(
857 io::ErrorKind::UnexpectedEof,
858 "dense vector truncated",
859 ));
860 }
861 if wanted {
862 let mut values = vec![0.0f32; count];
863 unsafe {
864 std::ptr::copy_nonoverlapping(
865 reader.as_ptr(),
866 values.as_mut_ptr() as *mut u8,
867 byte_len,
868 );
869 }
870 doc.add_dense_vector(Field(field_id as u32), values);
871 }
872 reader = &reader[byte_len..];
873 }
874 7 => {
875 let len = reader.read_u32::<LittleEndian>()? as usize;
877 if len > reader.len() {
878 return Err(io::Error::new(
879 io::ErrorKind::UnexpectedEof,
880 "json field truncated",
881 ));
882 }
883 if wanted {
884 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
885 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
886 doc.add_json(Field(field_id as u32), v);
887 }
888 reader = &reader[len..];
889 }
890 8 => {
891 let len = reader.read_u32::<LittleEndian>()? as usize;
893 if len > reader.len() {
894 return Err(io::Error::new(
895 io::ErrorKind::UnexpectedEof,
896 "binary dense vector truncated",
897 ));
898 }
899 if wanted {
900 doc.add_binary_dense_vector(Field(field_id as u32), reader[..len].to_vec());
901 }
902 reader = &reader[len..];
903 }
904 _ => {
905 return Err(io::Error::new(
906 io::ErrorKind::InvalidData,
907 format!("Unknown field type tag: {}", type_tag),
908 ));
909 }
910 }
911 }
912
913 Ok(doc)
914}
915
916#[derive(Debug, Clone)]
918pub struct RawStoreBlock {
919 pub first_doc_id: DocId,
920 pub num_docs: u32,
921 pub offset: u64,
922 pub length: u32,
923}
924
925pub struct StoreMerger<'a, W: Write> {
936 writer: &'a mut W,
937 index: Vec<StoreBlockIndex>,
938 current_offset: u64,
939 next_doc_id: DocId,
940}
941
942impl<'a, W: Write> StoreMerger<'a, W> {
943 pub fn new(writer: &'a mut W) -> Self {
944 Self {
945 writer,
946 index: Vec::new(),
947 current_offset: 0,
948 next_doc_id: 0,
949 }
950 }
951
952 pub async fn append_store(
957 &mut self,
958 data_slice: &FileHandle,
959 blocks: &[RawStoreBlock],
960 ) -> io::Result<()> {
961 for block in blocks {
962 let start = block.offset;
964 let end = start + block.length as u64;
965 let compressed_data = data_slice.read_bytes_range(start..end).await?;
966
967 self.writer.write_all(compressed_data.as_slice())?;
969
970 self.index.push(StoreBlockIndex {
972 first_doc_id: self.next_doc_id,
973 offset: self.current_offset,
974 length: block.length,
975 num_docs: block.num_docs,
976 });
977
978 self.current_offset += block.length as u64;
979 self.next_doc_id += block.num_docs;
980 }
981
982 Ok(())
983 }
984
985 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
992 let dict = store.dict();
993 let data_slice = store.data_slice();
994 let blocks = store.block_index();
995
996 for block in blocks {
997 let start = block.offset;
998 let end = start + block.length as u64;
999 let compressed = data_slice.read_bytes_range(start..end).await?;
1000
1001 let decompressed = if let Some(d) = dict {
1003 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
1004 } else {
1005 crate::compression::decompress(compressed.as_slice())?
1006 };
1007
1008 let recompressed = crate::compression::compress(
1010 &decompressed,
1011 crate::compression::CompressionLevel::default(),
1012 )?;
1013
1014 self.writer.write_all(&recompressed)?;
1015
1016 self.index.push(StoreBlockIndex {
1017 first_doc_id: self.next_doc_id,
1018 offset: self.current_offset,
1019 length: recompressed.len() as u32,
1020 num_docs: block.num_docs,
1021 });
1022
1023 self.current_offset += recompressed.len() as u64;
1024 self.next_doc_id += block.num_docs;
1025 }
1026
1027 Ok(())
1028 }
1029
1030 pub fn finish(self) -> io::Result<u32> {
1032 let data_end_offset = self.current_offset;
1033
1034 let dict_offset = 0u64;
1036
1037 write_store_index_and_footer(
1039 self.writer,
1040 &self.index,
1041 data_end_offset,
1042 dict_offset,
1043 self.next_doc_id,
1044 false,
1045 )?;
1046
1047 Ok(self.next_doc_id)
1048 }
1049}
1050
1051impl AsyncStoreReader {
1052 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1054 self.index
1055 .iter()
1056 .map(|entry| RawStoreBlock {
1057 first_doc_id: entry.first_doc_id,
1058 num_docs: entry.num_docs,
1059 offset: entry.offset,
1060 length: entry.length,
1061 })
1062 .collect()
1063 }
1064
1065 pub fn data_slice(&self) -> &FileHandle {
1067 &self.data_slice
1068 }
1069
1070 pub fn has_dict(&self) -> bool {
1072 self.dict.is_some()
1073 }
1074
1075 pub fn dict(&self) -> Option<&CompressionDict> {
1077 self.dict.as_ref()
1078 }
1079
1080 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1082 &self.index
1083 }
1084}