lsmlite_rs/
lib.rs

1// Copyright 2023 Helsing GmbH
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14#![doc = include_str!("../README.md")]
15/*!
16# Documentation
17
18This documentation contains examples that show how to create a database, as well as how read from it and write to it. Primary operations (reads and writes) over a database file are done via a lightweight set of methods described by traits  [`Disk`] and [`Cursor`]. These traits are implemented by [`LsmDb`] and [`LsmCursor`]. For database configuration and creation, the relevant information can be found under [`DbConf`] and [`LsmDb`].
19*/
20
21// Private mods.
22mod compression;
23mod lsmdb;
24mod threads;
25
26use crate::compression::lsm_compress;
27use prometheus::Histogram;
28use serde::{Deserialize, Serialize};
29use std::cmp::Ordering;
30use std::ffi::CString;
31use std::marker::{PhantomData, PhantomPinned};
32use std::path::PathBuf;
33use std::sync::mpsc;
34use std::thread;
35
36/// This struct contains the configuration of a database.
37#[derive(Clone, Debug, Default)]
38pub struct DbConf {
39    pub(crate) db_path: PathBuf,
40    pub(crate) db_base_name: String,
41    pub(crate) handle_mode: LsmHandleMode,
42    pub(crate) mode: LsmMode,
43    pub(crate) metrics: Option<LsmMetrics>,
44    pub(crate) compression: LsmCompressionLib,
45}
46
47impl DbConf {
48    /// The minimum amount of information a database requires is
49    /// the path the file will be put in as well as the name of it.
50    /// This version of `new` will create a database using
51    /// no background threads, without Prometheus metrics, and using
52    /// no compression.
53    pub fn new(db_path: impl Into<PathBuf>, db_name: String) -> Self {
54        Self {
55            db_path: db_path.into(),
56            db_base_name: db_name,
57            ..Default::default()
58        }
59    }
60
61    /// A full database configuration requires:
62    /// 1. The path the database file will be put in.
63    /// 2. The name of the database file.
64    /// 3. The mode it will work on (single- or multi-threaded).
65    /// 4. Whether the database is to be used in read-only mode, or writes
66    ///    are also allowed.
67    /// 5. The upper-layer set of (Prometheus) metrics to be updated by
68    ///    the database.
69    /// 6. Whether compression is to be used or not, and what kind.
70    ///
71    /// # Example
72    ///
73    /// ```rust
74    /// use prometheus::{Histogram, HistogramOpts};
75    /// use lsmlite_rs::*;
76    ///
77    /// let default_checkpointer_kbs_buckets: Vec::<f64> =
78    /// vec![2048., 4096., 5120., 6144., 7168., 8192., 12288., 16384., 20480.];
79    /// let default_worker_kbs_buckets: Vec::<f64> =
80    /// vec![1024., 2048., 4096., 8192., 16384., 24576., 32768., 49152., 65536.];
81    /// let default_write_times_sec_buckets: Vec::<f64> =
82    /// vec![0.001, 0.0025, 0.005, 0.0075, 0.01, 0.025, 0.05, 0.075, 0.1, 0.5, 1.0, 5.0, 10.];
83    ///
84    /// let opts_1 = HistogramOpts::new(
85    ///                                 "non_typed_write_times_s",
86    ///                                 "non_typed_write_times_s help"
87    ///                                )
88    ///                                .buckets(default_write_times_sec_buckets.clone());
89    /// let opts_2 = HistogramOpts::new(
90    ///                                 "work_kbs",
91    ///                                 "work_kbs help"
92    ///                                )
93    ///                                 .buckets(default_worker_kbs_buckets.clone());
94    /// let opts_3 = HistogramOpts::new(
95    ///                                 "work_times_s",
96    ///                                 "work_times_s help"
97    ///                                 )
98    ///                                 .buckets(default_write_times_sec_buckets.clone());
99    /// let opts_4 = HistogramOpts::new(
100    ///                                 "checkpoint_kbs",
101    ///                                 "checkpoint_kbs help"
102    ///                                 )
103    ///                                 .buckets(default_checkpointer_kbs_buckets.clone());
104    /// let opts_5 = HistogramOpts::new(
105    ///                                 "checkpoint_times_s",
106    ///                                 "checkpoint_times_s help"
107    ///                                )
108    ///                                .buckets(default_write_times_sec_buckets);
109    ///
110    /// let metrics = LsmMetrics {
111    ///     write_times_s: Histogram::with_opts(opts_1).unwrap(),
112    ///     work_kbs: Histogram::with_opts(opts_2).unwrap(),
113    ///     work_times_s: Histogram::with_opts(opts_3).unwrap(),
114    ///     checkpoint_kbs: Histogram::with_opts(opts_4).unwrap(),
115    ///     checkpoint_times_s: Histogram::with_opts(opts_5).unwrap(),
116    /// };
117    ///
118    /// let db_conf = DbConf::new_with_parameters(
119    ///                                           "/tmp/",
120    ///                                           "my_db_z".to_string(),
121    ///                                           LsmMode::LsmNoBackgroundThreads,
122    ///                                           LsmHandleMode::ReadWrite,
123    ///                                           Some(metrics),
124    ///                                           LsmCompressionLib::ZLib,
125    /// );
126    ///
127    /// let mut db: LsmDb = Default::default();
128    /// let rc = db.initialize(db_conf)?;
129    /// # Result::<(), LsmErrorCode>::Ok(())
130    /// ```
131    pub fn new_with_parameters(
132        db_path: impl Into<PathBuf>,
133        db_name: String,
134        mode: LsmMode,
135        handle_mode: LsmHandleMode,
136        metrics: Option<LsmMetrics>,
137        compression: LsmCompressionLib,
138    ) -> Self {
139        Self {
140            db_path: db_path.into(),
141            db_base_name: db_name,
142            handle_mode,
143            mode,
144            metrics,
145            compression,
146        }
147    }
148}
149
150/// These are stubs that mirror LSM's types. They are define like this to
151/// provide type safety only on the Rust side. Their body at runtime is
152/// the one from LSM.
153#[repr(C)]
154#[derive(Copy, Clone)]
155struct lsm_db {
156    _data: [u8; 0],
157    _marker: PhantomData<(*mut u8, PhantomPinned)>,
158}
159
160#[repr(C)]
161#[derive(Copy, Clone)]
162struct lsm_env {
163    _data: [u8; 0],
164    _marker: PhantomData<(*mut u8, PhantomPinned)>,
165}
166
167#[repr(C)]
168#[derive(Copy, Clone)]
169struct lsm_cursor {
170    _data: [u8; 0],
171    _marker: PhantomData<(*mut u8, PhantomPinned)>,
172}
173
174/// These are the different signals we can send to a background thread.
175#[repr(C)]
176#[derive(Copy, Clone, Debug)]
177pub(crate) enum LsmBgWorkerMessage {
178    Checkpoint = 0,
179    Merge,
180    Stop,
181}
182
183#[derive(Debug)]
184pub(crate) struct LsmBgWorker {
185    pub(crate) thread: Option<thread::JoinHandle<()>>,
186}
187
188// This is the thread pool that will contain our worker threads.
189#[derive(Debug, Default)]
190pub(crate) struct LsmBgWorkers {
191    pub(crate) bg_threads: Vec<LsmBgWorker>,
192    pub(crate) sender: Option<mpsc::SyncSender<LsmBgWorkerMessage>>,
193    pub(crate) id: usize,
194}
195
196/// This represents the main-memory handle to a database file, most operations
197/// on the database are performed through the corresponding handle, like writing
198/// to it, or opening cursors, or handling explicit transactions.
199pub struct LsmDb {
200    pub(crate) db_env: *mut lsm_env,
201    pub(crate) db_handle: *mut lsm_db,
202    pub(crate) db_fq_name: CString,
203    pub(crate) db_conf: DbConf,
204    pub(crate) db_bg_threads: LsmBgWorkers,
205    pub(crate) db_compress: Option<lsm_compress>,
206    pub(crate) initialized: bool,
207    pub(crate) connected: bool,
208}
209
210/// This is the main cursor structure.
211pub struct LsmCursor<'a> {
212    pub(crate) db_cursor: *mut lsm_cursor,
213    _marker: PhantomData<&'a ()>,
214}
215
216/// These are the metrics exposed by the engine. This metrics are
217/// Prometheus histograms, see <https://docs.rs/prometheus/latest/prometheus/struct.Histogram.html>.
218#[derive(Clone, Debug)]
219pub struct LsmMetrics {
220    /// Histogram of the time it takes to write to the database file.
221    /// Due to internal database file operations, and depending on the size
222    /// of the database, some write operations might have latencies in the seconds.
223    /// Since this histogram is produced external to the engine, it is suggested
224    /// to capture a well-space set of intervals with precision from milliseconds
225    /// to up to, say, 10 seconds. The storage engine updates this histogram with
226    /// precision in seconds.
227    pub write_times_s: Histogram,
228    /// Histogram of the amount of data (in KBs) written during merge operations
229    /// and flushing of in-memory data into the database file. This histogram is
230    /// only updated in [`LsmMode::LsmBackgroundMerger`] mode.
231    /// As before, a well-space set of intervals with precision between say
232    /// 512 KBs and 32 MBs is recommended.
233    pub work_kbs: Histogram,
234    /// Histogram of the time it takes to perform database file operations like merging
235    /// segments and flushing in-memory data. As before, this histogram is updated with
236    /// precision in seconds. This histogram is only updated in
237    /// [`LsmMode::LsmBackgroundMerger`] mode.
238    pub work_times_s: Histogram,
239    /// Histogram of the amount of data (in KBs) written during a checkpoint operation.
240    /// This histogram is only updated in [`LsmMode::LsmBackgroundCheckpointer`] mode. As
241    /// before, a well-space set of intervals with precision between 1 and, say, 32 MB is
242    /// recommended.
243    pub checkpoint_kbs: Histogram,
244    /// Histogram of the time it takes to perform checkpointing operations on the database
245    /// file. This histogram is only updated in [`LsmMode::LsmBackgroundCheckpointer`]
246    /// mode and with precision in seconds.
247    pub checkpoint_times_s: Histogram,
248}
249
250/// Whether a database handle operates in read-only mode or not. By default,
251/// reads and writes are allowed in a database.
252#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
253pub enum LsmHandleMode {
254    /// Attempts to write to the database will be rejected in this mode.
255    ReadOnly = 0,
256    /// This is the default mode in which reads and writes to the database
257    /// are allowed.
258    #[default]
259    ReadWrite,
260}
261
262/// These are the different seek operations of a cursor.
263#[repr(C)]
264#[derive(Copy, Clone, Debug, PartialEq, Eq)]
265pub enum LsmCursorSeekOp {
266    /// When seeking with a [`LsmCursor`] using this mode, the cursor
267    /// is positioned on the record indexed by the given key if found, or the
268    /// record right before it in the total order of keys (as per `memcmp`).
269    /// If the database contains no such record, the cursor is left at EOF.
270    LsmCursorSeekLe = -1,
271    /// When seeking with a [`LsmCursor`] using the mode, the cursor
272    /// is positioned on the record indexed by the given key if found, or
273    /// at EOF if such record does not exist in the database.
274    LsmCursorSeekEq,
275    /// When seeking with a [`LsmCursor`] using this mode, the cursor
276    /// is positioned on the record indexed by the given key if found, or the
277    /// record right after it in the total order of keys (as per `memcmp`).
278    /// If the database contains no such record, the cursor is left at EOF.
279    LsmCursorSeekGe,
280}
281
282/// These are the different kind of errors that we can encounter.
283#[repr(C)]
284#[derive(Copy, Clone, Debug, PartialEq, Eq)]
285pub enum LsmErrorCode {
286    LsmError = 1,
287    LsmBusy = 5,
288    LsmNoMem = 7,
289    LsmReadOnly = 8,
290    LsmIOErr = 10,
291    LsmCorrupt = 11,
292    LsmFull = 13,
293    LsmCantOpen = 14,
294    LsmProtocol = 15,
295    LsmMisuse = 21,
296    LsmMismatch = 50,
297    LsmConversionErr = 55,
298    LsmMetricCreation = 56,
299    LsmMetricRegistration = 57,
300    LsmMetricsEmpty = 58,
301    LsmBgThreadUnavailable = 59,
302    LsmUnknownCode = 60,
303}
304
305/// These are the different levels of safety that the engine can offer.
306/// For most cases, `LsmSafetyNormal` should be enough. The more
307/// restrictive, the slower the engine performs.
308#[repr(C)]
309#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
310pub(crate) enum LsmSafety {
311    /// Do not sync to disk at all. This is the fastest mode.
312    /// If a power failure occurs while writing to the database,
313    /// following recovery the database may be corrupt. All or some
314    /// data may be recoverable.
315    Off = 0,
316    /// Sync only as much as is necessary to prevent database corruption.
317    /// This is the default setting. Although slower than [`LsmSafety::Off`],
318    /// this mode is still much faster than [`LsmSafety::Full`].
319    #[default]
320    Normal = 1,
321    /// Sync every transaction to disk as part of committing it. This is the
322    /// slowest mode. If a power failure occurs while writing to the database,
323    /// all successfully committed transactions should be present. The database
324    /// file should not be corrupt.
325    Full = 2,
326}
327
328/// These are the different kinds of execution modes that the engine can
329/// work on. The default mode is `LsmNoBackgroundThreads` - in which no
330/// background thread is scheduled to perform any task. All file operations
331/// are performed by the thread that is currently writing by the database.
332///
333/// The other available modes are `LsmBackgroundMerger` in which a background
334/// thread is scheduled to merge database segments towards producing larger
335/// segments. This background thread is also in charge of checkpointing the
336/// database file - thus making data truly persistent on stable storage - and
337/// flushing in-memory data into the database file.
338///
339/// In mode `LsmBackgroundMerger`, the additional background thread only
340/// checkpoints the database file. In this mode, merge operations are performed
341/// by the thread that also writes data to the database.
342///
343/// In any case, file operations (like merging and checkpointing) cannot be
344/// scheduled, these operations are triggered depending on the current state
345/// of the database.
346#[repr(C)]
347#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
348pub enum LsmMode {
349    /// Default mode. No additional background thread is scheduled.
350    #[default]
351    LsmNoBackgroundThreads = 0,
352    /// An additional background thread is scheduled to merge segments
353    /// and to checkpoint the database file (update file headers and sync
354    /// the contents of the database file to disk). This thread is also
355    /// in charge of flushing in-memory data to the database file.
356    LsmBackgroundMerger,
357    /// An additional background thread is scheduled to checkpoint the
358    /// database file. Merge operations are not performed by this thread,
359    /// but by the thread that also writes data to the database.
360    LsmBackgroundCheckpointer,
361}
362
363/// These are the current supported compression libraries. A comparison of the
364/// performance of different compression libraries (compression ratio and throughput
365/// can be found here: <https://github.com/lz4/lz4>. Support for more compression
366/// libraries like, `Snappy`, may be added in the future.
367#[repr(C)]
368#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
369pub enum LsmCompressionLib {
370    /// By default, no compression is performed.
371    #[default]
372    NoCompression = 1,
373    /// Uses `LZ4` to compress data pages.
374    LZ4 = 10001,
375    /// Uses `Zlib` to compress data pages.
376    ZLib,
377    /// Uses `ZStd` to compress data pages.
378    ZStd,
379}
380
381/// These are parameters that impact the behaviour of the engine.
382#[repr(C)]
383#[derive(Copy, Clone, Debug, PartialEq, Eq)]
384pub(crate) enum LsmParam {
385    AutoFlush = 1,
386    PageSize = 2,
387    Safety = 3,
388    BlockSize = 4,
389    AutoWork = 5,
390    Mmap = 7,
391    UseLog = 8,
392    AutoMerge = 9,
393    MaxFreeList = 10,
394    MultipleProcesses = 11,
395    AutoCheckPoint = 12,
396    SetCompression = 13,
397    GetCompression = 14,
398    SetCompressionFactory = 15,
399    ReadOnly = 16,
400}
401
402// This enum is most probably only relevant in this file. Thus we won't expose it to
403// the public for the time being.
404#[repr(C)]
405#[derive(Copy, Clone, Debug, PartialEq, Eq)]
406pub(crate) enum LsmInfo {
407    Lsm4KbPagesWritten = 1,
408    Lsm4KbPagesRead = 2,
409    LsmDbStructure = 3,
410    LsmLogStructure = 4,
411    LsmPageDumpAscii = 6,
412    LsmPageDumpHex = 7,
413    LsmCheckpointSize = 10,
414    LsmTreeSize = 11,
415    LsmCompressionId = 13,
416}
417
418// This is the simplest implementation of the std::error:Error trait
419// on LsmErrorCode to ease error handling (e.g. using anyhow).
420impl std::error::Error for LsmErrorCode {}
421
422// Display trait is needed when implementing Error
423impl std::fmt::Display for LsmErrorCode {
424    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
425        // Recycle Debug implementation
426        write!(f, "{self:?}")
427    }
428}
429
430/// The following `try_from`s are to be able to parse an integer to the
431/// corresponding enum (one of the above).
432impl TryFrom<i32> for LsmErrorCode {
433    type Error = LsmErrorCode;
434    fn try_from(value: i32) -> Result<Self, Self::Error> {
435        match value {
436            1 => Ok(LsmErrorCode::LsmError),
437            5 => Ok(LsmErrorCode::LsmBusy),
438            7 => Ok(LsmErrorCode::LsmNoMem),
439            8 => Ok(LsmErrorCode::LsmReadOnly),
440            10 => Ok(LsmErrorCode::LsmIOErr),
441            11 => Ok(LsmErrorCode::LsmCorrupt),
442            13 => Ok(LsmErrorCode::LsmFull),
443            14 => Ok(LsmErrorCode::LsmCantOpen),
444            15 => Ok(LsmErrorCode::LsmProtocol),
445            21 => Ok(LsmErrorCode::LsmMisuse),
446            50 => Ok(LsmErrorCode::LsmMismatch),
447            55 => Ok(LsmErrorCode::LsmConversionErr),
448            56 => Ok(LsmErrorCode::LsmMetricCreation),
449            57 => Ok(LsmErrorCode::LsmMetricRegistration),
450            58 => Ok(LsmErrorCode::LsmMetricsEmpty),
451            59 => Ok(LsmErrorCode::LsmBgThreadUnavailable),
452            _ => Err(LsmErrorCode::LsmUnknownCode),
453        }
454    }
455}
456
457impl TryFrom<i32> for LsmCursorSeekOp {
458    type Error = LsmErrorCode;
459    fn try_from(value: i32) -> Result<Self, Self::Error> {
460        match value {
461            -1 => Ok(LsmCursorSeekOp::LsmCursorSeekLe),
462            0 => Ok(LsmCursorSeekOp::LsmCursorSeekEq),
463            1 => Ok(LsmCursorSeekOp::LsmCursorSeekGe),
464            _ => Err(LsmErrorCode::LsmUnknownCode),
465        }
466    }
467}
468
469impl TryFrom<i32> for LsmSafety {
470    type Error = LsmErrorCode;
471    fn try_from(value: i32) -> Result<Self, Self::Error> {
472        match value {
473            0 => Ok(LsmSafety::Off),
474            1 => Ok(LsmSafety::Normal),
475            2 => Ok(LsmSafety::Full),
476            _ => Err(LsmErrorCode::LsmUnknownCode),
477        }
478    }
479}
480
481impl TryFrom<i32> for LsmParam {
482    type Error = LsmErrorCode;
483    fn try_from(value: i32) -> Result<Self, Self::Error> {
484        match value {
485            1 => Ok(LsmParam::AutoFlush),
486            2 => Ok(LsmParam::PageSize),
487            3 => Ok(LsmParam::Safety),
488            4 => Ok(LsmParam::BlockSize),
489            5 => Ok(LsmParam::AutoWork),
490            7 => Ok(LsmParam::Mmap),
491            8 => Ok(LsmParam::UseLog),
492            9 => Ok(LsmParam::AutoMerge),
493            10 => Ok(LsmParam::MaxFreeList),
494            11 => Ok(LsmParam::MultipleProcesses),
495            12 => Ok(LsmParam::AutoCheckPoint),
496            13 => Ok(LsmParam::SetCompression),
497            14 => Ok(LsmParam::GetCompression),
498            15 => Ok(LsmParam::SetCompressionFactory),
499            16 => Ok(LsmParam::ReadOnly),
500            _ => Err(LsmErrorCode::LsmUnknownCode),
501        }
502    }
503}
504
505impl TryFrom<i32> for LsmMode {
506    type Error = LsmErrorCode;
507    fn try_from(value: i32) -> Result<Self, Self::Error> {
508        match value {
509            0 => Ok(LsmMode::LsmNoBackgroundThreads),
510            1 => Ok(LsmMode::LsmBackgroundMerger),
511            2 => Ok(LsmMode::LsmBackgroundCheckpointer),
512            _ => Err(LsmErrorCode::LsmUnknownCode),
513        }
514    }
515}
516
517impl TryFrom<i32> for LsmCompressionLib {
518    type Error = LsmErrorCode;
519
520    fn try_from(value: i32) -> Result<Self, Self::Error> {
521        match value {
522            1 => Ok(LsmCompressionLib::NoCompression),
523            10001 => Ok(LsmCompressionLib::LZ4),
524            10002 => Ok(LsmCompressionLib::ZLib),
525            10003 => Ok(LsmCompressionLib::ZStd),
526            _ => Err(LsmErrorCode::LsmMismatch),
527        }
528    }
529}
530
531impl TryFrom<i32> for LsmInfo {
532    type Error = LsmErrorCode;
533    fn try_from(value: i32) -> Result<Self, Self::Error> {
534        match value {
535            1 => Ok(LsmInfo::Lsm4KbPagesWritten),
536            2 => Ok(LsmInfo::Lsm4KbPagesRead),
537            3 => Ok(LsmInfo::LsmDbStructure),
538            4 => Ok(LsmInfo::LsmLogStructure),
539            6 => Ok(LsmInfo::LsmPageDumpAscii),
540            7 => Ok(LsmInfo::LsmPageDumpHex),
541            10 => Ok(LsmInfo::LsmCheckpointSize),
542            11 => Ok(LsmInfo::LsmTreeSize),
543            13 => Ok(LsmInfo::LsmCompressionId),
544            _ => Err(LsmErrorCode::LsmUnknownCode),
545        }
546    }
547}
548
549/// Primary set of methods of a database.
550pub trait Disk
551where
552    for<'a> Self: 'a,
553{
554    type C<'a>: Cursor;
555    /// Initializes a database with a given configuration (of type [`DbConf`]).
556    fn initialize(&mut self, conf: DbConf) -> Result<(), LsmErrorCode>;
557    /// Connects to a database on disk.
558    fn connect(&mut self) -> Result<(), LsmErrorCode>;
559    /// Disconnects from a database on disk.
560    fn disconnect(&mut self) -> Result<(), LsmErrorCode>;
561    /// Persists a given `value` under the given `key`. Observe that both fields
562    /// are given as a set of bytes.
563    fn persist(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode>;
564    /// Updates the value stored under `key` to the given `value`.
565    fn update(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode>;
566    /// Deletes the value stored under `key`.
567    fn delete(&mut self, key: &[u8]) -> Result<(), LsmErrorCode>;
568    /// Deletes all stored values in the open interval (begin, end).
569    fn delete_range(&mut self, begin: &[u8], end: &[u8]) -> Result<(), LsmErrorCode>;
570    /// Optimizes the database in certain way. This is implementation-defined. For
571    /// example, currently `lsmlite-rs` optimizes the database for write-once, read-many
572    /// workloads (for space and read efficiency). Other underlying implementations
573    /// might have different optimization opportunities.
574    fn optimize(&mut self) -> Result<(), LsmErrorCode>;
575    /// Opens a transaction explicitly.
576    fn begin_transaction(&mut self) -> Result<(), LsmErrorCode>;
577    /// Commits the operations contained in the current transaction.
578    fn commit_transaction(&mut self) -> Result<(), LsmErrorCode>;
579    /// Rollbacks the operations contained in the current transaction.
580    fn rollback_transaction(&mut self) -> Result<(), LsmErrorCode>;
581    /// Opens a database cursor.
582    fn cursor_open(&self) -> Result<Self::C<'_>, LsmErrorCode>;
583}
584
585/// Primary set of methods of database cursors.
586pub trait Cursor {
587    /// Closes a database cursor.
588    fn close(&mut self) -> Result<(), LsmErrorCode>;
589    /// Moves the cursor to the very first record in the database.
590    fn first(&mut self) -> Result<(), LsmErrorCode>;
591    /// Moves the cursor to the very last record in the database.
592    fn last(&mut self) -> Result<(), LsmErrorCode>;
593    /// Moves the cursor in the database to the record pointed by `key`.
594    /// How this operation behaves depends on the given seek mode ([`LsmCursorSeekOp`]).
595    fn seek(&mut self, key: &[u8], mode: LsmCursorSeekOp) -> Result<(), LsmErrorCode>;
596    /// Moves the cursor to the next record in the database (as seen from the
597    /// current value the cursor is pointing to).
598    fn next(&mut self) -> Result<(), LsmErrorCode>;
599    /// Moves the cursor to the previous record in the database (as seen from the
600    /// current value the cursor is pointing to).
601    fn prev(&mut self) -> Result<(), LsmErrorCode>;
602    /// Tests whether the cursor is currently pointing to a valid database record.
603    fn valid(&self) -> Result<(), LsmErrorCode>;
604    /// Obtains a copy of the key of the record the cursor is currently pointing
605    /// to (if valid).
606    fn get_key(&self) -> Result<Vec<u8>, LsmErrorCode>;
607    /// Obtains a copy of the value of the record the cursor is currently pointing
608    /// to (if valid).
609    fn get_value(&self) -> Result<Vec<u8>, LsmErrorCode>;
610    /// Compares the key the cursor is currently pointing to (if valid) with the
611    /// given `key` (as per `memcmp`). The result of the comparison (`< 0, == 0, > 0`)
612    /// is returned.
613    fn compare(&self, key: &[u8]) -> Result<Ordering, LsmErrorCode>;
614}
615
616#[cfg(test)]
617mod tests {
618    use std::cmp::Ordering;
619    use std::ops::Not;
620    use std::path::Path;
621    use std::thread;
622
623    use crate::{
624        Cursor, DbConf, Disk, LsmCompressionLib, LsmCursorSeekOp, LsmDb, LsmErrorCode,
625        LsmHandleMode, LsmInfo, LsmMetrics, LsmMode, LsmParam, LsmSafety,
626    };
627
628    use chrono::Utc;
629    use prometheus::{Histogram, HistogramOpts, DEFAULT_BUCKETS};
630    use prost::Message;
631    use rand::Rng;
632    use rand::SeedableRng;
633    use rand_mt::{Mt19937GenRand64, Mt64};
634
635    // To test the strongly typed api we need a message type.
636    #[derive(Message)]
637    pub struct TypedBlob {
638        #[prost(bytes = "vec", tag = "1")]
639        pub(crate) payload: Vec<u8>,
640    }
641
642    fn construct_random_blob(bytes: usize, prng: &mut Mt19937GenRand64) -> Vec<u8> {
643        // Produce a random vector of the given number of bytes.
644        let random_vector: Vec<u8> = (0..bytes).map(|_| prng.gen_range(0..=255)).collect();
645
646        random_vector
647    }
648
649    fn construct_compressible_blob(bytes: usize) -> Vec<u8> {
650        vec![0; bytes]
651    }
652
653    fn test_initialize(
654        id: usize,
655        name: String,
656        mode: LsmMode,
657        compression: LsmCompressionLib,
658    ) -> LsmDb {
659        let now = Utc::now();
660        let db_path = "/tmp".to_string();
661        let db_base_name = format!("{}-{}-{}", name, id, now.timestamp_nanos_opt().unwrap());
662
663        let buckets = DEFAULT_BUCKETS.to_vec();
664        let opts_1 = HistogramOpts::new("non_typed_write_times_s", "non_typed_write_times_s help")
665            .buckets(buckets.clone());
666        let opts_2 = HistogramOpts::new("work_kbs", "work_kbs help").buckets(buckets.clone());
667        let opts_3 =
668            HistogramOpts::new("work_times_s", "work_times_s help").buckets(buckets.clone());
669        let opts_4 =
670            HistogramOpts::new("checkpoint_kbs", "checkpoint_kbs help").buckets(buckets.clone());
671        let opts_5 =
672            HistogramOpts::new("checkpoint_times_s", "checkpoint_times_s help").buckets(buckets);
673
674        let metrics = LsmMetrics {
675            write_times_s: Histogram::with_opts(opts_1).unwrap(),
676            work_kbs: Histogram::with_opts(opts_2).unwrap(),
677            work_times_s: Histogram::with_opts(opts_3).unwrap(),
678            checkpoint_kbs: Histogram::with_opts(opts_4).unwrap(),
679            checkpoint_times_s: Histogram::with_opts(opts_5).unwrap(),
680        };
681
682        let db_conf = DbConf::new_with_parameters(
683            db_path,
684            db_base_name,
685            mode,
686            LsmHandleMode::ReadWrite,
687            Some(metrics),
688            compression,
689        );
690
691        let mut db: LsmDb = Default::default();
692
693        assert!(!db.is_initialized());
694
695        let rc = db.initialize(db_conf.clone());
696        assert_eq!(rc, Ok(()));
697
698        assert!(db.is_initialized());
699
700        let rc = db.initialize(db_conf);
701        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
702
703        db
704    }
705
706    fn test_connect(db: &mut LsmDb) {
707        // Produces a new handle.
708        let rc = db.connect();
709        assert_eq!(rc, Ok(()));
710        assert!(db.is_connected());
711
712        let rc = db.connect();
713        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
714        assert!(db.is_connected());
715    }
716
717    fn test_disconnect(db: &mut LsmDb) {
718        // Destroys the handle.
719        let rc = db.disconnect();
720        assert_eq!(rc, Ok(()));
721        assert!(!db.is_connected());
722    }
723
724    fn test_persist_blobs(
725        db: &mut LsmDb,
726        num_blobs: usize,
727        size_blob: usize,
728        prng: Option<Mt19937GenRand64>,
729        id: usize,
730    ) {
731        let master_blob = if let Some(mut prng) = prng {
732            construct_random_blob(size_blob, &mut prng)
733        } else {
734            construct_compressible_blob(size_blob)
735        };
736        // Observe that we assume that a handle for the database has been
737        // produced.
738        for b in 1..=num_blobs {
739            // The keys are tagged. Blobs with smaller key will appear first in the database.
740            // In this manner, having different clients (with different ids) we can detect
741            // which client's blobs are in the database.
742            let current_blob_key = [id.to_be_bytes().as_ref(), b.to_be_bytes().as_ref()].concat();
743            let mut current_blob = master_blob.clone();
744            // Blobs are marked so that we can verify them later.
745            current_blob[0] = (b & 0xFF) as u8;
746            // Update is actually upsert (update if exists, insert otherwise)
747            let rc = Disk::update(db, &current_blob_key, &current_blob);
748            assert_eq!(rc, Ok(()));
749        }
750    }
751
752    fn test_persist_grpc_blobs(
753        db: &mut LsmDb,
754        num_blobs: usize,
755        size_blob: usize,
756        mut prng: Mt19937GenRand64,
757    ) {
758        // One needs to produce a handle before operating on the database.
759        let mut rc;
760        for _ in 1..=num_blobs {
761            let master_blob = construct_random_blob(size_blob, &mut prng);
762            let current_blob = TypedBlob::encode_to_vec(&TypedBlob {
763                payload: master_blob.clone(),
764            });
765            let signature = md5::compute(&current_blob).0;
766            // Observe that what we insert is the grpc serialization of current_master_blob
767            // and it is put under its md5 signature. In this way, when deserializing,
768            // we can again compute the md5 signature of what we get and compare with the key.
769            // In this way we test integrity of the blobs (I know md5 is broken on all sides, but
770            // for this test, this is enough).
771            // Update is actually upsert (update if exists, insert otherwise)
772            rc = Disk::update(db, signature.as_ref(), &current_blob);
773            assert_eq!(rc, Ok(()));
774        }
775    }
776
777    fn test_forward_cursor(db: &mut LsmDb, num_blobs: usize, size_blob: usize, id: usize) {
778        let cursor_res = Disk::cursor_open(db);
779        assert!(cursor_res.is_ok());
780
781        let mut cursor_forward = cursor_res.unwrap();
782        let mut rc = cursor_forward.next();
783        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
784        rc = cursor_forward.prev();
785        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
786
787        // This key is not found.
788        let no_key: usize = 0x7FFFFFFFFFFFFFFF;
789        let no_key_serial = [id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat();
790
791        // Cursor is not valid, thus an error is thrown.
792        let cmp = Cursor::compare(&cursor_forward, &no_key_serial);
793        assert_eq!(cmp, Err(LsmErrorCode::LsmError));
794
795        rc = cursor_forward.valid();
796        assert_eq!(rc, Err(LsmErrorCode::LsmError));
797
798        // Extracting from a non-valid cursor is not allowed.
799        let current_key_res = Cursor::get_key(&cursor_forward);
800        assert_eq!(current_key_res, Err(LsmErrorCode::LsmError));
801        let current_blob_res = Cursor::get_value(&cursor_forward);
802        assert_eq!(current_blob_res, Err(LsmErrorCode::LsmError));
803
804        // We will now traverse the database forward.
805        rc = cursor_forward.first();
806        assert_eq!(rc, Ok(()));
807
808        // Let's check that the database contains what we just persisted.
809        for b in 1..=num_blobs {
810            rc = cursor_forward.valid();
811            assert_eq!(rc, Ok(()));
812            let current_key_res = Cursor::get_key(&cursor_forward);
813            assert!(current_key_res.is_ok());
814            let current_blob_res = Cursor::get_value(&cursor_forward);
815            assert!(current_blob_res.is_ok());
816            let current_key = current_key_res.unwrap();
817            let current_blob = current_blob_res.unwrap();
818            // These were the values originally inserted.
819            let original_blob_key = [id.to_be_bytes().as_ref(), b.to_be_bytes().as_ref()].concat();
820            assert_eq!(current_key, original_blob_key);
821            assert_eq!(current_blob[0], (b & 0xFF) as u8);
822            assert_eq!(current_blob.len(), size_blob);
823
824            rc = cursor_forward.next();
825            assert_eq!(rc, Ok(()));
826
827            // Since we moved the cursor to the first element
828            // we can never move backwards.
829            rc = cursor_forward.prev();
830            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
831        }
832        // The cursor is not valid any more because it's located at EOF.
833        rc = cursor_forward.valid();
834        assert_eq!(rc, Err(LsmErrorCode::LsmError));
835
836        // Moving a not-valid cursor is not considered an error.
837        rc = cursor_forward.next();
838        assert_eq!(rc, Ok(()));
839
840        // Cursor remains invalid.
841        rc = cursor_forward.valid();
842        assert_eq!(rc, Err(LsmErrorCode::LsmError));
843
844        // Extracting from a non-valid cursor is not allowed.
845        let current_key_res = Cursor::get_key(&cursor_forward);
846        assert_eq!(current_key_res, Err(LsmErrorCode::LsmError));
847        let current_blob_res = Cursor::get_value(&cursor_forward);
848        assert_eq!(current_blob_res, Err(LsmErrorCode::LsmError));
849
850        // Cursor is not valid, thus an error is thrown.
851        let cmp = Cursor::compare(&cursor_forward, &no_key_serial);
852        assert_eq!(cmp, Err(LsmErrorCode::LsmError));
853
854        // Cursor can be moved again though.
855        rc = cursor_forward.first();
856        assert_eq!(rc, Ok(()));
857
858        if num_blobs > 0 {
859            // Cursor becomes valid again.
860            rc = cursor_forward.valid();
861            assert_eq!(rc, Ok(()));
862        } else {
863            rc = cursor_forward.valid();
864            assert_eq!(rc, Err(LsmErrorCode::LsmError));
865        }
866
867        // Freeing up resources.
868        rc = cursor_forward.close();
869        assert_eq!(rc, Ok(()));
870    }
871
872    fn test_forward_cursor_grpc(db: &mut LsmDb, num_blobs: usize) {
873        let cursor_res = Disk::cursor_open(db);
874        assert!(cursor_res.is_ok());
875
876        let mut cursor_forward = cursor_res.unwrap();
877        let mut rc = cursor_forward.next();
878        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
879        rc = cursor_forward.prev();
880        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
881
882        // We will now traverse the database forward.
883        rc = cursor_forward.first();
884        assert_eq!(rc, Ok(()));
885
886        // Let's check that the database contains what we just persisted.
887        for _ in 1..=num_blobs {
888            let current_key_res = Cursor::get_key(&cursor_forward);
889            assert!(current_key_res.is_ok());
890            let current_blob_res = Cursor::get_value(&cursor_forward);
891            assert!(current_blob_res.is_ok());
892            let current_key = current_key_res.unwrap();
893            let current_blob = current_blob_res.unwrap();
894            let signature = md5::compute(current_blob).0;
895            // In the signature matches, we assume everything is ok.
896            assert_eq!(current_key, signature);
897
898            rc = cursor_forward.next();
899            assert_eq!(rc, Ok(()));
900
901            // Since we moved the cursor to the first element
902            // we can never move backwards.
903            rc = cursor_forward.prev();
904            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
905        }
906        // The cursor is not valid any more because it's located at EOF.
907        rc = cursor_forward.valid();
908        assert_eq!(rc, Err(LsmErrorCode::LsmError));
909
910        // Freeing up resources.
911        rc = cursor_forward.close();
912        assert_eq!(rc, Ok(()));
913    }
914
915    fn test_backward_cursor(db: &mut LsmDb, num_blobs: usize, size_blob: usize, id: usize) {
916        let cursor_res = Disk::cursor_open(db);
917        assert!(cursor_res.is_ok());
918
919        let mut cursor_backward = cursor_res.unwrap();
920        let mut rc = cursor_backward.next();
921        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
922        rc = cursor_backward.prev();
923        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
924
925        rc = cursor_backward.last();
926        assert_eq!(rc, Ok(()));
927
928        // Let's check that the database contains what we just persisted.
929        // but now traversing backwards.
930        for b in (1..=num_blobs).rev() {
931            let current_key_res = Cursor::get_key(&cursor_backward);
932            assert!(current_key_res.is_ok());
933            let current_blob_res = Cursor::get_value(&cursor_backward);
934            assert!(current_blob_res.is_ok());
935            let current_key = current_key_res.unwrap();
936            let current_blob = current_blob_res.unwrap();
937            // These were the values originally inserted.
938            let original_blob_key = [id.to_be_bytes().as_ref(), b.to_be_bytes().as_ref()].concat();
939            assert_eq!(current_key, original_blob_key);
940            assert_eq!(current_blob[0], (b & 0xFF) as u8);
941            assert_eq!(current_blob.len(), size_blob);
942
943            rc = cursor_backward.prev();
944            assert_eq!(rc, Ok(()));
945
946            // Since we moved the cursor to the last element
947            // we can never move forward.
948            rc = cursor_backward.next();
949            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
950        }
951        // The cursor is not valid any more because it's located at EOF.
952        rc = cursor_backward.valid();
953        assert_eq!(rc, Err(LsmErrorCode::LsmError));
954
955        // Moving a not-valid cursor is not considered an error.
956        rc = cursor_backward.prev();
957        assert_eq!(rc, Ok(()));
958
959        // Cursor remains invalid.
960        rc = cursor_backward.valid();
961        assert_eq!(rc, Err(LsmErrorCode::LsmError));
962
963        // Cursor can be moved again though.
964        rc = cursor_backward.last();
965        assert_eq!(rc, Ok(()));
966
967        if num_blobs > 0 {
968            // Cursor becomes valid again.
969            rc = cursor_backward.valid();
970            assert_eq!(rc, Ok(()));
971        } else {
972            rc = cursor_backward.valid();
973            assert_eq!(rc, Err(LsmErrorCode::LsmError));
974        }
975
976        // Freeing up resources.
977        rc = cursor_backward.close();
978        assert_eq!(rc, Ok(()));
979    }
980
981    fn test_backward_cursor_grpc(db: &mut LsmDb, num_blobs: usize) {
982        let cursor_res = Disk::cursor_open(db);
983        assert!(cursor_res.is_ok());
984
985        let mut cursor_backward = cursor_res.unwrap();
986        let mut rc = cursor_backward.next();
987        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
988        rc = cursor_backward.prev();
989        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
990
991        rc = cursor_backward.last();
992        assert_eq!(rc, Ok(()));
993
994        // Let's check that the database contains what we just persisted.
995        // but now traversing backwards.
996        for _ in (1..=num_blobs).rev() {
997            let current_key_res = Cursor::get_key(&cursor_backward);
998            assert!(current_key_res.is_ok());
999            let current_blob_res = Cursor::get_value(&cursor_backward);
1000            assert!(current_blob_res.is_ok());
1001            let current_key = current_key_res.unwrap();
1002            let current_blob = current_blob_res.unwrap();
1003            let signature = md5::compute(current_blob).0;
1004            // In the signature matches, we assume everything is ok.
1005            assert_eq!(current_key, signature);
1006
1007            rc = cursor_backward.prev();
1008            assert_eq!(rc, Ok(()));
1009
1010            // Since we moved the cursor to the last element
1011            // we can never move forward.
1012            rc = cursor_backward.next();
1013            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1014        }
1015        // The cursor is not valid any more because it's located at EOF.
1016        rc = cursor_backward.valid();
1017        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1018
1019        // Freeing up resources.
1020        rc = cursor_backward.close();
1021        assert_eq!(rc, Ok(()));
1022    }
1023
1024    fn test_seek_cursor_forward_limited(
1025        db: &mut LsmDb,
1026        start_key: usize,
1027        expected_num_blobs: usize,
1028        size_blob: usize,
1029        id: usize,
1030    ) {
1031        let cursor_res = Disk::cursor_open(db);
1032        assert!(cursor_res.is_ok());
1033
1034        let mut cursor_seek_ge = cursor_res.unwrap();
1035        let mut rc = cursor_seek_ge.next();
1036        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1037        rc = cursor_seek_ge.prev();
1038        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1039
1040        // This key is not found.
1041        let no_key: usize = 0x7FFFFFFFFFFFFFFF;
1042        rc = Cursor::seek(
1043            &mut cursor_seek_ge,
1044            &[id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat(),
1045            LsmCursorSeekOp::LsmCursorSeekGe,
1046        );
1047        assert_eq!(rc, Ok(()));
1048        rc = cursor_seek_ge.valid();
1049        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1050
1051        // We only traverse the number of blobs that we are told.
1052        let key = start_key;
1053        let upper_key = key + start_key;
1054        let mut num_traversed_keys = 0;
1055        let mut current_underlying_key = key;
1056        rc = Cursor::seek(
1057            &mut cursor_seek_ge,
1058            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1059            LsmCursorSeekOp::LsmCursorSeekGe,
1060        );
1061        assert_eq!(rc, Ok(()));
1062
1063        let mut cmp = Ordering::Less;
1064        // A cursor is valid as long as it's not positioned at the end of the file.
1065        // In which case the cursor has been exhausted.
1066        while cmp < Ordering::Equal {
1067            let current_key_res = Cursor::get_key(&cursor_seek_ge);
1068            assert!(current_key_res.is_ok());
1069            let current_blob_res = Cursor::get_value(&cursor_seek_ge);
1070            assert!(current_blob_res.is_ok());
1071            let current_key = current_key_res.unwrap();
1072            let current_blob = current_blob_res.unwrap();
1073            // These were the values originally inserted.
1074            let original_blob_key = [
1075                id.to_be_bytes().as_ref(),
1076                current_underlying_key.to_be_bytes().as_ref(),
1077            ]
1078            .concat();
1079            assert_eq!(current_key, original_blob_key);
1080            assert_eq!(current_blob[0], (current_underlying_key & 0xFF) as u8);
1081            assert_eq!(current_blob.len(), size_blob);
1082
1083            // We compare the upper bound we got from the user to see if the blob is
1084            // relevant for the result set. The moment the underlying key becomes larger,
1085            // we get out of the loop.
1086            let cmp_res = Cursor::compare(
1087                &cursor_seek_ge,
1088                &[id.to_be_bytes().as_ref(), upper_key.to_be_bytes().as_ref()].concat(),
1089            );
1090            assert!(cmp_res.is_ok());
1091            cmp = cmp_res.unwrap();
1092
1093            num_traversed_keys += 1;
1094            current_underlying_key += 1;
1095
1096            rc = cursor_seek_ge.next();
1097            assert_eq!(rc, Ok(()));
1098
1099            // This one does not work.
1100            rc = cursor_seek_ge.prev();
1101            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1102        }
1103        assert_eq!(num_traversed_keys, expected_num_blobs);
1104
1105        rc = cursor_seek_ge.close();
1106        assert_eq!(rc, Ok(()));
1107    }
1108
1109    fn test_seek_cursor_forward_eof(
1110        db: &mut LsmDb,
1111        start_key: usize,
1112        expected_num_blobs: usize,
1113        size_blob: usize,
1114        id: usize,
1115    ) {
1116        let cursor_res = Disk::cursor_open(db);
1117        assert!(cursor_res.is_ok());
1118
1119        let mut cursor_seek_ge = cursor_res.unwrap();
1120        let mut rc = cursor_seek_ge.next();
1121        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1122        rc = cursor_seek_ge.prev();
1123        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1124
1125        // This key is not found.
1126        let no_key: usize = 0x7FFFFFFFFFFFFFFF;
1127        rc = Cursor::seek(
1128            &mut cursor_seek_ge,
1129            &[id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat(),
1130            LsmCursorSeekOp::LsmCursorSeekGe,
1131        );
1132        assert_eq!(rc, Ok(()));
1133        rc = cursor_seek_ge.valid();
1134        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1135
1136        // We only traverse the number of blobs that we are told.
1137        let mut num_traversed_keys = 0;
1138        let mut current_underlying_key = start_key;
1139        rc = Cursor::seek(
1140            &mut cursor_seek_ge,
1141            &[id.to_be_bytes().as_ref(), start_key.to_be_bytes().as_ref()].concat(),
1142            LsmCursorSeekOp::LsmCursorSeekGe,
1143        );
1144        assert_eq!(rc, Ok(()));
1145
1146        // A cursor is valid as long as it's not positioned at the end of the file.
1147        // In which case the cursor has been exhausted.
1148        while cursor_seek_ge.valid() == Ok(()) {
1149            let current_key_res = Cursor::get_key(&cursor_seek_ge);
1150            assert!(current_key_res.is_ok());
1151            let current_blob_res = Cursor::get_value(&cursor_seek_ge);
1152            assert!(current_blob_res.is_ok());
1153            let current_key = current_key_res.unwrap();
1154            let current_blob = current_blob_res.unwrap();
1155            // These were the values originally inserted.
1156            let original_blob_key = [
1157                id.to_be_bytes().as_ref(),
1158                current_underlying_key.to_be_bytes().as_ref(),
1159            ]
1160            .concat();
1161            assert_eq!(current_key, original_blob_key);
1162            assert_eq!(current_blob[0], (current_underlying_key & 0xFF) as u8);
1163            assert_eq!(current_blob.len(), size_blob);
1164
1165            num_traversed_keys += 1;
1166            current_underlying_key += 1;
1167
1168            rc = cursor_seek_ge.next();
1169            assert_eq!(rc, Ok(()));
1170
1171            // This one does not work.
1172            rc = cursor_seek_ge.prev();
1173            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1174        }
1175        assert_eq!(num_traversed_keys, expected_num_blobs);
1176
1177        rc = cursor_seek_ge.close();
1178        assert_eq!(rc, Ok(()));
1179    }
1180
1181    fn test_seek_cursor_backward_limited(
1182        db: &mut LsmDb,
1183        num_blobs: usize,
1184        size_blob: usize,
1185        id: usize,
1186    ) {
1187        let cursor_res = Disk::cursor_open(db);
1188        assert!(cursor_res.is_ok());
1189
1190        let mut cursor_seek_le = cursor_res.unwrap();
1191        let mut rc = cursor_seek_le.next();
1192        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1193        rc = cursor_seek_le.prev();
1194        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1195
1196        // This key is not found.
1197        let no_key: usize = 0x0;
1198        rc = Cursor::seek(
1199            &mut cursor_seek_le,
1200            &[id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat(),
1201            LsmCursorSeekOp::LsmCursorSeekLe,
1202        );
1203        assert_eq!(rc, Ok(()));
1204        rc = cursor_seek_le.valid();
1205        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1206
1207        // We only traverse the number of blobs that we are told.
1208        let key = num_blobs << 1;
1209        let lower_key = key - num_blobs;
1210        let mut num_traversed_keys = 0;
1211        let mut current_underlying_key = key;
1212        rc = Cursor::seek(
1213            &mut cursor_seek_le,
1214            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1215            LsmCursorSeekOp::LsmCursorSeekLe,
1216        );
1217        assert_eq!(rc, Ok(()));
1218
1219        let mut cmp = Ordering::Greater;
1220        // A cursor is valid as long as it's not positioned at the end of the file.
1221        // In which case the cursor has been exhausted.
1222        while cmp > Ordering::Equal {
1223            let current_key_res = Cursor::get_key(&cursor_seek_le);
1224            assert!(current_key_res.is_ok());
1225            let current_blob_res = Cursor::get_value(&cursor_seek_le);
1226            assert!(current_blob_res.is_ok());
1227            let current_key = current_key_res.unwrap();
1228            let current_blob = current_blob_res.unwrap();
1229            // These were the values originally inserted.
1230            let original_blob_key = [
1231                id.to_be_bytes().as_ref(),
1232                current_underlying_key.to_be_bytes().as_ref(),
1233            ]
1234            .concat();
1235            assert_eq!(current_key, original_blob_key);
1236            assert_eq!(current_blob[0], (current_underlying_key & 0xFF) as u8);
1237            assert_eq!(current_blob.len(), size_blob);
1238
1239            // We compare the upper bound we got from the user to see if the blob is
1240            // relevant for the result set. The moment the underlying key becomes larger,
1241            // we get out of the loop.
1242            let cmp_res = Cursor::compare(
1243                &cursor_seek_le,
1244                &[id.to_be_bytes().as_ref(), lower_key.to_be_bytes().as_ref()].concat(),
1245            );
1246            assert!(cmp_res.is_ok());
1247            cmp = cmp_res.unwrap();
1248
1249            num_traversed_keys += 1;
1250            current_underlying_key -= 1;
1251
1252            rc = cursor_seek_le.prev();
1253            assert_eq!(rc, Ok(()));
1254
1255            // This one does not work.
1256            rc = cursor_seek_le.next();
1257            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1258        }
1259        assert_eq!(num_traversed_keys, num_blobs + 1);
1260
1261        rc = cursor_seek_le.close();
1262        assert_eq!(rc, Ok(()));
1263    }
1264
1265    fn test_seek_cursor_backward_eof(
1266        db: &mut LsmDb,
1267        start_key: usize,
1268        expected_num_keys: usize,
1269        size_blob: usize,
1270        id: usize,
1271    ) {
1272        let cursor_res = Disk::cursor_open(db);
1273        assert!(cursor_res.is_ok());
1274
1275        let mut cursor_seek_le = cursor_res.unwrap();
1276        let mut rc = cursor_seek_le.next();
1277        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1278        rc = cursor_seek_le.prev();
1279        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1280
1281        // This key is not found.
1282        let no_key: usize = 0x0;
1283        rc = Cursor::seek(
1284            &mut cursor_seek_le,
1285            &[id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat(),
1286            LsmCursorSeekOp::LsmCursorSeekLe,
1287        );
1288        assert_eq!(rc, Ok(()));
1289        rc = cursor_seek_le.valid();
1290        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1291
1292        // We only traverse the number of blobs that we are told.
1293        let mut num_traversed_keys = 0;
1294        let mut current_underlying_key = start_key;
1295        rc = Cursor::seek(
1296            &mut cursor_seek_le,
1297            &[id.to_be_bytes().as_ref(), start_key.to_be_bytes().as_ref()].concat(),
1298            LsmCursorSeekOp::LsmCursorSeekLe,
1299        );
1300        assert_eq!(rc, Ok(()));
1301
1302        // A cursor is valid as long as it's not positioned at the end of the file.
1303        // In which case the cursor has been exhausted.
1304        while cursor_seek_le.valid() == Ok(()) {
1305            let current_key_res = Cursor::get_key(&cursor_seek_le);
1306            assert!(current_key_res.is_ok());
1307            let current_blob_res = Cursor::get_value(&cursor_seek_le);
1308            assert!(current_blob_res.is_ok());
1309            let current_key = current_key_res.unwrap();
1310            let current_blob = current_blob_res.unwrap();
1311            // These were the values originally inserted.
1312            let original_blob_key = [
1313                id.to_be_bytes().as_ref(),
1314                current_underlying_key.to_be_bytes().as_ref(),
1315            ]
1316            .concat();
1317            assert_eq!(current_key, original_blob_key);
1318            assert_eq!(current_blob[0], (current_underlying_key & 0xFF) as u8);
1319            assert_eq!(current_blob.len(), size_blob);
1320
1321            num_traversed_keys += 1;
1322            current_underlying_key -= 1;
1323
1324            rc = cursor_seek_le.prev();
1325            assert_eq!(rc, Ok(()));
1326
1327            // This one does not work.
1328            rc = cursor_seek_le.next();
1329            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1330        }
1331        assert_eq!(num_traversed_keys, expected_num_keys);
1332
1333        rc = cursor_seek_le.close();
1334        assert_eq!(rc, Ok(()));
1335    }
1336
1337    fn test_seek_cursor_exact(db: &mut LsmDb, start_key: usize, size_blob: usize, id: usize) {
1338        let cursor_res = Disk::cursor_open(db);
1339        assert!(cursor_res.is_ok());
1340
1341        let mut cursor_seek_eq = cursor_res.unwrap();
1342        let mut rc = cursor_seek_eq.next();
1343        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1344        rc = cursor_seek_eq.prev();
1345        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1346
1347        // This key is not found.
1348        let no_key: usize = 0x7FFFFFFFFFFFFFFF;
1349        rc = Cursor::seek(
1350            &mut cursor_seek_eq,
1351            &[id.to_be_bytes().as_ref(), no_key.to_be_bytes().as_ref()].concat(),
1352            LsmCursorSeekOp::LsmCursorSeekEq,
1353        );
1354        assert_eq!(rc, Ok(()));
1355        rc = cursor_seek_eq.valid();
1356        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1357
1358        // We only traverse the number of blobs that we are told.
1359        let key = start_key;
1360        let current_underlying_key = key;
1361        rc = Cursor::seek(
1362            &mut cursor_seek_eq,
1363            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1364            LsmCursorSeekOp::LsmCursorSeekEq,
1365        );
1366        assert_eq!(rc, Ok(()));
1367
1368        let current_key_res = Cursor::get_key(&cursor_seek_eq);
1369        assert!(current_key_res.is_ok());
1370        let current_blob_res = Cursor::get_value(&cursor_seek_eq);
1371        assert!(current_blob_res.is_ok());
1372        let current_key = current_key_res.unwrap();
1373        let current_blob = current_blob_res.unwrap();
1374        // These were the values originally inserted.
1375        let original_blob_key = [
1376            id.to_be_bytes().as_ref(),
1377            current_underlying_key.to_be_bytes().as_ref(),
1378        ]
1379        .concat();
1380        assert_eq!(current_key, original_blob_key);
1381        assert_eq!(current_blob[0], (current_underlying_key & 0xFF) as u8);
1382        assert_eq!(current_blob.len(), size_blob);
1383
1384        // We compare the upper bound we got from the user to see if the blob is
1385        // relevant for the result set. The moment the underlying key becomes larger,
1386        // we get out of the loop.
1387        let cmp_res = Cursor::compare(
1388            &cursor_seek_eq,
1389            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1390        );
1391        assert_eq!(cmp_res, Ok(Ordering::Equal));
1392
1393        rc = cursor_seek_eq.next();
1394        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1395
1396        // This one does not work.
1397        rc = cursor_seek_eq.prev();
1398        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1399
1400        rc = cursor_seek_eq.close();
1401        assert_eq!(rc, Ok(()));
1402    }
1403
1404    fn test_num_blobs_are_in_file(db: &mut LsmDb, expected_num_blobs: usize) {
1405        let cursor_res = Disk::cursor_open(db);
1406        assert!(cursor_res.is_ok());
1407
1408        let mut cursor_seek_forward = cursor_res.unwrap();
1409        let mut rc = cursor_seek_forward.next();
1410        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1411        rc = cursor_seek_forward.prev();
1412        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1413
1414        // Move the cursor to the very first key of the file.
1415        rc = cursor_seek_forward.first();
1416        assert_eq!(rc, Ok(()));
1417
1418        let mut num_traversed_keys = 0;
1419        // A cursor is valid as long as it's not positioned at the end of the file.
1420        // In which case the cursor has been exhausted.
1421        while cursor_seek_forward.valid() == Ok(()) {
1422            num_traversed_keys += 1;
1423
1424            rc = cursor_seek_forward.next();
1425            assert_eq!(rc, Ok(()));
1426
1427            // This one does not work.
1428            rc = cursor_seek_forward.prev();
1429            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1430        }
1431        assert_eq!(num_traversed_keys, expected_num_blobs);
1432    }
1433
1434    fn test_single_deletion(db: &mut LsmDb, key: usize, total_num_blobs: usize, id: usize) {
1435        let rc = Disk::delete(
1436            db,
1437            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1438        );
1439        assert_eq!(rc, Ok(()));
1440
1441        // Now we verify that we have one element less,
1442        // and that we cannot find the one we just deleted.
1443        let cursor_res = Disk::cursor_open(db);
1444        assert!(cursor_res.is_ok());
1445
1446        let mut cursor = cursor_res.unwrap();
1447        let mut rc = cursor.next();
1448        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1449        rc = cursor.prev();
1450        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1451
1452        // We will now traverse the database forward.
1453        rc = cursor.first();
1454        assert_eq!(rc, Ok(()));
1455
1456        let mut num_traversed_keys = 0;
1457        while cursor.valid() == Ok(()) {
1458            num_traversed_keys += 1;
1459
1460            rc = cursor.next();
1461            assert_eq!(rc, Ok(()));
1462
1463            // This one does not work.
1464            rc = cursor.prev();
1465            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1466        }
1467        assert_eq!(num_traversed_keys, total_num_blobs - 1);
1468
1469        // Now let us check that the key cannot be found.
1470        rc = Cursor::seek(
1471            &mut cursor,
1472            &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1473            LsmCursorSeekOp::LsmCursorSeekEq,
1474        );
1475        assert_eq!(rc, Ok(()));
1476        rc = cursor.valid();
1477        assert_eq!(rc, Err(LsmErrorCode::LsmError));
1478
1479        rc = cursor.close();
1480        assert_eq!(rc, Ok(()));
1481    }
1482
1483    fn test_range_deletion(
1484        db: &mut LsmDb,
1485        starting_key: usize,
1486        ending_key: usize,
1487        total_num_blobs: usize,
1488        id: usize,
1489    ) {
1490        // Ending and starting keys are not deleted.
1491        let num_deleted_keys = ending_key - starting_key - 1;
1492
1493        // If the interval extremes are swapped, nothing happens.
1494        // If something gets deleted here, then the following tests
1495        // will fail because cardinalities won't match any more.
1496        let rc = Disk::delete_range(
1497            db,
1498            &[id.to_be_bytes().as_ref(), ending_key.to_be_bytes().as_ref()].concat(),
1499            &[
1500                id.to_be_bytes().as_ref(),
1501                starting_key.to_be_bytes().as_ref(),
1502            ]
1503            .concat(),
1504        );
1505        assert_eq!(rc, Ok(()));
1506
1507        // Range-delete deletes an open interval.
1508        let rc = Disk::delete_range(
1509            db,
1510            &[
1511                id.to_be_bytes().as_ref(),
1512                starting_key.to_be_bytes().as_ref(),
1513            ]
1514            .concat(),
1515            &[id.to_be_bytes().as_ref(), ending_key.to_be_bytes().as_ref()].concat(),
1516        );
1517        assert_eq!(rc, Ok(()));
1518
1519        // Now we verify that we have much less entries,
1520        // and that we cannot find the ones we just deleted.
1521        let cursor_res = Disk::cursor_open(db);
1522        assert!(cursor_res.is_ok());
1523
1524        let mut cursor = cursor_res.unwrap();
1525        let mut rc = cursor.next();
1526        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1527        rc = cursor.prev();
1528        assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1529
1530        // We will now traverse the database forward.
1531        rc = cursor.first();
1532        assert_eq!(rc, Ok(()));
1533
1534        let mut num_traversed_keys = 0;
1535        while cursor.valid() == Ok(()) {
1536            num_traversed_keys += 1;
1537
1538            rc = cursor.next();
1539            assert_eq!(rc, Ok(()));
1540
1541            // This one does not work.
1542            rc = cursor.prev();
1543            assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
1544        }
1545        // The - 1 it's because in a previous step outside, we deleted a single
1546        // element, and thus it cannot be found any longer.
1547        assert_eq!(num_traversed_keys, total_num_blobs - num_deleted_keys - 1);
1548
1549        //  Remember that starting and ending keys are not deleted.
1550        for key in (starting_key + 1)..ending_key {
1551            // Now let us check that the key cannot be found.
1552            rc = Cursor::seek(
1553                &mut cursor,
1554                &[id.to_be_bytes().as_ref(), key.to_be_bytes().as_ref()].concat(),
1555                LsmCursorSeekOp::LsmCursorSeekEq,
1556            );
1557            assert_eq!(rc, Ok(()));
1558            rc = cursor.valid();
1559            assert_eq!(rc, Err(LsmErrorCode::LsmError));
1560        }
1561
1562        rc = cursor.close();
1563        assert_eq!(rc, Ok(()));
1564    }
1565
1566    #[test]
1567    /// We test the whole api under different circumstances.
1568    fn lsm_whole_rust_bindings_multiple_optimized_dbs() {
1569        let num_dbs = 2;
1570        let num_connection_cycles = 10;
1571        let num_segments = 2;
1572        let mut thread_handles = vec![];
1573        let mut master_prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
1574        for db_idx in 1..=num_dbs {
1575            // Every thread will have a different seed, but each time it will be same,
1576            // as the master prng is initialized with the same seed every time.
1577            let thread_prng: Mt64 = SeedableRng::seed_from_u64(master_prng.next_u64());
1578            let handle = thread::spawn(move || {
1579                // Let's try to initialize a database per thread.
1580                let mut db = test_initialize(
1581                    db_idx,
1582                    "test-insert-get-blobs-optimized".to_string(),
1583                    LsmMode::LsmNoBackgroundThreads,
1584                    LsmCompressionLib::NoCompression,
1585                );
1586
1587                // Let's connect to it via a main memory handle.
1588                test_connect(&mut db);
1589
1590                // We now produce certain amount of blobs and persist them.
1591                let num_blobs = 10000_usize;
1592                let size_blob = 1 << 10; // 1 KB
1593
1594                // At most this many segment are created (depending on checkpointing for example).
1595                for _ in 0..num_segments {
1596                    // Let's persist some blobs.
1597                    test_persist_blobs(
1598                        &mut db,
1599                        num_blobs,
1600                        size_blob,
1601                        Some(thread_prng.clone()),
1602                        db_idx,
1603                    );
1604                }
1605
1606                // Let's optimize the DB. No wasted data pages.
1607                assert!(db.optimize().is_ok());
1608
1609                // Once some blobs have been persisted, we connect
1610                // and disconnect multiple times and then we test that
1611                // what we read is what we expect.
1612                for _ in 0..num_connection_cycles {
1613                    test_disconnect(&mut db);
1614                    test_connect(&mut db);
1615                }
1616
1617                // Let's test forward iterators on the whole database.
1618                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
1619
1620                // Now traverse the database backward.
1621                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
1622
1623                // Now using cursors via seeking a particular blob.
1624                test_seek_cursor_forward_limited(
1625                    &mut db,
1626                    num_blobs >> 2,
1627                    (num_blobs >> 2) + 1,
1628                    size_blob,
1629                    db_idx,
1630                );
1631
1632                // Now until the very end of the database (EOF)
1633                test_seek_cursor_forward_eof(
1634                    &mut db,
1635                    3 * (num_blobs >> 2),
1636                    (num_blobs >> 2) + 1,
1637                    size_blob,
1638                    db_idx,
1639                );
1640
1641                // Now using seeking going backwards.
1642                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
1643
1644                // Now until the beginning of the database (inverse EOF)
1645                test_seek_cursor_backward_eof(
1646                    &mut db,
1647                    num_blobs >> 2,
1648                    num_blobs >> 2,
1649                    size_blob,
1650                    db_idx,
1651                );
1652
1653                // Now seek with exact match.
1654                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
1655
1656                // Now let's test deletions.
1657                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
1658
1659                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
1660
1661                // Good bye
1662                test_disconnect(&mut db);
1663            });
1664            thread_handles.push(handle);
1665        }
1666        for t in thread_handles {
1667            t.join().unwrap();
1668        }
1669    }
1670
1671    #[test]
1672    /// We test the whole api under different circumstances.
1673    fn lsm_whole_rust_bindings_multiple_non_optimized_dbs() {
1674        let num_dbs = 2;
1675        let num_connection_cycles = 10;
1676        let num_segments = 2;
1677        let mut thread_handles = vec![];
1678        let mut master_prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
1679        for db_idx in 1..=num_dbs {
1680            // Every thread will have a different seed, but each time it will be same,
1681            // as the master prng is initialized with the same seed every time.
1682            let thread_prng: Mt64 = SeedableRng::seed_from_u64(master_prng.next_u64());
1683            let handle = thread::spawn(move || {
1684                // Let's try to initialize a database per thread.
1685                let mut db = test_initialize(
1686                    db_idx,
1687                    "test-insert-get-blobs-non-optimized".to_string(),
1688                    LsmMode::LsmNoBackgroundThreads,
1689                    LsmCompressionLib::NoCompression,
1690                );
1691
1692                // Let's connect to it via a main memory handle.
1693                test_connect(&mut db);
1694
1695                // We now produce certain amount of blobs and persist them.
1696                let num_blobs = 10000_usize;
1697                let size_blob = 1 << 10; // 1 KB
1698
1699                // At most this many segment are created (depending on checkpointing for example).
1700                for _ in 0..num_segments {
1701                    // Let's persist some blobs.
1702                    test_persist_blobs(
1703                        &mut db,
1704                        num_blobs,
1705                        size_blob,
1706                        Some(thread_prng.clone()),
1707                        db_idx,
1708                    );
1709                }
1710
1711                // No optimization now. There will be wasted data pages.
1712
1713                // Once some blobs have been persisted, we connect
1714                // and disconnect multiple times and then we test that
1715                // what we read is what we expect.
1716                for _ in 0..num_connection_cycles {
1717                    test_disconnect(&mut db);
1718                    test_connect(&mut db);
1719                }
1720
1721                // Let's test forward iterators on the whole database.
1722                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
1723
1724                // Now traverse the database backward.
1725                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
1726
1727                // Now until the very end of the database (EOF)
1728                test_seek_cursor_forward_eof(
1729                    &mut db,
1730                    3 * (num_blobs >> 2),
1731                    (num_blobs >> 2) + 1,
1732                    size_blob,
1733                    db_idx,
1734                );
1735
1736                // Now using seeking going backwards.
1737                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
1738
1739                // Now until the beginning of the database (inverse EOF)
1740                test_seek_cursor_backward_eof(
1741                    &mut db,
1742                    num_blobs >> 2,
1743                    num_blobs >> 2,
1744                    size_blob,
1745                    db_idx,
1746                );
1747
1748                // Now seek with exact match.
1749                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
1750
1751                // Now let's test deletions.
1752                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
1753
1754                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
1755
1756                // Good bye
1757                test_disconnect(&mut db);
1758            });
1759            thread_handles.push(handle);
1760        }
1761        for t in thread_handles {
1762            t.join().unwrap();
1763        }
1764    }
1765
1766    #[test]
1767    /// We test the whole api under different circumstances.
1768    fn lsm_whole_rust_bindings_multiple_empty_dbs() {
1769        let num_dbs = 2;
1770        let num_connection_cycles = 10;
1771        let mut thread_handles = vec![];
1772        for db_idx in 1..=num_dbs {
1773            let handle = thread::spawn(move || {
1774                // Let's try to initialize a database per thread.
1775                let mut db = test_initialize(
1776                    db_idx,
1777                    "test-get-blobs-optimized-empty".to_string(),
1778                    LsmMode::LsmNoBackgroundThreads,
1779                    LsmCompressionLib::NoCompression,
1780                );
1781
1782                // Let's connect to it via a main memory handle.
1783                test_connect(&mut db);
1784
1785                // We now produce certain amount of blobs and persist them.
1786                let num_blobs = 0;
1787                let size_blob = 0;
1788
1789                // Let's optimize the DB. No wasted data pages.
1790                assert!(db.optimize().is_ok());
1791
1792                // Once some blobs have been persisted, we connect
1793                // and disconnect multiple times and then we test that
1794                // what we read is what we expect.
1795                for _ in 0..num_connection_cycles {
1796                    test_disconnect(&mut db);
1797                    test_connect(&mut db);
1798                }
1799
1800                // Let's test forward iterators on the whole database.
1801                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
1802
1803                // Now traverse the database backward.
1804                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
1805
1806                // Now until the very end of the database (EOF)
1807                test_seek_cursor_forward_eof(&mut db, 3 * (num_blobs >> 2), 0, size_blob, db_idx);
1808
1809                // Now until the beginning of the database (inverse EOF)
1810                test_seek_cursor_backward_eof(
1811                    &mut db,
1812                    num_blobs >> 2,
1813                    num_blobs >> 2,
1814                    size_blob,
1815                    db_idx,
1816                );
1817
1818                // Good bye
1819                test_disconnect(&mut db);
1820            });
1821            thread_handles.push(handle);
1822        }
1823        for t in thread_handles {
1824            t.join().unwrap();
1825        }
1826    }
1827
1828    #[test]
1829    /// We test the whole api under different circumstances.
1830    fn lsm_whole_rust_bindings_multiple_compressed_lz4_optimized_dbs() {
1831        let num_dbs = 2;
1832        let num_connection_cycles = 10;
1833        let num_segments = 2;
1834        let mut thread_handles = vec![];
1835        // Every database will be handled by a thread.
1836        for db_idx in 1..=num_dbs {
1837            let handle = thread::spawn(move || {
1838                // Let's try to initialize a database per thread.
1839                let mut db = test_initialize(
1840                    db_idx,
1841                    "test-insert-get-compressed-lz4-blobs".to_string(),
1842                    LsmMode::LsmNoBackgroundThreads,
1843                    LsmCompressionLib::LZ4,
1844                );
1845
1846                // Let's connect to it via a main memory handle.
1847                test_connect(&mut db);
1848
1849                // We now produce certain amount of blobs and persist them.
1850                let num_blobs = 10000_usize;
1851                let size_blob = 1 << 10; // 1 KB
1852
1853                // At most this many segment are created (depending on checkpointing for example).
1854                for _ in 0..num_segments {
1855                    // Let's persist some blobs.
1856                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
1857                }
1858
1859                // Let's optimize the DB. No wasted data pages.
1860                assert!(db.optimize().is_ok());
1861
1862                // Once some blobs have been persisted, we connect
1863                // and disconnect multiple times and then we test that
1864                // what we read is what we expect.
1865                for _ in 0..num_connection_cycles {
1866                    test_disconnect(&mut db);
1867                    test_connect(&mut db);
1868                }
1869
1870                // Let's test forward iterators on the whole database.
1871                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
1872
1873                // Now traverse the database backward.
1874                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
1875
1876                // Now using cursors via seeking a particular blob.
1877                test_seek_cursor_forward_limited(
1878                    &mut db,
1879                    num_blobs >> 2,
1880                    (num_blobs >> 2) + 1,
1881                    size_blob,
1882                    db_idx,
1883                );
1884
1885                // Now until the very end of the database (EOF)
1886                test_seek_cursor_forward_eof(
1887                    &mut db,
1888                    3 * (num_blobs >> 2),
1889                    (num_blobs >> 2) + 1,
1890                    size_blob,
1891                    db_idx,
1892                );
1893
1894                // Now using seeking going backwards.
1895                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
1896
1897                // Now until the beginning of the database (inverse EOF)
1898                test_seek_cursor_backward_eof(
1899                    &mut db,
1900                    num_blobs >> 2,
1901                    num_blobs >> 2,
1902                    size_blob,
1903                    db_idx,
1904                );
1905
1906                // Now seek with exact match.
1907                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
1908
1909                // Now let's test deletions.
1910                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
1911                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
1912
1913                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::LZ4));
1914
1915                // Good bye
1916                test_disconnect(&mut db);
1917            });
1918            thread_handles.push(handle);
1919        }
1920        for t in thread_handles {
1921            t.join().unwrap();
1922        }
1923    }
1924
1925    #[test]
1926    /// We test the whole api under different circumstances.
1927    fn lsm_whole_rust_bindings_multiple_compressed_lz4_non_optimized_dbs() {
1928        let num_dbs = 2;
1929        let num_connection_cycles = 10;
1930        let num_segments = 2;
1931        let mut thread_handles = vec![];
1932        // Every database will be handled by a thread.
1933        for db_idx in 1..=num_dbs {
1934            let handle = thread::spawn(move || {
1935                // Let's try to initialize a database per thread.
1936                let mut db = test_initialize(
1937                    db_idx,
1938                    "test-insert-get-compressed-lz4-blobs".to_string(),
1939                    LsmMode::LsmNoBackgroundThreads,
1940                    LsmCompressionLib::LZ4,
1941                );
1942
1943                // Let's connect to it via a main memory handle.
1944                test_connect(&mut db);
1945
1946                // We now produce certain amount of blobs and persist them.
1947                let num_blobs = 10000_usize;
1948                let size_blob = 1 << 10; // 1 KB
1949
1950                // At most this many segment are created (depending on checkpointing for example).
1951                for _ in 0..num_segments {
1952                    // Let's persist some blobs.
1953                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
1954                }
1955
1956                // No optimization now. There will be wasted data pages.
1957
1958                // Once some blobs have been persisted, we connect
1959                // and disconnect multiple times and then we test that
1960                // what we read is what we expect.
1961                for _ in 0..num_connection_cycles {
1962                    test_disconnect(&mut db);
1963                    test_connect(&mut db);
1964                }
1965
1966                // Let's test forward iterators on the whole database.
1967                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
1968
1969                // Now traverse the database backward.
1970                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
1971
1972                // Now using cursors via seeking a particular blob.
1973                test_seek_cursor_forward_limited(
1974                    &mut db,
1975                    num_blobs >> 2,
1976                    (num_blobs >> 2) + 1,
1977                    size_blob,
1978                    db_idx,
1979                );
1980
1981                // Now until the very end of the database (EOF)
1982                test_seek_cursor_forward_eof(
1983                    &mut db,
1984                    3 * (num_blobs >> 2),
1985                    (num_blobs >> 2) + 1,
1986                    size_blob,
1987                    db_idx,
1988                );
1989
1990                // Now using seeking going backwards.
1991                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
1992
1993                // Now until the beginning of the database (inverse EOF)
1994                test_seek_cursor_backward_eof(
1995                    &mut db,
1996                    num_blobs >> 2,
1997                    num_blobs >> 2,
1998                    size_blob,
1999                    db_idx,
2000                );
2001
2002                // Now seek with exact match.
2003                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
2004
2005                // Now let's test deletions.
2006                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
2007                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
2008
2009                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::LZ4));
2010
2011                // Good bye
2012                test_disconnect(&mut db);
2013            });
2014            thread_handles.push(handle);
2015        }
2016        for t in thread_handles {
2017            t.join().unwrap();
2018        }
2019    }
2020
2021    #[test]
2022    /// We test the whole api under different circumstances.
2023    fn lsm_whole_rust_bindings_multiple_compressed_zlib_optimized_dbs() {
2024        let num_dbs = 2;
2025        let num_connection_cycles = 10;
2026        let num_segments = 2;
2027        let mut thread_handles = vec![];
2028        // Every database will be handled by a thread.
2029        for db_idx in 1..=num_dbs {
2030            let handle = thread::spawn(move || {
2031                // Let's try to initialize a database per thread.
2032                let mut db = test_initialize(
2033                    db_idx,
2034                    "test-insert-get-compressed-zlib-blobs".to_string(),
2035                    LsmMode::LsmNoBackgroundThreads,
2036                    LsmCompressionLib::ZLib,
2037                );
2038
2039                // Let's connect to it via a main memory handle.
2040                test_connect(&mut db);
2041
2042                // We now produce certain amount of blobs and persist them.
2043                let num_blobs = 10000_usize;
2044                let size_blob = 1 << 10; // 1 KB
2045
2046                // At most this many segment are created (depending on checkpointing for example).
2047                for _ in 0..num_segments {
2048                    // Let's persist some blobs.
2049                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
2050                }
2051
2052                // Let's optimize the DB. No wasted data pages.
2053                assert!(db.optimize().is_ok());
2054
2055                // Once some blobs have been persisted, we connect
2056                // and disconnect multiple times and then we test that
2057                // what we read is what we expect.
2058                for _ in 0..num_connection_cycles {
2059                    test_disconnect(&mut db);
2060                    test_connect(&mut db);
2061                }
2062
2063                // Let's test forward iterators on the whole database.
2064                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
2065
2066                // Now traverse the database backward.
2067                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
2068
2069                // Now using cursors via seeking a particular blob.
2070                test_seek_cursor_forward_limited(
2071                    &mut db,
2072                    num_blobs >> 2,
2073                    (num_blobs >> 2) + 1,
2074                    size_blob,
2075                    db_idx,
2076                );
2077
2078                // Now until the very end of the database (EOF)
2079                test_seek_cursor_forward_eof(
2080                    &mut db,
2081                    3 * (num_blobs >> 2),
2082                    (num_blobs >> 2) + 1,
2083                    size_blob,
2084                    db_idx,
2085                );
2086
2087                // Now using seeking going backwards.
2088                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
2089
2090                // Now until the beginning of the database (inverse EOF)
2091                test_seek_cursor_backward_eof(
2092                    &mut db,
2093                    num_blobs >> 2,
2094                    num_blobs >> 2,
2095                    size_blob,
2096                    db_idx,
2097                );
2098
2099                // Now seek with exact match.
2100                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
2101
2102                // Now let's test deletions.
2103                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
2104                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
2105
2106                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::ZLib));
2107
2108                // Good bye
2109                test_disconnect(&mut db);
2110            });
2111            thread_handles.push(handle);
2112        }
2113        for t in thread_handles {
2114            t.join().unwrap();
2115        }
2116    }
2117
2118    #[test]
2119    /// We test the whole api under different circumstances.
2120    fn lsm_whole_rust_bindings_multiple_compressed_zlib_non_optimized_dbs() {
2121        let num_dbs = 2;
2122        let num_connection_cycles = 10;
2123        let num_segments = 2;
2124        let mut thread_handles = vec![];
2125        // Every database will be handled by a thread.
2126        for db_idx in 1..=num_dbs {
2127            let handle = thread::spawn(move || {
2128                // Let's try to initialize a database per thread.
2129                let mut db = test_initialize(
2130                    db_idx,
2131                    "test-insert-get-compressed-zlib-blobs".to_string(),
2132                    LsmMode::LsmNoBackgroundThreads,
2133                    LsmCompressionLib::ZLib,
2134                );
2135
2136                // Let's connect to it via a main memory handle.
2137                test_connect(&mut db);
2138
2139                // We now produce certain amount of blobs and persist them.
2140                let num_blobs = 10000_usize;
2141                let size_blob = 1 << 10; // 1 KB
2142
2143                // At most this many segment are created (depending on checkpointing for example).
2144                for _ in 0..num_segments {
2145                    // Let's persist some blobs.
2146                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
2147                }
2148
2149                // No optimization now. There will be wasted data pages.
2150
2151                // Once some blobs have been persisted, we connect
2152                // and disconnect multiple times and then we test that
2153                // what we read is what we expect.
2154                for _ in 0..num_connection_cycles {
2155                    test_disconnect(&mut db);
2156                    test_connect(&mut db);
2157                }
2158
2159                // Let's test forward iterators on the whole database.
2160                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
2161
2162                // Now traverse the database backward.
2163                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
2164
2165                // Now using cursors via seeking a particular blob.
2166                test_seek_cursor_forward_limited(
2167                    &mut db,
2168                    num_blobs >> 2,
2169                    (num_blobs >> 2) + 1,
2170                    size_blob,
2171                    db_idx,
2172                );
2173
2174                // Now until the very end of the database (EOF)
2175                test_seek_cursor_forward_eof(
2176                    &mut db,
2177                    3 * (num_blobs >> 2),
2178                    (num_blobs >> 2) + 1,
2179                    size_blob,
2180                    db_idx,
2181                );
2182
2183                // Now using seeking going backwards.
2184                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
2185
2186                // Now until the beginning of the database (inverse EOF)
2187                test_seek_cursor_backward_eof(
2188                    &mut db,
2189                    num_blobs >> 2,
2190                    num_blobs >> 2,
2191                    size_blob,
2192                    db_idx,
2193                );
2194
2195                // Now seek with exact match.
2196                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
2197
2198                // Now let's test deletions.
2199                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
2200                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
2201
2202                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::ZLib));
2203
2204                // Good bye
2205                test_disconnect(&mut db);
2206            });
2207            thread_handles.push(handle);
2208        }
2209        for t in thread_handles {
2210            t.join().unwrap();
2211        }
2212    }
2213
2214    #[test]
2215    /// We test the whole api under different circumstances.
2216    fn lsm_whole_rust_bindings_multiple_compressed_zstd_optimized_dbs() {
2217        let num_dbs = 2;
2218        let num_connection_cycles = 10;
2219        let num_segments = 2;
2220        let mut thread_handles = vec![];
2221        // Every database will be handled by a thread.
2222        for db_idx in 1..=num_dbs {
2223            let handle = thread::spawn(move || {
2224                // Let's try to initialize a database per thread.
2225                let mut db = test_initialize(
2226                    db_idx,
2227                    "test-insert-get-compressed-zstd-blobs-optimized".to_string(),
2228                    LsmMode::LsmNoBackgroundThreads,
2229                    LsmCompressionLib::ZStd,
2230                );
2231
2232                // Let's connect to it via a main memory handle.
2233                test_connect(&mut db);
2234
2235                // We now produce certain amount of blobs and persist them.
2236                let num_blobs = 10000_usize;
2237                let size_blob = 1 << 10; // 1 KB
2238
2239                // At most this many segment are created (depending on checkpointing for example).
2240                for _ in 0..num_segments {
2241                    // Let's persist some blobs.
2242                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
2243                }
2244
2245                // Let's optimize the DB. No wasted data pages.
2246                assert!(db.optimize().is_ok());
2247
2248                // Once some blobs have been persisted, we connect
2249                // and disconnect multiple times and then we test that
2250                // what we read is what we expect.
2251                for _ in 0..num_connection_cycles {
2252                    test_disconnect(&mut db);
2253                    test_connect(&mut db);
2254                }
2255
2256                // Let's test forward iterators on the whole database.
2257                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
2258
2259                // Now traverse the database backward.
2260                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
2261
2262                // Now using cursors via seeking a particular blob.
2263                test_seek_cursor_forward_limited(
2264                    &mut db,
2265                    num_blobs >> 2,
2266                    (num_blobs >> 2) + 1,
2267                    size_blob,
2268                    db_idx,
2269                );
2270
2271                // Now until the very end of the database (EOF)
2272                test_seek_cursor_forward_eof(
2273                    &mut db,
2274                    3 * (num_blobs >> 2),
2275                    (num_blobs >> 2) + 1,
2276                    size_blob,
2277                    db_idx,
2278                );
2279
2280                // Now using seeking going backwards.
2281                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
2282
2283                // Now until the beginning of the database (inverse EOF)
2284                test_seek_cursor_backward_eof(
2285                    &mut db,
2286                    num_blobs >> 2,
2287                    num_blobs >> 2,
2288                    size_blob,
2289                    db_idx,
2290                );
2291
2292                // Now seek with exact match.
2293                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
2294
2295                // Now let's test deletions.
2296                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
2297                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
2298
2299                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::ZStd));
2300
2301                // Good bye
2302                test_disconnect(&mut db);
2303            });
2304            thread_handles.push(handle);
2305        }
2306        for t in thread_handles {
2307            t.join().unwrap();
2308        }
2309    }
2310
2311    #[test]
2312    /// We test the whole api under different circumstances.
2313    fn lsm_whole_rust_bindings_multiple_compressed_zstd_non_optimized_dbs() {
2314        let num_dbs = 2;
2315        let num_connection_cycles = 10;
2316        let num_segments = 2;
2317        let mut thread_handles = vec![];
2318        // Every database will be handled by a thread.
2319        for db_idx in 1..=num_dbs {
2320            let handle = thread::spawn(move || {
2321                // Let's try to initialize a database per thread.
2322                let mut db = test_initialize(
2323                    db_idx,
2324                    "test-insert-get-compressed-zstd-blobs-non-optimized".to_string(),
2325                    LsmMode::LsmNoBackgroundThreads,
2326                    LsmCompressionLib::ZStd,
2327                );
2328
2329                // Let's connect to it via a main memory handle.
2330                test_connect(&mut db);
2331
2332                // We now produce certain amount of blobs and persist them.
2333                let num_blobs = 10000_usize;
2334                let size_blob = 1 << 10; // 1 KB
2335
2336                // At most this many segment are created (depending on checkpointing for example).
2337                for _ in 0..num_segments {
2338                    // Let's persist some blobs.
2339                    test_persist_blobs(&mut db, num_blobs, size_blob, None, db_idx);
2340                }
2341
2342                // No optimization now. There will be wasted data pages.
2343
2344                // Once some blobs have been persisted, we connect
2345                // and disconnect multiple times and then we test that
2346                // what we read is what we expect.
2347                for _ in 0..num_connection_cycles {
2348                    test_disconnect(&mut db);
2349                    test_connect(&mut db);
2350                }
2351
2352                // Let's test forward iterators on the whole database.
2353                test_forward_cursor(&mut db, num_blobs, size_blob, db_idx);
2354
2355                // Now traverse the database backward.
2356                test_backward_cursor(&mut db, num_blobs, size_blob, db_idx);
2357
2358                // Now using cursors via seeking a particular blob.
2359                test_seek_cursor_forward_limited(
2360                    &mut db,
2361                    num_blobs >> 2,
2362                    (num_blobs >> 2) + 1,
2363                    size_blob,
2364                    db_idx,
2365                );
2366
2367                // Now until the very end of the database (EOF)
2368                test_seek_cursor_forward_eof(
2369                    &mut db,
2370                    3 * (num_blobs >> 2),
2371                    (num_blobs >> 2) + 1,
2372                    size_blob,
2373                    db_idx,
2374                );
2375
2376                // Now using seeking going backwards.
2377                test_seek_cursor_backward_limited(&mut db, num_blobs >> 2, size_blob, db_idx);
2378
2379                // Now until the beginning of the database (inverse EOF)
2380                test_seek_cursor_backward_eof(
2381                    &mut db,
2382                    num_blobs >> 2,
2383                    num_blobs >> 2,
2384                    size_blob,
2385                    db_idx,
2386                );
2387
2388                // Now seek with exact match.
2389                test_seek_cursor_exact(&mut db, num_blobs >> 2, size_blob, db_idx);
2390
2391                // Now let's test deletions.
2392                test_single_deletion(&mut db, num_blobs >> 2, num_blobs, db_idx);
2393                test_range_deletion(&mut db, num_blobs >> 1, num_blobs, num_blobs, db_idx);
2394
2395                assert_eq!(db.get_compression_id(), Ok(LsmCompressionLib::ZStd));
2396
2397                // Good bye
2398                test_disconnect(&mut db);
2399            });
2400            thread_handles.push(handle);
2401        }
2402        for t in thread_handles {
2403            t.join().unwrap();
2404        }
2405    }
2406
2407    #[test]
2408    /// Let's test that a non-initialized database behaves well.
2409    fn lsm_rust_bindings_with_not_yet_initialized_db() {
2410        let num_dbs = 2;
2411        let mut thread_handles = vec![];
2412        let mut master_prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2413        for _ in 1..=num_dbs {
2414            // Every thread will have a different seed, but each time it will be same,
2415            // as the master prng is initialized with the same seed every time.
2416            let mut thread_prng: Mt64 = SeedableRng::seed_from_u64(master_prng.next_u64());
2417            let handle = thread::spawn(move || {
2418                let mut db: LsmDb = Default::default();
2419
2420                // We now produce certain amount of blobs and persist them.
2421                let size_blob = 1 << 10; // 1 KB
2422                let blob = construct_random_blob(size_blob, &mut thread_prng);
2423
2424                let mut rc = db.connect();
2425                assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
2426                rc = Disk::persist(&mut db, &Vec::from(1usize.to_be_bytes()), &blob);
2427                assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
2428
2429                let key = Vec::from(1usize.to_be_bytes());
2430                rc = Disk::delete(&mut db, &key);
2431                assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
2432
2433                // let cursor_res = Disk::cursor_open(&mut db);
2434                // assert!(cursor_res.is_err());
2435
2436                let starting_key = Vec::from(1usize.to_be_bytes());
2437                let ending_key = Vec::from(0usize.not().to_be_bytes());
2438                rc = Disk::delete_range(&mut db, &starting_key, &ending_key);
2439                assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
2440
2441                rc = db.disconnect();
2442                assert_eq!(rc, Ok(()));
2443            });
2444            thread_handles.push(handle);
2445        }
2446        for t in thread_handles {
2447            t.join().unwrap();
2448        }
2449    }
2450
2451    #[test]
2452    /// To see whether we can open and close the same file over and over again.
2453    fn lsm_open_close_multiple_times() {
2454        let num_connection_cycles = 100;
2455        let mut db = test_initialize(
2456            0,
2457            "test-open-close".to_string(),
2458            LsmMode::LsmNoBackgroundThreads,
2459            LsmCompressionLib::NoCompression,
2460        );
2461        test_connect(&mut db);
2462
2463        for _ in 0..num_connection_cycles {
2464            test_disconnect(&mut db);
2465            test_connect(&mut db);
2466        }
2467
2468        test_disconnect(&mut db);
2469    }
2470
2471    #[test]
2472    fn lsm_rust_bindings_multiple_dbs_grpc_blobs() {
2473        let num_dbs = 2;
2474        let num_connection_cycles = 10;
2475        let mut thread_handles = vec![];
2476        let mut master_prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2477        for db_idx in 1..=num_dbs {
2478            // Every thread will have a different seed, but each time it will be same,
2479            // as the master prng is initialized with the same seed every time.
2480            let thread_prng: Mt64 = SeedableRng::seed_from_u64(master_prng.next_u64());
2481            let handle = thread::spawn(move || {
2482                // Let's try to initialize a database per thread.
2483                let mut db = test_initialize(
2484                    db_idx,
2485                    "test-insert-get-grpc-blobs".to_string(),
2486                    LsmMode::LsmNoBackgroundThreads,
2487                    LsmCompressionLib::NoCompression,
2488                );
2489
2490                // Let's connect to it via a main memory handle.
2491                test_connect(&mut db);
2492
2493                // We now produce certain amount of blobs and persist them.
2494                let num_blobs = 100_usize;
2495                let size_blob = 1 << 10; // 1 KB
2496
2497                // Let's persist some blobs.
2498                test_persist_grpc_blobs(&mut db, num_blobs, size_blob, thread_prng);
2499
2500                // Let's optimize the DB.
2501                assert!(db.optimize().is_ok());
2502
2503                // Once some blobs have been persisted, we connect
2504                // and disconnect multiple times and then we test that
2505                // what we read is what we expect.
2506                for _ in 0..num_connection_cycles {
2507                    test_disconnect(&mut db);
2508                    test_connect(&mut db);
2509                }
2510
2511                // Let's test forward iterators on the whole database.
2512                test_forward_cursor_grpc(&mut db, num_blobs);
2513
2514                // Now traverse the database backward.
2515                test_backward_cursor_grpc(&mut db, num_blobs);
2516
2517                // Good bye
2518                test_disconnect(&mut db);
2519            });
2520            thread_handles.push(handle);
2521        }
2522        for t in thread_handles {
2523            t.join().unwrap();
2524        }
2525    }
2526
2527    #[test]
2528    fn lsm_cursor_automatic_drop() {
2529        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2530        // Let's try to initialize a database per thread.
2531        let mut db = test_initialize(
2532            1,
2533            "test-cursor-automatic-drop".to_string(),
2534            LsmMode::LsmNoBackgroundThreads,
2535            LsmCompressionLib::NoCompression,
2536        );
2537
2538        // Let's connect to it via a main memory handle.
2539        test_connect(&mut db);
2540
2541        // We now produce certain amount of blobs and persist them.
2542        let num_blobs = 100_usize;
2543        let size_blob = 1 << 10; // 1 KB
2544
2545        // Let's persist some blobs.
2546        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2547
2548        {
2549            // Let's open a cursor (which will produce a snapshot) within a scope.
2550            let cursor_res = db.cursor_open();
2551            assert!(cursor_res.is_ok());
2552        }
2553
2554        // We should now be able to close the database cleanly as the cursor has been
2555        // dropped automatically.
2556        test_disconnect(&mut db);
2557    }
2558
2559    #[test]
2560    fn lsm_handle_automatic_drop() {
2561        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2562        let db_fqn: String;
2563
2564        // The database is opened within a scope. Thus the `db` handle will
2565        // be valid inside it. When going out of scope, the handle is freed, and
2566        // thus the database will be cleanly close.
2567        {
2568            // Let's try to initialize a database per thread.
2569            let mut db = test_initialize(
2570                1,
2571                "test-handle-automatic-drop".to_string(),
2572                LsmMode::LsmNoBackgroundThreads,
2573                LsmCompressionLib::NoCompression,
2574            );
2575
2576            // We copy the whole path of the database. Observe that we cannot use db_fq_name
2577            // as that's a c-string (and thus nul-terminated).
2578            db_fqn = format!(
2579                "{}/{}.lsm",
2580                db.db_conf.db_path.display(),
2581                db.db_conf.db_base_name
2582            );
2583
2584            // Let's connect to it via a main memory handle.
2585            test_connect(&mut db);
2586
2587            // We now produce certain amount of blobs and persist them.
2588            let num_blobs = 100_usize;
2589            let size_blob = 1 << 10; // 1 KB
2590
2591            // Let's persist some blobs.
2592            test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2593
2594            // We don't disconnect explicitly from the database. This will be done
2595            // by going out of scope.
2596        }
2597        // The only way we can test whether the database was closed cleanly is that
2598        // no database files other than the main database file exist in the filesystem.
2599        let db_file = Path::new(&db_fqn);
2600        let db_wal_fqn = format!("{}-{}", db_fqn.clone(), "log");
2601        let db_wal = Path::new(&db_wal_fqn);
2602
2603        // The following file should exist.
2604        assert!(db_file.exists());
2605        // The following files shouldn't exist.
2606        assert!(!db_wal.exists());
2607    }
2608
2609    #[test]
2610    fn lsm_multiple_writers_interlaced() {
2611        let num_writers = 5;
2612        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2613        let now = Utc::now();
2614        let db_path: String = "/tmp".to_string();
2615        let db_base_name: String = format!(
2616            "{}-{}",
2617            "test-multiple-writers-interlaced",
2618            now.timestamp_nanos_opt().unwrap()
2619        );
2620        // We now produce certain amount of blobs and persist them.
2621        let num_blobs = 10000_usize;
2622        let size_blob = 1 << 10; // 1 KB
2623
2624        // Let's produce the handles for every writer.
2625        let mut db_handles = vec![];
2626        for _ in 1..=num_writers {
2627            // Let's try to initialize a database per thread.
2628            let db_conf = DbConf {
2629                db_path: db_path.clone().into(),
2630                db_base_name: db_base_name.clone(),
2631                mode: LsmMode::LsmNoBackgroundThreads,
2632                handle_mode: LsmHandleMode::ReadWrite,
2633                metrics: None,
2634                compression: LsmCompressionLib::NoCompression,
2635            };
2636
2637            let mut db: LsmDb = Default::default();
2638
2639            let rc = db.initialize(db_conf);
2640            assert_eq!(rc, Ok(()));
2641
2642            let rc = db.connect();
2643            assert_eq!(rc, Ok(()));
2644
2645            db_handles.push(db);
2646        }
2647
2648        // Now let's use the handles to write to the same file one writer at a time.
2649        // This works because each time the writer lock is obtained by the corresponding
2650        // writer.
2651        for (db_handle_id, db_handle) in db_handles.iter_mut().enumerate().take(num_writers) {
2652            // Let's persist some more blobs (which can only be written on a clean database).
2653            test_persist_blobs(
2654                db_handle,
2655                num_blobs,
2656                size_blob,
2657                Some(prng.clone()),
2658                db_handle_id,
2659            );
2660        }
2661    }
2662
2663    #[test]
2664    fn lsm_bg_checkpointer_and_merger_in_one() {
2665        // Let's try to initialize a database. The connection will issue one background
2666        // thread that will merge and checkpoint the database file at certain intervals.
2667        let mut db = test_initialize(
2668            1,
2669            "test-bg-checkpointer-and-merger-in-one".to_string(),
2670            LsmMode::LsmBackgroundMerger,
2671            LsmCompressionLib::NoCompression,
2672        );
2673
2674        // We now produce certain amount of blobs and persist them.
2675        let num_blobs = 100_usize;
2676        let size_blob = 1 << 20; // 1 MB
2677        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2678
2679        // Let's connect to it via a main memory handle.
2680        test_connect(&mut db);
2681
2682        // Let's persist some blobs.
2683        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2684
2685        // Checkpointing increases the size of the data file. Thus, in the end,
2686        // the file size should be something between 88 and 92 MiBs (for this particular test).
2687        let db_path = db.db_conf.db_path.clone();
2688        let db_base_name = db.db_conf.db_base_name.clone();
2689        let db_fqn = format!("{}/{db_base_name}.lsm", db_path.display());
2690        let db_file = Path::new(&db_fqn);
2691        let db_wal_fqn = format!("{}-{}", db_fqn.clone(), "log");
2692        let db_wal = Path::new(&db_wal_fqn);
2693        assert!(db_file.exists());
2694        assert!(db_wal.exists());
2695
2696        let datafile_size_res = db_file.metadata();
2697        assert!(datafile_size_res.is_ok());
2698
2699        let datafile_size = datafile_size_res.unwrap().len();
2700        let expected_datafile_min_size: u64 = 67_000_000;
2701        let expected_datafile_max_size: u64 = 125_000_000;
2702        // The size of the data file might vary from platform to platform, but for this
2703        // particular test it should be about 68 MiBs
2704        assert!(datafile_size > expected_datafile_min_size);
2705        assert!(datafile_size < expected_datafile_max_size);
2706
2707        // We should now be able to close the database cleanly as the cursor has been
2708        // dropped automatically.
2709        test_disconnect(&mut db)
2710    }
2711
2712    #[test]
2713    fn lsm_bg_checkpointer_only() {
2714        // Let's try to initialize a database. The connection will issue one background
2715        // thread that will checkpoint the database file at certain intervals.
2716        let mut db = test_initialize(
2717            1,
2718            "test-bg-checkpointer-only".to_string(),
2719            LsmMode::LsmBackgroundCheckpointer,
2720            LsmCompressionLib::NoCompression,
2721        );
2722
2723        // We now produce certain amount of blobs and persist them.
2724        let num_blobs = 100_usize;
2725        let size_blob = 1 << 20; // 1 MB
2726        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2727
2728        // Let's connect to it via a main memory handle.
2729        test_connect(&mut db);
2730
2731        // Let's persist some blobs.
2732        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2733
2734        // Checkpointing increases the size of the data file. Thus, in the end,
2735        // the file size should be around 160 MiBs (for this particular test).
2736        let db_path = db.db_conf.db_path.clone();
2737        let db_base_name = db.db_conf.db_base_name.clone();
2738        let db_fqn = format!("{}/{db_base_name}.lsm", db_path.display());
2739        let db_file = Path::new(&db_fqn);
2740        let db_wal_fqn = format!("{}-{}", db_fqn.clone(), "log");
2741        let db_wal = Path::new(&db_wal_fqn);
2742        assert!(db_file.exists());
2743        assert!(db_wal.exists());
2744
2745        let datafile_size_res = db_file.metadata();
2746        assert!(datafile_size_res.is_ok());
2747
2748        let datafile_size = datafile_size_res.unwrap().len();
2749        let expected_datafile_min_size: u64 = 160_000_000;
2750        let expected_datafile_max_size: u64 = 170_000_000;
2751        // The size of the data file might vary from platform to platform, but for this
2752        // particular test it should be about 68 MiBs
2753        assert!(datafile_size > expected_datafile_min_size);
2754        assert!(datafile_size < expected_datafile_max_size);
2755
2756        // We should now be able to close the database cleanly as the cursor has been
2757        // dropped automatically.
2758        test_disconnect(&mut db);
2759    }
2760
2761    #[test]
2762    fn lsm_initialization_fails_with_non_c_string() {
2763        let bad_filename = "test-no-null\0in-the-middle".to_string();
2764        let now = Utc::now();
2765        let db_conf = DbConf {
2766            db_path: "/tmp".into(),
2767            db_base_name: format!("{}-{}", bad_filename, now.timestamp_nanos_opt().unwrap(),),
2768            mode: LsmMode::LsmNoBackgroundThreads,
2769            handle_mode: LsmHandleMode::ReadWrite,
2770            metrics: None,
2771            compression: LsmCompressionLib::NoCompression,
2772        };
2773
2774        let mut db: LsmDb = Default::default();
2775
2776        let rc = db.initialize(db_conf);
2777        assert_eq!(rc, Err(LsmErrorCode::LsmError));
2778    }
2779
2780    #[test]
2781    fn transactions_rollback() {
2782        // Let's initialize a handle.
2783        let mut db = test_initialize(
2784            1,
2785            "test-transactions-rollback".to_string(),
2786            LsmMode::LsmNoBackgroundThreads,
2787            LsmCompressionLib::NoCompression,
2788        );
2789
2790        // We now produce certain amount of blobs and persist them.
2791        let num_blobs = 10000_usize;
2792        let size_blob = 1 << 10; // 1 KB
2793        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2794
2795        // Let's connect to it via a main memory handle.
2796        test_connect(&mut db);
2797
2798        // Let's open a transaction
2799        let mut rc = db.begin_transaction();
2800        assert_eq!(rc, Ok(()));
2801
2802        // Let's persist some blobs within the opened transactions.
2803        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2804
2805        // Let's rollback the transaction (making all persisted blobs disappear).
2806        rc = db.rollback_transaction();
2807        assert_eq!(rc, Ok(()));
2808
2809        // We now test that no blob is found in the database although some were "persisted".
2810        test_num_blobs_are_in_file(&mut db, 0);
2811    }
2812
2813    #[test]
2814    fn transactions_commit() {
2815        // Let's initialize a handle.
2816        let mut db = test_initialize(
2817            1,
2818            "test-transactions-rollback".to_string(),
2819            LsmMode::LsmNoBackgroundThreads,
2820            LsmCompressionLib::NoCompression,
2821        );
2822
2823        // We now produce certain amount of blobs and persist them.
2824        let num_blobs = 10000_usize;
2825        let size_blob = 1 << 10; // 1 KB
2826        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2827
2828        // Let's connect to it via a main memory handle.
2829        test_connect(&mut db);
2830
2831        // Let's open a transaction
2832        let mut rc = db.begin_transaction();
2833        assert_eq!(rc, Ok(()));
2834
2835        // Let's persist some blobs within the opened transactions.
2836        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2837
2838        // Let's commit the transaction; making all blobs truly persisted blobs.
2839        rc = db.commit_transaction();
2840        assert_eq!(rc, Ok(()));
2841
2842        // We now test that no blob is found in the database although some were "persisted".
2843        test_num_blobs_are_in_file(&mut db, num_blobs);
2844    }
2845
2846    #[test]
2847    fn can_work_with_empty_metrics_no_background_threads() {
2848        let mut db = test_initialize(
2849            1,
2850            "test-can-work-with-empty-metrics-no-background-threads".to_string(),
2851            LsmMode::LsmNoBackgroundThreads,
2852            LsmCompressionLib::NoCompression,
2853        );
2854        // This is the interesting bit.
2855        db.db_conf.metrics = None;
2856
2857        // Let's connect to it via a main memory handle.
2858        test_connect(&mut db);
2859
2860        // We now produce certain amount of blobs and persist them.
2861        let num_blobs = 10000_usize;
2862        let size_blob = 1 << 10; // 1 KB
2863
2864        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2865        // Let's persist some blobs.
2866        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2867
2868        // Let's test forward iterators on the whole database.
2869        test_forward_cursor(&mut db, num_blobs, size_blob, 0);
2870
2871        test_disconnect(&mut db);
2872    }
2873
2874    #[test]
2875    fn can_work_with_empty_metrics_with_background_merger() {
2876        let mut db = test_initialize(
2877            1,
2878            "test-can-work-with-empty-metrics-with-background-merger".to_string(),
2879            LsmMode::LsmBackgroundMerger,
2880            LsmCompressionLib::NoCompression,
2881        );
2882        // This is the interesting bit.
2883        db.db_conf.metrics = None;
2884
2885        // Let's connect to it via a main memory handle.
2886        test_connect(&mut db);
2887
2888        // We now produce certain amount of blobs and persist them.
2889        let num_blobs = 10000_usize;
2890        let size_blob = 1 << 10; // 1 KB
2891
2892        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2893        // Let's persist some blobs.
2894        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2895
2896        // We disconnect to be able to finish all background work, before
2897        // continuing.
2898        test_disconnect(&mut db);
2899        test_connect(&mut db);
2900
2901        // Let's test forward iterators on the whole database.
2902        test_forward_cursor(&mut db, num_blobs, size_blob, 0);
2903
2904        test_disconnect(&mut db);
2905    }
2906
2907    #[test]
2908    fn can_work_with_compressed_db_with_background_merger() {
2909        let mut dbs = vec![];
2910        let db_lz4 = test_initialize(
2911            1,
2912            "test-can-work-with-compressed-db-lz4-with-background-merger".to_string(),
2913            LsmMode::LsmBackgroundMerger,
2914            LsmCompressionLib::LZ4,
2915        );
2916        let db_zlib = test_initialize(
2917            1,
2918            "test-can-work-with-compressed-db-zlib-with-background-merger".to_string(),
2919            LsmMode::LsmBackgroundMerger,
2920            LsmCompressionLib::ZLib,
2921        );
2922        let db_zstd = test_initialize(
2923            1,
2924            "test-can-work-with-compressed-db-zstd-with-background-merger".to_string(),
2925            LsmMode::LsmBackgroundMerger,
2926            LsmCompressionLib::ZStd,
2927        );
2928        dbs.push(db_lz4);
2929        dbs.push(db_zstd);
2930        dbs.push(db_zlib);
2931
2932        // We now produce certain amount of blobs and persist them.
2933        let num_blobs = 10000_usize;
2934        let size_blob = 1 << 10; // 1 KB
2935
2936        let mut thread_handles = vec![];
2937        for mut db in dbs {
2938            let handle = thread::spawn(move || {
2939                // Let's connect to it via a main memory handle.
2940                test_connect(&mut db);
2941
2942                // Let's persist some blobs.
2943                test_persist_blobs(&mut db, num_blobs, size_blob, None, 0);
2944
2945                // We disconnect to be able to finish all background work, before
2946                // continuing.
2947                test_disconnect(&mut db);
2948                test_connect(&mut db);
2949
2950                // Let's test forward iterators on the whole database.
2951                test_forward_cursor(&mut db, num_blobs, size_blob, 0);
2952
2953                test_disconnect(&mut db);
2954            });
2955            thread_handles.push(handle);
2956        }
2957        for t in thread_handles {
2958            t.join().unwrap();
2959        }
2960    }
2961
2962    #[test]
2963    fn can_work_with_empty_metrics_with_background_checkpointer() {
2964        let mut db = test_initialize(
2965            1,
2966            "test-can-work-with-empty-metrics-with-background-checkpointer".to_string(),
2967            LsmMode::LsmBackgroundCheckpointer,
2968            LsmCompressionLib::NoCompression,
2969        );
2970        // This is the interesting bit.
2971        db.db_conf.metrics = None;
2972
2973        // Let's connect to it via a main memory handle.
2974        test_connect(&mut db);
2975
2976        // We now produce certain amount of blobs and persist them.
2977        let num_blobs = 10000_usize;
2978        let size_blob = 1 << 10; // 1 KB
2979
2980        let prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
2981        // Let's persist some blobs.
2982        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng), 0);
2983
2984        // Let's test forward iterators on the whole database.
2985        test_forward_cursor(&mut db, num_blobs, size_blob, 0);
2986
2987        test_disconnect(&mut db);
2988    }
2989
2990    #[test]
2991    fn can_work_with_compressed_db_with_background_checkpointer() {
2992        let mut dbs = vec![];
2993        let db_lz4 = test_initialize(
2994            1,
2995            "test-can-work-with-compressed-db-lz4-with-background-checkpointer".to_string(),
2996            LsmMode::LsmBackgroundCheckpointer,
2997            LsmCompressionLib::LZ4,
2998        );
2999        let db_zlib = test_initialize(
3000            1,
3001            "test-can-work-with-compressed-db-zlib-with-background-checkpointer".to_string(),
3002            LsmMode::LsmBackgroundCheckpointer,
3003            LsmCompressionLib::ZLib,
3004        );
3005        let db_zstd = test_initialize(
3006            1,
3007            "test-can-work-with-compressed-db-zstd-with-background-checkpointer".to_string(),
3008            LsmMode::LsmBackgroundCheckpointer,
3009            LsmCompressionLib::ZStd,
3010        );
3011        dbs.push(db_lz4);
3012        dbs.push(db_zstd);
3013        dbs.push(db_zlib);
3014
3015        // We now produce certain amount of blobs and persist them.
3016        let num_blobs = 10000_usize;
3017        let size_blob = 1 << 10; // 1 KB
3018
3019        let mut thread_handles = vec![];
3020        for mut db in dbs {
3021            let handle = thread::spawn(move || {
3022                // Let's connect to it via a main memory handle.
3023                test_connect(&mut db);
3024
3025                // Let's persist some blobs.
3026                test_persist_blobs(&mut db, num_blobs, size_blob, None, 0);
3027                // Let's test forward iterators on the whole database.
3028                test_forward_cursor(&mut db, num_blobs, size_blob, 0);
3029
3030                test_disconnect(&mut db);
3031            });
3032            thread_handles.push(handle);
3033        }
3034        for t in thread_handles {
3035            t.join().unwrap();
3036        }
3037    }
3038
3039    #[test]
3040    fn lsm_no_concurrent_writers_no_background_threads() {
3041        let num_writers = 10;
3042        let mut master_prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
3043        let now = Utc::now();
3044        let db_path: String = "/tmp".to_string();
3045        let db_base_name: String = format!(
3046            "{}-{}",
3047            "test-no-concurrent_writers",
3048            now.timestamp_nanos_opt().unwrap()
3049        );
3050        // We now produce certain amount of blobs and persist them.
3051        let num_blobs = 10000_usize;
3052        let size_blob = 1 << 10; // 1 KB
3053
3054        let mut thread_handles = vec![];
3055        for writer_id in 1..=num_writers {
3056            let thread_db_path = db_path.clone();
3057            let thread_db_base_name = db_base_name.clone();
3058            let thread_prng: Mt64 = SeedableRng::seed_from_u64(master_prng.next_u64());
3059            let handle = thread::spawn(move || {
3060                // Let's try to initialize a database handle per thread.
3061                let db_conf = DbConf {
3062                    db_path: thread_db_path.into(),
3063                    db_base_name: thread_db_base_name,
3064                    mode: LsmMode::LsmNoBackgroundThreads,
3065                    handle_mode: LsmHandleMode::ReadWrite,
3066                    metrics: None,
3067                    compression: LsmCompressionLib::NoCompression,
3068                };
3069
3070                let mut db: LsmDb = Default::default();
3071
3072                let mut rc = db.initialize(db_conf);
3073                assert_eq!(rc, Ok(()));
3074
3075                rc = db.connect();
3076                assert_eq!(rc, Ok(()));
3077
3078                // Let's open a write transaction to write a batch.
3079                rc = db.begin_transaction();
3080                // Who knows which thread will succeed, but it can be only one.
3081                if rc.is_ok() {
3082                    test_persist_blobs(&mut db, num_blobs, size_blob, Some(thread_prng), writer_id);
3083                    rc = db.commit_transaction();
3084
3085                    if rc.is_ok() {
3086                        writer_id
3087                    } else {
3088                        assert_eq!(rc, Err(LsmErrorCode::LsmBusy));
3089                        0
3090                    }
3091                } else {
3092                    assert_eq!(rc, Err(LsmErrorCode::LsmBusy));
3093                    // Unsuccessful thread returns id 0.
3094                    0
3095                }
3096            });
3097
3098            thread_handles.push(handle);
3099        }
3100
3101        let mut successful_thread: usize = 0;
3102        for t in thread_handles {
3103            let result = t.join().unwrap();
3104            successful_thread = if result != 0 {
3105                result
3106            } else {
3107                successful_thread
3108            };
3109        }
3110
3111        // Final connection to the database.
3112        let db_conf = DbConf {
3113            db_path: db_path.into(),
3114            db_base_name,
3115            mode: LsmMode::LsmNoBackgroundThreads,
3116            handle_mode: LsmHandleMode::ReadWrite,
3117            metrics: None,
3118            compression: LsmCompressionLib::NoCompression,
3119        };
3120
3121        let mut db: LsmDb = Default::default();
3122
3123        let rc = db.initialize(db_conf);
3124        assert_eq!(rc, Ok(()));
3125
3126        let rc = db.connect();
3127        assert_eq!(rc, Ok(()));
3128
3129        // After all threads have finished, we traverse the whole database. There can be only
3130        // blobs from a single writers.
3131        test_forward_cursor(&mut db, num_blobs, size_blob, successful_thread);
3132    }
3133
3134    #[test]
3135    fn open_file_in_read_only_mode() {
3136        // We first produce a file we can work on
3137        let now = Utc::now();
3138        let db_path = "/tmp".to_string();
3139        let db_base_name = format!(
3140            "{}-{}-{}",
3141            "test-read-only-mode",
3142            0,
3143            now.timestamp_nanos_opt().unwrap()
3144        );
3145
3146        let mut db_conf = DbConf::new(db_path, db_base_name);
3147
3148        let mut db: LsmDb = Default::default();
3149
3150        let mut rc = db.initialize(db_conf.clone());
3151        assert_eq!(rc, Ok(()));
3152
3153        // Let's connect to it via a main memory handle.
3154        test_connect(&mut db);
3155
3156        // We now produce certain amount of blobs and persist them.
3157        let num_blobs = 10000_usize;
3158        let size_blob = 1 << 10; // 1 KB
3159
3160        let mut prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
3161        // Let's persist some blobs.
3162        test_persist_blobs(&mut db, num_blobs, size_blob, Some(prng.clone()), 0);
3163
3164        test_disconnect(&mut db);
3165
3166        // To open the exact same file, we reuse the the same configuration but
3167        // marked as read-only.
3168        db_conf.handle_mode = LsmHandleMode::ReadOnly;
3169
3170        // Initialize the read-only configuration.
3171        let mut db_ro: LsmDb = Default::default();
3172        rc = db_ro.initialize(db_conf);
3173        assert_eq!(rc, Ok(()));
3174
3175        // Connect to the database.
3176        rc = db_ro.connect();
3177        assert_eq!(rc, Ok(()));
3178
3179        // Let us try to write one more blob...and fail
3180        let extra_blob = construct_random_blob(size_blob, &mut prng);
3181        rc = Disk::persist(
3182            &mut db_ro,
3183            &Vec::from((num_blobs + 1).to_be_bytes()),
3184            &extra_blob,
3185        );
3186        assert_eq!(rc.unwrap_err(), LsmErrorCode::LsmReadOnly);
3187
3188        // Without our fix to lsm1.c this test would fail/crash/forever loop, who knows.
3189        test_forward_cursor(&mut db_ro, num_blobs, size_blob, 0);
3190    }
3191
3192    #[test]
3193    fn open_compressed_file_in_read_only_mode() {
3194        // We first produce a file we can work on
3195        let now = Utc::now();
3196        let db_path = "/tmp".to_string();
3197        let db_base_name = format!(
3198            "{}-{}-{}",
3199            "test-read-only-mode-compressed-db",
3200            0,
3201            now.timestamp_nanos_opt().unwrap()
3202        );
3203
3204        let mut db_conf = DbConf::new_with_parameters(
3205            db_path,
3206            db_base_name,
3207            LsmMode::LsmBackgroundMerger,
3208            LsmHandleMode::ReadWrite,
3209            None,
3210            LsmCompressionLib::ZStd,
3211        );
3212
3213        let mut db: LsmDb = Default::default();
3214
3215        let mut rc = db.initialize(db_conf.clone());
3216        assert_eq!(rc, Ok(()));
3217
3218        // Let's connect to it via a main memory handle.
3219        test_connect(&mut db);
3220
3221        // We now produce certain amount of blobs and persist them.
3222        let num_blobs = 10000_usize;
3223        let size_blob = 1 << 10; // 1 KB
3224
3225        let mut prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
3226        // Let's persist some blobs.
3227        test_persist_blobs(&mut db, num_blobs, size_blob, None, 0);
3228
3229        test_disconnect(&mut db);
3230
3231        // To open the exact same file, we reuse the the same configuration but
3232        // marked as read-only.
3233        db_conf.handle_mode = LsmHandleMode::ReadOnly;
3234
3235        // Initialize the read-only configuration.
3236        let mut db_ro: LsmDb = Default::default();
3237        rc = db_ro.initialize(db_conf);
3238        assert_eq!(rc, Ok(()));
3239
3240        // Connect to the database.
3241        rc = db_ro.connect();
3242        assert_eq!(rc, Ok(()));
3243
3244        // Let us try to write one more blob...and fail
3245        let extra_blob = construct_random_blob(size_blob, &mut prng);
3246        rc = Disk::persist(
3247            &mut db_ro,
3248            &Vec::from((num_blobs + 1).to_be_bytes()),
3249            &extra_blob,
3250        );
3251        assert_eq!(rc.unwrap_err(), LsmErrorCode::LsmReadOnly);
3252
3253        // Asking for compression on a read-only database without any cursor opened
3254        // is considered misused, as db parameters are loaded on the first transaction.
3255        assert_eq!(db.get_compression_id(), Err(LsmErrorCode::LsmMisuse));
3256
3257        // Without our fix to lsm1.c this test would fail/crash/forever loop, who knows.
3258        test_forward_cursor(&mut db_ro, num_blobs, size_blob, 0);
3259    }
3260
3261    #[test]
3262    fn cannot_open_with_different_compression() {
3263        // We first produce a file we can work on
3264        let now = Utc::now();
3265        let db_path = "/tmp".to_string();
3266        let db_base_name = format!(
3267            "{}-{}-{}",
3268            "test-cannot-open-with-different-compression",
3269            0,
3270            now.timestamp_nanos_opt().unwrap()
3271        );
3272
3273        let mut db_conf = DbConf::new_with_parameters(
3274            db_path,
3275            db_base_name,
3276            LsmMode::LsmBackgroundMerger,
3277            LsmHandleMode::ReadWrite,
3278            None,
3279            LsmCompressionLib::ZStd,
3280        );
3281
3282        let mut db: LsmDb = Default::default();
3283
3284        let mut rc = db.initialize(db_conf.clone());
3285        assert_eq!(rc, Ok(()));
3286
3287        // Let's connect to it via a main memory handle.
3288        test_connect(&mut db);
3289
3290        // We now produce certain amount of blobs and persist them.
3291        let num_blobs = 10000_usize;
3292        let size_blob = 1 << 10; // 1 KB
3293
3294        let mut prng: Mt64 = SeedableRng::seed_from_u64(0x41bd56915d5c7804);
3295        // Let's persist some blobs.
3296        test_persist_blobs(&mut db, num_blobs, size_blob, None, 0);
3297
3298        test_disconnect(&mut db);
3299
3300        // Now let's open without compression.
3301        db_conf.compression = LsmCompressionLib::NoCompression;
3302
3303        let mut db: LsmDb = Default::default();
3304
3305        rc = db.initialize(db_conf.clone());
3306        assert_eq!(rc, Ok(()));
3307
3308        // Let's connect to it via a main memory handle.
3309        test_connect(&mut db);
3310
3311        // Let us try to write one more blob...and fail because compression is different.
3312        let extra_blob = construct_random_blob(size_blob, &mut prng);
3313        rc = Disk::persist(
3314            &mut db,
3315            &Vec::from((num_blobs + 1).to_be_bytes()),
3316            &extra_blob,
3317        );
3318        assert_eq!(rc.unwrap_err(), LsmErrorCode::LsmMismatch);
3319
3320        // But this also happens when reading.
3321        let Err(cursor) = Disk::cursor_open(&db) else {
3322            panic!("Unexpected result.")
3323        };
3324        assert_eq!(cursor, LsmErrorCode::LsmMismatch);
3325
3326        test_disconnect(&mut db);
3327
3328        // Now let's open with another compression but also in read-only mode.
3329        db_conf.compression = LsmCompressionLib::ZLib;
3330        db_conf.handle_mode = LsmHandleMode::ReadOnly;
3331
3332        let mut db: LsmDb = Default::default();
3333
3334        rc = db.initialize(db_conf);
3335        assert_eq!(rc, Ok(()));
3336
3337        // Let's connect to it via a main memory handle.
3338        test_connect(&mut db);
3339
3340        // But this also happens when reading.
3341        let Err(cursor) = Disk::cursor_open(&db) else {
3342            panic!("Unexpected result.")
3343        };
3344        assert_eq!(cursor, LsmErrorCode::LsmMismatch);
3345    }
3346
3347    #[test]
3348    fn test_try_from_error_code() {
3349        assert_eq!(LsmErrorCode::LsmError, LsmErrorCode::try_from(1).unwrap());
3350        assert_eq!(LsmErrorCode::LsmBusy, LsmErrorCode::try_from(5).unwrap());
3351        assert_eq!(LsmErrorCode::LsmNoMem, LsmErrorCode::try_from(7).unwrap());
3352        assert_eq!(
3353            LsmErrorCode::LsmReadOnly,
3354            LsmErrorCode::try_from(8).unwrap()
3355        );
3356        assert_eq!(LsmErrorCode::LsmIOErr, LsmErrorCode::try_from(10).unwrap());
3357        assert_eq!(
3358            LsmErrorCode::LsmCorrupt,
3359            LsmErrorCode::try_from(11).unwrap()
3360        );
3361        assert_eq!(LsmErrorCode::LsmFull, LsmErrorCode::try_from(13).unwrap());
3362        assert_eq!(
3363            LsmErrorCode::LsmCantOpen,
3364            LsmErrorCode::try_from(14).unwrap()
3365        );
3366        assert_eq!(
3367            LsmErrorCode::LsmProtocol,
3368            LsmErrorCode::try_from(15).unwrap()
3369        );
3370        assert_eq!(LsmErrorCode::LsmMisuse, LsmErrorCode::try_from(21).unwrap());
3371        assert_eq!(
3372            LsmErrorCode::LsmMismatch,
3373            LsmErrorCode::try_from(50).unwrap()
3374        );
3375        assert_eq!(
3376            LsmErrorCode::LsmConversionErr,
3377            LsmErrorCode::try_from(55).unwrap()
3378        );
3379        assert_eq!(
3380            LsmErrorCode::LsmMetricCreation,
3381            LsmErrorCode::try_from(56).unwrap()
3382        );
3383        assert_eq!(
3384            LsmErrorCode::LsmMetricRegistration,
3385            LsmErrorCode::try_from(57).unwrap()
3386        );
3387        assert_eq!(
3388            LsmErrorCode::LsmMetricsEmpty,
3389            LsmErrorCode::try_from(58).unwrap()
3390        );
3391        assert_eq!(
3392            LsmErrorCode::LsmBgThreadUnavailable,
3393            LsmErrorCode::try_from(59).unwrap(),
3394        );
3395        assert_eq!(
3396            LsmErrorCode::LsmUnknownCode,
3397            LsmErrorCode::try_from(61).unwrap_err(),
3398        );
3399    }
3400
3401    #[test]
3402    fn test_try_from_seek_op() {
3403        assert_eq!(
3404            LsmCursorSeekOp::LsmCursorSeekLe,
3405            LsmCursorSeekOp::try_from(-1).unwrap()
3406        );
3407        assert_eq!(
3408            LsmCursorSeekOp::LsmCursorSeekEq,
3409            LsmCursorSeekOp::try_from(0).unwrap()
3410        );
3411        assert_eq!(
3412            LsmCursorSeekOp::LsmCursorSeekGe,
3413            LsmCursorSeekOp::try_from(1).unwrap()
3414        );
3415        assert_eq!(
3416            LsmCursorSeekOp::try_from(2).unwrap_err(),
3417            LsmErrorCode::LsmUnknownCode
3418        );
3419    }
3420
3421    #[test]
3422    fn test_try_from_safety() {
3423        assert_eq!(LsmSafety::Off, LsmSafety::try_from(0).unwrap());
3424        assert_eq!(LsmSafety::Normal, LsmSafety::try_from(1).unwrap());
3425        assert_eq!(LsmSafety::Full, LsmSafety::try_from(2).unwrap());
3426        assert_eq!(
3427            LsmSafety::try_from(-1).unwrap_err(),
3428            LsmErrorCode::LsmUnknownCode
3429        );
3430    }
3431
3432    #[test]
3433    fn test_try_from_param() {
3434        assert_eq!(LsmParam::AutoFlush, LsmParam::try_from(1).unwrap());
3435        assert_eq!(LsmParam::PageSize, LsmParam::try_from(2).unwrap());
3436        assert_eq!(LsmParam::Safety, LsmParam::try_from(3).unwrap());
3437        assert_eq!(LsmParam::BlockSize, LsmParam::try_from(4).unwrap());
3438        assert_eq!(LsmParam::AutoWork, LsmParam::try_from(5).unwrap());
3439        assert_eq!(LsmParam::Mmap, LsmParam::try_from(7).unwrap());
3440        assert_eq!(LsmParam::UseLog, LsmParam::try_from(8).unwrap());
3441        assert_eq!(LsmParam::AutoMerge, LsmParam::try_from(9).unwrap());
3442        assert_eq!(LsmParam::MaxFreeList, LsmParam::try_from(10).unwrap());
3443        assert_eq!(LsmParam::MultipleProcesses, LsmParam::try_from(11).unwrap());
3444        assert_eq!(LsmParam::AutoCheckPoint, LsmParam::try_from(12).unwrap());
3445        assert_eq!(LsmParam::SetCompression, LsmParam::try_from(13).unwrap());
3446        assert_eq!(LsmParam::GetCompression, LsmParam::try_from(14).unwrap());
3447        assert_eq!(
3448            LsmParam::SetCompressionFactory,
3449            LsmParam::try_from(15).unwrap()
3450        );
3451        assert_eq!(LsmParam::ReadOnly, LsmParam::try_from(16).unwrap());
3452        assert_eq!(
3453            LsmParam::try_from(6).unwrap_err(),
3454            LsmErrorCode::LsmUnknownCode
3455        );
3456    }
3457
3458    #[test]
3459    fn test_try_from_mode() {
3460        assert_eq!(
3461            LsmMode::LsmNoBackgroundThreads,
3462            LsmMode::try_from(0).unwrap()
3463        );
3464        assert_eq!(LsmMode::LsmBackgroundMerger, LsmMode::try_from(1).unwrap());
3465        assert_eq!(
3466            LsmMode::LsmBackgroundCheckpointer,
3467            LsmMode::try_from(2).unwrap()
3468        );
3469        assert_eq!(
3470            LsmMode::try_from(-1).unwrap_err(),
3471            LsmErrorCode::LsmUnknownCode
3472        );
3473    }
3474
3475    #[test]
3476    fn test_try_from_compression_lib() {
3477        assert_eq!(
3478            LsmCompressionLib::NoCompression,
3479            LsmCompressionLib::try_from(1).unwrap()
3480        );
3481        assert_eq!(
3482            LsmCompressionLib::LZ4,
3483            LsmCompressionLib::try_from(10001).unwrap()
3484        );
3485        assert_eq!(
3486            LsmCompressionLib::ZLib,
3487            LsmCompressionLib::try_from(10002).unwrap()
3488        );
3489        assert_eq!(
3490            LsmCompressionLib::ZStd,
3491            LsmCompressionLib::try_from(10003).unwrap()
3492        );
3493        for id in 2..10000 {
3494            assert_eq!(
3495                LsmCompressionLib::try_from(id).unwrap_err(),
3496                LsmErrorCode::LsmMismatch
3497            )
3498        }
3499        assert_eq!(
3500            LsmCompressionLib::try_from(10005).unwrap_err(),
3501            LsmErrorCode::LsmMismatch
3502        );
3503    }
3504
3505    #[test]
3506    fn test_try_from_info() {
3507        assert_eq!(LsmInfo::Lsm4KbPagesWritten, LsmInfo::try_from(1).unwrap());
3508        assert_eq!(LsmInfo::Lsm4KbPagesRead, LsmInfo::try_from(2).unwrap());
3509        assert_eq!(LsmInfo::LsmDbStructure, LsmInfo::try_from(3).unwrap());
3510        assert_eq!(LsmInfo::LsmLogStructure, LsmInfo::try_from(4).unwrap());
3511        assert_eq!(LsmInfo::LsmPageDumpAscii, LsmInfo::try_from(6).unwrap());
3512        assert_eq!(LsmInfo::LsmPageDumpHex, LsmInfo::try_from(7).unwrap());
3513        assert_eq!(LsmInfo::LsmCheckpointSize, LsmInfo::try_from(10).unwrap());
3514        assert_eq!(LsmInfo::LsmTreeSize, LsmInfo::try_from(11).unwrap());
3515        assert_eq!(LsmInfo::LsmCompressionId, LsmInfo::try_from(13).unwrap());
3516        assert_eq!(
3517            LsmInfo::try_from(5).unwrap_err(),
3518            LsmErrorCode::LsmUnknownCode
3519        );
3520    }
3521}