1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
38
39fn write_store_index_and_footer(
43 writer: &mut (impl Write + ?Sized),
44 index: &[StoreBlockIndex],
45 data_end_offset: u64,
46 dict_offset: u64,
47 num_docs: u32,
48 has_dict: bool,
49) -> io::Result<()> {
50 writer.write_u32::<LittleEndian>(index.len() as u32)?;
51 for entry in index {
52 writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
53 writer.write_u64::<LittleEndian>(entry.offset)?;
54 writer.write_u32::<LittleEndian>(entry.length)?;
55 writer.write_u32::<LittleEndian>(entry.num_docs)?;
56 }
57 writer.write_u64::<LittleEndian>(data_end_offset)?;
58 writer.write_u64::<LittleEndian>(dict_offset)?;
59 writer.write_u32::<LittleEndian>(num_docs)?;
60 writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
61 writer.write_u32::<LittleEndian>(STORE_VERSION)?;
62 writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
63 Ok(())
64}
65
66pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78 let mut buf = Vec::with_capacity(256);
79 serialize_document_into(doc, schema, &mut buf)?;
80 Ok(buf)
81}
82
83pub fn serialize_document_into(
86 doc: &Document,
87 schema: &Schema,
88 buf: &mut Vec<u8>,
89) -> io::Result<()> {
90 use crate::dsl::FieldValue;
91
92 buf.clear();
93
94 let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96 if matches!(value, FieldValue::DenseVector(_)) {
98 return false;
99 }
100 schema.get_field_entry(*field).is_some_and(|e| e.stored)
101 };
102
103 let stored_count = doc
104 .field_values()
105 .iter()
106 .filter(|(field, value)| is_stored(field, value))
107 .count();
108
109 buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111 for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112 buf.write_u16::<LittleEndian>(field.0 as u16)?;
113 match value {
114 FieldValue::Text(s) => {
115 buf.push(0);
116 let bytes = s.as_bytes();
117 buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118 buf.extend_from_slice(bytes);
119 }
120 FieldValue::U64(v) => {
121 buf.push(1);
122 buf.write_u64::<LittleEndian>(*v)?;
123 }
124 FieldValue::I64(v) => {
125 buf.push(2);
126 buf.write_i64::<LittleEndian>(*v)?;
127 }
128 FieldValue::F64(v) => {
129 buf.push(3);
130 buf.write_f64::<LittleEndian>(*v)?;
131 }
132 FieldValue::Bytes(b) => {
133 buf.push(4);
134 buf.write_u32::<LittleEndian>(b.len() as u32)?;
135 buf.extend_from_slice(b);
136 }
137 FieldValue::SparseVector(entries) => {
138 buf.push(5);
139 buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140 for (idx, val) in entries {
141 buf.write_u32::<LittleEndian>(*idx)?;
142 buf.write_f32::<LittleEndian>(*val)?;
143 }
144 }
145 FieldValue::DenseVector(values) => {
146 buf.push(6);
147 buf.write_u32::<LittleEndian>(values.len() as u32)?;
148 let byte_slice = unsafe {
150 std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151 };
152 buf.extend_from_slice(byte_slice);
153 }
154 FieldValue::Json(v) => {
155 buf.push(7);
156 let json_bytes = serde_json::to_vec(v)
157 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158 buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159 buf.extend_from_slice(&json_bytes);
160 }
161 }
162 }
163
164 Ok(())
165}
166
167#[cfg(feature = "native")]
169struct CompressedBlock {
170 seq: usize,
171 first_doc_id: DocId,
172 num_docs: u32,
173 compressed: Vec<u8>,
174}
175
176#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185 writer: &'a mut dyn Write,
186 block_buffer: Vec<u8>,
187 serialize_buf: Vec<u8>,
189 compressed_blocks: Vec<CompressedBlock>,
191 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
193 next_seq: usize,
194 next_doc_id: DocId,
195 block_first_doc: DocId,
196 dict: Option<Arc<CompressionDict>>,
197 compression_level: CompressionLevel,
198}
199
200#[cfg(feature = "native")]
201impl<'a> EagerParallelStoreWriter<'a> {
202 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
204 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
205 }
206
207 pub fn with_compression_level(
209 writer: &'a mut dyn Write,
210 _num_threads: usize,
211 compression_level: CompressionLevel,
212 ) -> Self {
213 Self {
214 writer,
215 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
216 serialize_buf: Vec::with_capacity(512),
217 compressed_blocks: Vec::new(),
218 pending_handles: Vec::new(),
219 next_seq: 0,
220 next_doc_id: 0,
221 block_first_doc: 0,
222 dict: None,
223 compression_level,
224 }
225 }
226
227 pub fn with_dict(
229 writer: &'a mut dyn Write,
230 dict: CompressionDict,
231 _num_threads: usize,
232 ) -> Self {
233 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
234 }
235
236 pub fn with_dict_and_level(
238 writer: &'a mut dyn Write,
239 dict: CompressionDict,
240 _num_threads: usize,
241 compression_level: CompressionLevel,
242 ) -> Self {
243 Self {
244 writer,
245 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
246 serialize_buf: Vec::with_capacity(512),
247 compressed_blocks: Vec::new(),
248 pending_handles: Vec::new(),
249 next_seq: 0,
250 next_doc_id: 0,
251 block_first_doc: 0,
252 dict: Some(Arc::new(dict)),
253 compression_level,
254 }
255 }
256
257 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
258 serialize_document_into(doc, schema, &mut self.serialize_buf)?;
259 let doc_id = self.next_doc_id;
260 self.next_doc_id += 1;
261 self.block_buffer
262 .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
263 self.block_buffer.extend_from_slice(&self.serialize_buf);
264 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
265 self.spawn_compression();
266 }
267 Ok(doc_id)
268 }
269
270 pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
272 let doc_id = self.next_doc_id;
273 self.next_doc_id += 1;
274
275 self.block_buffer
276 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
277 self.block_buffer.extend_from_slice(doc_bytes);
278
279 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
280 self.spawn_compression();
281 }
282
283 Ok(doc_id)
284 }
285
286 fn spawn_compression(&mut self) {
288 if self.block_buffer.is_empty() {
289 return;
290 }
291
292 let num_docs = self.next_doc_id - self.block_first_doc;
293 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
294 let seq = self.next_seq;
295 let first_doc_id = self.block_first_doc;
296 let dict = self.dict.clone();
297
298 self.next_seq += 1;
299 self.block_first_doc = self.next_doc_id;
300
301 let level = self.compression_level;
303 let handle = std::thread::spawn(move || {
304 let compressed = if let Some(ref d) = dict {
305 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
306 } else {
307 crate::compression::compress(&data, level).expect("compression failed")
308 };
309
310 CompressedBlock {
311 seq,
312 first_doc_id,
313 num_docs,
314 compressed,
315 }
316 });
317
318 self.pending_handles.push(handle);
319 }
320
321 fn collect_completed(&mut self) {
323 let mut remaining = Vec::new();
324 for handle in self.pending_handles.drain(..) {
325 if handle.is_finished() {
326 match handle.join() {
327 Ok(block) => self.compressed_blocks.push(block),
328 Err(payload) => std::panic::resume_unwind(payload),
329 }
330 } else {
331 remaining.push(handle);
332 }
333 }
334 self.pending_handles = remaining;
335 }
336
337 pub fn finish(mut self) -> io::Result<u32> {
338 self.spawn_compression();
340
341 self.collect_completed();
343
344 for handle in self.pending_handles.drain(..) {
346 match handle.join() {
347 Ok(block) => self.compressed_blocks.push(block),
348 Err(payload) => std::panic::resume_unwind(payload),
349 }
350 }
351
352 if self.compressed_blocks.is_empty() {
353 write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
354 return Ok(0);
355 }
356
357 self.compressed_blocks.sort_by_key(|b| b.seq);
359
360 let mut index = Vec::with_capacity(self.compressed_blocks.len());
362 let mut current_offset = 0u64;
363
364 for block in &self.compressed_blocks {
365 index.push(StoreBlockIndex {
366 first_doc_id: block.first_doc_id,
367 offset: current_offset,
368 length: block.compressed.len() as u32,
369 num_docs: block.num_docs,
370 });
371
372 self.writer.write_all(&block.compressed)?;
373 current_offset += block.compressed.len() as u64;
374 }
375
376 let data_end_offset = current_offset;
377
378 let dict_offset = if let Some(ref dict) = self.dict {
380 let offset = current_offset;
381 let dict_bytes = dict.as_bytes();
382 self.writer
383 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
384 self.writer.write_all(dict_bytes)?;
385 Some(offset)
386 } else {
387 None
388 };
389
390 write_store_index_and_footer(
392 &mut self.writer,
393 &index,
394 data_end_offset,
395 dict_offset.unwrap_or(0),
396 self.next_doc_id,
397 self.dict.is_some(),
398 )?;
399
400 Ok(self.next_doc_id)
401 }
402}
403
404#[derive(Debug, Clone)]
406pub(crate) struct StoreBlockIndex {
407 pub(crate) first_doc_id: DocId,
408 pub(crate) offset: u64,
409 pub(crate) length: u32,
410 pub(crate) num_docs: u32,
411}
412
413pub struct AsyncStoreReader {
415 data_slice: FileHandle,
417 index: Vec<StoreBlockIndex>,
419 num_docs: u32,
420 dict: Option<CompressionDict>,
422 cache: RwLock<StoreBlockCache>,
424}
425
426struct CachedBlock {
432 data: Vec<u8>,
433 offsets: Vec<u32>,
436}
437
438impl CachedBlock {
439 fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
440 let mut offsets = Vec::with_capacity(num_docs as usize);
441 let mut pos = 0usize;
442 for _ in 0..num_docs {
443 if pos + 4 > data.len() {
444 return Err(io::Error::new(
445 io::ErrorKind::InvalidData,
446 "truncated block while building offset table",
447 ));
448 }
449 offsets.push(pos as u32);
450 let doc_len =
451 u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
452 as usize;
453 pos += 4 + doc_len;
454 }
455 Ok(Self { data, offsets })
456 }
457
458 fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
460 let idx = doc_offset_in_block as usize;
461 if idx >= self.offsets.len() {
462 return Err(io::Error::new(
463 io::ErrorKind::InvalidData,
464 "doc offset out of range",
465 ));
466 }
467 let start = self.offsets[idx] as usize;
468 if start + 4 > self.data.len() {
469 return Err(io::Error::new(
470 io::ErrorKind::InvalidData,
471 "truncated doc length",
472 ));
473 }
474 let doc_len = u32::from_le_bytes([
475 self.data[start],
476 self.data[start + 1],
477 self.data[start + 2],
478 self.data[start + 3],
479 ]) as usize;
480 let data_start = start + 4;
481 if data_start + doc_len > self.data.len() {
482 return Err(io::Error::new(
483 io::ErrorKind::InvalidData,
484 "doc data overflow",
485 ));
486 }
487 Ok(&self.data[data_start..data_start + doc_len])
488 }
489}
490
491struct StoreBlockCache {
496 blocks: FxHashMap<DocId, Arc<CachedBlock>>,
497 lru_order: std::collections::VecDeque<DocId>,
498 max_blocks: usize,
499}
500
501impl StoreBlockCache {
502 fn new(max_blocks: usize) -> Self {
503 Self {
504 blocks: FxHashMap::default(),
505 lru_order: std::collections::VecDeque::with_capacity(max_blocks),
506 max_blocks,
507 }
508 }
509
510 fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
512 self.blocks.get(&first_doc_id).map(Arc::clone)
513 }
514
515 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
516 let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
517 self.promote(first_doc_id);
518 Some(block)
519 }
520
521 fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
522 if self.blocks.contains_key(&first_doc_id) {
523 self.promote(first_doc_id);
524 return;
525 }
526 while self.blocks.len() >= self.max_blocks {
527 if let Some(evict) = self.lru_order.pop_front() {
528 self.blocks.remove(&evict);
529 } else {
530 break;
531 }
532 }
533 self.blocks.insert(first_doc_id, block);
534 self.lru_order.push_back(first_doc_id);
535 }
536
537 fn promote(&mut self, key: DocId) {
538 if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
539 self.lru_order.remove(pos);
540 self.lru_order.push_back(key);
541 }
542 }
543}
544
545impl AsyncStoreReader {
546 pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
549 let file_len = file_handle.len();
550 if file_len < 32 {
552 return Err(io::Error::new(
553 io::ErrorKind::InvalidData,
554 "Store too small",
555 ));
556 }
557
558 let footer = file_handle
560 .read_bytes_range(file_len - 32..file_len)
561 .await?;
562 let mut reader = footer.as_slice();
563 let data_end_offset = reader.read_u64::<LittleEndian>()?;
564 let dict_offset = reader.read_u64::<LittleEndian>()?;
565 let num_docs = reader.read_u32::<LittleEndian>()?;
566 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
567 let version = reader.read_u32::<LittleEndian>()?;
568 let magic = reader.read_u32::<LittleEndian>()?;
569
570 if magic != STORE_MAGIC {
571 return Err(io::Error::new(
572 io::ErrorKind::InvalidData,
573 "Invalid store magic",
574 ));
575 }
576 if version != STORE_VERSION {
577 return Err(io::Error::new(
578 io::ErrorKind::InvalidData,
579 format!("Unsupported store version: {}", version),
580 ));
581 }
582
583 let (dict, index_start) = if has_dict && dict_offset > 0 {
585 let dict_start = dict_offset;
586 let dict_len_bytes = file_handle
587 .read_bytes_range(dict_start..dict_start + 4)
588 .await?;
589 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
590 let dict_bytes = file_handle
591 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
592 .await?;
593 let idx_start = dict_start + 4 + dict_len;
594 (
595 Some(CompressionDict::from_owned_bytes(dict_bytes)),
596 idx_start,
597 )
598 } else {
599 (None, data_end_offset)
600 };
601 let index_end = file_len - 32;
602
603 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
604 let mut reader = index_bytes.as_slice();
605
606 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
607 let mut index = Vec::with_capacity(num_blocks);
608
609 for _ in 0..num_blocks {
610 let first_doc_id = reader.read_u32::<LittleEndian>()?;
611 let offset = reader.read_u64::<LittleEndian>()?;
612 let length = reader.read_u32::<LittleEndian>()?;
613 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
614
615 index.push(StoreBlockIndex {
616 first_doc_id,
617 offset,
618 length,
619 num_docs: num_docs_in_block,
620 });
621 }
622
623 let data_slice = file_handle.slice(0..data_end_offset);
625
626 Ok(Self {
627 data_slice,
628 index,
629 num_docs,
630 dict,
631 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
632 })
633 }
634
635 pub fn num_docs(&self) -> u32 {
637 self.num_docs
638 }
639
640 pub fn cached_blocks(&self) -> usize {
642 self.cache.read().blocks.len()
643 }
644
645 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
647 if doc_id >= self.num_docs {
648 return Ok(None);
649 }
650
651 let (entry, block) = self.find_and_load_block(doc_id).await?;
652 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
653 deserialize_document(doc_bytes, schema).map(Some)
654 }
655
656 pub async fn get_fields(
662 &self,
663 doc_id: DocId,
664 schema: &Schema,
665 field_ids: &[u32],
666 ) -> io::Result<Option<Document>> {
667 if doc_id >= self.num_docs {
668 return Ok(None);
669 }
670
671 let (entry, block) = self.find_and_load_block(doc_id).await?;
672 let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
673 deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
674 }
675
676 async fn find_and_load_block(
678 &self,
679 doc_id: DocId,
680 ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
681 let block_idx = self
682 .index
683 .binary_search_by(|entry| {
684 if doc_id < entry.first_doc_id {
685 std::cmp::Ordering::Greater
686 } else if doc_id >= entry.first_doc_id + entry.num_docs {
687 std::cmp::Ordering::Less
688 } else {
689 std::cmp::Ordering::Equal
690 }
691 })
692 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
693
694 let entry = &self.index[block_idx];
695 let block = self.load_block(entry).await?;
696 Ok((entry, block))
697 }
698
699 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
700 {
702 let cache = self.cache.read();
703 if let Some(block) = cache.peek(entry.first_doc_id) {
704 return Ok(block);
705 }
706 }
707 {
709 if let Some(block) = self.cache.write().get(entry.first_doc_id) {
710 return Ok(block);
711 }
712 }
713
714 let start = entry.offset;
716 let end = start + entry.length as u64;
717 let compressed = self.data_slice.read_bytes_range(start..end).await?;
718
719 let decompressed = if let Some(ref dict) = self.dict {
721 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
722 } else {
723 crate::compression::decompress(compressed.as_slice())?
724 };
725
726 let cached = CachedBlock::build(decompressed, entry.num_docs)?;
728 let block = Arc::new(cached);
729
730 {
732 let mut cache = self.cache.write();
733 cache.insert(entry.first_doc_id, Arc::clone(&block));
734 }
735
736 Ok(block)
737 }
738}
739
740pub fn deserialize_document_fields(
747 data: &[u8],
748 schema: &Schema,
749 field_ids: &[u32],
750) -> io::Result<Document> {
751 deserialize_document_inner(data, schema, Some(field_ids))
752}
753
754pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
758 deserialize_document_inner(data, schema, None)
759}
760
761fn deserialize_document_inner(
763 data: &[u8],
764 _schema: &Schema,
765 field_filter: Option<&[u32]>,
766) -> io::Result<Document> {
767 use crate::dsl::Field;
768
769 let mut reader = data;
770 let num_fields = reader.read_u16::<LittleEndian>()? as usize;
771 let mut doc = Document::new();
772
773 for _ in 0..num_fields {
774 let field_id = reader.read_u16::<LittleEndian>()?;
775 let type_tag = reader.read_u8()?;
776
777 let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
778
779 match type_tag {
780 0 => {
781 let len = reader.read_u32::<LittleEndian>()? as usize;
783 if wanted {
784 let s = std::str::from_utf8(&reader[..len])
785 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
786 doc.add_text(Field(field_id as u32), s);
787 }
788 reader = &reader[len..];
789 }
790 1 => {
791 let v = reader.read_u64::<LittleEndian>()?;
793 if wanted {
794 doc.add_u64(Field(field_id as u32), v);
795 }
796 }
797 2 => {
798 let v = reader.read_i64::<LittleEndian>()?;
800 if wanted {
801 doc.add_i64(Field(field_id as u32), v);
802 }
803 }
804 3 => {
805 let v = reader.read_f64::<LittleEndian>()?;
807 if wanted {
808 doc.add_f64(Field(field_id as u32), v);
809 }
810 }
811 4 => {
812 let len = reader.read_u32::<LittleEndian>()? as usize;
814 if wanted {
815 doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
816 }
817 reader = &reader[len..];
818 }
819 5 => {
820 let count = reader.read_u32::<LittleEndian>()? as usize;
822 if wanted {
823 let mut entries = Vec::with_capacity(count);
824 for _ in 0..count {
825 let idx = reader.read_u32::<LittleEndian>()?;
826 let val = reader.read_f32::<LittleEndian>()?;
827 entries.push((idx, val));
828 }
829 doc.add_sparse_vector(Field(field_id as u32), entries);
830 } else {
831 let skip = count * 8;
832 if skip > reader.len() {
833 return Err(io::Error::new(
834 io::ErrorKind::UnexpectedEof,
835 "sparse vector skip overflow",
836 ));
837 }
838 reader = &reader[skip..];
839 }
840 }
841 6 => {
842 let count = reader.read_u32::<LittleEndian>()? as usize;
844 let byte_len = count * 4;
845 if byte_len > reader.len() {
846 return Err(io::Error::new(
847 io::ErrorKind::UnexpectedEof,
848 "dense vector truncated",
849 ));
850 }
851 if wanted {
852 let mut values = vec![0.0f32; count];
853 unsafe {
854 std::ptr::copy_nonoverlapping(
855 reader.as_ptr(),
856 values.as_mut_ptr() as *mut u8,
857 byte_len,
858 );
859 }
860 doc.add_dense_vector(Field(field_id as u32), values);
861 }
862 reader = &reader[byte_len..];
863 }
864 7 => {
865 let len = reader.read_u32::<LittleEndian>()? as usize;
867 if len > reader.len() {
868 return Err(io::Error::new(
869 io::ErrorKind::UnexpectedEof,
870 "json field truncated",
871 ));
872 }
873 if wanted {
874 let v: serde_json::Value = serde_json::from_slice(&reader[..len])
875 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
876 doc.add_json(Field(field_id as u32), v);
877 }
878 reader = &reader[len..];
879 }
880 _ => {
881 return Err(io::Error::new(
882 io::ErrorKind::InvalidData,
883 format!("Unknown field type tag: {}", type_tag),
884 ));
885 }
886 }
887 }
888
889 Ok(doc)
890}
891
892#[derive(Debug, Clone)]
894pub struct RawStoreBlock {
895 pub first_doc_id: DocId,
896 pub num_docs: u32,
897 pub offset: u64,
898 pub length: u32,
899}
900
901pub struct StoreMerger<'a, W: Write> {
912 writer: &'a mut W,
913 index: Vec<StoreBlockIndex>,
914 current_offset: u64,
915 next_doc_id: DocId,
916}
917
918impl<'a, W: Write> StoreMerger<'a, W> {
919 pub fn new(writer: &'a mut W) -> Self {
920 Self {
921 writer,
922 index: Vec::new(),
923 current_offset: 0,
924 next_doc_id: 0,
925 }
926 }
927
928 pub async fn append_store(
933 &mut self,
934 data_slice: &FileHandle,
935 blocks: &[RawStoreBlock],
936 ) -> io::Result<()> {
937 for block in blocks {
938 let start = block.offset;
940 let end = start + block.length as u64;
941 let compressed_data = data_slice.read_bytes_range(start..end).await?;
942
943 self.writer.write_all(compressed_data.as_slice())?;
945
946 self.index.push(StoreBlockIndex {
948 first_doc_id: self.next_doc_id,
949 offset: self.current_offset,
950 length: block.length,
951 num_docs: block.num_docs,
952 });
953
954 self.current_offset += block.length as u64;
955 self.next_doc_id += block.num_docs;
956 }
957
958 Ok(())
959 }
960
961 pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
968 let dict = store.dict();
969 let data_slice = store.data_slice();
970 let blocks = store.block_index();
971
972 for block in blocks {
973 let start = block.offset;
974 let end = start + block.length as u64;
975 let compressed = data_slice.read_bytes_range(start..end).await?;
976
977 let decompressed = if let Some(d) = dict {
979 crate::compression::decompress_with_dict(compressed.as_slice(), d)?
980 } else {
981 crate::compression::decompress(compressed.as_slice())?
982 };
983
984 let recompressed = crate::compression::compress(
986 &decompressed,
987 crate::compression::CompressionLevel::default(),
988 )?;
989
990 self.writer.write_all(&recompressed)?;
991
992 self.index.push(StoreBlockIndex {
993 first_doc_id: self.next_doc_id,
994 offset: self.current_offset,
995 length: recompressed.len() as u32,
996 num_docs: block.num_docs,
997 });
998
999 self.current_offset += recompressed.len() as u64;
1000 self.next_doc_id += block.num_docs;
1001 }
1002
1003 Ok(())
1004 }
1005
1006 pub fn finish(self) -> io::Result<u32> {
1008 let data_end_offset = self.current_offset;
1009
1010 let dict_offset = 0u64;
1012
1013 write_store_index_and_footer(
1015 self.writer,
1016 &self.index,
1017 data_end_offset,
1018 dict_offset,
1019 self.next_doc_id,
1020 false,
1021 )?;
1022
1023 Ok(self.next_doc_id)
1024 }
1025}
1026
1027impl AsyncStoreReader {
1028 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1030 self.index
1031 .iter()
1032 .map(|entry| RawStoreBlock {
1033 first_doc_id: entry.first_doc_id,
1034 num_docs: entry.num_docs,
1035 offset: entry.offset,
1036 length: entry.length,
1037 })
1038 .collect()
1039 }
1040
1041 pub fn data_slice(&self) -> &FileHandle {
1043 &self.data_slice
1044 }
1045
1046 pub fn has_dict(&self) -> bool {
1048 self.dict.is_some()
1049 }
1050
1051 pub fn dict(&self) -> Option<&CompressionDict> {
1053 self.dict.as_ref()
1054 }
1055
1056 pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1058 &self.index
1059 }
1060}