1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
35
36pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
37 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
38}
39
40#[cfg(feature = "native")]
42struct CompressedBlock {
43 seq: usize,
44 first_doc_id: DocId,
45 num_docs: u32,
46 compressed: Vec<u8>,
47}
48
49#[cfg(feature = "native")]
57pub struct EagerParallelStoreWriter<'a> {
58 writer: &'a mut dyn Write,
59 block_buffer: Vec<u8>,
60 compressed_blocks: Vec<CompressedBlock>,
62 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
64 next_seq: usize,
65 next_doc_id: DocId,
66 block_first_doc: DocId,
67 dict: Option<Arc<CompressionDict>>,
68 compression_level: CompressionLevel,
69}
70
71#[cfg(feature = "native")]
72impl<'a> EagerParallelStoreWriter<'a> {
73 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
75 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
76 }
77
78 pub fn with_compression_level(
80 writer: &'a mut dyn Write,
81 _num_threads: usize,
82 compression_level: CompressionLevel,
83 ) -> Self {
84 Self {
85 writer,
86 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
87 compressed_blocks: Vec::new(),
88 pending_handles: Vec::new(),
89 next_seq: 0,
90 next_doc_id: 0,
91 block_first_doc: 0,
92 dict: None,
93 compression_level,
94 }
95 }
96
97 pub fn with_dict(
99 writer: &'a mut dyn Write,
100 dict: CompressionDict,
101 _num_threads: usize,
102 ) -> Self {
103 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
104 }
105
106 pub fn with_dict_and_level(
108 writer: &'a mut dyn Write,
109 dict: CompressionDict,
110 _num_threads: usize,
111 compression_level: CompressionLevel,
112 ) -> Self {
113 Self {
114 writer,
115 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
116 compressed_blocks: Vec::new(),
117 pending_handles: Vec::new(),
118 next_seq: 0,
119 next_doc_id: 0,
120 block_first_doc: 0,
121 dict: Some(Arc::new(dict)),
122 compression_level,
123 }
124 }
125
126 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
127 let doc_id = self.next_doc_id;
128 self.next_doc_id += 1;
129
130 let doc_bytes = serialize_document(doc, schema)?;
131
132 self.block_buffer
133 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
134 self.block_buffer.extend_from_slice(&doc_bytes);
135
136 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
137 self.spawn_compression();
138 }
139
140 Ok(doc_id)
141 }
142
143 fn spawn_compression(&mut self) {
145 if self.block_buffer.is_empty() {
146 return;
147 }
148
149 let num_docs = self.next_doc_id - self.block_first_doc;
150 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
151 let seq = self.next_seq;
152 let first_doc_id = self.block_first_doc;
153 let dict = self.dict.clone();
154
155 self.next_seq += 1;
156 self.block_first_doc = self.next_doc_id;
157
158 let level = self.compression_level;
160 let handle = std::thread::spawn(move || {
161 let compressed = if let Some(ref d) = dict {
162 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
163 } else {
164 crate::compression::compress(&data, level).expect("compression failed")
165 };
166
167 CompressedBlock {
168 seq,
169 first_doc_id,
170 num_docs,
171 compressed,
172 }
173 });
174
175 self.pending_handles.push(handle);
176 }
177
178 fn collect_completed(&mut self) {
180 let mut remaining = Vec::new();
181 for handle in self.pending_handles.drain(..) {
182 if handle.is_finished() {
183 if let Ok(block) = handle.join() {
184 self.compressed_blocks.push(block);
185 }
186 } else {
187 remaining.push(handle);
188 }
189 }
190 self.pending_handles = remaining;
191 }
192
193 pub fn finish(mut self) -> io::Result<u32> {
194 self.spawn_compression();
196
197 self.collect_completed();
199
200 for handle in self.pending_handles.drain(..) {
202 if let Ok(block) = handle.join() {
203 self.compressed_blocks.push(block);
204 }
205 }
206
207 if self.compressed_blocks.is_empty() {
208 let data_end_offset = 0u64;
210 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
212 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
216 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
217 return Ok(0);
218 }
219
220 self.compressed_blocks.sort_by_key(|b| b.seq);
222
223 let mut index = Vec::with_capacity(self.compressed_blocks.len());
225 let mut current_offset = 0u64;
226
227 for block in &self.compressed_blocks {
228 index.push(StoreBlockIndex {
229 first_doc_id: block.first_doc_id,
230 offset: current_offset,
231 length: block.compressed.len() as u32,
232 num_docs: block.num_docs,
233 });
234
235 self.writer.write_all(&block.compressed)?;
236 current_offset += block.compressed.len() as u64;
237 }
238
239 let data_end_offset = current_offset;
240
241 let dict_offset = if let Some(ref dict) = self.dict {
243 let offset = current_offset;
244 let dict_bytes = dict.as_bytes();
245 self.writer
246 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
247 self.writer.write_all(dict_bytes)?;
248 Some(offset)
249 } else {
250 None
251 };
252
253 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
255 for entry in &index {
256 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
257 self.writer.write_u64::<LittleEndian>(entry.offset)?;
258 self.writer.write_u32::<LittleEndian>(entry.length)?;
259 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
260 }
261
262 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
264 self.writer
265 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
266 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
267 self.writer
268 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
269 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
270 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
271
272 Ok(self.next_doc_id)
273 }
274}
275
276#[derive(Debug, Clone)]
278struct StoreBlockIndex {
279 first_doc_id: DocId,
280 offset: u64,
281 length: u32,
282 num_docs: u32,
283}
284
285pub struct AsyncStoreReader {
287 data_slice: LazyFileSlice,
289 index: Vec<StoreBlockIndex>,
291 num_docs: u32,
292 dict: Option<CompressionDict>,
294 cache: RwLock<StoreBlockCache>,
296}
297
298struct StoreBlockCache {
299 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
300 access_order: Vec<DocId>,
301 max_blocks: usize,
302}
303
304impl StoreBlockCache {
305 fn new(max_blocks: usize) -> Self {
306 Self {
307 blocks: FxHashMap::default(),
308 access_order: Vec::new(),
309 max_blocks,
310 }
311 }
312
313 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
314 if let Some(block) = self.blocks.get(&first_doc_id) {
315 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
316 self.access_order.remove(pos);
317 self.access_order.push(first_doc_id);
318 }
319 Some(Arc::clone(block))
320 } else {
321 None
322 }
323 }
324
325 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
326 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
327 let evict = self.access_order.remove(0);
328 self.blocks.remove(&evict);
329 }
330 self.blocks.insert(first_doc_id, block);
331 self.access_order.push(first_doc_id);
332 }
333}
334
335impl AsyncStoreReader {
336 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
339 let file_len = file_handle.len();
340 if file_len < 32 {
342 return Err(io::Error::new(
343 io::ErrorKind::InvalidData,
344 "Store too small",
345 ));
346 }
347
348 let footer = file_handle
350 .read_bytes_range(file_len - 32..file_len)
351 .await?;
352 let mut reader = footer.as_slice();
353 let data_end_offset = reader.read_u64::<LittleEndian>()?;
354 let dict_offset = reader.read_u64::<LittleEndian>()?;
355 let num_docs = reader.read_u32::<LittleEndian>()?;
356 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
357 let version = reader.read_u32::<LittleEndian>()?;
358 let magic = reader.read_u32::<LittleEndian>()?;
359
360 if magic != STORE_MAGIC {
361 return Err(io::Error::new(
362 io::ErrorKind::InvalidData,
363 "Invalid store magic",
364 ));
365 }
366 if version != STORE_VERSION {
367 return Err(io::Error::new(
368 io::ErrorKind::InvalidData,
369 format!("Unsupported store version: {}", version),
370 ));
371 }
372
373 let dict = if has_dict && dict_offset > 0 {
375 let dict_start = dict_offset as usize;
376 let dict_len_bytes = file_handle
377 .read_bytes_range(dict_start..dict_start + 4)
378 .await?;
379 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
380 let dict_bytes = file_handle
381 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
382 .await?;
383 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
384 } else {
385 None
386 };
387
388 let index_start = if has_dict && dict_offset > 0 {
390 let dict_start = dict_offset as usize;
391 let dict_len_bytes = file_handle
392 .read_bytes_range(dict_start..dict_start + 4)
393 .await?;
394 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
395 dict_start + 4 + dict_len
396 } else {
397 data_end_offset as usize
398 };
399 let index_end = file_len - 32;
400
401 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
402 let mut reader = index_bytes.as_slice();
403
404 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
405 let mut index = Vec::with_capacity(num_blocks);
406
407 for _ in 0..num_blocks {
408 let first_doc_id = reader.read_u32::<LittleEndian>()?;
409 let offset = reader.read_u64::<LittleEndian>()?;
410 let length = reader.read_u32::<LittleEndian>()?;
411 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
412
413 index.push(StoreBlockIndex {
414 first_doc_id,
415 offset,
416 length,
417 num_docs: num_docs_in_block,
418 });
419 }
420
421 let data_slice = file_handle.slice(0..data_end_offset as usize);
423
424 Ok(Self {
425 data_slice,
426 index,
427 num_docs,
428 dict,
429 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
430 })
431 }
432
433 pub fn num_docs(&self) -> u32 {
435 self.num_docs
436 }
437
438 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
440 if doc_id >= self.num_docs {
441 return Ok(None);
442 }
443
444 let block_idx = self
446 .index
447 .binary_search_by(|entry| {
448 if doc_id < entry.first_doc_id {
449 std::cmp::Ordering::Greater
450 } else if doc_id >= entry.first_doc_id + entry.num_docs {
451 std::cmp::Ordering::Less
452 } else {
453 std::cmp::Ordering::Equal
454 }
455 })
456 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
457
458 let entry = &self.index[block_idx];
459 let block_data = self.load_block(entry).await?;
460
461 let doc_offset_in_block = doc_id - entry.first_doc_id;
463 let mut reader = &block_data[..];
464
465 for _ in 0..doc_offset_in_block {
466 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
467 if doc_len > reader.len() {
468 return Err(io::Error::new(
469 io::ErrorKind::InvalidData,
470 "Invalid doc length",
471 ));
472 }
473 reader = &reader[doc_len..];
474 }
475
476 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
477 let doc_bytes = &reader[..doc_len];
478
479 deserialize_document(doc_bytes, schema).map(Some)
480 }
481
482 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
483 {
485 let mut cache = self.cache.write();
486 if let Some(block) = cache.get(entry.first_doc_id) {
487 return Ok(block);
488 }
489 }
490
491 let start = entry.offset as usize;
493 let end = start + entry.length as usize;
494 let compressed = self.data_slice.read_bytes_range(start..end).await?;
495
496 let decompressed = if let Some(ref dict) = self.dict {
498 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
499 } else {
500 crate::compression::decompress(compressed.as_slice())?
501 };
502
503 let block = Arc::new(decompressed);
504
505 {
507 let mut cache = self.cache.write();
508 cache.insert(entry.first_doc_id, Arc::clone(&block));
509 }
510
511 Ok(block)
512 }
513}
514
515pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
516 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
517}
518
519#[derive(Debug, Clone)]
521pub struct RawStoreBlock {
522 pub first_doc_id: DocId,
523 pub num_docs: u32,
524 pub offset: u64,
525 pub length: u32,
526}
527
528pub struct StoreMerger<'a, W: Write> {
539 writer: &'a mut W,
540 index: Vec<StoreBlockIndex>,
541 current_offset: u64,
542 next_doc_id: DocId,
543}
544
545impl<'a, W: Write> StoreMerger<'a, W> {
546 pub fn new(writer: &'a mut W) -> Self {
547 Self {
548 writer,
549 index: Vec::new(),
550 current_offset: 0,
551 next_doc_id: 0,
552 }
553 }
554
555 pub async fn append_store<F: AsyncFileRead>(
560 &mut self,
561 data_slice: &F,
562 blocks: &[RawStoreBlock],
563 ) -> io::Result<()> {
564 for block in blocks {
565 let start = block.offset as usize;
567 let end = start + block.length as usize;
568 let compressed_data = data_slice.read_bytes_range(start..end).await?;
569
570 self.writer.write_all(compressed_data.as_slice())?;
572
573 self.index.push(StoreBlockIndex {
575 first_doc_id: self.next_doc_id,
576 offset: self.current_offset,
577 length: block.length,
578 num_docs: block.num_docs,
579 });
580
581 self.current_offset += block.length as u64;
582 self.next_doc_id += block.num_docs;
583 }
584
585 Ok(())
586 }
587
588 pub fn finish(self) -> io::Result<u32> {
590 let data_end_offset = self.current_offset;
591
592 let dict_offset = 0u64;
594
595 self.writer
597 .write_u32::<LittleEndian>(self.index.len() as u32)?;
598 for entry in &self.index {
599 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
600 self.writer.write_u64::<LittleEndian>(entry.offset)?;
601 self.writer.write_u32::<LittleEndian>(entry.length)?;
602 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
603 }
604
605 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
607 self.writer.write_u64::<LittleEndian>(dict_offset)?;
608 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
609 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
611 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
612
613 Ok(self.next_doc_id)
614 }
615}
616
617impl AsyncStoreReader {
618 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
620 self.index
621 .iter()
622 .map(|entry| RawStoreBlock {
623 first_doc_id: entry.first_doc_id,
624 num_docs: entry.num_docs,
625 offset: entry.offset,
626 length: entry.length,
627 })
628 .collect()
629 }
630
631 pub fn data_slice(&self) -> &LazyFileSlice {
633 &self.data_slice
634 }
635
636 pub fn has_dict(&self) -> bool {
638 self.dict.is_some()
639 }
640}