1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
//! Compressed/slow/row-oriented storage for documents.
//!
//! A field needs to be marked as stored in the schema in
//! order to be handled in the `Store`.
//!
//! Internally, documents (or rather their stored fields) are serialized to a buffer.
//! When the buffer exceeds `block_size` (defaults to 16K), the buffer is compressed
//! using LZ4 or Zstd and the resulting block is written to disk.
//!
//! One can then request for a specific `DocId`.
//! A skip list helps navigating to the right block,
//! decompresses it entirely and returns the document within it.
//!
//! If the last document requested was in the same block,
//! the reader is smart enough to avoid decompressing
//! the block a second time, but their is no real
//! uncompressed block* cache.
//!
//! A typical use case for the store is, once
//! the search result page has been computed, returning
//! the actual content of the 10 best document.
//!
//! # Usage
//!
//! Most users should not access the `StoreReader` directly
//! and should rely on either
//!
//! - at the segment level, the
//! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc)
//! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method

mod compressors;
mod decompressors;
mod footer;
mod index;
mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;
mod store_compressor;

/// Doc store version in footer to handle format changes.
pub(crate) const DOC_STORE_VERSION: u32 = 1;

#[cfg(feature = "lz4-compression")]
mod compression_lz4_block;

#[cfg(feature = "zstd-compression")]
mod compression_zstd_block;

#[cfg(test)]
pub mod tests {

    use std::path::Path;

    use super::*;
    use crate::directory::{Directory, RamDirectory, WritePtr};
    use crate::fastfield::AliveBitSet;
    use crate::schema::document::Value;
    use crate::schema::{
        self, Schema, TantivyDocument, TextFieldIndexing, TextOptions, STORED, TEXT,
    };
    use crate::{Index, IndexWriter, Term};

    const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \
                         eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad \
                         minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip \
                         ex ea commodo consequat. Duis aute irure dolor in reprehenderit in \
                         voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur \
                         sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
                         mollit anim id est laborum.";

    const BLOCK_SIZE: usize = 16_384;

    pub fn write_lorem_ipsum_store(
        writer: WritePtr,
        num_docs: usize,
        compressor: Compressor,
        blocksize: usize,
        separate_thread: bool,
    ) -> Schema {
        let mut schema_builder = Schema::builder();
        let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
        let field_title =
            schema_builder.add_text_field("title", TextOptions::default().set_stored());
        let schema = schema_builder.build();
        {
            let mut store_writer =
                StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
            for i in 0..num_docs {
                let mut doc = TantivyDocument::default();
                doc.add_field_value(field_body, LOREM.to_string());
                doc.add_field_value(field_title, format!("Doc {i}"));
                store_writer.store(&doc, &schema).unwrap();
            }
            store_writer.close().unwrap();
        }
        schema
    }

    const NUM_DOCS: usize = 1_000;
    #[test]
    fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> {
        // this will cover deletion of the first element in a checkpoint
        let deleted_doc_ids = (200..300).collect::<Vec<_>>();
        let alive_bitset =
            AliveBitSet::for_test_from_deleted_docs(&deleted_doc_ids, NUM_DOCS as u32);

        let path = Path::new("store");
        let directory = RamDirectory::create();
        let store_wrt = directory.open_write(path)?;
        let schema =
            write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
        let field_title = schema.get_field("title").unwrap();
        let store_file = directory.open_read(path)?;
        let store = StoreReader::open(store_file, 10)?;
        for i in 0..NUM_DOCS as u32 {
            assert_eq!(
                *store
                    .get::<TantivyDocument>(i)?
                    .get_first(field_title)
                    .unwrap()
                    .as_str()
                    .unwrap(),
                format!("Doc {i}")
            );
        }

        for doc in store.iter::<TantivyDocument>(Some(&alive_bitset)) {
            let doc = doc?;
            let title_content = doc.get_first(field_title).unwrap().as_str().unwrap();
            if !title_content.starts_with("Doc ") {
                panic!("unexpected title_content {title_content}");
            }

            let id = title_content
                .strip_prefix("Doc ")
                .unwrap()
                .parse::<u32>()
                .unwrap();
            if alive_bitset.is_deleted(id) {
                panic!("unexpected deleted document {id}");
            }
        }

        Ok(())
    }

