1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36pub struct StoreWriter<'a> {
38 writer: &'a mut dyn Write,
39 block_buffer: Vec<u8>,
40 index: Vec<StoreBlockIndex>,
41 current_offset: u64,
42 next_doc_id: DocId,
43 block_first_doc: DocId,
44 dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48 pub fn new(writer: &'a mut dyn Write) -> Self {
50 Self {
51 writer,
52 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53 index: Vec::new(),
54 current_offset: 0,
55 next_doc_id: 0,
56 block_first_doc: 0,
57 dict: None,
58 }
59 }
60
61 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63 Self {
64 writer,
65 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66 index: Vec::new(),
67 current_offset: 0,
68 next_doc_id: 0,
69 block_first_doc: 0,
70 dict: Some(dict),
71 }
72 }
73
74 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75 let doc_id = self.next_doc_id;
76 self.next_doc_id += 1;
77
78 let doc_bytes = serialize_document(doc, schema)?;
79
80 self.block_buffer
81 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82 self.block_buffer.extend_from_slice(&doc_bytes);
83
84 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85 self.flush_block()?;
86 }
87
88 Ok(doc_id)
89 }
90
91 fn flush_block(&mut self) -> io::Result<()> {
92 if self.block_buffer.is_empty() {
93 return Ok(());
94 }
95
96 let num_docs = self.next_doc_id - self.block_first_doc;
97
98 let compressed = if let Some(ref dict) = self.dict {
100 crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101 } else {
102 crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103 };
104
105 self.index.push(StoreBlockIndex {
106 first_doc_id: self.block_first_doc,
107 offset: self.current_offset,
108 length: compressed.len() as u32,
109 num_docs,
110 });
111
112 self.writer.write_all(&compressed)?;
113 self.current_offset += compressed.len() as u64;
114
115 self.block_buffer.clear();
116 self.block_first_doc = self.next_doc_id;
117
118 Ok(())
119 }
120
121 pub fn finish(mut self) -> io::Result<u32> {
122 self.flush_block()?;
123
124 let data_end_offset = self.current_offset;
125
126 let dict_offset = if let Some(ref dict) = self.dict {
128 let offset = self.current_offset;
129 let dict_bytes = dict.as_bytes();
130 self.writer
131 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132 self.writer.write_all(dict_bytes)?;
133 self.current_offset += 4 + dict_bytes.len() as u64;
134 Some(offset)
135 } else {
136 None
137 };
138
139 self.writer
141 .write_u32::<LittleEndian>(self.index.len() as u32)?;
142 for entry in &self.index {
143 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144 self.writer.write_u64::<LittleEndian>(entry.offset)?;
145 self.writer.write_u32::<LittleEndian>(entry.length)?;
146 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147 }
148
149 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151 self.writer
152 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154 self.writer
155 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159 Ok(self.next_doc_id)
160 }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167#[cfg(feature = "native")]
169struct PendingBlock {
170 seq: usize,
171 first_doc_id: DocId,
172 num_docs: u32,
173 data: Vec<u8>,
174}
175
176#[cfg(feature = "native")]
178struct CompressedBlock {
179 seq: usize,
180 first_doc_id: DocId,
181 num_docs: u32,
182 compressed: Vec<u8>,
183}
184
185#[cfg(feature = "native")]
190pub struct ParallelStoreWriter<'a> {
191 writer: &'a mut dyn Write,
192 block_buffer: Vec<u8>,
193 pending_blocks: Vec<PendingBlock>,
194 next_seq: usize,
195 next_doc_id: DocId,
196 block_first_doc: DocId,
197 dict: Option<CompressionDict>,
198 #[allow(dead_code)]
199 num_threads: usize,
200}
201
202#[cfg(feature = "native")]
203impl<'a> ParallelStoreWriter<'a> {
204 pub fn new(writer: &'a mut dyn Write, num_threads: usize) -> Self {
206 Self {
207 writer,
208 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
209 pending_blocks: Vec::new(),
210 next_seq: 0,
211 next_doc_id: 0,
212 block_first_doc: 0,
213 dict: None,
214 num_threads: num_threads.max(1),
215 }
216 }
217
218 pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict, num_threads: usize) -> Self {
220 Self {
221 writer,
222 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
223 pending_blocks: Vec::new(),
224 next_seq: 0,
225 next_doc_id: 0,
226 block_first_doc: 0,
227 dict: Some(dict),
228 num_threads: num_threads.max(1),
229 }
230 }
231
232 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
233 let doc_id = self.next_doc_id;
234 self.next_doc_id += 1;
235
236 let doc_bytes = serialize_document(doc, schema)?;
237
238 self.block_buffer
239 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
240 self.block_buffer.extend_from_slice(&doc_bytes);
241
242 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
243 self.queue_block();
244 }
245
246 Ok(doc_id)
247 }
248
249 fn queue_block(&mut self) {
250 if self.block_buffer.is_empty() {
251 return;
252 }
253
254 let num_docs = self.next_doc_id - self.block_first_doc;
255 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
256
257 self.pending_blocks.push(PendingBlock {
258 seq: self.next_seq,
259 first_doc_id: self.block_first_doc,
260 num_docs,
261 data,
262 });
263
264 self.next_seq += 1;
265 self.block_first_doc = self.next_doc_id;
266 }
267
268 pub fn finish(mut self) -> io::Result<u32> {
269 self.queue_block();
271
272 if self.pending_blocks.is_empty() {
273 let data_end_offset = 0u64;
275 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
277 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
281 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
282 return Ok(0);
283 }
284
285 let dict = self.dict.clone();
287 let compressed_blocks: Vec<CompressedBlock> = {
288 use rayon::prelude::*;
289
290 self.pending_blocks
291 .into_par_iter()
292 .map(|block| {
293 let compressed = if let Some(ref d) = dict {
294 crate::compression::compress_with_dict(&block.data, COMPRESSION_LEVEL, d)
295 .expect("compression failed")
296 } else {
297 crate::compression::compress(&block.data, COMPRESSION_LEVEL)
298 .expect("compression failed")
299 };
300
301 CompressedBlock {
302 seq: block.seq,
303 first_doc_id: block.first_doc_id,
304 num_docs: block.num_docs,
305 compressed,
306 }
307 })
308 .collect()
309 };
310
311 let mut sorted_blocks = compressed_blocks;
313 sorted_blocks.sort_by_key(|b| b.seq);
314
315 let mut index = Vec::with_capacity(sorted_blocks.len());
317 let mut current_offset = 0u64;
318
319 for block in sorted_blocks {
320 index.push(StoreBlockIndex {
321 first_doc_id: block.first_doc_id,
322 offset: current_offset,
323 length: block.compressed.len() as u32,
324 num_docs: block.num_docs,
325 });
326
327 self.writer.write_all(&block.compressed)?;
328 current_offset += block.compressed.len() as u64;
329 }
330
331 let data_end_offset = current_offset;
332
333 let dict_offset = if let Some(ref dict) = self.dict {
335 let offset = current_offset;
336 let dict_bytes = dict.as_bytes();
337 self.writer
338 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
339 self.writer.write_all(dict_bytes)?;
340 let _ = current_offset + 4 + dict_bytes.len() as u64; Some(offset)
342 } else {
343 None
344 };
345
346 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
348 for entry in &index {
349 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
350 self.writer.write_u64::<LittleEndian>(entry.offset)?;
351 self.writer.write_u32::<LittleEndian>(entry.length)?;
352 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
353 }
354
355 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
357 self.writer
358 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
359 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
360 self.writer
361 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
362 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
363 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
364
365 Ok(self.next_doc_id)
366 }
367}
368
369#[derive(Debug, Clone)]
371struct StoreBlockIndex {
372 first_doc_id: DocId,
373 offset: u64,
374 length: u32,
375 num_docs: u32,
376}
377
378pub struct AsyncStoreReader {
380 data_slice: LazyFileSlice,
382 index: Vec<StoreBlockIndex>,
384 num_docs: u32,
385 dict: Option<CompressionDict>,
387 cache: RwLock<StoreBlockCache>,
389}
390
391struct StoreBlockCache {
392 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
393 access_order: Vec<DocId>,
394 max_blocks: usize,
395}
396
397impl StoreBlockCache {
398 fn new(max_blocks: usize) -> Self {
399 Self {
400 blocks: FxHashMap::default(),
401 access_order: Vec::new(),
402 max_blocks,
403 }
404 }
405
406 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
407 if let Some(block) = self.blocks.get(&first_doc_id) {
408 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
409 self.access_order.remove(pos);
410 self.access_order.push(first_doc_id);
411 }
412 Some(Arc::clone(block))
413 } else {
414 None
415 }
416 }
417
418 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
419 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
420 let evict = self.access_order.remove(0);
421 self.blocks.remove(&evict);
422 }
423 self.blocks.insert(first_doc_id, block);
424 self.access_order.push(first_doc_id);
425 }
426}
427
428impl AsyncStoreReader {
429 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
432 let file_len = file_handle.len();
433 if file_len < 32 {
435 return Err(io::Error::new(
436 io::ErrorKind::InvalidData,
437 "Store too small",
438 ));
439 }
440
441 let footer = file_handle
443 .read_bytes_range(file_len - 32..file_len)
444 .await?;
445 let mut reader = footer.as_slice();
446 let data_end_offset = reader.read_u64::<LittleEndian>()?;
447 let dict_offset = reader.read_u64::<LittleEndian>()?;
448 let num_docs = reader.read_u32::<LittleEndian>()?;
449 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
450 let version = reader.read_u32::<LittleEndian>()?;
451 let magic = reader.read_u32::<LittleEndian>()?;
452
453 if magic != STORE_MAGIC {
454 return Err(io::Error::new(
455 io::ErrorKind::InvalidData,
456 "Invalid store magic",
457 ));
458 }
459 if version != STORE_VERSION {
460 return Err(io::Error::new(
461 io::ErrorKind::InvalidData,
462 format!("Unsupported store version: {}", version),
463 ));
464 }
465
466 let dict = if has_dict && dict_offset > 0 {
468 let dict_start = dict_offset as usize;
469 let dict_len_bytes = file_handle
470 .read_bytes_range(dict_start..dict_start + 4)
471 .await?;
472 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
473 let dict_bytes = file_handle
474 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
475 .await?;
476 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
477 } else {
478 None
479 };
480
481 let index_start = if has_dict && dict_offset > 0 {
483 let dict_start = dict_offset as usize;
484 let dict_len_bytes = file_handle
485 .read_bytes_range(dict_start..dict_start + 4)
486 .await?;
487 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
488 dict_start + 4 + dict_len
489 } else {
490 data_end_offset as usize
491 };
492 let index_end = file_len - 32;
493
494 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
495 let mut reader = index_bytes.as_slice();
496
497 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
498 let mut index = Vec::with_capacity(num_blocks);
499
500 for _ in 0..num_blocks {
501 let first_doc_id = reader.read_u32::<LittleEndian>()?;
502 let offset = reader.read_u64::<LittleEndian>()?;
503 let length = reader.read_u32::<LittleEndian>()?;
504 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
505
506 index.push(StoreBlockIndex {
507 first_doc_id,
508 offset,
509 length,
510 num_docs: num_docs_in_block,
511 });
512 }
513
514 let data_slice = file_handle.slice(0..data_end_offset as usize);
516
517 Ok(Self {
518 data_slice,
519 index,
520 num_docs,
521 dict,
522 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
523 })
524 }
525
526 pub fn num_docs(&self) -> u32 {
528 self.num_docs
529 }
530
531 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
533 if doc_id >= self.num_docs {
534 return Ok(None);
535 }
536
537 let block_idx = self
539 .index
540 .binary_search_by(|entry| {
541 if doc_id < entry.first_doc_id {
542 std::cmp::Ordering::Greater
543 } else if doc_id >= entry.first_doc_id + entry.num_docs {
544 std::cmp::Ordering::Less
545 } else {
546 std::cmp::Ordering::Equal
547 }
548 })
549 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
550
551 let entry = &self.index[block_idx];
552 let block_data = self.load_block(entry).await?;
553
554 let doc_offset_in_block = doc_id - entry.first_doc_id;
556 let mut reader = &block_data[..];
557
558 for _ in 0..doc_offset_in_block {
559 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
560 if doc_len > reader.len() {
561 return Err(io::Error::new(
562 io::ErrorKind::InvalidData,
563 "Invalid doc length",
564 ));
565 }
566 reader = &reader[doc_len..];
567 }
568
569 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
570 let doc_bytes = &reader[..doc_len];
571
572 deserialize_document(doc_bytes, schema).map(Some)
573 }
574
575 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
576 {
578 let mut cache = self.cache.write();
579 if let Some(block) = cache.get(entry.first_doc_id) {
580 return Ok(block);
581 }
582 }
583
584 let start = entry.offset as usize;
586 let end = start + entry.length as usize;
587 let compressed = self.data_slice.read_bytes_range(start..end).await?;
588
589 let decompressed = if let Some(ref dict) = self.dict {
591 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
592 } else {
593 crate::compression::decompress(compressed.as_slice())?
594 };
595
596 let block = Arc::new(decompressed);
597
598 {
600 let mut cache = self.cache.write();
601 cache.insert(entry.first_doc_id, Arc::clone(&block));
602 }
603
604 Ok(block)
605 }
606}
607
608fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
609 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
610}
611
612#[derive(Debug, Clone)]
614pub struct RawStoreBlock {
615 pub first_doc_id: DocId,
616 pub num_docs: u32,
617 pub offset: u64,
618 pub length: u32,
619}
620
621pub struct StoreMerger<'a, W: Write> {
632 writer: &'a mut W,
633 index: Vec<StoreBlockIndex>,
634 current_offset: u64,
635 next_doc_id: DocId,
636}
637
638impl<'a, W: Write> StoreMerger<'a, W> {
639 pub fn new(writer: &'a mut W) -> Self {
640 Self {
641 writer,
642 index: Vec::new(),
643 current_offset: 0,
644 next_doc_id: 0,
645 }
646 }
647
648 pub async fn append_store<F: AsyncFileRead>(
653 &mut self,
654 data_slice: &F,
655 blocks: &[RawStoreBlock],
656 ) -> io::Result<()> {
657 for block in blocks {
658 let start = block.offset as usize;
660 let end = start + block.length as usize;
661 let compressed_data = data_slice.read_bytes_range(start..end).await?;
662
663 self.writer.write_all(compressed_data.as_slice())?;
665
666 self.index.push(StoreBlockIndex {
668 first_doc_id: self.next_doc_id,
669 offset: self.current_offset,
670 length: block.length,
671 num_docs: block.num_docs,
672 });
673
674 self.current_offset += block.length as u64;
675 self.next_doc_id += block.num_docs;
676 }
677
678 Ok(())
679 }
680
681 pub fn finish(self) -> io::Result<u32> {
683 let data_end_offset = self.current_offset;
684
685 let dict_offset = 0u64;
687
688 self.writer
690 .write_u32::<LittleEndian>(self.index.len() as u32)?;
691 for entry in &self.index {
692 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
693 self.writer.write_u64::<LittleEndian>(entry.offset)?;
694 self.writer.write_u32::<LittleEndian>(entry.length)?;
695 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
696 }
697
698 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
700 self.writer.write_u64::<LittleEndian>(dict_offset)?;
701 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
702 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
704 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
705
706 Ok(self.next_doc_id)
707 }
708}
709
710impl AsyncStoreReader {
711 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
713 self.index
714 .iter()
715 .map(|entry| RawStoreBlock {
716 first_doc_id: entry.first_doc_id,
717 num_docs: entry.num_docs,
718 offset: entry.offset,
719 length: entry.length,
720 })
721 .collect()
722 }
723
724 pub fn data_slice(&self) -> &LazyFileSlice {
726 &self.data_slice
727 }
728
729 pub fn has_dict(&self) -> bool {
731 self.dict.is_some()
732 }
733}