1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 16 * 1024;
33
34pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
36
37#[cfg(feature = "native")]
39const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
40
41fn write_store_index_and_footer(
45 writer: &mut (impl Write + ?Sized),
46 index: &[StoreBlockIndex],
47 data_end_offset: u64,
48 dict_offset: u64,
49 num_docs: u32,
50 has_dict: bool,
51) -> io::Result<()> {
52 writer.write_u32::<LittleEndian>(index.len() as u32)?;
53 for entry in index {
54 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
55 writer.write_u64::<LittleEndian>(entry.offset)?;
56 writer.write_u32::<LittleEndian>(entry.length)?;
57 writer.write_u32::<LittleEndian>(entry.num_docs)?;
58 }
59 writer.write_u64::<LittleEndian>(data_end_offset)?;
60 writer.write_u64::<LittleEndian>(dict_offset)?;
61 writer.write_u32::<LittleEndian>(num_docs)?;
62 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
63 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
64 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
65 Ok(())
66}
67
68pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
80 let mut buf = Vec::with_capacity(256);
81 serialize_document_into(doc, schema, &mut buf)?;
82 Ok(buf)
83}
84
85pub fn serialize_document_into(
88 doc: &Document,
89 schema: &Schema,
90 buf: &mut Vec<u8>,
91) -> io::Result<()> {
92 use crate::dsl::FieldValue;
93
94 buf.clear();
95
96 let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
98 if matches!(value, FieldValue::DenseVector(_)) {
100 return false;
101 }
102 schema.get_field_entry(*field).is_some_and(|e| e.stored)
103 };
104
105 let stored_count = doc
106 .field_values()
107 .iter()
108 .filter(|(field, value)| is_stored(field, value))
109 .count();
110
111 buf.write_u16::<LittleEndian>(stored_count as u16)?;
112
113 for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
114 buf.write_u16::<LittleEndian>(field.0 as u16)?;
115 match value {
116 FieldValue::Text(s) => {
117 buf.push(0);
118 let bytes = s.as_bytes();
119 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
120 buf.extend_from_slice(bytes);
121 }
122 FieldValue::U64(v) => {
123 buf.push(1);
124 buf.write_u64::<LittleEndian>(*v)?;
125 }
126 FieldValue::I64(v) => {
127 buf.push(2);
128 buf.write_i64::<LittleEndian>(*v)?;
129 }
130 FieldValue::F64(v) => {
131 buf.push(3);
132 buf.write_f64::<LittleEndian>(*v)?;
133 }
134 FieldValue::Bytes(b) => {
135 buf.push(4);
136 buf.write_u32::<LittleEndian>(b.len() as u32)?;
137 buf.extend_from_slice(b);
138 }
139 FieldValue::SparseVector(entries) => {
140 buf.push(5);
141 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
142 for (idx, val) in entries {
143 buf.write_u32::<LittleEndian>(*idx)?;
144 buf.write_f32::<LittleEndian>(*val)?;
145 }
146 }
147 FieldValue::DenseVector(values) => {
148 buf.push(6);
149 buf.write_u32::<LittleEndian>(values.len() as u32)?;
150 let byte_slice = unsafe {
152 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
153 };
154 buf.extend_from_slice(byte_slice);
155 }
156 FieldValue::Json(v) => {
157 buf.push(7);
158 let json_bytes = serde_json::to_vec(v)
159 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
160 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
161 buf.extend_from_slice(&json_bytes);
162 }
163 }
164 }
165
166 Ok(())
167}
168
169#[cfg(feature = "native")]
171struct CompressedBlock {
172 seq: usize,
173 first_doc_id: DocId,
174 num_docs: u32,
175 compressed: Vec<u8>,
176}
177
178#[cfg(feature = "native")]
186pub struct EagerParallelStoreWriter<'a> {
187 writer: &'a mut dyn Write,
188 block_buffer: Vec<u8>,
189 serialize_buf: Vec<u8>,
191 compressed_blocks: Vec<CompressedBlock>,
193 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
195 next_seq: usize,
196 next_doc_id: DocId,
197 block_first_doc: DocId,
198 dict: Option<Arc<CompressionDict>>,
199 compression_level: CompressionLevel,
200}
201
202#[cfg(feature = "native")]
203impl<'a> EagerParallelStoreWriter<'a> {
204 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
206 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
207 }
208
209 pub fn with_compression_level(
211 writer: &'a mut dyn Write,
212 _num_threads: usize,
213 compression_level: CompressionLevel,
214 ) -> Self {
215 Self {
216 writer,
217 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
218 serialize_buf: Vec::with_capacity(512),
219 compressed_blocks: Vec::new(),
220 pending_handles: Vec::new(),
221 next_seq: 0,
222 next_doc_id: 0,
223 block_first_doc: 0,
224 dict: None,
225 compression_level,
226 }
227 }
228
229 pub fn with_dict(
231 writer: &'a mut dyn Write,
232 dict: CompressionDict,
233 _num_threads: usize,
234 ) -> Self {
235 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
236 }
237
238 pub fn with_dict_and_level(
240 writer: &'a mut dyn Write,
241 dict: CompressionDict,
242 _num_threads: usize,
243 compression_level: CompressionLevel,
244 ) -> Self {
245 Self {
246 writer,
247 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
248 serialize_buf: Vec::with_capacity(512),
249 compressed_blocks: Vec::new(),
250 pending_handles: Vec::new(),
251 next_seq: 0,
252 next_doc_id: 0,
253 block_first_doc: 0,
254 dict: Some(Arc::new(dict)),
255 compression_level,
256 }
257 }
258
259 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
260 serialize_document_into(doc, schema, &mut self.serialize_buf)?;
261 let doc_id = self.next_doc_id;
262 self.next_doc_id += 1;
263 self.block_buffer
264 .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
265 self.block_buffer.extend_from_slice(&self.serialize_buf);
266 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
267 self.spawn_compression();
268 }
269 Ok(doc_id)
270 }
271
272 pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
274 let doc_id = self.next_doc_id;
275 self.next_doc_id += 1;
276
277 self.block_buffer
278 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
279 self.block_buffer.extend_from_slice(doc_bytes);
280
281 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
282 self.spawn_compression();
283 }
284
285 Ok(doc_id)
286 }
287
288 fn spawn_compression(&mut self) {
290 if self.block_buffer.is_empty() {
291 return;
292 }
293
294 let num_docs = self.next_doc_id - self.block_first_doc;
295 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
296 let seq = self.next_seq;
297 let first_doc_id = self.block_first_doc;
298 let dict = self.dict.clone();
299
300 self.next_seq += 1;
301 self.block_first_doc = self.next_doc_id;
302
303 let level = self.compression_level;
305 let handle = std::thread::spawn(move || {
306 let compressed = if let Some(ref d) = dict {
307 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
308 } else {
309 crate::compression::compress(&data, level).expect("compression failed")
310 };
311
312 CompressedBlock {
313 seq,
314 first_doc_id,
315 num_docs,
316 compressed,
317 }
318 });
319
320 self.pending_handles.push(handle);
321 }
322
323 fn collect_completed(&mut self) {
325 let mut remaining = Vec::new();
326 for handle in self.pending_handles.drain(..) {
327 if handle.is_finished() {
328 match handle.join() {
329 Ok(block) => self.compressed_blocks.push(block),
330 Err(payload) => std::panic::resume_unwind(payload),
331 }
332 } else {
333 remaining.push(handle);
334 }
335 }
336 self.pending_handles = remaining;
337 }
338
339 pub fn finish(mut self) -> io::Result<u32> {
340 self.spawn_compression();
342
343 self.collect_completed();
345
346 for handle in self.pending_handles.drain(..) {
348 match handle.join() {
349 Ok(block) => self.compressed_blocks.push(block),
350 Err(payload) => std::panic::resume_unwind(payload),
351 }
352 }
353
354 if self.compressed_blocks.is_empty() {
355 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
356 return Ok(0);
357 }
358
359 self.compressed_blocks.sort_by_key(|b| b.seq);
361
362 let mut index = Vec::with_capacity(self.compressed_blocks.len());
364 let mut current_offset = 0u64;
365
366 for block in &self.compressed_blocks {
367 index.push(StoreBlockIndex {
368 first_doc_id: block.first_doc_id,
369 offset: current_offset,
370 length: block.compressed.len() as u32,
371 num_docs: block.num_docs,
372 });
373
374 self.writer.write_all(&block.compressed)?;
375 current_offset += block.compressed.len() as u64;
376 }
377
378 let data_end_offset = current_offset;
379
380 let dict_offset = if let Some(ref dict) = self.dict {
382 let offset = current_offset;
383 let dict_bytes = dict.as_bytes();
384 self.writer
385 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
386 self.writer.write_all(dict_bytes)?;
387 Some(offset)
388 } else {
389 None
390 };
391
392 write_store_index_and_footer(
394 &mut self.writer,
395 &index,
396 data_end_offset,
397 dict_offset.unwrap_or(0),
398 self.next_doc_id,
399 self.dict.is_some(),
400 )?;
401
402 Ok(self.next_doc_id)
403 }
404}
405
406#[derive(Debug, Clone)]
408pub(crate) struct StoreBlockIndex {
409 pub(crate) first_doc_id: DocId,
410 pub(crate) offset: u64,
411 pub(crate) length: u32,
412 pub(crate) num_docs: u32,
413}
414
415pub struct AsyncStoreReader {
417 data_slice: FileHandle,
419 index: Vec<StoreBlockIndex>,
421 num_docs: u32,
422 dict: Option<CompressionDict>,
424 cache: RwLock<StoreBlockCache>,
426}
427
428struct CachedBlock {
434 data: Vec<u8>,
435 offsets: Vec<u32>,
438}
439
440impl CachedBlock {
441 fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
442 let mut offsets = Vec::with_capacity(num_docs as usize);
443 let mut pos = 0usize;
444 for _ in 0..num_docs {
445 if pos + 4 > data.len() {
446 return Err(io::Error::new(
447 io::ErrorKind::InvalidData,
448 "truncated block while building offset table",
449 ));
450 }
451 offsets.push(pos as u32);
452 let doc_len =
453 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
454 as usize;
455 pos += 4 + doc_len;
456 }
457 Ok(Self { data, offsets })
458 }
459
460 fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
462 let idx = doc_offset_in_block as usize;
463 if idx >= self.offsets.len() {
464 return Err(io::Error::new(
465 io::ErrorKind::InvalidData,
466 "doc offset out of range",
467 ));
468 }
469 let start = self.offsets[idx] as usize;
470 if start + 4 > self.data.len() {
471 return Err(io::Error::new(
472 io::ErrorKind::InvalidData,
473 "truncated doc length",
474 ));
475 }
476 let doc_len = u32::from_le_bytes([
477 self.data[start],
478 self.data[start + 1],
479 self.data[start + 2],
480 self.data[start + 3],
481 ]) as usize;
482 let data_start = start + 4;
483 if data_start + doc_len > self.data.len() {
484 return Err(io::Error::new(
485 io::ErrorKind::InvalidData,
486 "doc data overflow",
487 ));
488 }
489 Ok(&self.data[data_start..data_start + doc_len])
490 }
491}
492
493struct StoreBlockCache {
498 blocks: FxHashMap<DocId, Arc<CachedBlock>>,
499 lru_order: std::collections::VecDeque<DocId>,
500 max_blocks: usize,
501}
502
503impl StoreBlockCache {
504 fn new(max_blocks: usize) -> Self {
505 Self {
506 blocks: FxHashMap::default(),
507 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
508 max_blocks,
509 }
510 }
511
512 fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
514 self.blocks.get(&first_doc_id).map(Arc::clone)
515 }
516
517 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
518 let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
519 self.promote(first_doc_id);
520 Some(block)
521 }
522
523 fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
524 if self.blocks.contains_key(&first_doc_id) {
525 self.promote(first_doc_id);
526 return;
527 }
528 while self.blocks.len() >= self.max_blocks {
529 if let Some(evict) = self.lru_order.pop_front() {
530 self.blocks.remove(&evict);
531 } else {
532 break;
533 }
534 }
535 self.blocks.insert(first_doc_id, block);
536 self.lru_order.push_back(first_doc_id);
537 }
538
539 fn promote(&mut self, key: DocId) {
540 if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
541 self.lru_order.remove(pos);
542 self.lru_order.push_back(key);
543 }
544 }
545}
546
547impl AsyncStoreReader {
548 pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
551 let file_len = file_handle.len();
552 if file_len < 32 {
554 return Err(io::Error::new(
555 io::ErrorKind::InvalidData,
556 "Store too small",
557 ));
558 }
559
560 let footer = file_handle
562 .read_bytes_range(file_len - 32..file_len)
563 .await?;
564 let mut reader = footer.as_slice();
565 let data_end_offset = reader.read_u64::<LittleEndian>()?;
566 let dict_offset = reader.read_u64::<LittleEndian>()?;
567 let num_docs = reader.read_u32::<LittleEndian>()?;
568 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
569 let version = reader.read_u32::<LittleEndian>()?;
570 let magic = reader.read_u32::<LittleEndian>()?;
571
572 if magic != STORE_MAGIC {
573 return Err(io::Error::new(
574 io::ErrorKind::InvalidData,
575 "Invalid store magic",
576 ));
577 }
578 if version != STORE_VERSION {
579 return Err(io::Error::new(
580 io::ErrorKind::InvalidData,
581 format!("Unsupported store version: {}", version),
582 ));
583 }
584
585 let (dict, index_start) = if has_dict && dict_offset > 0 {
587 let dict_start = dict_offset;
588 let dict_len_bytes = file_handle
589 .read_bytes_range(dict_start..dict_start + 4)
590 .await?;
591 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
592 let dict_bytes = file_handle
593 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
594 .await?;
595 let idx_start = dict_start + 4 + dict_len;
596 (
597 Some(CompressionDict::from_owned_bytes(dict_bytes)),
598 idx_start,
599 )
600 } else {
601 (None, data_end_offset)
602 };
603 let index_end = file_len - 32;
604
605 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
606 let mut reader = index_bytes.as_slice();
607
608 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
609 let mut index = Vec::with_capacity(num_blocks);
610
611 for _ in 0..num_blocks {
612 let first_doc_id = reader.read_u32::<LittleEndian>()?;
613 let offset = reader.read_u64::<LittleEndian>()?;
614 let length = reader.read_u32::<LittleEndian>()?;
615 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
616
617 index.push(StoreBlockIndex {
618 first_doc_id,
619 offset,
620 length,
621 num_docs: num_docs_in_block,
622 });
623 }
624
625 let data_slice = file_handle.slice(0..data_end_offset);
627
628 Ok(Self {
629 data_slice,
630 index,
631 num_docs,
632 dict,
633 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
634 })
635 }
636
637 pub fn num_docs(&self) -> u32 {
639 self.num_docs
640 }
641
642 pub fn cached_blocks(&self) -> usize {
644 self.cache.read().blocks.len()
645 }
646
647 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
649 if doc_id >= self.num_docs {
650 return Ok(None);
651 }
652
653 let (entry, block) = self.find_and_load_block(doc_id).await?;
654 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
655 deserialize_document(doc_bytes, schema).map(Some)
656 }
657
658 pub async fn get_fields(
664 &self,
665 doc_id: DocId,
666 schema: &Schema,
667 field_ids: &[u32],
668 ) -> io::Result<Option<Document>> {
669 if doc_id >= self.num_docs {
670 return Ok(None);
671 }
672
673 let (entry, block) = self.find_and_load_block(doc_id).await?;
674 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
675 deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
676 }
677
678 async fn find_and_load_block(
680 &self,
681 doc_id: DocId,
682 ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
683 let block_idx = self
684 .index
685 .binary_search_by(|entry| {
686 if doc_id < entry.first_doc_id {
687 std::cmp::Ordering::Greater
688 } else if doc_id >= entry.first_doc_id + entry.num_docs {
689 std::cmp::Ordering::Less
690 } else {
691 std::cmp::Ordering::Equal
692 }
693 })
694 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
695
696 let entry = &self.index[block_idx];
697 let block = self.load_block(entry).await?;
698 Ok((entry, block))
699 }
700
701 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
702 {
704 let cache = self.cache.read();
705 if let Some(block) = cache.peek(entry.first_doc_id) {
706 return Ok(block);
707 }
708 }
709 {
711 if let Some(block) = self.cache.write().get(entry.first_doc_id) {
712 return Ok(block);
713 }
714 }
715
716 let start = entry.offset;
718 let end = start + entry.length as u64;
719 let compressed = self.data_slice.read_bytes_range(start..end).await?;
720
721 let decompressed = if let Some(ref dict) = self.dict {
723 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
724 } else {
725 crate::compression::decompress(compressed.as_slice())?
726 };
727
728 let cached = CachedBlock::build(decompressed, entry.num_docs)?;
730 let block = Arc::new(cached);
731
732 {
734 let mut cache = self.cache.write();
735 cache.insert(entry.first_doc_id, Arc::clone(&block));
736 }
737
738 Ok(block)
739 }
740}
741
742pub fn deserialize_document_fields(
749 data: &[u8],
750 schema: &Schema,
751 field_ids: &[u32],
752) -> io::Result<Document> {
753 deserialize_document_inner(data, schema, Some(field_ids))
754}
755
756pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
760 deserialize_document_inner(data, schema, None)
761}
762
763fn deserialize_document_inner(
765 data: &[u8],
766 _schema: &Schema,
767 field_filter: Option<&[u32]>,
768) -> io::Result<Document> {
769 use crate::dsl::Field;
770
771 let mut reader = data;
772 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
773 let mut doc = Document::new();
774
775 for _ in 0..num_fields {
776 let field_id = reader.read_u16::<LittleEndian>()?;
777 let type_tag = reader.read_u8()?;
778
779 let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
780
781 match type_tag {
782 0 => {
783 let len = reader.read_u32::<LittleEndian>()? as usize;
785 if wanted {
786 let s = std::str::from_utf8(&reader[..len])
787 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
788 doc.add_text(Field(field_id as u32), s);
789 }
790 reader = &reader[len..];
791 }
792 1 => {
793 let v = reader.read_u64::<LittleEndian>()?;
795 if wanted {
796 doc.add_u64(Field(field_id as u32), v);
797 }
798 }
799 2 => {
800 let v = reader.read_i64::<LittleEndian>()?;
802 if wanted {
803 doc.add_i64(Field(field_id as u32), v);
804 }
805 }
806 3 => {
807 let v = reader.read_f64::<LittleEndian>()?;
809 if wanted {
810 doc.add_f64(Field(field_id as u32), v);
811 }
812 }
813 4 => {
814 let len = reader.read_u32::<LittleEndian>()? as usize;
816 if wanted {
817 doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
818 }
819 reader = &reader[len..];
820 }
821 5 => {
822 let count = reader.read_u32::<LittleEndian>()? as usize;
824 if wanted {
825 let mut entries = Vec::with_capacity(count);
826 for _ in 0..count {
827 let idx = reader.read_u32::<LittleEndian>()?;
828 let val = reader.read_f32::<LittleEndian>()?;
829 entries.push((idx, val));
830 }
831 doc.add_sparse_vector(Field(field_id as u32), entries);
832 } else {
833 let skip = count * 8;
834 if skip > reader.len() {
835 return Err(io::Error::new(
836 io::ErrorKind::UnexpectedEof,
837 "sparse vector skip overflow",
838 ));
839 }
840 reader = &reader[skip..];
841 }
842 }
843 6 => {
844 let count = reader.read_u32::<LittleEndian>()? as usize;
846 let byte_len = count * 4;
847 if byte_len > reader.len() {
848 return Err(io::Error::new(
849 io::ErrorKind::UnexpectedEof,
850 "dense vector truncated",
851 ));
852 }
853 if wanted {
854 let mut values = vec![0.0f32; count];
855 unsafe {
856 std::ptr::copy_nonoverlapping(
857 reader.as_ptr(),
858 values.as_mut_ptr() as *mut u8,
859 byte_len,
860 );
861 }
862 doc.add_dense_vector(Field(field_id as u32), values);
863 }
864 reader = &reader[byte_len..];
865 }
866 7 => {
867 let len = reader.read_u32::<LittleEndian>()? as usize;
869 if len > reader.len() {
870 return Err(io::Error::new(
871 io::ErrorKind::UnexpectedEof,
872 "json field truncated",
873 ));
874 }
875 if wanted {
876 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
877 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
878 doc.add_json(Field(field_id as u32), v);
879 }
880 reader = &reader[len..];
881 }
882 _ => {
883 return Err(io::Error::new(
884 io::ErrorKind::InvalidData,
885 format!("Unknown field type tag: {}", type_tag),
886 ));
887 }
888 }
889 }
890
891 Ok(doc)
892}
893
894#[derive(Debug, Clone)]
896pub struct RawStoreBlock {
897 pub first_doc_id: DocId,
898 pub num_docs: u32,
899 pub offset: u64,
900 pub length: u32,
901}
902
903pub struct StoreMerger<'a, W: Write> {
914 writer: &'a mut W,
915 index: Vec<StoreBlockIndex>,
916 current_offset: u64,
917 next_doc_id: DocId,
918}
919
920impl<'a, W: Write> StoreMerger<'a, W> {
921 pub fn new(writer: &'a mut W) -> Self {
922 Self {
923 writer,
924 index: Vec::new(),
925 current_offset: 0,
926 next_doc_id: 0,
927 }
928 }
929
930 pub async fn append_store(
935 &mut self,
936 data_slice: &FileHandle,
937 blocks: &[RawStoreBlock],
938 ) -> io::Result<()> {
939 for block in blocks {
940 let start = block.offset;
942 let end = start + block.length as u64;
943 let compressed_data = data_slice.read_bytes_range(start..end).await?;
944
945 self.writer.write_all(compressed_data.as_slice())?;
947
948 self.index.push(StoreBlockIndex {
950 first_doc_id: self.next_doc_id,
951 offset: self.current_offset,
952 length: block.length,
953 num_docs: block.num_docs,
954 });
955
956 self.current_offset += block.length as u64;
957 self.next_doc_id += block.num_docs;
958 }
959
960 Ok(())
961 }
962
963 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
970 let dict = store.dict();
971 let data_slice = store.data_slice();
972 let blocks = store.block_index();
973
974 for block in blocks {
975 let start = block.offset;
976 let end = start + block.length as u64;
977 let compressed = data_slice.read_bytes_range(start..end).await?;
978
979 let decompressed = if let Some(d) = dict {
981 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
982 } else {
983 crate::compression::decompress(compressed.as_slice())?
984 };
985
986 let recompressed = crate::compression::compress(
988 &decompressed,
989 crate::compression::CompressionLevel::default(),
990 )?;
991
992 self.writer.write_all(&recompressed)?;
993
994 self.index.push(StoreBlockIndex {
995 first_doc_id: self.next_doc_id,
996 offset: self.current_offset,
997 length: recompressed.len() as u32,
998 num_docs: block.num_docs,
999 });
1000
1001 self.current_offset += recompressed.len() as u64;
1002 self.next_doc_id += block.num_docs;
1003 }
1004
1005 Ok(())
1006 }
1007
1008 pub fn finish(self) -> io::Result<u32> {
1010 let data_end_offset = self.current_offset;
1011
1012 let dict_offset = 0u64;
1014
1015 write_store_index_and_footer(
1017 self.writer,
1018 &self.index,
1019 data_end_offset,
1020 dict_offset,
1021 self.next_doc_id,
1022 false,
1023 )?;
1024
1025 Ok(self.next_doc_id)
1026 }
1027}
1028
1029impl AsyncStoreReader {
1030 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1032 self.index
1033 .iter()
1034 .map(|entry| RawStoreBlock {
1035 first_doc_id: entry.first_doc_id,
1036 num_docs: entry.num_docs,
1037 offset: entry.offset,
1038 length: entry.length,
1039 })
1040 .collect()
1041 }
1042
1043 pub fn data_slice(&self) -> &FileHandle {
1045 &self.data_slice
1046 }
1047
1048 pub fn has_dict(&self) -> bool {
1050 self.dict.is_some()
1051 }
1052
1053 pub fn dict(&self) -> Option<&CompressionDict> {
1055 self.dict.as_ref()
1056 }
1057
1058 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1060 &self.index
1061 }
1062}