    fn test_store(
        compressor: Compressor,
        blocksize: usize,
        separate_thread: bool,
    ) -> crate::Result<()> {
        let path = Path::new("store");
        let directory = RamDirectory::create();
        let store_wrt = directory.open_write(path)?;
        let schema =
            write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
        let field_title = schema.get_field("title").unwrap();
        let store_file = directory.open_read(path)?;
        let store = StoreReader::open(store_file, 10)?;
        for i in 0..NUM_DOCS as u32 {
            assert_eq!(
                *store
                    .get::<TantivyDocument>(i)?
                    .get_first(field_title)
                    .unwrap()
                    .as_str()
                    .unwrap(),
                format!("Doc {i}")
            );
        }
        for (i, doc) in store.iter::<TantivyDocument>(None).enumerate() {
            assert_eq!(
                *doc?.get_first(field_title).unwrap().as_str().unwrap(),
                format!("Doc {i}")
            );
        }
        Ok(())
    }

    #[test]
    fn test_store_no_compression_same_thread() -> crate::Result<()> {
        test_store(Compressor::None, BLOCK_SIZE, false)
    }

    #[test]
    fn test_store_no_compression() -> crate::Result<()> {
        test_store(Compressor::None, BLOCK_SIZE, true)
    }

    #[cfg(feature = "lz4-compression")]
    #[test]
    fn test_store_lz4_block() -> crate::Result<()> {
        test_store(Compressor::Lz4, BLOCK_SIZE, true)
    }

    #[cfg(feature = "zstd-compression")]
    #[test]
    fn test_store_zstd() -> crate::Result<()> {
        test_store(
            Compressor::Zstd(ZstdCompressor::default()),
            BLOCK_SIZE,
            true,
        )
    }

    #[test]
    fn test_store_with_delete() -> crate::Result<()> {
        let mut schema_builder = schema::Schema::builder();

        let text_field_options = TextOptions::default()
            .set_indexing_options(
                TextFieldIndexing::default()
                    .set_index_option(schema::IndexRecordOption::WithFreqsAndPositions),
            )
            .set_stored();
        let text_field = schema_builder.add_text_field("text_field", text_field_options);
        let schema = schema_builder.build();
        let index_builder = Index::builder().schema(schema);

        let index = index_builder.create_in_ram()?;

        {
            let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
            index_writer.add_document(doc!(text_field=> "deleteme"))?;
            index_writer.add_document(doc!(text_field=> "deletemenot"))?;
            index_writer.add_document(doc!(text_field=> "deleteme"))?;
            index_writer.add_document(doc!(text_field=> "deletemenot"))?;
            index_writer.add_document(doc!(text_field=> "deleteme"))?;

            index_writer.delete_term(Term::from_field_text(text_field, "deleteme"));
            index_writer.commit()?;
        }

        let searcher = index.reader()?.searcher();
        let reader = searcher.segment_reader(0);
        let store = reader.get_store_reader(10)?;
        for doc in store.iter::<TantivyDocument>(reader.alive_bitset()) {
            assert_eq!(
                *doc?.get_first(text_field).unwrap().as_str().unwrap(),
                "deletemenot".to_string()
            );
        }
        Ok(())
    }

