1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; const STORE_VERSION: u32 = 2; pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
40 serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
41}
42
43#[cfg(feature = "native")]
45struct CompressedBlock {
46 seq: usize,
47 first_doc_id: DocId,
48 num_docs: u32,
49 compressed: Vec<u8>,
50}
51
52#[cfg(feature = "native")]
60pub struct EagerParallelStoreWriter<'a> {
61 writer: &'a mut dyn Write,
62 block_buffer: Vec<u8>,
63 compressed_blocks: Vec<CompressedBlock>,
65 pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
67 next_seq: usize,
68 next_doc_id: DocId,
69 block_first_doc: DocId,
70 dict: Option<Arc<CompressionDict>>,
71 compression_level: CompressionLevel,
72}
73
74#[cfg(feature = "native")]
75impl<'a> EagerParallelStoreWriter<'a> {
76 pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
78 Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
79 }
80
81 pub fn with_compression_level(
83 writer: &'a mut dyn Write,
84 _num_threads: usize,
85 compression_level: CompressionLevel,
86 ) -> Self {
87 Self {
88 writer,
89 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
90 compressed_blocks: Vec::new(),
91 pending_handles: Vec::new(),
92 next_seq: 0,
93 next_doc_id: 0,
94 block_first_doc: 0,
95 dict: None,
96 compression_level,
97 }
98 }
99
100 pub fn with_dict(
102 writer: &'a mut dyn Write,
103 dict: CompressionDict,
104 _num_threads: usize,
105 ) -> Self {
106 Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
107 }
108
109 pub fn with_dict_and_level(
111 writer: &'a mut dyn Write,
112 dict: CompressionDict,
113 _num_threads: usize,
114 compression_level: CompressionLevel,
115 ) -> Self {
116 Self {
117 writer,
118 block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
119 compressed_blocks: Vec::new(),
120 pending_handles: Vec::new(),
121 next_seq: 0,
122 next_doc_id: 0,
123 block_first_doc: 0,
124 dict: Some(Arc::new(dict)),
125 compression_level,
126 }
127 }
128
129 pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
130 let doc_id = self.next_doc_id;
131 self.next_doc_id += 1;
132
133 let doc_bytes = serialize_document(doc, schema)?;
134
135 self.block_buffer
136 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
137 self.block_buffer.extend_from_slice(&doc_bytes);
138
139 if self.block_buffer.len() >= STORE_BLOCK_SIZE {
140 self.spawn_compression();
141 }
142
143 Ok(doc_id)
144 }
145
146 fn spawn_compression(&mut self) {
148 if self.block_buffer.is_empty() {
149 return;
150 }
151
152 let num_docs = self.next_doc_id - self.block_first_doc;
153 let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
154 let seq = self.next_seq;
155 let first_doc_id = self.block_first_doc;
156 let dict = self.dict.clone();
157
158 self.next_seq += 1;
159 self.block_first_doc = self.next_doc_id;
160
161 let level = self.compression_level;
163 let handle = std::thread::spawn(move || {
164 let compressed = if let Some(ref d) = dict {
165 crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
166 } else {
167 crate::compression::compress(&data, level).expect("compression failed")
168 };
169
170 CompressedBlock {
171 seq,
172 first_doc_id,
173 num_docs,
174 compressed,
175 }
176 });
177
178 self.pending_handles.push(handle);
179 }
180
181 fn collect_completed(&mut self) {
183 let mut remaining = Vec::new();
184 for handle in self.pending_handles.drain(..) {
185 if handle.is_finished() {
186 if let Ok(block) = handle.join() {
187 self.compressed_blocks.push(block);
188 }
189 } else {
190 remaining.push(handle);
191 }
192 }
193 self.pending_handles = remaining;
194 }
195
196 pub fn finish(mut self) -> io::Result<u32> {
197 self.spawn_compression();
199
200 self.collect_completed();
202
203 for handle in self.pending_handles.drain(..) {
205 if let Ok(block) = handle.join() {
206 self.compressed_blocks.push(block);
207 }
208 }
209
210 if self.compressed_blocks.is_empty() {
211 let data_end_offset = 0u64;
213 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u64::<LittleEndian>(data_end_offset)?;
215 self.writer.write_u64::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
219 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
220 return Ok(0);
221 }
222
223 self.compressed_blocks.sort_by_key(|b| b.seq);
225
226 let mut index = Vec::with_capacity(self.compressed_blocks.len());
228 let mut current_offset = 0u64;
229
230 for block in &self.compressed_blocks {
231 index.push(StoreBlockIndex {
232 first_doc_id: block.first_doc_id,
233 offset: current_offset,
234 length: block.compressed.len() as u32,
235 num_docs: block.num_docs,
236 });
237
238 self.writer.write_all(&block.compressed)?;
239 current_offset += block.compressed.len() as u64;
240 }
241
242 let data_end_offset = current_offset;
243
244 let dict_offset = if let Some(ref dict) = self.dict {
246 let offset = current_offset;
247 let dict_bytes = dict.as_bytes();
248 self.writer
249 .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
250 self.writer.write_all(dict_bytes)?;
251 Some(offset)
252 } else {
253 None
254 };
255
256 self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
258 for entry in &index {
259 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
260 self.writer.write_u64::<LittleEndian>(entry.offset)?;
261 self.writer.write_u32::<LittleEndian>(entry.length)?;
262 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
263 }
264
265 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
267 self.writer
268 .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
269 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
270 self.writer
271 .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
272 self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
273 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
274
275 Ok(self.next_doc_id)
276 }
277}
278
279#[derive(Debug, Clone)]
281struct StoreBlockIndex {
282 first_doc_id: DocId,
283 offset: u64,
284 length: u32,
285 num_docs: u32,
286}
287
288pub struct AsyncStoreReader {
290 data_slice: LazyFileSlice,
292 index: Vec<StoreBlockIndex>,
294 num_docs: u32,
295 dict: Option<CompressionDict>,
297 cache: RwLock<StoreBlockCache>,
299}
300
301struct StoreBlockCache {
302 blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
303 access_order: Vec<DocId>,
304 max_blocks: usize,
305}
306
307impl StoreBlockCache {
308 fn new(max_blocks: usize) -> Self {
309 Self {
310 blocks: FxHashMap::default(),
311 access_order: Vec::new(),
312 max_blocks,
313 }
314 }
315
316 fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
317 if let Some(block) = self.blocks.get(&first_doc_id) {
318 if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
319 self.access_order.remove(pos);
320 self.access_order.push(first_doc_id);
321 }
322 Some(Arc::clone(block))
323 } else {
324 None
325 }
326 }
327
328 fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
329 while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
330 let evict = self.access_order.remove(0);
331 self.blocks.remove(&evict);
332 }
333 self.blocks.insert(first_doc_id, block);
334 self.access_order.push(first_doc_id);
335 }
336}
337
338impl AsyncStoreReader {
339 pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
342 let file_len = file_handle.len();
343 if file_len < 32 {
345 return Err(io::Error::new(
346 io::ErrorKind::InvalidData,
347 "Store too small",
348 ));
349 }
350
351 let footer = file_handle
353 .read_bytes_range(file_len - 32..file_len)
354 .await?;
355 let mut reader = footer.as_slice();
356 let data_end_offset = reader.read_u64::<LittleEndian>()?;
357 let dict_offset = reader.read_u64::<LittleEndian>()?;
358 let num_docs = reader.read_u32::<LittleEndian>()?;
359 let has_dict = reader.read_u32::<LittleEndian>()? != 0;
360 let version = reader.read_u32::<LittleEndian>()?;
361 let magic = reader.read_u32::<LittleEndian>()?;
362
363 if magic != STORE_MAGIC {
364 return Err(io::Error::new(
365 io::ErrorKind::InvalidData,
366 "Invalid store magic",
367 ));
368 }
369 if version != STORE_VERSION {
370 return Err(io::Error::new(
371 io::ErrorKind::InvalidData,
372 format!("Unsupported store version: {}", version),
373 ));
374 }
375
376 let dict = if has_dict && dict_offset > 0 {
378 let dict_start = dict_offset;
379 let dict_len_bytes = file_handle
380 .read_bytes_range(dict_start..dict_start + 4)
381 .await?;
382 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
383 let dict_bytes = file_handle
384 .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
385 .await?;
386 Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
387 } else {
388 None
389 };
390
391 let index_start = if has_dict && dict_offset > 0 {
393 let dict_start = dict_offset;
394 let dict_len_bytes = file_handle
395 .read_bytes_range(dict_start..dict_start + 4)
396 .await?;
397 let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
398 dict_start + 4 + dict_len
399 } else {
400 data_end_offset
401 };
402 let index_end = file_len - 32;
403
404 let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
405 let mut reader = index_bytes.as_slice();
406
407 let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
408 let mut index = Vec::with_capacity(num_blocks);
409
410 for _ in 0..num_blocks {
411 let first_doc_id = reader.read_u32::<LittleEndian>()?;
412 let offset = reader.read_u64::<LittleEndian>()?;
413 let length = reader.read_u32::<LittleEndian>()?;
414 let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
415
416 index.push(StoreBlockIndex {
417 first_doc_id,
418 offset,
419 length,
420 num_docs: num_docs_in_block,
421 });
422 }
423
424 let data_slice = file_handle.slice(0..data_end_offset);
426
427 Ok(Self {
428 data_slice,
429 index,
430 num_docs,
431 dict,
432 cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
433 })
434 }
435
436 pub fn num_docs(&self) -> u32 {
438 self.num_docs
439 }
440
441 pub fn cached_blocks(&self) -> usize {
443 self.cache.read().blocks.len()
444 }
445
446 pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
448 if doc_id >= self.num_docs {
449 return Ok(None);
450 }
451
452 let block_idx = self
454 .index
455 .binary_search_by(|entry| {
456 if doc_id < entry.first_doc_id {
457 std::cmp::Ordering::Greater
458 } else if doc_id >= entry.first_doc_id + entry.num_docs {
459 std::cmp::Ordering::Less
460 } else {
461 std::cmp::Ordering::Equal
462 }
463 })
464 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
465
466 let entry = &self.index[block_idx];
467 let block_data = self.load_block(entry).await?;
468
469 let doc_offset_in_block = doc_id - entry.first_doc_id;
471 let mut reader = &block_data[..];
472
473 for _ in 0..doc_offset_in_block {
474 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
475 if doc_len > reader.len() {
476 return Err(io::Error::new(
477 io::ErrorKind::InvalidData,
478 "Invalid doc length",
479 ));
480 }
481 reader = &reader[doc_len..];
482 }
483
484 let doc_len = reader.read_u32::<LittleEndian>()? as usize;
485 let doc_bytes = &reader[..doc_len];
486
487 deserialize_document(doc_bytes, schema).map(Some)
488 }
489
490 async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
491 {
493 let mut cache = self.cache.write();
494 if let Some(block) = cache.get(entry.first_doc_id) {
495 return Ok(block);
496 }
497 }
498
499 let start = entry.offset;
501 let end = start + entry.length as u64;
502 let compressed = self.data_slice.read_bytes_range(start..end).await?;
503
504 let decompressed = if let Some(ref dict) = self.dict {
506 crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
507 } else {
508 crate::compression::decompress(compressed.as_slice())?
509 };
510
511 let block = Arc::new(decompressed);
512
513 {
515 let mut cache = self.cache.write();
516 cache.insert(entry.first_doc_id, Arc::clone(&block));
517 }
518
519 Ok(block)
520 }
521}
522
523pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
524 serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
525}
526
527#[derive(Debug, Clone)]
529pub struct RawStoreBlock {
530 pub first_doc_id: DocId,
531 pub num_docs: u32,
532 pub offset: u64,
533 pub length: u32,
534}
535
536pub struct StoreMerger<'a, W: Write> {
547 writer: &'a mut W,
548 index: Vec<StoreBlockIndex>,
549 current_offset: u64,
550 next_doc_id: DocId,
551}
552
553impl<'a, W: Write> StoreMerger<'a, W> {
554 pub fn new(writer: &'a mut W) -> Self {
555 Self {
556 writer,
557 index: Vec::new(),
558 current_offset: 0,
559 next_doc_id: 0,
560 }
561 }
562
563 pub async fn append_store<F: AsyncFileRead>(
568 &mut self,
569 data_slice: &F,
570 blocks: &[RawStoreBlock],
571 ) -> io::Result<()> {
572 for block in blocks {
573 let start = block.offset;
575 let end = start + block.length as u64;
576 let compressed_data = data_slice.read_bytes_range(start..end).await?;
577
578 self.writer.write_all(compressed_data.as_slice())?;
580
581 self.index.push(StoreBlockIndex {
583 first_doc_id: self.next_doc_id,
584 offset: self.current_offset,
585 length: block.length,
586 num_docs: block.num_docs,
587 });
588
589 self.current_offset += block.length as u64;
590 self.next_doc_id += block.num_docs;
591 }
592
593 Ok(())
594 }
595
596 pub fn finish(self) -> io::Result<u32> {
598 let data_end_offset = self.current_offset;
599
600 let dict_offset = 0u64;
602
603 self.writer
605 .write_u32::<LittleEndian>(self.index.len() as u32)?;
606 for entry in &self.index {
607 self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
608 self.writer.write_u64::<LittleEndian>(entry.offset)?;
609 self.writer.write_u32::<LittleEndian>(entry.length)?;
610 self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
611 }
612
613 self.writer.write_u64::<LittleEndian>(data_end_offset)?;
615 self.writer.write_u64::<LittleEndian>(dict_offset)?;
616 self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
617 self.writer.write_u32::<LittleEndian>(0)?; self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
619 self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
620
621 Ok(self.next_doc_id)
622 }
623}
624
625impl AsyncStoreReader {
626 pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
628 self.index
629 .iter()
630 .map(|entry| RawStoreBlock {
631 first_doc_id: entry.first_doc_id,
632 num_docs: entry.num_docs,
633 offset: entry.offset,
634 length: entry.length,
635 })
636 .collect()
637 }
638
639 pub fn data_slice(&self) -> &LazyFileSlice {
641 &self.data_slice
642 }
643
644 pub fn has_dict(&self) -> bool {
646 self.dict.is_some()
647 }
648}