1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36pub struct StoreWriter<'a> {
38 writer: &'a mut dyn Write,
39 block_buffer: Vec<u8>,
40 index: Vec<StoreBlockIndex>,
41 current_offset: u64,
42 next_doc_id: DocId,
43 block_first_doc: DocId,
44 dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48 pub fn new(writer: &'a mut dyn Write) -> Self {
50 Self {
51 writer,
52 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53 index: Vec::new(),
54 current_offset: 0,
55 next_doc_id: 0,
56 block_first_doc: 0,
57 dict: None,
58 }
59 }
60
61 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63 Self {
64 writer,
65 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66 index: Vec::new(),
67 current_offset: 0,
68 next_doc_id: 0,
69 block_first_doc: 0,
70 dict: Some(dict),
71 }
72 }
73
74 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75 let doc_id = self.next_doc_id;
76 self.next_doc_id += 1;
77
78 let doc_bytes = serialize_document(doc, schema)?;
79
80 self.block_buffer
81 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82 self.block_buffer.extend_from_slice(&doc_bytes);
83
84 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85 self.flush_block()?;
86 }
87
88 Ok(doc_id)
89 }
90
91 fn flush_block(&mut self) -> io::Result<()> {
92 if self.block_buffer.is_empty() {
93 return Ok(());
94 }
95
96 let num_docs = self.next_doc_id - self.block_first_doc;
97
98 let compressed = if let Some(ref dict) = self.dict {
100 crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101 } else {
102 crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103 };
104
105 self.index.push(StoreBlockIndex {
106 first_doc_id: self.block_first_doc,
107 offset: self.current_offset,
108 length: compressed.len() as u32,
109 num_docs,
110 });
111
112 self.writer.write_all(&compressed)?;
113 self.current_offset += compressed.len() as u64;
114
115 self.block_buffer.clear();
116 self.block_first_doc = self.next_doc_id;
117
118 Ok(())
119 }
120
121 pub fn finish(mut self) -> io::Result<u32> {
122 self.flush_block()?;
123
124 let data_end_offset = self.current_offset;
125
126 let dict_offset = if let Some(ref dict) = self.dict {
128 let offset = self.current_offset;
129 let dict_bytes = dict.as_bytes();
130 self.writer
131 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132 self.writer.write_all(dict_bytes)?;
133 self.current_offset += 4 + dict_bytes.len() as u64;
134 Some(offset)
135 } else {
136 None
137 };
138
139 self.writer
141 .write_u32::<LittleEndian>(self.index.len() as u32)?;
142 for entry in &self.index {
143 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144 self.writer.write_u64::<LittleEndian>(entry.offset)?;
145 self.writer.write_u32::<LittleEndian>(entry.length)?;
146 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147 }
148
149 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151 self.writer
152 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154 self.writer
155 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159 Ok(self.next_doc_id)
160 }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167#[cfg(feature = "native")]
169struct CompressedBlock {
170 seq: usize,
171 first_doc_id: DocId,
172 num_docs: u32,
173 compressed: Vec<u8>,
174}
175
176#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185 writer: &'a mut dyn Write,
186 block_buffer: Vec<u8>,
187 compressed_blocks: Vec<CompressedBlock>,
189 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
191 next_seq: usize,
192 next_doc_id: DocId,
193 block_first_doc: DocId,
194 dict: Option<Arc<CompressionDict>>,
195}
196
197#[cfg(feature = "native")]
198impl<'a> EagerParallelStoreWriter<'a> {
199 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
201 Self {
202 writer,
203 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
204 compressed_blocks: Vec::new(),
205 pending_handles: Vec::new(),
206 next_seq: 0,
207 next_doc_id: 0,
208 block_first_doc: 0,
209 dict: None,
210 }
211 }
212
213 pub fn with_dict(
215 writer: &'a mut dyn Write,
216 dict: CompressionDict,
217 _num_threads: usize,
218 ) -> Self {
219 Self {
220 writer,
221 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
222 compressed_blocks: Vec::new(),
223 pending_handles: Vec::new(),
224 next_seq: 0,
225 next_doc_id: 0,
226 block_first_doc: 0,
227 dict: Some(Arc::new(dict)),
228 }
229 }
230
231 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
232 let doc_id = self.next_doc_id;
233 self.next_doc_id += 1;
234
235 let doc_bytes = serialize_document(doc, schema)?;
236
237 self.block_buffer
238 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
239 self.block_buffer.extend_from_slice(&doc_bytes);
240
241 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
242 self.spawn_compression();
243 }
244
245 Ok(doc_id)
246 }
247
248 fn spawn_compression(&mut self) {
250 if self.block_buffer.is_empty() {
251 return;
252 }
253
254 let num_docs = self.next_doc_id - self.block_first_doc;
255 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
256 let seq = self.next_seq;
257 let first_doc_id = self.block_first_doc;
258 let dict = self.dict.clone();
259
260 self.next_seq += 1;
261 self.block_first_doc = self.next_doc_id;
262
263 let handle = std::thread::spawn(move || {
265 let compressed = if let Some(ref d) = dict {
266 crate::compression::compress_with_dict(&data, COMPRESSION_LEVEL, d)
267 .expect("compression failed")
268 } else {
269 crate::compression::compress(&data, COMPRESSION_LEVEL).expect("compression failed")
270 };
271
272 CompressedBlock {
273 seq,
274 first_doc_id,
275 num_docs,
276 compressed,
277 }
278 });
279
280 self.pending_handles.push(handle);
281 }
282
283 fn collect_completed(&mut self) {
285 let mut remaining = Vec::new();
286 for handle in self.pending_handles.drain(..) {
287 if handle.is_finished() {
288 if let Ok(block) = handle.join() {
289 self.compressed_blocks.push(block);
290 }
291 } else {
292 remaining.push(handle);
293 }
294 }
295 self.pending_handles = remaining;
296 }
297
298 pub fn finish(mut self) -> io::Result<u32> {
299 self.spawn_compression();
301
302 self.collect_completed();
304
305 for handle in self.pending_handles.drain(..) {
307 if let Ok(block) = handle.join() {
308 self.compressed_blocks.push(block);
309 }
310 }
311
312 if self.compressed_blocks.is_empty() {
313 let data_end_offset = 0u64;
315 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
317 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
321 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
322 return Ok(0);
323 }
324
325 self.compressed_blocks.sort_by_key(|b| b.seq);
327
328 let mut index = Vec::with_capacity(self.compressed_blocks.len());
330 let mut current_offset = 0u64;
331
332 for block in &self.compressed_blocks {
333 index.push(StoreBlockIndex {
334 first_doc_id: block.first_doc_id,
335 offset: current_offset,
336 length: block.compressed.len() as u32,
337 num_docs: block.num_docs,
338 });
339
340 self.writer.write_all(&block.compressed)?;
341 current_offset += block.compressed.len() as u64;
342 }
343
344 let data_end_offset = current_offset;
345
346 let dict_offset = if let Some(ref dict) = self.dict {
348 let offset = current_offset;
349 let dict_bytes = dict.as_bytes();
350 self.writer
351 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
352 self.writer.write_all(dict_bytes)?;
353 Some(offset)
354 } else {
355 None
356 };
357
358 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
360 for entry in &index {
361 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
362 self.writer.write_u64::<LittleEndian>(entry.offset)?;
363 self.writer.write_u32::<LittleEndian>(entry.length)?;
364 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
365 }
366
367 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
369 self.writer
370 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
371 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
372 self.writer
373 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
374 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
375 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
376
377 Ok(self.next_doc_id)
378 }
379}
380
381#[derive(Debug, Clone)]
383struct StoreBlockIndex {
384 first_doc_id: DocId,
385 offset: u64,
386 length: u32,
387 num_docs: u32,
388}
389
390pub struct AsyncStoreReader {
392 data_slice: LazyFileSlice,
394 index: Vec<StoreBlockIndex>,
396 num_docs: u32,
397 dict: Option<CompressionDict>,
399 cache: RwLock<StoreBlockCache>,
401}
402
403struct StoreBlockCache {
404 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
405 access_order: Vec<DocId>,
406 max_blocks: usize,
407}
408
409impl StoreBlockCache {
410 fn new(max_blocks: usize) -> Self {
411 Self {
412 blocks: FxHashMap::default(),
413 access_order: Vec::new(),
414 max_blocks,
415 }
416 }
417
418 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
419 if let Some(block) = self.blocks.get(&first_doc_id) {
420 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
421 self.access_order.remove(pos);
422 self.access_order.push(first_doc_id);
423 }
424 Some(Arc::clone(block))
425 } else {
426 None
427 }
428 }
429
430 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
431 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
432 let evict = self.access_order.remove(0);
433 self.blocks.remove(&evict);
434 }
435 self.blocks.insert(first_doc_id, block);
436 self.access_order.push(first_doc_id);
437 }
438}
439
440impl AsyncStoreReader {
441 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
444 let file_len = file_handle.len();
445 if file_len < 32 {
447 return Err(io::Error::new(
448 io::ErrorKind::InvalidData,
449 "Store too small",
450 ));
451 }
452
453 let footer = file_handle
455 .read_bytes_range(file_len - 32..file_len)
456 .await?;
457 let mut reader = footer.as_slice();
458 let data_end_offset = reader.read_u64::<LittleEndian>()?;
459 let dict_offset = reader.read_u64::<LittleEndian>()?;
460 let num_docs = reader.read_u32::<LittleEndian>()?;
461 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
462 let version = reader.read_u32::<LittleEndian>()?;
463 let magic = reader.read_u32::<LittleEndian>()?;
464
465 if magic != STORE_MAGIC {
466 return Err(io::Error::new(
467 io::ErrorKind::InvalidData,
468 "Invalid store magic",
469 ));
470 }
471 if version != STORE_VERSION {
472 return Err(io::Error::new(
473 io::ErrorKind::InvalidData,
474 format!("Unsupported store version: {}", version),
475 ));
476 }
477
478 let dict = if has_dict && dict_offset > 0 {
480 let dict_start = dict_offset as usize;
481 let dict_len_bytes = file_handle
482 .read_bytes_range(dict_start..dict_start + 4)
483 .await?;
484 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
485 let dict_bytes = file_handle
486 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
487 .await?;
488 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
489 } else {
490 None
491 };
492
493 let index_start = if has_dict && dict_offset > 0 {
495 let dict_start = dict_offset as usize;
496 let dict_len_bytes = file_handle
497 .read_bytes_range(dict_start..dict_start + 4)
498 .await?;
499 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
500 dict_start + 4 + dict_len
501 } else {
502 data_end_offset as usize
503 };
504 let index_end = file_len - 32;
505
506 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
507 let mut reader = index_bytes.as_slice();
508
509 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
510 let mut index = Vec::with_capacity(num_blocks);
511
512 for _ in 0..num_blocks {
513 let first_doc_id = reader.read_u32::<LittleEndian>()?;
514 let offset = reader.read_u64::<LittleEndian>()?;
515 let length = reader.read_u32::<LittleEndian>()?;
516 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
517
518 index.push(StoreBlockIndex {
519 first_doc_id,
520 offset,
521 length,
522 num_docs: num_docs_in_block,
523 });
524 }
525
526 let data_slice = file_handle.slice(0..data_end_offset as usize);
528
529 Ok(Self {
530 data_slice,
531 index,
532 num_docs,
533 dict,
534 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
535 })
536 }
537
538 pub fn num_docs(&self) -> u32 {
540 self.num_docs
541 }
542
543 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
545 if doc_id >= self.num_docs {
546 return Ok(None);
547 }
548
549 let block_idx = self
551 .index
552 .binary_search_by(|entry| {
553 if doc_id < entry.first_doc_id {
554 std::cmp::Ordering::Greater
555 } else if doc_id >= entry.first_doc_id + entry.num_docs {
556 std::cmp::Ordering::Less
557 } else {
558 std::cmp::Ordering::Equal
559 }
560 })
561 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
562
563 let entry = &self.index[block_idx];
564 let block_data = self.load_block(entry).await?;
565
566 let doc_offset_in_block = doc_id - entry.first_doc_id;
568 let mut reader = &block_data[..];
569
570 for _ in 0..doc_offset_in_block {
571 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
572 if doc_len > reader.len() {
573 return Err(io::Error::new(
574 io::ErrorKind::InvalidData,
575 "Invalid doc length",
576 ));
577 }
578 reader = &reader[doc_len..];
579 }
580
581 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
582 let doc_bytes = &reader[..doc_len];
583
584 deserialize_document(doc_bytes, schema).map(Some)
585 }
586
587 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
588 {
590 let mut cache = self.cache.write();
591 if let Some(block) = cache.get(entry.first_doc_id) {
592 return Ok(block);
593 }
594 }
595
596 let start = entry.offset as usize;
598 let end = start + entry.length as usize;
599 let compressed = self.data_slice.read_bytes_range(start..end).await?;
600
601 let decompressed = if let Some(ref dict) = self.dict {
603 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
604 } else {
605 crate::compression::decompress(compressed.as_slice())?
606 };
607
608 let block = Arc::new(decompressed);
609
610 {
612 let mut cache = self.cache.write();
613 cache.insert(entry.first_doc_id, Arc::clone(&block));
614 }
615
616 Ok(block)
617 }
618}
619
620fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
621 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
622}
623
624#[derive(Debug, Clone)]
626pub struct RawStoreBlock {
627 pub first_doc_id: DocId,
628 pub num_docs: u32,
629 pub offset: u64,
630 pub length: u32,
631}
632
633pub struct StoreMerger<'a, W: Write> {
644 writer: &'a mut W,
645 index: Vec<StoreBlockIndex>,
646 current_offset: u64,
647 next_doc_id: DocId,
648}
649
650impl<'a, W: Write> StoreMerger<'a, W> {
651 pub fn new(writer: &'a mut W) -> Self {
652 Self {
653 writer,
654 index: Vec::new(),
655 current_offset: 0,
656 next_doc_id: 0,
657 }
658 }
659
660 pub async fn append_store<F: AsyncFileRead>(
665 &mut self,
666 data_slice: &F,
667 blocks: &[RawStoreBlock],
668 ) -> io::Result<()> {
669 for block in blocks {
670 let start = block.offset as usize;
672 let end = start + block.length as usize;
673 let compressed_data = data_slice.read_bytes_range(start..end).await?;
674
675 self.writer.write_all(compressed_data.as_slice())?;
677
678 self.index.push(StoreBlockIndex {
680 first_doc_id: self.next_doc_id,
681 offset: self.current_offset,
682 length: block.length,
683 num_docs: block.num_docs,
684 });
685
686 self.current_offset += block.length as u64;
687 self.next_doc_id += block.num_docs;
688 }
689
690 Ok(())
691 }
692
693 pub fn finish(self) -> io::Result<u32> {
695 let data_end_offset = self.current_offset;
696
697 let dict_offset = 0u64;
699
700 self.writer
702 .write_u32::<LittleEndian>(self.index.len() as u32)?;
703 for entry in &self.index {
704 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
705 self.writer.write_u64::<LittleEndian>(entry.offset)?;
706 self.writer.write_u32::<LittleEndian>(entry.length)?;
707 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
708 }
709
710 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
712 self.writer.write_u64::<LittleEndian>(dict_offset)?;
713 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
714 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
716 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
717
718 Ok(self.next_doc_id)
719 }
720}
721
722impl AsyncStoreReader {
723 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
725 self.index
726 .iter()
727 .map(|entry| RawStoreBlock {
728 first_doc_id: entry.first_doc_id,
729 num_docs: entry.num_docs,
730 offset: entry.offset,
731 length: entry.length,
732 })
733 .collect()
734 }
735
736 pub fn data_slice(&self) -> &LazyFileSlice {
738 &self.data_slice
739 }
740
741 pub fn has_dict(&self) -> bool {
743 self.dict.is_some()
744 }
745}