1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33#[cfg(feature = "native")]
35const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
36
37pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
38 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
39}
40
41#[cfg(feature = "native")]
43struct CompressedBlock {
44 seq: usize,
45 first_doc_id: DocId,
46 num_docs: u32,
47 compressed: Vec<u8>,
48}
49
50#[cfg(feature = "native")]
58pub struct EagerParallelStoreWriter<'a> {
59 writer: &'a mut dyn Write,
60 block_buffer: Vec<u8>,
61 compressed_blocks: Vec<CompressedBlock>,
63 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
65 next_seq: usize,
66 next_doc_id: DocId,
67 block_first_doc: DocId,
68 dict: Option<Arc<CompressionDict>>,
69 compression_level: CompressionLevel,
70}
71
72#[cfg(feature = "native")]
73impl<'a> EagerParallelStoreWriter<'a> {
74 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
76 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
77 }
78
79 pub fn with_compression_level(
81 writer: &'a mut dyn Write,
82 _num_threads: usize,
83 compression_level: CompressionLevel,
84 ) -> Self {
85 Self {
86 writer,
87 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
88 compressed_blocks: Vec::new(),
89 pending_handles: Vec::new(),
90 next_seq: 0,
91 next_doc_id: 0,
92 block_first_doc: 0,
93 dict: None,
94 compression_level,
95 }
96 }
97
98 pub fn with_dict(
100 writer: &'a mut dyn Write,
101 dict: CompressionDict,
102 _num_threads: usize,
103 ) -> Self {
104 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
105 }
106
107 pub fn with_dict_and_level(
109 writer: &'a mut dyn Write,
110 dict: CompressionDict,
111 _num_threads: usize,
112 compression_level: CompressionLevel,
113 ) -> Self {
114 Self {
115 writer,
116 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
117 compressed_blocks: Vec::new(),
118 pending_handles: Vec::new(),
119 next_seq: 0,
120 next_doc_id: 0,
121 block_first_doc: 0,
122 dict: Some(Arc::new(dict)),
123 compression_level,
124 }
125 }
126
127 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
128 let doc_id = self.next_doc_id;
129 self.next_doc_id += 1;
130
131 let doc_bytes = serialize_document(doc, schema)?;
132
133 self.block_buffer
134 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
135 self.block_buffer.extend_from_slice(&doc_bytes);
136
137 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
138 self.spawn_compression();
139 }
140
141 Ok(doc_id)
142 }
143
144 fn spawn_compression(&mut self) {
146 if self.block_buffer.is_empty() {
147 return;
148 }
149
150 let num_docs = self.next_doc_id - self.block_first_doc;
151 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
152 let seq = self.next_seq;
153 let first_doc_id = self.block_first_doc;
154 let dict = self.dict.clone();
155
156 self.next_seq += 1;
157 self.block_first_doc = self.next_doc_id;
158
159 let level = self.compression_level;
161 let handle = std::thread::spawn(move || {
162 let compressed = if let Some(ref d) = dict {
163 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
164 } else {
165 crate::compression::compress(&data, level).expect("compression failed")
166 };
167
168 CompressedBlock {
169 seq,
170 first_doc_id,
171 num_docs,
172 compressed,
173 }
174 });
175
176 self.pending_handles.push(handle);
177 }
178
179 fn collect_completed(&mut self) {
181 let mut remaining = Vec::new();
182 for handle in self.pending_handles.drain(..) {
183 if handle.is_finished() {
184 if let Ok(block) = handle.join() {
185 self.compressed_blocks.push(block);
186 }
187 } else {
188 remaining.push(handle);
189 }
190 }
191 self.pending_handles = remaining;
192 }
193
194 pub fn finish(mut self) -> io::Result<u32> {
195 self.spawn_compression();
197
198 self.collect_completed();
200
201 for handle in self.pending_handles.drain(..) {
203 if let Ok(block) = handle.join() {
204 self.compressed_blocks.push(block);
205 }
206 }
207
208 if self.compressed_blocks.is_empty() {
209 let data_end_offset = 0u64;
211 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
213 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
217 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
218 return Ok(0);
219 }
220
221 self.compressed_blocks.sort_by_key(|b| b.seq);
223
224 let mut index = Vec::with_capacity(self.compressed_blocks.len());
226 let mut current_offset = 0u64;
227
228 for block in &self.compressed_blocks {
229 index.push(StoreBlockIndex {
230 first_doc_id: block.first_doc_id,
231 offset: current_offset,
232 length: block.compressed.len() as u32,
233 num_docs: block.num_docs,
234 });
235
236 self.writer.write_all(&block.compressed)?;
237 current_offset += block.compressed.len() as u64;
238 }
239
240 let data_end_offset = current_offset;
241
242 let dict_offset = if let Some(ref dict) = self.dict {
244 let offset = current_offset;
245 let dict_bytes = dict.as_bytes();
246 self.writer
247 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
248 self.writer.write_all(dict_bytes)?;
249 Some(offset)
250 } else {
251 None
252 };
253
254 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
256 for entry in &index {
257 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
258 self.writer.write_u64::<LittleEndian>(entry.offset)?;
259 self.writer.write_u32::<LittleEndian>(entry.length)?;
260 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
261 }
262
263 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
265 self.writer
266 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
267 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
268 self.writer
269 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
270 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
271 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
272
273 Ok(self.next_doc_id)
274 }
275}
276
277#[derive(Debug, Clone)]
279struct StoreBlockIndex {
280 first_doc_id: DocId,
281 offset: u64,
282 length: u32,
283 num_docs: u32,
284}
285
286pub struct AsyncStoreReader {
288 data_slice: LazyFileSlice,
290 index: Vec<StoreBlockIndex>,
292 num_docs: u32,
293 dict: Option<CompressionDict>,
295 cache: RwLock<StoreBlockCache>,
297}
298
299struct StoreBlockCache {
300 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
301 access_order: Vec<DocId>,
302 max_blocks: usize,
303}
304
305impl StoreBlockCache {
306 fn new(max_blocks: usize) -> Self {
307 Self {
308 blocks: FxHashMap::default(),
309 access_order: Vec::new(),
310 max_blocks,
311 }
312 }
313
314 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
315 if let Some(block) = self.blocks.get(&first_doc_id) {
316 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
317 self.access_order.remove(pos);
318 self.access_order.push(first_doc_id);
319 }
320 Some(Arc::clone(block))
321 } else {
322 None
323 }
324 }
325
326 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
327 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
328 let evict = self.access_order.remove(0);
329 self.blocks.remove(&evict);
330 }
331 self.blocks.insert(first_doc_id, block);
332 self.access_order.push(first_doc_id);
333 }
334}
335
336impl AsyncStoreReader {
337 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
340 let file_len = file_handle.len();
341 if file_len < 32 {
343 return Err(io::Error::new(
344 io::ErrorKind::InvalidData,
345 "Store too small",
346 ));
347 }
348
349 let footer = file_handle
351 .read_bytes_range(file_len - 32..file_len)
352 .await?;
353 let mut reader = footer.as_slice();
354 let data_end_offset = reader.read_u64::<LittleEndian>()?;
355 let dict_offset = reader.read_u64::<LittleEndian>()?;
356 let num_docs = reader.read_u32::<LittleEndian>()?;
357 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
358 let version = reader.read_u32::<LittleEndian>()?;
359 let magic = reader.read_u32::<LittleEndian>()?;
360
361 if magic != STORE_MAGIC {
362 return Err(io::Error::new(
363 io::ErrorKind::InvalidData,
364 "Invalid store magic",
365 ));
366 }
367 if version != STORE_VERSION {
368 return Err(io::Error::new(
369 io::ErrorKind::InvalidData,
370 format!("Unsupported store version: {}", version),
371 ));
372 }
373
374 let dict = if has_dict && dict_offset > 0 {
376 let dict_start = dict_offset as usize;
377 let dict_len_bytes = file_handle
378 .read_bytes_range(dict_start..dict_start + 4)
379 .await?;
380 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
381 let dict_bytes = file_handle
382 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
383 .await?;
384 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
385 } else {
386 None
387 };
388
389 let index_start = if has_dict && dict_offset > 0 {
391 let dict_start = dict_offset as usize;
392 let dict_len_bytes = file_handle
393 .read_bytes_range(dict_start..dict_start + 4)
394 .await?;
395 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
396 dict_start + 4 + dict_len
397 } else {
398 data_end_offset as usize
399 };
400 let index_end = file_len - 32;
401
402 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
403 let mut reader = index_bytes.as_slice();
404
405 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
406 let mut index = Vec::with_capacity(num_blocks);
407
408 for _ in 0..num_blocks {
409 let first_doc_id = reader.read_u32::<LittleEndian>()?;
410 let offset = reader.read_u64::<LittleEndian>()?;
411 let length = reader.read_u32::<LittleEndian>()?;
412 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
413
414 index.push(StoreBlockIndex {
415 first_doc_id,
416 offset,
417 length,
418 num_docs: num_docs_in_block,
419 });
420 }
421
422 let data_slice = file_handle.slice(0..data_end_offset as usize);
424
425 Ok(Self {
426 data_slice,
427 index,
428 num_docs,
429 dict,
430 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
431 })
432 }
433
434 pub fn num_docs(&self) -> u32 {
436 self.num_docs
437 }
438
439 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
441 if doc_id >= self.num_docs {
442 return Ok(None);
443 }
444
445 let block_idx = self
447 .index
448 .binary_search_by(|entry| {
449 if doc_id < entry.first_doc_id {
450 std::cmp::Ordering::Greater
451 } else if doc_id >= entry.first_doc_id + entry.num_docs {
452 std::cmp::Ordering::Less
453 } else {
454 std::cmp::Ordering::Equal
455 }
456 })
457 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
458
459 let entry = &self.index[block_idx];
460 let block_data = self.load_block(entry).await?;
461
462 let doc_offset_in_block = doc_id - entry.first_doc_id;
464 let mut reader = &block_data[..];
465
466 for _ in 0..doc_offset_in_block {
467 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
468 if doc_len > reader.len() {
469 return Err(io::Error::new(
470 io::ErrorKind::InvalidData,
471 "Invalid doc length",
472 ));
473 }
474 reader = &reader[doc_len..];
475 }
476
477 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
478 let doc_bytes = &reader[..doc_len];
479
480 deserialize_document(doc_bytes, schema).map(Some)
481 }
482
483 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
484 {
486 let mut cache = self.cache.write();
487 if let Some(block) = cache.get(entry.first_doc_id) {
488 return Ok(block);
489 }
490 }
491
492 let start = entry.offset as usize;
494 let end = start + entry.length as usize;
495 let compressed = self.data_slice.read_bytes_range(start..end).await?;
496
497 let decompressed = if let Some(ref dict) = self.dict {
499 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
500 } else {
501 crate::compression::decompress(compressed.as_slice())?
502 };
503
504 let block = Arc::new(decompressed);
505
506 {
508 let mut cache = self.cache.write();
509 cache.insert(entry.first_doc_id, Arc::clone(&block));
510 }
511
512 Ok(block)
513 }
514}
515
516pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
517 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
518}
519
520#[derive(Debug, Clone)]
522pub struct RawStoreBlock {
523 pub first_doc_id: DocId,
524 pub num_docs: u32,
525 pub offset: u64,
526 pub length: u32,
527}
528
529pub struct StoreMerger<'a, W: Write> {
540 writer: &'a mut W,
541 index: Vec<StoreBlockIndex>,
542 current_offset: u64,
543 next_doc_id: DocId,
544}
545
546impl<'a, W: Write> StoreMerger<'a, W> {
547 pub fn new(writer: &'a mut W) -> Self {
548 Self {
549 writer,
550 index: Vec::new(),
551 current_offset: 0,
552 next_doc_id: 0,
553 }
554 }
555
556 pub async fn append_store<F: AsyncFileRead>(
561 &mut self,
562 data_slice: &F,
563 blocks: &[RawStoreBlock],
564 ) -> io::Result<()> {
565 for block in blocks {
566 let start = block.offset as usize;
568 let end = start + block.length as usize;
569 let compressed_data = data_slice.read_bytes_range(start..end).await?;
570
571 self.writer.write_all(compressed_data.as_slice())?;
573
574 self.index.push(StoreBlockIndex {
576 first_doc_id: self.next_doc_id,
577 offset: self.current_offset,
578 length: block.length,
579 num_docs: block.num_docs,
580 });
581
582 self.current_offset += block.length as u64;
583 self.next_doc_id += block.num_docs;
584 }
585
586 Ok(())
587 }
588
589 pub fn finish(self) -> io::Result<u32> {
591 let data_end_offset = self.current_offset;
592
593 let dict_offset = 0u64;
595
596 self.writer
598 .write_u32::<LittleEndian>(self.index.len() as u32)?;
599 for entry in &self.index {
600 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
601 self.writer.write_u64::<LittleEndian>(entry.offset)?;
602 self.writer.write_u32::<LittleEndian>(entry.length)?;
603 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
604 }
605
606 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
608 self.writer.write_u64::<LittleEndian>(dict_offset)?;
609 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
610 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
612 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
613
614 Ok(self.next_doc_id)
615 }
616}
617
618impl AsyncStoreReader {
619 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
621 self.index
622 .iter()
623 .map(|entry| RawStoreBlock {
624 first_doc_id: entry.first_doc_id,
625 num_docs: entry.num_docs,
626 offset: entry.offset,
627 length: entry.length,
628 })
629 .collect()
630 }
631
632 pub fn data_slice(&self) -> &LazyFileSlice {
634 &self.data_slice
635 }
636
637 pub fn has_dict(&self) -> bool {
639 self.dict.is_some()
640 }
641}