    #[cfg(feature = "lz4-compression")]
    #[cfg(feature = "zstd-compression")]
    #[test]
    fn test_merge_with_changed_compressor() -> crate::Result<()> {
        let mut schema_builder = schema::Schema::builder();

        let text_field = schema_builder.add_text_field("text_field", TEXT | STORED);
        let schema = schema_builder.build();
        let index_builder = Index::builder().schema(schema);

        let mut index = index_builder.create_in_ram().unwrap();
        index.settings_mut().docstore_compression = Compressor::Lz4;
        {
            let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
            // put enough data create enough blocks in the doc store to be considered for stacking
            for _ in 0..200 {
                index_writer.add_document(doc!(text_field=> LOREM))?;
            }
            assert!(index_writer.commit().is_ok());
            for _ in 0..200 {
                index_writer.add_document(doc!(text_field=> LOREM))?;
            }
            assert!(index_writer.commit().is_ok());
        }
        assert_eq!(
            index.reader().unwrap().searcher().segment_readers()[0]
                .get_store_reader(10)
                .unwrap()
                .decompressor(),
            Decompressor::Lz4
        );
        // Change compressor, this disables stacking on merging
        let index_settings = index.settings_mut();
        index_settings.docstore_compression = Compressor::Zstd(Default::default());
        // Merging the segments
        {
            let segment_ids = index
                .searchable_segment_ids()
                .expect("Searchable segments failed.");
            let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
            assert!(index_writer.merge(&segment_ids).wait().is_ok());
            assert!(index_writer.wait_merging_threads().is_ok());
        }

        let searcher = index.reader().unwrap().searcher();
        assert_eq!(searcher.segment_readers().len(), 1);
        let reader = searcher.segment_readers().iter().last().unwrap();
        let store = reader.get_store_reader(10).unwrap();

        for doc in store
            .iter::<TantivyDocument>(reader.alive_bitset())
            .take(50)
        {
            assert_eq!(
                *doc?.get_first(text_field).and_then(|v| v.as_str()).unwrap(),
                LOREM.to_string()
            );
        }
        assert_eq!(store.decompressor(), Decompressor::Zstd);

        Ok(())
    }

    #[test]
    fn test_merge_of_small_segments() -> crate::Result<()> {
        let mut schema_builder = schema::Schema::builder();

        let text_field = schema_builder.add_text_field("text_field", TEXT | STORED);
        let schema = schema_builder.build();
        let index_builder = Index::builder().schema(schema);

        let index = index_builder.create_in_ram().unwrap();

        {
            let mut index_writer = index.writer_for_tests()?;
            index_writer.add_document(doc!(text_field=> "1"))?;
            index_writer.commit()?;
            index_writer.add_document(doc!(text_field=> "2"))?;
            index_writer.commit()?;
            index_writer.add_document(doc!(text_field=> "3"))?;
            index_writer.commit()?;
            index_writer.add_document(doc!(text_field=> "4"))?;
            index_writer.commit()?;
            index_writer.add_document(doc!(text_field=> "5"))?;
            index_writer.commit()?;
        }
        // Merging the segments
        {
            let segment_ids = index.searchable_segment_ids()?;
            let mut index_writer: IndexWriter = index.writer_for_tests()?;
            index_writer.merge(&segment_ids).wait()?;
            index_writer.wait_merging_threads()?;
        }

        let searcher = index.reader()?.searcher();
        assert_eq!(searcher.segment_readers().len(), 1);
        let reader = searcher.segment_readers().iter().last().unwrap();
        let store = reader.get_store_reader(10)?;
        assert_eq!(store.block_checkpoints().count(), 1);
        Ok(())
    }
}

#[cfg(all(test, feature = "unstable"))]
mod bench {

    use std::path::Path;

    use test::Bencher;

    use super::tests::write_lorem_ipsum_store;
    use crate::directory::{Directory, RamDirectory};
    use crate::store::{Compressor, StoreReader};
    use crate::TantivyDocument;

    #[bench]
    #[cfg(feature = "mmap")]
    fn bench_store_encode(b: &mut Bencher) {
        let directory = RamDirectory::create();
        let path = Path::new("store");
        b.iter(|| {
            write_lorem_ipsum_store(
                directory.open_write(path).unwrap(),
                1_000,
                Compressor::default(),
                16_384,
                true,
            );
            directory.delete(path).unwrap();
        });
    }

    #[bench]
    fn bench_store_decode(b: &mut Bencher) {
        let directory = RamDirectory::create();
        let path = Path::new("store");
        write_lorem_ipsum_store(
            directory.open_write(path).unwrap(),
            1_000,
            Compressor::default(),
            16_384,
            true,
        );
        let store_file = directory.open_read(path).unwrap();
        let store = StoreReader::open(store_file, 10).unwrap();
        b.iter(|| store.iter::<TantivyDocument>(None).collect::<Vec<_>>());
    }
}