1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
38
39fn write_store_index_and_footer(
43 writer: &mut (impl Write + ?Sized),
44 index: &[StoreBlockIndex],
45 data_end_offset: u64,
46 dict_offset: u64,
47 num_docs: u32,
48 has_dict: bool,
49) -> io::Result<()> {
50 writer.write_u32::<LittleEndian>(index.len() as u32)?;
51 for entry in index {
52 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
53 writer.write_u64::<LittleEndian>(entry.offset)?;
54 writer.write_u32::<LittleEndian>(entry.length)?;
55 writer.write_u32::<LittleEndian>(entry.num_docs)?;
56 }
57 writer.write_u64::<LittleEndian>(data_end_offset)?;
58 writer.write_u64::<LittleEndian>(dict_offset)?;
59 writer.write_u32::<LittleEndian>(num_docs)?;
60 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
61 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
62 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
63 Ok(())
64}
65
66pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78 let mut buf = Vec::with_capacity(256);
79 serialize_document_into(doc, schema, &mut buf)?;
80 Ok(buf)
81}
82
83pub fn serialize_document_into(
86 doc: &Document,
87 schema: &Schema,
88 buf: &mut Vec<u8>,
89) -> io::Result<()> {
90 use crate::dsl::FieldValue;
91
92 buf.clear();
93
94 let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96 if matches!(value, FieldValue::DenseVector(_)) {
98 return false;
99 }
100 schema.get_field_entry(*field).is_some_and(|e| e.stored)
101 };
102
103 let stored_count = doc
104 .field_values()
105 .iter()
106 .filter(|(field, value)| is_stored(field, value))
107 .count();
108
109 buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111 for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112 buf.write_u16::<LittleEndian>(field.0 as u16)?;
113 match value {
114 FieldValue::Text(s) => {
115 buf.push(0);
116 let bytes = s.as_bytes();
117 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118 buf.extend_from_slice(bytes);
119 }
120 FieldValue::U64(v) => {
121 buf.push(1);
122 buf.write_u64::<LittleEndian>(*v)?;
123 }
124 FieldValue::I64(v) => {
125 buf.push(2);
126 buf.write_i64::<LittleEndian>(*v)?;
127 }
128 FieldValue::F64(v) => {
129 buf.push(3);
130 buf.write_f64::<LittleEndian>(*v)?;
131 }
132 FieldValue::Bytes(b) => {
133 buf.push(4);
134 buf.write_u32::<LittleEndian>(b.len() as u32)?;
135 buf.extend_from_slice(b);
136 }
137 FieldValue::SparseVector(entries) => {
138 buf.push(5);
139 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140 for (idx, val) in entries {
141 buf.write_u32::<LittleEndian>(*idx)?;
142 buf.write_f32::<LittleEndian>(*val)?;
143 }
144 }
145 FieldValue::DenseVector(values) => {
146 buf.push(6);
147 buf.write_u32::<LittleEndian>(values.len() as u32)?;
148 let byte_slice = unsafe {
150 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151 };
152 buf.extend_from_slice(byte_slice);
153 }
154 FieldValue::Json(v) => {
155 buf.push(7);
156 let json_bytes = serde_json::to_vec(v)
157 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159 buf.extend_from_slice(&json_bytes);
160 }
161 }
162 }
163
164 Ok(())
165}
166
167#[cfg(feature = "native")]
169struct CompressedBlock {
170 seq: usize,
171 first_doc_id: DocId,
172 num_docs: u32,
173 compressed: Vec<u8>,
174}
175
176#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185 writer: &'a mut dyn Write,
186 block_buffer: Vec<u8>,
187 serialize_buf: Vec<u8>,
189 compressed_blocks: Vec<CompressedBlock>,
191 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
193 next_seq: usize,
194 next_doc_id: DocId,
195 block_first_doc: DocId,
196 dict: Option<Arc<CompressionDict>>,
197 compression_level: CompressionLevel,
198}
199
200#[cfg(feature = "native")]
201impl<'a> EagerParallelStoreWriter<'a> {
202 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
204 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
205 }
206
207 pub fn with_compression_level(
209 writer: &'a mut dyn Write,
210 _num_threads: usize,
211 compression_level: CompressionLevel,
212 ) -> Self {
213 Self {
214 writer,
215 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
216 serialize_buf: Vec::with_capacity(512),
217 compressed_blocks: Vec::new(),
218 pending_handles: Vec::new(),
219 next_seq: 0,
220 next_doc_id: 0,
221 block_first_doc: 0,
222 dict: None,
223 compression_level,
224 }
225 }
226
227 pub fn with_dict(
229 writer: &'a mut dyn Write,
230 dict: CompressionDict,
231 _num_threads: usize,
232 ) -> Self {
233 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
234 }
235
236 pub fn with_dict_and_level(
238 writer: &'a mut dyn Write,
239 dict: CompressionDict,
240 _num_threads: usize,
241 compression_level: CompressionLevel,
242 ) -> Self {
243 Self {
244 writer,
245 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
246 serialize_buf: Vec::with_capacity(512),
247 compressed_blocks: Vec::new(),
248 pending_handles: Vec::new(),
249 next_seq: 0,
250 next_doc_id: 0,
251 block_first_doc: 0,
252 dict: Some(Arc::new(dict)),
253 compression_level,
254 }
255 }
256
257 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
258 serialize_document_into(doc, schema, &mut self.serialize_buf)?;
259 let doc_id = self.next_doc_id;
260 self.next_doc_id += 1;
261 self.block_buffer
262 .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
263 self.block_buffer.extend_from_slice(&self.serialize_buf);
264 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
265 self.spawn_compression();
266 }
267 Ok(doc_id)
268 }
269
270 pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
272 let doc_id = self.next_doc_id;
273 self.next_doc_id += 1;
274
275 self.block_buffer
276 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
277 self.block_buffer.extend_from_slice(doc_bytes);
278
279 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
280 self.spawn_compression();
281 }
282
283 Ok(doc_id)
284 }
285
286 fn spawn_compression(&mut self) {
288 if self.block_buffer.is_empty() {
289 return;
290 }
291
292 let num_docs = self.next_doc_id - self.block_first_doc;
293 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
294 let seq = self.next_seq;
295 let first_doc_id = self.block_first_doc;
296 let dict = self.dict.clone();
297
298 self.next_seq += 1;
299 self.block_first_doc = self.next_doc_id;
300
301 let level = self.compression_level;
303 let handle = std::thread::spawn(move || {
304 let compressed = if let Some(ref d) = dict {
305 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
306 } else {
307 crate::compression::compress(&data, level).expect("compression failed")
308 };
309
310 CompressedBlock {
311 seq,
312 first_doc_id,
313 num_docs,
314 compressed,
315 }
316 });
317
318 self.pending_handles.push(handle);
319 }
320
321 fn collect_completed(&mut self) {
323 let mut remaining = Vec::new();
324 for handle in self.pending_handles.drain(..) {
325 if handle.is_finished() {
326 if let Ok(block) = handle.join() {
327 self.compressed_blocks.push(block);
328 }
329 } else {
330 remaining.push(handle);
331 }
332 }
333 self.pending_handles = remaining;
334 }
335
336 pub fn finish(mut self) -> io::Result<u32> {
337 self.spawn_compression();
339
340 self.collect_completed();
342
343 for handle in self.pending_handles.drain(..) {
345 if let Ok(block) = handle.join() {
346 self.compressed_blocks.push(block);
347 }
348 }
349
350 if self.compressed_blocks.is_empty() {
351 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
352 return Ok(0);
353 }
354
355 self.compressed_blocks.sort_by_key(|b| b.seq);
357
358 let mut index = Vec::with_capacity(self.compressed_blocks.len());
360 let mut current_offset = 0u64;
361
362 for block in &self.compressed_blocks {
363 index.push(StoreBlockIndex {
364 first_doc_id: block.first_doc_id,
365 offset: current_offset,
366 length: block.compressed.len() as u32,
367 num_docs: block.num_docs,
368 });
369
370 self.writer.write_all(&block.compressed)?;
371 current_offset += block.compressed.len() as u64;
372 }
373
374 let data_end_offset = current_offset;
375
376 let dict_offset = if let Some(ref dict) = self.dict {
378 let offset = current_offset;
379 let dict_bytes = dict.as_bytes();
380 self.writer
381 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
382 self.writer.write_all(dict_bytes)?;
383 Some(offset)
384 } else {
385 None
386 };
387
388 write_store_index_and_footer(
390 &mut self.writer,
391 &index,
392 data_end_offset,
393 dict_offset.unwrap_or(0),
394 self.next_doc_id,
395 self.dict.is_some(),
396 )?;
397
398 Ok(self.next_doc_id)
399 }
400}
401
402#[derive(Debug, Clone)]
404pub(crate) struct StoreBlockIndex {
405 pub(crate) first_doc_id: DocId,
406 pub(crate) offset: u64,
407 pub(crate) length: u32,
408 pub(crate) num_docs: u32,
409}
410
411pub struct AsyncStoreReader {
413 data_slice: LazyFileSlice,
415 index: Vec<StoreBlockIndex>,
417 num_docs: u32,
418 dict: Option<CompressionDict>,
420 cache: RwLock<StoreBlockCache>,
422}
423
424struct CachedBlock {
430 data: Vec<u8>,
431 offsets: Vec<u32>,
434}
435
436impl CachedBlock {
437 fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
438 let mut offsets = Vec::with_capacity(num_docs as usize);
439 let mut pos = 0usize;
440 for _ in 0..num_docs {
441 if pos + 4 > data.len() {
442 return Err(io::Error::new(
443 io::ErrorKind::InvalidData,
444 "truncated block while building offset table",
445 ));
446 }
447 offsets.push(pos as u32);
448 let doc_len =
449 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
450 as usize;
451 pos += 4 + doc_len;
452 }
453 Ok(Self { data, offsets })
454 }
455
456 fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
458 let idx = doc_offset_in_block as usize;
459 if idx >= self.offsets.len() {
460 return Err(io::Error::new(
461 io::ErrorKind::InvalidData,
462 "doc offset out of range",
463 ));
464 }
465 let start = self.offsets[idx] as usize;
466 if start + 4 > self.data.len() {
467 return Err(io::Error::new(
468 io::ErrorKind::InvalidData,
469 "truncated doc length",
470 ));
471 }
472 let doc_len = u32::from_le_bytes([
473 self.data[start],
474 self.data[start + 1],
475 self.data[start + 2],
476 self.data[start + 3],
477 ]) as usize;
478 let data_start = start + 4;
479 if data_start + doc_len > self.data.len() {
480 return Err(io::Error::new(
481 io::ErrorKind::InvalidData,
482 "doc data overflow",
483 ));
484 }
485 Ok(&self.data[data_start..data_start + doc_len])
486 }
487}
488
489struct StoreBlockCache {
494 blocks: FxHashMap<DocId, Arc<CachedBlock>>,
495 lru_order: std::collections::VecDeque<DocId>,
496 max_blocks: usize,
497}
498
499impl StoreBlockCache {
500 fn new(max_blocks: usize) -> Self {
501 Self {
502 blocks: FxHashMap::default(),
503 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
504 max_blocks,
505 }
506 }
507
508 fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
510 self.blocks.get(&first_doc_id).map(Arc::clone)
511 }
512
513 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
514 let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
515 self.promote(first_doc_id);
516 Some(block)
517 }
518
519 fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
520 if self.blocks.contains_key(&first_doc_id) {
521 self.promote(first_doc_id);
522 return;
523 }
524 while self.blocks.len() >= self.max_blocks {
525 if let Some(evict) = self.lru_order.pop_front() {
526 self.blocks.remove(&evict);
527 } else {
528 break;
529 }
530 }
531 self.blocks.insert(first_doc_id, block);
532 self.lru_order.push_back(first_doc_id);
533 }
534
535 fn promote(&mut self, key: DocId) {
536 if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
537 self.lru_order.remove(pos);
538 self.lru_order.push_back(key);
539 }
540 }
541}
542
543impl AsyncStoreReader {
544 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
547 let file_len = file_handle.len();
548 if file_len < 32 {
550 return Err(io::Error::new(
551 io::ErrorKind::InvalidData,
552 "Store too small",
553 ));
554 }
555
556 let footer = file_handle
558 .read_bytes_range(file_len - 32..file_len)
559 .await?;
560 let mut reader = footer.as_slice();
561 let data_end_offset = reader.read_u64::<LittleEndian>()?;
562 let dict_offset = reader.read_u64::<LittleEndian>()?;
563 let num_docs = reader.read_u32::<LittleEndian>()?;
564 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
565 let version = reader.read_u32::<LittleEndian>()?;
566 let magic = reader.read_u32::<LittleEndian>()?;
567
568 if magic != STORE_MAGIC {
569 return Err(io::Error::new(
570 io::ErrorKind::InvalidData,
571 "Invalid store magic",
572 ));
573 }
574 if version != STORE_VERSION {
575 return Err(io::Error::new(
576 io::ErrorKind::InvalidData,
577 format!("Unsupported store version: {}", version),
578 ));
579 }
580
581 let (dict, index_start) = if has_dict && dict_offset > 0 {
583 let dict_start = dict_offset;
584 let dict_len_bytes = file_handle
585 .read_bytes_range(dict_start..dict_start + 4)
586 .await?;
587 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
588 let dict_bytes = file_handle
589 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
590 .await?;
591 let idx_start = dict_start + 4 + dict_len;
592 (
593 Some(CompressionDict::from_owned_bytes(dict_bytes)),
594 idx_start,
595 )
596 } else {
597 (None, data_end_offset)
598 };
599 let index_end = file_len - 32;
600
601 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
602 let mut reader = index_bytes.as_slice();
603
604 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
605 let mut index = Vec::with_capacity(num_blocks);
606
607 for _ in 0..num_blocks {
608 let first_doc_id = reader.read_u32::<LittleEndian>()?;
609 let offset = reader.read_u64::<LittleEndian>()?;
610 let length = reader.read_u32::<LittleEndian>()?;
611 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
612
613 index.push(StoreBlockIndex {
614 first_doc_id,
615 offset,
616 length,
617 num_docs: num_docs_in_block,
618 });
619 }
620
621 let data_slice = file_handle.slice(0..data_end_offset);
623
624 Ok(Self {
625 data_slice,
626 index,
627 num_docs,
628 dict,
629 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
630 })
631 }
632
633 pub fn num_docs(&self) -> u32 {
635 self.num_docs
636 }
637
638 pub fn cached_blocks(&self) -> usize {
640 self.cache.read().blocks.len()
641 }
642
643 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
645 if doc_id >= self.num_docs {
646 return Ok(None);
647 }
648
649 let (entry, block) = self.find_and_load_block(doc_id).await?;
650 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
651 deserialize_document(doc_bytes, schema).map(Some)
652 }
653
654 pub async fn get_fields(
660 &self,
661 doc_id: DocId,
662 schema: &Schema,
663 field_ids: &[u32],
664 ) -> io::Result<Option<Document>> {
665 if doc_id >= self.num_docs {
666 return Ok(None);
667 }
668
669 let (entry, block) = self.find_and_load_block(doc_id).await?;
670 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
671 deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
672 }
673
674 async fn find_and_load_block(
676 &self,
677 doc_id: DocId,
678 ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
679 let block_idx = self
680 .index
681 .binary_search_by(|entry| {
682 if doc_id < entry.first_doc_id {
683 std::cmp::Ordering::Greater
684 } else if doc_id >= entry.first_doc_id + entry.num_docs {
685 std::cmp::Ordering::Less
686 } else {
687 std::cmp::Ordering::Equal
688 }
689 })
690 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
691
692 let entry = &self.index[block_idx];
693 let block = self.load_block(entry).await?;
694 Ok((entry, block))
695 }
696
697 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
698 {
700 let cache = self.cache.read();
701 if let Some(block) = cache.peek(entry.first_doc_id) {
702 return Ok(block);
703 }
704 }
705 {
707 if let Some(block) = self.cache.write().get(entry.first_doc_id) {
708 return Ok(block);
709 }
710 }
711
712 let start = entry.offset;
714 let end = start + entry.length as u64;
715 let compressed = self.data_slice.read_bytes_range(start..end).await?;
716
717 let decompressed = if let Some(ref dict) = self.dict {
719 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
720 } else {
721 crate::compression::decompress(compressed.as_slice())?
722 };
723
724 let cached = CachedBlock::build(decompressed, entry.num_docs)?;
726 let block = Arc::new(cached);
727
728 {
730 let mut cache = self.cache.write();
731 cache.insert(entry.first_doc_id, Arc::clone(&block));
732 }
733
734 Ok(block)
735 }
736}
737
738pub fn deserialize_document_fields(
745 data: &[u8],
746 schema: &Schema,
747 field_ids: &[u32],
748) -> io::Result<Document> {
749 deserialize_document_inner(data, schema, Some(field_ids))
750}
751
752pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
756 deserialize_document_inner(data, schema, None)
757}
758
759fn deserialize_document_inner(
761 data: &[u8],
762 _schema: &Schema,
763 field_filter: Option<&[u32]>,
764) -> io::Result<Document> {
765 use crate::dsl::Field;
766
767 let mut reader = data;
768 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
769 let mut doc = Document::new();
770
771 for _ in 0..num_fields {
772 let field_id = reader.read_u16::<LittleEndian>()?;
773 let type_tag = reader.read_u8()?;
774
775 let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
776
777 match type_tag {
778 0 => {
779 let len = reader.read_u32::<LittleEndian>()? as usize;
781 if wanted {
782 let s = std::str::from_utf8(&reader[..len])
783 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
784 doc.add_text(Field(field_id as u32), s);
785 }
786 reader = &reader[len..];
787 }
788 1 => {
789 let v = reader.read_u64::<LittleEndian>()?;
791 if wanted {
792 doc.add_u64(Field(field_id as u32), v);
793 }
794 }
795 2 => {
796 let v = reader.read_i64::<LittleEndian>()?;
798 if wanted {
799 doc.add_i64(Field(field_id as u32), v);
800 }
801 }
802 3 => {
803 let v = reader.read_f64::<LittleEndian>()?;
805 if wanted {
806 doc.add_f64(Field(field_id as u32), v);
807 }
808 }
809 4 => {
810 let len = reader.read_u32::<LittleEndian>()? as usize;
812 if wanted {
813 doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
814 }
815 reader = &reader[len..];
816 }
817 5 => {
818 let count = reader.read_u32::<LittleEndian>()? as usize;
820 if wanted {
821 let mut entries = Vec::with_capacity(count);
822 for _ in 0..count {
823 let idx = reader.read_u32::<LittleEndian>()?;
824 let val = reader.read_f32::<LittleEndian>()?;
825 entries.push((idx, val));
826 }
827 doc.add_sparse_vector(Field(field_id as u32), entries);
828 } else {
829 let skip = count * 8;
830 if skip > reader.len() {
831 return Err(io::Error::new(
832 io::ErrorKind::UnexpectedEof,
833 "sparse vector skip overflow",
834 ));
835 }
836 reader = &reader[skip..];
837 }
838 }
839 6 => {
840 let count = reader.read_u32::<LittleEndian>()? as usize;
842 let byte_len = count * 4;
843 if byte_len > reader.len() {
844 return Err(io::Error::new(
845 io::ErrorKind::UnexpectedEof,
846 "dense vector truncated",
847 ));
848 }
849 if wanted {
850 let mut values = vec![0.0f32; count];
851 unsafe {
852 std::ptr::copy_nonoverlapping(
853 reader.as_ptr(),
854 values.as_mut_ptr() as *mut u8,
855 byte_len,
856 );
857 }
858 doc.add_dense_vector(Field(field_id as u32), values);
859 }
860 reader = &reader[byte_len..];
861 }
862 7 => {
863 let len = reader.read_u32::<LittleEndian>()? as usize;
865 if len > reader.len() {
866 return Err(io::Error::new(
867 io::ErrorKind::UnexpectedEof,
868 "json field truncated",
869 ));
870 }
871 if wanted {
872 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
873 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
874 doc.add_json(Field(field_id as u32), v);
875 }
876 reader = &reader[len..];
877 }
878 _ => {
879 return Err(io::Error::new(
880 io::ErrorKind::InvalidData,
881 format!("Unknown field type tag: {}", type_tag),
882 ));
883 }
884 }
885 }
886
887 Ok(doc)
888}
889
890#[derive(Debug, Clone)]
892pub struct RawStoreBlock {
893 pub first_doc_id: DocId,
894 pub num_docs: u32,
895 pub offset: u64,
896 pub length: u32,
897}
898
899pub struct StoreMerger<'a, W: Write> {
910 writer: &'a mut W,
911 index: Vec<StoreBlockIndex>,
912 current_offset: u64,
913 next_doc_id: DocId,
914}
915
916impl<'a, W: Write> StoreMerger<'a, W> {
917 pub fn new(writer: &'a mut W) -> Self {
918 Self {
919 writer,
920 index: Vec::new(),
921 current_offset: 0,
922 next_doc_id: 0,
923 }
924 }
925
926 pub async fn append_store<F: AsyncFileRead>(
931 &mut self,
932 data_slice: &F,
933 blocks: &[RawStoreBlock],
934 ) -> io::Result<()> {
935 for block in blocks {
936 let start = block.offset;
938 let end = start + block.length as u64;
939 let compressed_data = data_slice.read_bytes_range(start..end).await?;
940
941 self.writer.write_all(compressed_data.as_slice())?;
943
944 self.index.push(StoreBlockIndex {
946 first_doc_id: self.next_doc_id,
947 offset: self.current_offset,
948 length: block.length,
949 num_docs: block.num_docs,
950 });
951
952 self.current_offset += block.length as u64;
953 self.next_doc_id += block.num_docs;
954 }
955
956 Ok(())
957 }
958
959 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
966 let dict = store.dict();
967 let data_slice = store.data_slice();
968 let blocks = store.block_index();
969
970 for block in blocks {
971 let start = block.offset;
972 let end = start + block.length as u64;
973 let compressed = data_slice.read_bytes_range(start..end).await?;
974
975 let decompressed = if let Some(d) = dict {
977 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
978 } else {
979 crate::compression::decompress(compressed.as_slice())?
980 };
981
982 let recompressed = crate::compression::compress(
984 &decompressed,
985 crate::compression::CompressionLevel::default(),
986 )?;
987
988 self.writer.write_all(&recompressed)?;
989
990 self.index.push(StoreBlockIndex {
991 first_doc_id: self.next_doc_id,
992 offset: self.current_offset,
993 length: recompressed.len() as u32,
994 num_docs: block.num_docs,
995 });
996
997 self.current_offset += recompressed.len() as u64;
998 self.next_doc_id += block.num_docs;
999 }
1000
1001 Ok(())
1002 }
1003
1004 pub fn finish(self) -> io::Result<u32> {
1006 let data_end_offset = self.current_offset;
1007
1008 let dict_offset = 0u64;
1010
1011 write_store_index_and_footer(
1013 self.writer,
1014 &self.index,
1015 data_end_offset,
1016 dict_offset,
1017 self.next_doc_id,
1018 false,
1019 )?;
1020
1021 Ok(self.next_doc_id)
1022 }
1023}
1024
1025impl AsyncStoreReader {
1026 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1028 self.index
1029 .iter()
1030 .map(|entry| RawStoreBlock {
1031 first_doc_id: entry.first_doc_id,
1032 num_docs: entry.num_docs,
1033 offset: entry.offset,
1034 length: entry.length,
1035 })
1036 .collect()
1037 }
1038
1039 pub fn data_slice(&self) -> &LazyFileSlice {
1041 &self.data_slice
1042 }
1043
1044 pub fn has_dict(&self) -> bool {
1046 self.dict.is_some()
1047 }
1048
1049 pub fn dict(&self) -> Option<&CompressionDict> {
1051 self.dict.as_ref()
1052 }
1053
1054 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1056 &self.index
1057 }
1058}