lsmlite_rs/
lsmdb.rs

1// Copyright 2023 Helsing GmbH
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::cmp::Ordering;
15use std::convert::TryFrom;
16use std::ffi::CString;
17use std::os::raw::c_char;
18use std::ptr::null_mut;
19use std::thread::park_timeout;
20use std::time::{Duration, Instant};
21
22use crate::compression::lz4::LsmLz4;
23use crate::compression::zlib::LsmZLib;
24use crate::compression::zstd::LsmZStd;
25use crate::compression::Compression;
26use crate::threads::NUM_MERGE_SEGMENTS;
27use crate::{
28    lsm_cursor, lsm_db, lsm_env, Cursor, DbConf, Disk, LsmBgWorkerMessage, LsmBgWorkers,
29    LsmCompressionLib, LsmCursor, LsmCursorSeekOp, LsmDb, LsmErrorCode, LsmHandleMode, LsmInfo,
30    LsmMode, LsmParam, LsmSafety,
31};
32
33// This is the amount of time a writer sleeps while a background worker does some work.
34// It is only relevant when background threads are spawn.
35const WRITER_PARK_TIME_MS: u64 = 1; // milliseconds.
36
37// This is the hard main memory limit we keep for the main memory component of the LSM
38// (a main-memory b-tree).
39pub(crate) const SIZE_MAIN_MEMORY_TREE_KB: i32 = 16 << 10; // X KiBs * 1024 = X MiB
40
41// These are lower and upper bounds on the amount of unchecked data that we can bear
42// to lose. Observe that there is also the main memory component that needs to be
43// flushed. Thus, at any point in time, there is at most
44// (2 * SIZE_MAIN_MEMORY_TREE_KB) + MAX_CHECKPOINT_SIZE_KB worth of data at risk.
45pub(crate) const MIN_CHECKPOINT_SIZE_KB: i32 = 2 << 10; // X KiBs * 1024 = X MiB
46pub(crate) const MAX_CHECKPOINT_SIZE_KB: i32 = 2 * MIN_CHECKPOINT_SIZE_KB;
47
48// Block size of the database (amount of data to be written as a unit).
49pub(crate) const BLOCK_SIZE_KB: i32 = 8 << 10;
50// Page size of the database (unit of bytes into which blocks are divided).
51pub(crate) const PAGE_SIZE_B: i32 = 4 << 10;
52
53// These functions translate to internal LSM functions. Thus the signatures have
54// to match. Observe that we treat LSM's types as opaque, and thus they are passed
55// around as memory references that are fully visible inside LSM, but not so
56// outside of it.
57extern "C" {
58    // These functions are the ones we need in other files (like threads.rs).
59    pub(crate) fn lsm_info(db: *mut lsm_db, e_conf: i32, ...) -> i32;
60    pub(crate) fn lsm_config(db: *mut lsm_db, e_param: i32, ...) -> i32;
61    pub(crate) fn lsm_work(db: *mut lsm_db, n_segments: i32, n_kb: i32, p_nwrite: *mut i32) -> i32;
62    pub(crate) fn lsm_checkpoint(db: *mut lsm_db, p_n_kb: *mut i32) -> i32;
63    pub(crate) fn lsm_new(env: *mut lsm_env, db: *mut *mut lsm_db) -> i32;
64    pub(crate) fn lsm_open(db: *mut lsm_db, file_name: *const c_char) -> i32;
65    pub(crate) fn lsm_close(db: *mut lsm_db) -> i32;
66
67    // These functions are private to this file.
68    fn lsm_insert(
69        db: *mut lsm_db,
70        p_key: *const u8,
71        n_key: i32,
72        p_val: *const u8,
73        n_val: i32,
74    ) -> i32;
75    fn lsm_delete(db: *mut lsm_db, p_key: *const u8, n_key: i32) -> i32;
76    fn lsm_delete_range(
77        db: *mut lsm_db,
78        p_key1: *const u8,
79        n_key1: i32,
80        p_key2: *const u8,
81        n_key2: i32,
82    ) -> i32;
83    fn lsm_begin(db: *mut lsm_db, level: i32) -> i32;
84    fn lsm_commit(db: *mut lsm_db, level: i32) -> i32;
85    fn lsm_rollback(db: *mut lsm_db, level: i32) -> i32;
86    fn lsm_csr_open(db: *mut lsm_db, cursor: *const *mut lsm_cursor) -> i32;
87    fn lsm_csr_close(cursor: *mut lsm_cursor) -> i32;
88    fn lsm_csr_first(cursor: *mut lsm_cursor) -> i32;
89    fn lsm_csr_seek(cursor: *mut lsm_cursor, p_key: *const u8, n_key: i32, e_seek: i32) -> i32;
90    fn lsm_csr_last(cursor: *mut lsm_cursor) -> i32;
91    fn lsm_csr_next(cursor: *mut lsm_cursor) -> i32;
92    fn lsm_csr_prev(cursor: *mut lsm_cursor) -> i32;
93    fn lsm_csr_valid(cursor: *mut lsm_cursor) -> i32;
94    fn lsm_csr_key(cursor: *mut lsm_cursor, pp_key: *const *mut u8, pn_key: *mut i32) -> i32; // # spellchecker:disable-line
95    fn lsm_csr_value(cursor: *mut lsm_cursor, pp_val: *const *mut u8, pn_val: *mut i32) -> i32; // # spellchecker:disable-line
96    fn lsm_csr_cmp(cursor: *mut lsm_cursor, p_key: *const u8, n_key: i32, pi_res: *mut i32) -> i32;
97    #[allow(dead_code)]
98    fn lsm_free(env: *mut lsm_env, ptr: *mut c_char);
99}
100
101/// Custom implementation of [`Disk`] for [`LsmDb`].
102impl Disk for LsmDb {
103    type C<'a> = LsmCursor<'a>;
104    /// This function sets up general variables about the database. Initializing
105    /// a handle more than once is considered [`LsmErrorCode::LsmMisuse`].
106    ///
107    /// # Example
108    ///
109    /// ```rust
110    /// use lsmlite_rs::*;
111    ///
112    /// let db_conf = DbConf::new(
113    ///                           "/tmp/",
114    ///                           "my_db_b".to_string(),
115    /// );
116    ///
117    /// let mut db: LsmDb = Default::default();
118    /// assert!(!db.is_initialized());
119    ///
120    /// let rc = db.initialize(db_conf.clone());
121    /// assert_eq!(rc, Ok(()));
122    /// assert!(db.is_initialized());
123    ///
124    /// let rc = db.initialize(db_conf);
125    /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
126    /// ```
127    fn initialize(&mut self, conf: DbConf) -> Result<(), LsmErrorCode> {
128        if self.initialized {
129            // If the database handle has already been initialized, we signal
130            // trying to initialize it again as an error.
131            return Err(LsmErrorCode::LsmMisuse);
132        }
133        self.db_conf = conf;
134
135        self.db_env = null_mut();
136        self.db_handle = null_mut();
137
138        // This is the fully-qualified name of the database.
139        let db_fq_name = format!(
140            "{}/{}.lsm",
141            self.db_conf.db_path.display(),
142            self.db_conf.db_base_name
143        );
144        // This is the c-string version of the name of the database.
145        self.db_fq_name = CString::new(db_fq_name).map_err(|e| {
146            tracing::error!(?e, "Name of the data base is not a valid c-string.");
147            LsmErrorCode::LsmError
148        })?;
149
150        self.initialized = true;
151
152        Ok(())
153    }
154
155    /// This method produces a main-memory handle to connect to the database. At this point
156    /// the database file is created at the given path, and upon success, the database can be
157    /// operated using any other available method for it.
158    ///
159    /// Connecting to a database using the same handle more than once, or connecting using
160    /// an uninitialized handle, is considered [`LsmErrorCode::LsmMisuse`].
161    ///
162    /// # Example
163    ///
164    /// ```rust
165    /// use lsmlite_rs::*;
166    ///
167    /// let db_conf = DbConf::new(
168    ///                           "/tmp/",
169    ///                           "my_db_c".to_string(),
170    /// );
171    ///
172    /// let mut db: LsmDb = Default::default();
173    ///
174    /// let rc = db.connect();
175    /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
176    ///
177    /// let rc = db.initialize(db_conf);
178    /// let rc = db.connect();
179    /// assert_eq!(rc, Ok(()));
180    /// assert!(db.is_connected());
181    ///
182    /// let rc = db.connect();
183    /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
184    /// ```
185    fn connect(&mut self) -> Result<(), LsmErrorCode> {
186        if !self.initialized || self.connected {
187            return Err(LsmErrorCode::LsmMisuse);
188        }
189
190        // Database has been initialized, thus we can proceed.
191        let mut rc: i32;
192        let mut db_handle = null_mut();
193        unsafe {
194            // Get a new handle to connect to the database on disk.
195            rc = lsm_new(null_mut(), &mut db_handle);
196        }
197
198        if rc != 0 {
199            // The only error that can occur is memory allocation. Thus,
200            // if we fail while allocating, we do not have to deallocate anything.
201            return Err(LsmErrorCode::try_from(rc)?);
202        }
203
204        self.db_handle = db_handle;
205
206        // In here we configure parameters of the database. These parameters
207        // are a good approximation in general. On bigger machines we would
208        // consume more main-memory. We differentiate between opening in read-only mode
209        // or not, as in read-only we need no extra threads or write-oriented parameters.
210        unsafe {
211            // These parameters are independent of the mode we open the file in.
212
213            // Disable multi-process support to improve performance
214            // (no OS advisory locks are used to synchronize access
215            // to the database file).
216            let multi_process: i32 = 0;
217            rc = lsm_config(
218                self.db_handle,
219                LsmParam::MultipleProcesses as i32,
220                &multi_process,
221            );
222
223            if rc != 0 {
224                self.disconnect()?;
225                return Err(LsmErrorCode::try_from(rc)?);
226            }
227
228            // These are our default parameters of any handle (whether it writes
229            // to the database or not).
230
231            // Maximum size of a main-memory tree before it can be marked as old.
232            let autoflush: i32 = SIZE_MAIN_MEMORY_TREE_KB;
233            rc = lsm_config(self.db_handle, LsmParam::AutoFlush as i32, &autoflush);
234
235            if rc != 0 {
236                self.disconnect()?;
237                return Err(LsmErrorCode::try_from(rc)?);
238            }
239
240            rc = lsm_config(self.db_handle, LsmParam::PageSize as i32, &PAGE_SIZE_B);
241
242            if rc != 0 {
243                self.disconnect()?;
244                return Err(LsmErrorCode::try_from(rc)?);
245            }
246
247            rc = lsm_config(self.db_handle, LsmParam::BlockSize as i32, &BLOCK_SIZE_KB);
248
249            if rc != 0 {
250                self.disconnect()?;
251                return Err(LsmErrorCode::try_from(rc)?);
252            }
253
254            // How much of the file is kept in memory.
255            let mmap_size: i32 = 0;
256            rc = lsm_config(self.db_handle, LsmParam::Mmap as i32, &mmap_size);
257
258            if rc != 0 {
259                self.disconnect()?;
260                return Err(LsmErrorCode::try_from(rc)?);
261            }
262
263            let safety: i32 = LsmSafety::Normal as i32;
264            rc = lsm_config(self.db_handle, LsmParam::Safety as i32, &safety);
265
266            if rc != 0 {
267                self.disconnect()?;
268                return Err(LsmErrorCode::try_from(rc)?);
269            }
270
271            if self.db_conf.handle_mode == LsmHandleMode::ReadOnly {
272                // Here are parameters set that are only relevant in read-only mode.
273                // Observe that this overwrites the mode the handle operates in,
274                // as only reads are performed.
275                let read_only_int = 1;
276                rc = lsm_config(self.db_handle, LsmParam::ReadOnly as i32, &read_only_int);
277
278                if rc != 0 {
279                    self.disconnect()?;
280                    return Err(LsmErrorCode::try_from(rc)?);
281                }
282
283                // When opening in read-only mode, we ignore the mode provided by the user
284                // as no background threads (operating on the file) are needed.
285                self.db_conf.mode = LsmMode::LsmNoBackgroundThreads;
286            }
287
288            // If we are instructed to configure compression, we do it now. Setting the
289            // compression can be done only when the database was created, once set,
290            // trying to do it again or not setting it to the same compression scheme
291            // will be considered an error (LsmErrorCode::LsmMismatch).
292            match self.db_conf.compression {
293                LsmCompressionLib::NoCompression => {}
294                LsmCompressionLib::LZ4 => {
295                    let lz4 = LsmLz4::new();
296                    self.db_compress = lz4.get_compression_methods().ok();
297                }
298                LsmCompressionLib::ZLib => {
299                    let zlib = LsmZLib::new();
300                    self.db_compress = zlib.get_compression_methods().ok();
301                }
302                LsmCompressionLib::ZStd => {
303                    let zstd = LsmZStd::new();
304                    self.db_compress = zstd.get_compression_methods().ok();
305                }
306            };
307
308            // Only if the compression library is defined we pass it onto
309            // the engine. Otherwise no compression whatsoever.
310            if let Some(lsm_compress) = self.db_compress.as_ref() {
311                rc = lsm_config(
312                    self.db_handle,
313                    LsmParam::SetCompression as i32,
314                    lsm_compress,
315                );
316
317                if rc != 0 {
318                    self.disconnect()?;
319                    return Err(LsmErrorCode::try_from(rc)?);
320                }
321            }
322
323            rc = lsm_open(self.db_handle, self.db_fq_name.as_ptr());
324
325            if rc != 0 {
326                self.disconnect()?;
327                return Err(LsmErrorCode::try_from(rc)?);
328            }
329
330            self.connected = true;
331
332            // Whether we spawn background threads is at this point properly set.
333            // Currently we spawn only one background thread at most, and thus its
334            // id is set to 0. This has to be executed after we have connected
335            // to the database.
336            self.configure_bg_threads(self.db_conf.mode, 0)?;
337        }
338
339        // We output the current parameters of the writer.
340        unsafe {
341            let auto_flush: i32 = -1;
342            let _ = lsm_config(self.db_handle, LsmParam::AutoFlush as i32, &auto_flush);
343
344            let page_size_b: i32 = -1;
345            let _ = lsm_config(self.db_handle, LsmParam::PageSize as i32, &page_size_b);
346
347            let block_size_kb: i32 = -1;
348            let _ = lsm_config(self.db_handle, LsmParam::BlockSize as i32, &block_size_kb);
349
350            let auto_checkpoint_kb: i32 = -1;
351            let _ = lsm_config(
352                self.db_handle,
353                LsmParam::AutoCheckPoint as i32,
354                &auto_checkpoint_kb,
355            );
356
357            let auto_work: i32 = -1;
358            let _ = lsm_config(self.db_handle, LsmParam::AutoWork as i32, &auto_work);
359
360            let multi_process: i32 = -1;
361            let _ = lsm_config(
362                self.db_handle,
363                LsmParam::MultipleProcesses as i32,
364                &multi_process,
365            );
366
367            let read_only: i32 = -1;
368            let _ = lsm_config(self.db_handle, LsmParam::ReadOnly as i32, &read_only);
369
370            let mmap_size: i32 = -1;
371            let _ = lsm_config(self.db_handle, LsmParam::Mmap as i32, &mmap_size);
372
373            let safety: i32 = -1;
374            let _ = lsm_config(self.db_handle, LsmParam::Safety as i32, &safety);
375
376            tracing::info!(
377                auto_flush = format!("{auto_flush} KBs"),
378                page_size = format!("{page_size_b} Bs"),
379                block_size = format!("{block_size_kb} KBs"),
380                auto_checkpoint = format!("{auto_checkpoint_kb} KBs"),
381                auto_work = if auto_work != 0 { "yes" } else { "no" },
382                multi_process = if multi_process != 0 { "yes" } else { "no" },
383                read_only = if read_only != 0 { "yes" } else { "no" },
384                background_threads = if self.db_conf.mode != LsmMode::LsmNoBackgroundThreads {
385                    "yes"
386                } else {
387                    "no"
388                },
389                mmap_overhead = format!("{mmap_size} KBs"),
390                compression = ?self.db_conf.compression,
391                safety = if safety == 0 { "None" } else if safety == 1 { "Normal" } else { "Full" },
392                "lsmlite-rs parameters.",
393            );
394        }
395
396        // If we get through, then everything is fine.
397        Ok(())
398    }
399
400    /// This method frees up all the resources used by the main-memory handle. A call
401    /// to `connect` has to have the corresponding call to `disconnect`, otherwise:
402    /// 1. The resources that belong to the memory handle will be leaked.
403    /// 2. The database (file) won't be closed and the next time we open it the recovery
404    ///    process will kick-in (which can take considerable time. Depending on the
405    ///    size of the log).
406    ///
407    /// For completeness, [`LsmDb`] also implements [`Drop`] so that a handle
408    /// gets automatically released once it goes out of scope.
409    ///
410    /// Disconnecting using an uninitialized handle, or a handle that is not yet connected
411    /// is not considered an error.
412    ///
413    /// # Example
414    ///
415    /// ```rust
416    /// use lsmlite_rs::*;
417    ///
418    /// let db_conf = DbConf::new(
419    ///                           "/tmp/",
420    ///                           "my_db_d".to_string(),
421    /// );
422    ///
423    /// let mut db: LsmDb = Default::default();
424    /// let rc = db.disconnect();
425    /// assert_eq!(rc, Ok(()));
426    ///
427    /// let rc = db.initialize(db_conf);
428    /// let rc = db.connect();
429    /// let rc = db.disconnect();
430    /// assert_eq!(rc, Ok(()));
431    /// assert!(!db.is_connected());
432    /// ```
433    fn disconnect(&mut self) -> Result<(), LsmErrorCode> {
434        if !self.initialized || !self.connected {
435            return Ok(());
436        }
437
438        // First, we explicitly shutdown all background threads (it might take some time
439        // depending on what they are doing). In this manner, they will all flush their
440        // data to disk.
441        if self.db_conf.mode != LsmMode::LsmNoBackgroundThreads {
442            self.db_bg_threads.shutdown();
443        }
444
445        // We now proceed to close the database and destroy all allocated resources of the handle.
446        let rc: i32;
447        unsafe {
448            rc = lsm_close(self.db_handle);
449        }
450        if rc != 0 {
451            return Err(LsmErrorCode::try_from(rc)?);
452        }
453
454        // We reset the pointer once we know we were able to cleanly close the database.
455        self.db_handle = null_mut();
456        self.connected = false;
457
458        // If we get through, then everything is fine.
459        Ok(())
460    }
461
462    /// This function writes the given entry on the database file in a transactional
463    /// manner. That is, it either writes it completely, or not, but it leaves the
464    /// database in no inconsistent state.
465    ///
466    /// Trying to persist data using an uninitialized handle, or one that is not yet
467    /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
468    ///
469    /// # Example
470    ///
471    /// ```rust
472    /// use lsmlite_rs::*;
473    /// use rand::{thread_rng, Rng};
474    ///
475    /// let db_conf = DbConf::new(
476    ///                           "/tmp/",
477    ///                           "my_db_e".to_string(),
478    /// );
479    ///
480    /// let mut db: LsmDb = Default::default();
481    /// let rc = db.initialize(db_conf);
482    /// let rc = db.connect();
483    ///
484    /// let mut prng = thread_rng();
485    /// // 16-byte random key (not very useful in practice).
486    /// let key: Vec<u8> = (0..16).map(|_| prng.gen_range(0..=255)).collect();
487    /// // 1 KB zeroed payload.
488    /// let value = vec![0; 1024];
489    ///
490    /// let rc = db.persist(&key, &value);
491    /// assert_eq!(rc, Ok(()));
492    ///
493    /// // This is also possible (would overwrite the entry)
494    /// let rc = Disk::persist(&mut db, &key, &value);
495    /// assert_eq!(rc, Ok(()));
496    ///
497    /// let rc = db.disconnect();
498    /// ```
499    fn persist(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode> {
500        if !self.initialized || !self.connected {
501            return Err(LsmErrorCode::LsmMisuse);
502        }
503
504        let start = Instant::now();
505        let serial_key = key;
506        let serial_key_len = serial_key.len() as i32;
507        let serial_blob = value;
508        let serial_blob_len = serial_blob.len() as i32;
509        let rc: i32;
510
511        unsafe {
512            // If we have background threads, we have to synchronize to avoid running
513            // out of memory. That is, we cannot write to the database until we make sure that
514            // we haven't exceeded the resources we are told (main memory for example).
515            self.deal_with_bg_threads()?;
516
517            rc = lsm_insert(
518                self.db_handle,
519                serial_key.as_ptr(),
520                serial_key_len,
521                serial_blob.as_ptr(),
522                serial_blob_len,
523            );
524            if rc != 0 {
525                return Err(LsmErrorCode::try_from(rc)?);
526            }
527        }
528
529        let current_request_duration = Instant::now()
530            .checked_duration_since(start)
531            .unwrap_or_default();
532        match &self.db_conf.metrics {
533            None => {}
534            Some(metrics) => metrics
535                .write_times_s
536                .observe(current_request_duration.as_secs_f64()),
537        }
538        Ok(())
539    }
540
541    /// This function is just sugar. The database file can be considered a primary
542    /// index in which only one entry under a given key can exist. If another entry
543    /// with an existing key is persisted, it overwrites the existing one.
544    fn update(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode> {
545        Disk::persist(self, key, value)
546    }
547
548    /// This deletes the entry under the given key (in a transactional manner).
549    ///
550    /// Trying to delete data using an uninitialized handle, or one that is not yet
551    /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
552    ///
553    /// # Example
554    ///
555    /// ```rust
556    /// use lsmlite_rs::*;
557    /// use rand::{thread_rng, Rng};
558    ///
559    /// let db_conf = DbConf::new(
560    ///                           "/tmp/",
561    ///                           "my_db_f".to_string(),
562    /// );
563    ///
564    /// let mut db: LsmDb = Default::default();
565    /// let rc = db.initialize(db_conf);
566    /// let rc = db.connect();
567    ///
568    /// let mut prng = thread_rng();
569    /// // 16-byte random key (not very useful in practice).
570    /// let key: Vec<u8> = (0..16).map(|_| prng.gen_range(0..=255)).collect();
571    /// // 1 KB zeroed payload.
572    /// let value = vec![0; 1024];
573    ///
574    /// let rc = db.persist(&key, &value);
575    /// assert_eq!(rc, Ok(()));
576    ///
577    /// // Entry under `key` will disappear.
578    /// let rc = db.delete(&key);
579    /// assert_eq!(rc, Ok(()));
580    ///
581    /// let rc = db.disconnect();
582    /// ```
583    fn delete(&mut self, key: &[u8]) -> Result<(), LsmErrorCode> {
584        if !self.initialized || !self.connected {
585            return Err(LsmErrorCode::LsmMisuse);
586        }
587
588        let rc: i32;
589        let key_len = key.len();
590        let key_ptr = key.as_ptr();
591        unsafe { rc = lsm_delete(self.db_handle, key_ptr, key_len as i32) }
592        if rc != 0 {
593            return Err(LsmErrorCode::try_from(rc)?);
594        }
595
596        Ok(())
597    }
598
599    /// This function deletes the open interval of keys (being, end) (in a transactional
600    /// manner as well).
601    fn delete_range(&mut self, begin: &[u8], end: &[u8]) -> Result<(), LsmErrorCode> {
602        if !self.initialized || !self.connected {
603            return Err(LsmErrorCode::LsmMisuse);
604        }
605
606        let rc: i32;
607        let starting_key_len = begin.len();
608        let starting_key_ptr = begin.as_ptr();
609        let ending_key_len = end.len();
610        let ending_key_ptr = end.as_ptr();
611        unsafe {
612            rc = lsm_delete_range(
613                self.db_handle,
614                starting_key_ptr,
615                starting_key_len as i32,
616                ending_key_ptr,
617                ending_key_len as i32,
618            )
619        }
620        if rc != 0 {
621            return Err(LsmErrorCode::try_from(rc)?);
622        }
623
624        Ok(())
625    }
626
627    /// This function optimizes a database to make it occupy as little space as possible.
628    /// Essentially, this function compacts the whole database into a single tightly-packed
629    /// B-tree: Thus, read I/O is optimized.
630    /// This function is thought to be used in an offline fashion - once
631    /// all writers have finished with the database.
632    fn optimize(&mut self) -> Result<(), LsmErrorCode> {
633        if !self.initialized || !self.connected {
634            return Err(LsmErrorCode::LsmMisuse);
635        }
636
637        let rc: i32;
638        unsafe {
639            // Let's work all the way through.
640            rc = lsm_work(self.db_handle, 1, -1, null_mut());
641
642            // Anything different than ok (0), or busy (5) is wrong!
643            if rc != 0 && rc != 5 {
644                lsm_close(self.db_handle);
645                let ec = LsmErrorCode::try_from(rc)?;
646                tracing::error!(
647                    datafile = self.get_full_db_path()?,
648                    rc  = ?ec,
649                    "Error occurred while working on the datafile. Exiting background thread.",
650                );
651            }
652        }
653
654        Ok(())
655    }
656
657    /// This function opens a transaction explicitly. All operations contained between
658    /// opening a transaction and committing it using [`Disk::commit_transaction`] will be
659    /// performed atomically. Similarly, if the transaction is explicitly rolled back using
660    /// [`Disk::rollback_transaction`], all enclosed operations will not be persistent.
661    /// Observe that every database operation is contained in an implicit transaction. This
662    /// function is thought to encapsulate multiple operations into a single transaction.
663    fn begin_transaction(&mut self) -> Result<(), LsmErrorCode> {
664        if !self.initialized || !self.connected {
665            return Err(LsmErrorCode::LsmMisuse);
666        }
667
668        // TODO: For the time being we accept only a single
669        // transaction, no nested ones.
670        let rc: i32;
671        unsafe {
672            rc = lsm_begin(self.db_handle, 1);
673        }
674
675        // A transaction is easy to deal with, it's either successfully
676        // opened or not.
677        if rc != 0 {
678            return Err(LsmErrorCode::try_from(rc)?);
679        }
680
681        Ok(())
682    }
683
684    /// This function commits an opened transaction. Without committing a transaction,
685    /// all enclosed operations will remain hidden from the consistent state of the database.
686    fn commit_transaction(&mut self) -> Result<(), LsmErrorCode> {
687        if !self.initialized || !self.connected {
688            return Err(LsmErrorCode::LsmMisuse);
689        }
690
691        // TODO: For the time being we do not support nested
692        // transactions, but we try to commit all open ones anyway.
693        let rc: i32;
694        unsafe {
695            rc = lsm_commit(self.db_handle, 0);
696        }
697
698        // The commit either succeeds or not.
699        if rc != 0 {
700            return Err(LsmErrorCode::try_from(rc)?);
701        }
702
703        Ok(())
704    }
705
706    /// This function rollbacks an opened transaction explicitly. All enclosed
707    /// operations will remain hidden from the consistent state of the database.
708    fn rollback_transaction(&mut self) -> Result<(), LsmErrorCode> {
709        if !self.initialized || !self.connected {
710            return Err(LsmErrorCode::LsmMisuse);
711        }
712
713        // TODO: For the time being we do not support nested
714        // transactions, thus we simply rollback and close the top-level
715        // transaction.
716        let rc: i32;
717        unsafe {
718            rc = lsm_rollback(self.db_handle, 0);
719        }
720
721        // The rollback should succeed, otherwise there are pretty bad
722        // issues down the pipeline.
723        if rc != 0 {
724            return Err(LsmErrorCode::try_from(rc)?);
725        }
726
727        Ok(())
728    }
729
730    /// This function returns a cursor to the underlying database.
731    /// This cursor can be operated by the methods provided by the [`Cursor`] trait.
732    /// When opening a cursor, a snapshot of the database will be created for it. No
733    /// new data arriving after the cursor has been created will be visible to the
734    /// cursor. A cursor is used to performed read-only operations over the database.
735    /// That is, no data of the database can be modified through a cursor.
736    ///
737    /// Trying to open a cursor using a uninitialized handle, or one that is not yet
738    /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
739    ///
740    /// # Example
741    ///
742    /// ```rust
743    /// use lsmlite_rs::*;
744    ///
745    /// let db_conf = DbConf::new(
746    ///                           "/tmp/",
747    ///                           "my_db_g".to_string(),
748    /// );
749    ///
750    /// let mut db: LsmDb = Default::default();
751    /// let rc = db.initialize(db_conf);
752    /// let rc = db.connect();
753    ///
754    /// // Opening a cursor for `db`. This cursor is currently
755    /// // not positioned anywhere, and thus no data can be extracted
756    /// // from it.
757    /// let cursor = db.cursor_open();
758    /// assert!(cursor.is_ok());
759    /// ```
760    fn cursor_open(&self) -> Result<LsmCursor, LsmErrorCode> {
761        if !self.initialized || !self.connected {
762            return Err(LsmErrorCode::LsmMisuse);
763        }
764
765        let cursor: *mut lsm_cursor = null_mut();
766        let rc: i32;
767        unsafe {
768            rc = lsm_csr_open(self.db_handle, &cursor);
769        }
770        if rc != 0 {
771            return Err(LsmErrorCode::try_from(rc)?);
772        }
773        Ok(LsmCursor {
774            db_cursor: cursor,
775            _marker: Default::default(),
776        })
777    }
778}
779
780/// Custom implementation of [`Cursor`] for [`LsmCursor`].
781impl Cursor for LsmCursor<'_> {
782    /// This function closes an existing cursor over the underlying
783    /// database. A call to [`Disk::cursor_open`] must be paired up (in the end)
784    /// with a call to [`Cursor::close`]. Otherwise the database won't be cleanly
785    /// closed (due to the snapshot that belongs to the cursor), and
786    /// a recovery process will be spawn the next time the database
787    /// file is opened.
788    ///
789    /// For completeness, [`LsmCursor`] also implements [`Drop`] so that a cursor
790    /// gets automatically released once it goes out of scope (thus releasing
791    /// resources).
792    ///
793    /// Closing an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
794    fn close(&mut self) -> Result<(), LsmErrorCode> {
795        if self.db_cursor.is_null() {
796            return Err(LsmErrorCode::LsmMisuse);
797        }
798
799        let rc: i32;
800        // Free resources.
801        unsafe {
802            // This call is infallible, it always returns Ok.
803            rc = lsm_csr_close(self.db_cursor);
804            // From here on, this cursor is not useful any longer.
805            self.db_cursor = null_mut();
806        }
807        if rc != 0 {
808            return Err(LsmErrorCode::try_from(rc)?);
809        }
810
811        Ok(())
812    }
813
814    /// Tests whether the cursor is currently pointing to a valid database record.
815    /// When operating a cursor, this function has to be called before extracting
816    /// records from it (key and/or value) to make sure that the values can be
817    /// trusted. That is, a cursor might internally retain the last output value
818    /// for a while after it has become invalid (say moved past the end of the
819    /// database), or empty values can be extracted from it before positioning
820    /// the cursor on a valid record.
821    ///
822    /// Testing for validity of an uninitialized uninitialized [`LsmCursor`] is
823    /// considered [`LsmErrorCode::LsmMisuse`].
824    fn valid(&self) -> Result<(), LsmErrorCode> {
825        if self.db_cursor.is_null() {
826            return Err(LsmErrorCode::LsmMisuse);
827        }
828
829        let mut rc: i32;
830        unsafe {
831            rc = lsm_csr_valid(self.db_cursor);
832            // Internally, lsm_csr_valid returns true (== 1) when valid,
833            // but LsmOk == 0, thus we exchange the value to represent
834            // true as LsmOk, and false as LsmError.
835            rc = 1 - rc;
836        }
837        if rc != 0 {
838            return Err(LsmErrorCode::try_from(rc)?);
839        }
840
841        Ok(())
842    }
843
844    /// Moves the cursor to the very first record in the database.
845    /// Positioning an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
846    ///
847    /// # Example
848    ///
849    /// ```rust
850    /// use lsmlite_rs::*;
851    ///
852    /// let db_conf = DbConf::new(
853    ///                           "/tmp/",
854    ///                           "my_db_h".to_string(),
855    /// );
856    ///
857    /// let mut db: LsmDb = Default::default();
858    /// let rc = db.initialize(db_conf);
859    /// let rc = db.connect();
860    ///
861    /// // Insert data into the database, so that something gets traversed.
862    /// let key: usize = 1;
863    /// let key_serial = key.to_be_bytes();
864    /// // 1 KB zeroed payload.
865    /// let value = vec![0; 1024];
866    /// let rc = db.persist(&key_serial, &value)?;
867    ///
868    /// let key: usize = 2;
869    /// let key_serial = key.to_be_bytes();
870    /// let rc = db.persist(&key_serial, &value)?;
871    ///
872    /// let mut cursor = db.cursor_open()?;
873    ///
874    /// let rc = cursor.first();
875    /// assert!(rc.is_ok());
876    ///
877    /// let mut num_records = 0;
878    /// while cursor.valid().is_ok() {
879    ///     num_records += 1;
880    ///     let current_key = Cursor::get_key(&cursor)?;
881    ///     let current_value = Cursor::get_value(&cursor)?;
882    ///     cursor.next()?;
883    /// }
884    /// assert_eq!(num_records, 2);
885    ///
886    /// // EOF
887    /// assert!(cursor.valid().is_err());
888    ///
889    /// # Result::<(), LsmErrorCode>::Ok(())
890    /// ```
891    fn first(&mut self) -> Result<(), LsmErrorCode> {
892        if self.db_cursor.is_null() {
893            return Err(LsmErrorCode::LsmMisuse);
894        }
895
896        let rc: i32;
897        unsafe {
898            rc = lsm_csr_first(self.db_cursor);
899        }
900        if rc != 0 {
901            return Err(LsmErrorCode::try_from(rc)?);
902        }
903
904        Ok(())
905    }
906
907    /// Moves the cursor to the very last record in the database.
908    /// Positioning an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
909    /// # Example
910    ///
911    /// ```rust
912    /// use lsmlite_rs::*;
913    ///
914    /// let db_conf = DbConf::new(
915    ///                           "/tmp/",
916    ///                           "my_db_i".to_string(),
917    /// );
918    ///
919    /// let mut db: LsmDb = Default::default();
920    /// let rc = db.initialize(db_conf);
921    /// let rc = db.connect();
922    ///
923    /// // Insert data into the database, so that something gets traversed.
924    /// let key: usize = 1;
925    /// let key_serial = key.to_be_bytes();
926    /// // 1 KB zeroed payload.
927    /// let value = vec![0; 1024];
928    /// let rc = db.persist(&key_serial, &value)?;
929    ///
930    /// let key: usize = 2;
931    /// let key_serial = key.to_be_bytes();
932    /// let rc = db.persist(&key_serial, &value)?;
933    ///
934    /// let mut cursor = db.cursor_open()?;
935    ///
936    /// let rc = cursor.last();
937    /// assert!(rc.is_ok());
938    ///
939    /// let mut num_records = 0;
940    /// while cursor.valid().is_ok() {
941    ///     num_records += 1;
942    ///     let current_key = Cursor::get_key(&cursor)?;
943    ///     let current_value = Cursor::get_value(&cursor)?;
944    ///     cursor.prev()?;
945    /// }
946    /// assert_eq!(num_records, 2);
947    ///
948    /// // EOF
949    /// assert!(cursor.valid().is_err());
950    ///
951    /// # Result::<(), LsmErrorCode>::Ok(())
952    /// ```
953    fn last(&mut self) -> Result<(), LsmErrorCode> {
954        if self.db_cursor.is_null() {
955            return Err(LsmErrorCode::LsmMisuse);
956        }
957
958        let rc: i32;
959        unsafe {
960            rc = lsm_csr_last(self.db_cursor);
961        }
962        if rc != 0 {
963            return Err(LsmErrorCode::try_from(rc)?);
964        }
965
966        Ok(())
967    }
968
969    /// This positions the cursor on an entry of the database that depends on
970    /// the seek mode provided:
971    /// 1. If [`LsmCursorSeekOp::LsmCursorSeekLe`] is given, then the cursor will be positioned
972    ///    at the entry that is less or equal than the provided key depending on whether
973    ///    the key is found in the database or not.
974    /// 2. If [`LsmCursorSeekOp::LsmCursorSeekEq`] is given, then the cursor will be positioned
975    ///    at the entry that corresponding to the given key, or at the end of the database
976    ///    depending on whether the entry is found or not. If the entry is found, a call
977    ///    to `valid` on the cursor will return success, and otherwise an error.
978    /// 3. If [`LsmCursorSeekOp::LsmCursorSeekGe`] is given, then the cursor will be positioned
979    ///    at the entry that is greater or equal than the provided key depending on
980    ///    whether the key is found in the database or not.
981    fn seek(&mut self, key: &[u8], mode: LsmCursorSeekOp) -> Result<(), LsmErrorCode> {
982        if self.db_cursor.is_null() {
983            return Err(LsmErrorCode::LsmMisuse);
984        }
985
986        let rc: i32;
987        let key_len = key.len();
988        let key_ptr = key.as_ptr();
989        unsafe {
990            rc = lsm_csr_seek(self.db_cursor, key_ptr, key_len as i32, mode as i32);
991        }
992        if rc != 0 {
993            return Err(LsmErrorCode::try_from(rc)?);
994        }
995
996        Ok(())
997    }
998
999    /// Once a cursor is position at a valid entry, this function moves it to the next
1000    /// entry. This function can be called only when moving forward on the database. That is,
1001    /// when starting from [`Cursor::first`] or when seeking with [`LsmCursorSeekOp::LsmCursorSeekGe`].
1002    /// Otherwise an error will be issued.
1003    fn next(&mut self) -> Result<(), LsmErrorCode> {
1004        if self.db_cursor.is_null() {
1005            return Err(LsmErrorCode::LsmMisuse);
1006        }
1007
1008        let rc: i32;
1009        unsafe {
1010            rc = lsm_csr_next(self.db_cursor);
1011        }
1012        if rc != 0 {
1013            return Err(LsmErrorCode::try_from(rc)?);
1014        }
1015
1016        Ok(())
1017    }
1018
1019    /// Similar to [`Cursor::next`], but moving to the previous entry. This function can
1020    /// be called only when moving backwards on the database. That is, when starting from
1021    /// [`Cursor::last`] or when seeking with [`LsmCursorSeekOp::LsmCursorSeekLe`]. Otherwise an
1022    /// error will be issued.
1023    fn prev(&mut self) -> Result<(), LsmErrorCode> {
1024        if self.db_cursor.is_null() {
1025            return Err(LsmErrorCode::LsmMisuse);
1026        }
1027
1028        let rc: i32;
1029        unsafe {
1030            rc = lsm_csr_prev(self.db_cursor);
1031        }
1032        if rc != 0 {
1033            return Err(LsmErrorCode::try_from(rc)?);
1034        }
1035
1036        Ok(())
1037    }
1038
1039    /// If the cursor is [`Cursor::valid`], then this function retrieves the key of
1040    /// the entry the cursor is currently pointing to. The memory the key uses
1041    /// belongs to the parent call. If the cursor is not valid, an error is returned.
1042    fn get_key(&self) -> Result<Vec<u8>, LsmErrorCode> {
1043        self.valid()?;
1044
1045        let rc: i32;
1046        let key_ptr: *mut u8 = null_mut();
1047        let mut key_len: i32 = 0;
1048        let mut key: Vec<u8> = vec![];
1049        unsafe {
1050            rc = lsm_csr_key(self.db_cursor, &key_ptr, &mut key_len);
1051            if rc != 0 {
1052                return Err(LsmErrorCode::try_from(rc)?);
1053            }
1054            // We reserve enough space so that we can copy what the cursor returned.
1055            key.reserve(key_len as usize);
1056            // We copy the returned value onto new memory that the upper call
1057            // will own.
1058            key_ptr.copy_to_nonoverlapping(key.as_mut_ptr(), key_len as usize);
1059            key.set_len(key_len as usize);
1060        }
1061        // This memory belongs now to the upper layer.
1062        Ok(key)
1063    }
1064
1065    /// If the cursor is [`Cursor::valid`], then this function retrieves the value
1066    /// of the entry the cursor is currently pointing to. The memory the key uses
1067    /// belongs to the parent call. If the cursor is not valid, an error is returned.
1068    fn get_value(&self) -> Result<Vec<u8>, LsmErrorCode> {
1069        self.valid()?;
1070
1071        let rc: i32;
1072        let value_ptr: *mut u8 = null_mut();
1073        let mut value_len: i32 = 0;
1074        let mut value: Vec<u8> = vec![];
1075        unsafe {
1076            rc = lsm_csr_value(self.db_cursor, &value_ptr, &mut value_len);
1077            if rc != 0 {
1078                return Err(LsmErrorCode::try_from(rc)?);
1079            }
1080            // We reserve enough space so that we can copy what the cursor returned.
1081            value.reserve(value_len as usize);
1082            // We copy the returned value onto new memory that the upper call
1083            // will own.
1084            value_ptr.copy_to_nonoverlapping(value.as_mut_ptr(), value_len as usize);
1085            value.set_len(value_len as usize);
1086        }
1087        // This memory belongs now to the upper layer.
1088        Ok(value)
1089    }
1090
1091    /// If the cursor is [`Cursor::valid`], then this function compares the key of the
1092    /// entry the cursor is currently pointing to, with the given key. On success, the
1093    /// result of the comparison is returned. The comparison happens as per
1094    /// `memcmp`, that is, if the cursor's key is [`Ordering::Less`], [`Ordering::Equal`],
1095    /// or [`Ordering::Greater`] than the provided key, then the corresponding [`Ordering`]
1096    /// will be returned. On prefix comparison, that is, the given key is a strict prefix of
1097    /// the cursor key, [`Ordering::Greater`] will be returned.
1098    ///
1099    /// This function is useful when probing the database for a range.
1100    ///
1101    /// # Example
1102    ///
1103    /// ```rust
1104    /// use std::cmp::Ordering;
1105    /// use lsmlite_rs::*;
1106    ///
1107    /// let db_conf = DbConf::new(
1108    ///                           "/tmp/",
1109    ///                           "my_db_j".to_string(),
1110    /// );
1111    ///
1112    /// let mut db: LsmDb = Default::default();
1113    /// let rc = db.initialize(db_conf);
1114    /// let rc = db.connect();
1115    ///
1116    /// // Insert data into the database, so that something gets traversed.
1117    /// let key: usize = 1;
1118    /// let key_serial = key.to_be_bytes();
1119    /// // 1 KB zeroed payload.
1120    /// let value = vec![0; 1024];
1121    /// let rc = db.persist(&key_serial, &value)?;
1122    ///
1123    /// let key: usize = 2;
1124    /// let key_serial = key.to_be_bytes();
1125    /// let rc = db.persist(&key_serial, &value)?;
1126    ///
1127    /// let mut cursor = db.cursor_open()?;
1128    ///
1129    /// let rc = cursor.first();
1130    /// assert!(rc.is_ok());
1131    ///
1132    /// // Assume the very first record is smaller than this.
1133    /// let key_ub_value: usize = 2;
1134    /// let key_ub_serial = key_ub_value.to_be_bytes();
1135    /// // `Ordering::Less` tells that the key of the cursor is smaller
1136    /// // than `key_ub_value`.
1137    /// let mut key_cmp = cursor.compare(&key_ub_serial)?;
1138    ///
1139    /// let mut num_records = 0;
1140    /// while cursor.valid().is_ok() && key_cmp < Ordering::Equal {
1141    ///    num_records += 1;
1142    ///    cursor.next()?;
1143    ///    key_cmp = cursor.compare(&key_ub_serial)?;
1144    /// }
1145    ///
1146    /// assert_eq!(num_records, 1);
1147    ///
1148    /// // We either exhausted the database or found a key >= than `key_ub_value`.
1149    /// # Result::<(), LsmErrorCode>::Ok(())
1150    /// ```
1151    fn compare(&self, key: &[u8]) -> Result<Ordering, LsmErrorCode> {
1152        self.valid()?;
1153
1154        let rc: i32;
1155        let mut result: i32 = 0;
1156        let zero: i32 = 0;
1157        let key_len = key.len();
1158        let key_ptr = key.as_ptr();
1159        unsafe {
1160            rc = lsm_csr_cmp(self.db_cursor, key_ptr, key_len as i32, &mut result);
1161        }
1162        if rc != 0 {
1163            return Err(LsmErrorCode::try_from(rc)?);
1164        }
1165
1166        Ok(result.cmp(&zero))
1167    }
1168}
1169
1170/// Additional to implementing [`Disk`], the following helper functions are also available.
1171impl LsmDb {
1172    fn configure_bg_threads(&mut self, mode: LsmMode, id: usize) -> Result<(), LsmErrorCode> {
1173        let rc: i32;
1174        match mode {
1175            LsmMode::LsmNoBackgroundThreads => {
1176                // In single-threaded mode we do nothing but to set parameters of the connection.
1177                // These parameters are a good approximation in general. On bigger machines we would
1178                // want to consume more main-memory for example.
1179                unsafe {
1180                    // Modifying auto checkpointing, as a single thread will handle all operations.
1181                    let checkpoint_size: i32 = MAX_CHECKPOINT_SIZE_KB;
1182                    rc = lsm_config(
1183                        self.db_handle,
1184                        LsmParam::AutoCheckPoint as i32,
1185                        &checkpoint_size,
1186                    );
1187
1188                    if rc != 0 {
1189                        return Err(LsmErrorCode::try_from(rc)?);
1190                    }
1191                }
1192            }
1193            // If a single extra thread should handle work and checkpoint, then
1194            // we signal this.
1195            LsmMode::LsmBackgroundMerger => {
1196                // We now initialize the thread that will take care of working and checkpointing.
1197                self.db_bg_threads = LsmBgWorkers::new(&self.db_conf, &self.db_fq_name, id);
1198                // If the background thread was issued, then we output this information.
1199                if self.db_bg_threads.bg_threads[0].thread.is_some() {
1200                    // Disable auto work, which will be delegated to a thread. The main
1201                    // writer won't take care of this.
1202                    let auto_work: i32 = 0;
1203                    unsafe {
1204                        rc = lsm_config(self.db_handle, LsmParam::AutoWork as i32, &auto_work);
1205                    }
1206
1207                    if rc != 0 {
1208                        // Let's destroy the background threads.
1209                        self.db_bg_threads.shutdown();
1210                        return Err(LsmErrorCode::try_from(rc)?);
1211                    }
1212
1213                    // All good, go ahead and inform.
1214                    tracing::info!(
1215                        datafile = self.get_full_db_path()?,
1216                        "Combined merger and check-pointer thread scheduled.",
1217                    );
1218                }
1219
1220                // If the background thread was not issued (due to internal errors)
1221                // we change no property of the main connection to avoid problems.
1222            }
1223            LsmMode::LsmBackgroundCheckpointer => {
1224                // We first initialize the thread that will take care of checkpointing.
1225                self.db_bg_threads = LsmBgWorkers::new(&self.db_conf, &self.db_fq_name, id);
1226                // If the background thread was issued, then we output this information.
1227                if self.db_bg_threads.bg_threads[0].thread.is_some() {
1228                    // Disable auto checkpointing, which will be delegated to a thread. The main
1229                    // writer won't take care of this.
1230                    let auto_checkpoint: i32 = 0;
1231                    unsafe {
1232                        rc = lsm_config(
1233                            self.db_handle,
1234                            LsmParam::AutoCheckPoint as i32,
1235                            &auto_checkpoint,
1236                        );
1237                    }
1238
1239                    if rc != 0 {
1240                        // Let's destroy the background threads.
1241                        self.db_bg_threads.shutdown();
1242                        return Err(LsmErrorCode::try_from(rc)?);
1243                    }
1244
1245                    // All good, go ahead and inform.
1246                    tracing::info!(
1247                        datafile = self.get_full_db_path()?,
1248                        "Check-pointer thread scheduled.",
1249                    );
1250                }
1251
1252                // If the background thread was not issued (due to internal errors)
1253                // we change no property of the main connection to avoid problems.
1254            }
1255        }
1256        Ok(())
1257    }
1258
1259    fn deal_with_bg_threads(&mut self) -> Result<(), LsmErrorCode> {
1260        match self.db_conf.mode {
1261            LsmMode::LsmNoBackgroundThreads => {}
1262            LsmMode::LsmBackgroundMerger => {
1263                // We register the time it takes for the merger to work.
1264                let start = Instant::now();
1265                self.wait_on_merger()?;
1266                let current_request_duration = Instant::now()
1267                    .checked_duration_since(start)
1268                    .unwrap_or_default();
1269                match &self.db_conf.metrics {
1270                    None => {}
1271                    Some(metrics) => metrics
1272                        .work_times_s
1273                        .observe(current_request_duration.as_secs_f64()),
1274                }
1275            }
1276            LsmMode::LsmBackgroundCheckpointer => {
1277                // We register the time it takes for the checkpointer to work.
1278                let start = Instant::now();
1279                self.wait_on_checkpointer()?;
1280                let current_request_duration = Instant::now()
1281                    .checked_duration_since(start)
1282                    .unwrap_or_default();
1283                match &self.db_conf.metrics {
1284                    None => {}
1285                    Some(metrics) => metrics
1286                        .checkpoint_times_s
1287                        .observe(current_request_duration.as_secs_f64()),
1288                }
1289            }
1290        }
1291        Ok(())
1292    }
1293
1294    fn wait_on_merger(&mut self) -> Result<(), LsmErrorCode> {
1295        let mut rc: i32;
1296        let mut old_tree_size: i32 = -1;
1297        let mut new_tree_size: i32 = -1;
1298
1299        // Since the database is single-writer, we can safely query
1300        // the current sizes of the main memory structures (trees)
1301        // to decide how to proceed. Observe that this is done
1302        // only in the case that multiple threads are used.
1303        // Otherwise, LSM does this internally.
1304        unsafe {
1305            rc = lsm_info(
1306                self.db_handle,
1307                LsmInfo::LsmTreeSize as i32,
1308                &mut old_tree_size,
1309                &mut new_tree_size,
1310            );
1311        }
1312
1313        if rc != 0 {
1314            return Err(LsmErrorCode::try_from(rc)?);
1315        }
1316
1317        let mut written_kb: i32 = 0;
1318        let mut overall_written_kb = 0;
1319        let work_kb: i32 = 128;
1320        // We perform work until we have enough space in main memory to keep writing.
1321        // This is to avoid running over the amount of main memory allowed to use.
1322        while old_tree_size > 0 {
1323            unsafe {
1324                rc = lsm_work(self.db_handle, NUM_MERGE_SEGMENTS, work_kb, &mut written_kb);
1325                overall_written_kb += written_kb;
1326
1327                // Anything different than ok (0), or busy (5) is wrong!
1328                if rc != 0 && rc != 5 {
1329                    lsm_close(self.db_handle);
1330                    let ec = LsmErrorCode::try_from(rc);
1331                    tracing::error!(
1332                            datafile = ?self.get_full_db_path(),
1333                            rc = ?ec,
1334                        "Error occurred while working on the datafile. No work performed \
1335                        on the database.",
1336                    );
1337                    return Err(LsmErrorCode::try_from(rc)?);
1338                }
1339
1340                // After having performed some work, we query the sizes of the
1341                // main-memory components to decide whether another iteration will be done or not.
1342                rc = lsm_info(
1343                    self.db_handle,
1344                    LsmInfo::LsmTreeSize as i32,
1345                    &mut old_tree_size,
1346                    &mut new_tree_size,
1347                );
1348
1349                // Something went wrong!
1350                if rc != 0 {
1351                    let ec = LsmErrorCode::try_from(rc);
1352                    tracing::error!(
1353                        datafile = ?self.get_full_db_path(),
1354                        rc = ?ec,
1355                        "Error occurred while obtaining segment information for background thread. \
1356                        Exiting background thread.",
1357                    );
1358                    return Err(LsmErrorCode::try_from(rc)?);
1359                }
1360
1361                park_timeout(Duration::from_millis(WRITER_PARK_TIME_MS));
1362            }
1363        }
1364
1365        // We update the metric on the amount of data written.
1366        match &self.db_conf.metrics {
1367            None => {}
1368            Some(metrics) => metrics.work_kbs.observe(overall_written_kb as f64),
1369        }
1370
1371        // Should hold.
1372        debug_assert!(old_tree_size == 0);
1373
1374        // Background thread will now take care of file operations in the background,
1375        // but main thread will be allowed to write to memory as well.
1376        self.db_bg_threads.execute(LsmBgWorkerMessage::Merge)?;
1377
1378        Ok(())
1379    }
1380
1381    fn wait_on_checkpointer(&mut self) -> Result<(), LsmErrorCode> {
1382        let mut rc: i32;
1383        let mut amount_volatile_data: i32 = -1;
1384        let writer_park_time_ms = Duration::from_millis(WRITER_PARK_TIME_MS);
1385
1386        // Since the database is single-writer, we can safely query
1387        // the current sizes of the main memory structures (trees)
1388        // to decide how to proceed. Observe that this is done
1389        // only in the case that multiple threads are used.
1390        // Otherwise, LSM does this internally.
1391        unsafe {
1392            rc = lsm_info(
1393                self.db_handle,
1394                LsmInfo::LsmCheckpointSize as i32,
1395                &mut amount_volatile_data,
1396            );
1397        }
1398
1399        if rc != 0 {
1400            return Err(LsmErrorCode::try_from(rc)?);
1401        }
1402
1403        // If a checkpoint is due, then we wake up the background thread.
1404        if amount_volatile_data >= MAX_CHECKPOINT_SIZE_KB {
1405            // This asks the background thread to checkpoint the data file (needed at this point).
1406            self.db_bg_threads.execute(LsmBgWorkerMessage::Checkpoint)?;
1407
1408            // Once the message has been sent, we wait for the background thread to
1409            // finish before returning control to the upper layer.
1410            // To avoid busy waits we yield for a little bit in every iteration.
1411            while amount_volatile_data >= MAX_CHECKPOINT_SIZE_KB {
1412                // TODO: We currently assume that the background thread is running
1413                // doing stuff. If the background thread dies, then we have to see how we
1414                // proceed (re-spawning the thread most probably to not interfere with the
1415                // existing writer connection).
1416                park_timeout(writer_park_time_ms);
1417
1418                unsafe {
1419                    rc = lsm_info(
1420                        self.db_handle,
1421                        LsmInfo::LsmCheckpointSize as i32,
1422                        &mut amount_volatile_data,
1423                    );
1424                }
1425
1426                if rc != 0 {
1427                    return Err(LsmErrorCode::try_from(rc)?);
1428                }
1429            }
1430        }
1431
1432        // If this condition holds, we can keep "safely" writing to the database.
1433        debug_assert!(amount_volatile_data < MAX_CHECKPOINT_SIZE_KB);
1434
1435        Ok(())
1436    }
1437
1438    /// This function tests whether a database handle has been initialized.
1439    pub fn is_initialized(&self) -> bool {
1440        self.initialized
1441    }
1442
1443    /// This function tests whether a database handle is connected.
1444    pub fn is_connected(&self) -> bool {
1445        self.connected
1446    }
1447
1448    /// This function outputs the full-qualified path of the database.
1449    /// It errors if the database has not been initialized.
1450    pub fn get_full_db_path(&self) -> Result<String, LsmErrorCode> {
1451        if !self.initialized {
1452            return Err(LsmErrorCode::LsmMisuse);
1453        }
1454        Ok(String::from_utf8_lossy(self.db_fq_name.as_bytes()).to_string())
1455    }
1456
1457    /// This function outputs the compression id of the database. The only possible
1458    /// error is [`LsmErrorCode::LsmMismatch`] which means that records of the database
1459    /// have been compressed with a unknown library. At this point there is not much
1460    /// to do, and this error should be considered unrecoverable. That is, the database
1461    /// can be considered corrupted, and its data is most probably lost.
1462    pub fn get_compression_id(&self) -> Result<LsmCompressionLib, LsmErrorCode> {
1463        if !self.initialized || !self.connected {
1464            return Err(LsmErrorCode::LsmMisuse);
1465        }
1466
1467        let compression_id: i32 = -1;
1468        unsafe {
1469            let _ = lsm_info(
1470                self.db_handle,
1471                LsmInfo::LsmCompressionId as i32,
1472                &compression_id,
1473            );
1474        }
1475        LsmCompressionLib::try_from(compression_id)
1476    }
1477}
1478
1479/// A default database. This database is not useful without
1480/// getting first initialized using [`Disk::initialize`]. The purpose
1481/// of this method is to simply zero all attributes of [`LsmDb`].
1482impl Default for LsmDb {
1483    fn default() -> Self {
1484        Self {
1485            db_env: null_mut(),
1486            db_handle: null_mut(),
1487            db_compress: None,
1488            db_fq_name: Default::default(),
1489            db_conf: Default::default(),
1490            db_bg_threads: Default::default(),
1491            initialized: false,
1492            connected: false,
1493        }
1494    }
1495}
1496
1497/// A default cursor. This cursor is not useful by itself as it is not
1498/// bound to any database. This construction is provided to be used in cases
1499/// in which a cursor needs to be declared ahead of time, only to be later
1500/// assigned a cursor bound to a database.
1501impl Default for LsmCursor<'_> {
1502    fn default() -> Self {
1503        Self {
1504            db_cursor: null_mut(),
1505            _marker: Default::default(),
1506        }
1507    }
1508}
1509
1510/// Drop for [`LsmDb`] so that it gets properly terminated when it goes out of scope for example.
1511impl Drop for LsmDb {
1512    fn drop(&mut self) {
1513        // Now we close the data file so that it does not require recovery next time we open it.
1514        // We might fail thou. If we fail, for whatever reason, the handle will be leaked, but since
1515        // this is `Drop`, we have no way to signal this to the upper layer.
1516        let rc = self.disconnect();
1517        if rc == Err(LsmErrorCode::LsmMisuse) {
1518            tracing::warn!(
1519                ?rc,
1520                "Database could not be closed. Most probably there are still cursors accessing it. \
1521                A recovery procedure will be required next time the database is accessed. \
1522                DB handle has not been destroyed and is still fully functional.",
1523            );
1524        } else if rc != Ok(()) {
1525            tracing::error!(
1526                ?rc,
1527                "Database could not be closed. Unexpected error happened. \
1528                Resources occupied by the handle will be most probably leaked.",
1529            );
1530        }
1531    }
1532}
1533
1534/// Drop for `LsmCursor` so that it gets properly terminated when it goes out of scope for example.
1535impl Drop for LsmCursor<'_> {
1536    fn drop(&mut self) {
1537        // We simply close the cursor (thus releasing resources occupied by it like
1538        // snapshot(s) and memory.
1539        let _ = self.close();
1540    }
1541}
1542
1543/// A database handle is marked as [`Send`] as it can be safely sent to another
1544/// thread (for further usage), for example in async code.
1545unsafe impl Send for LsmDb {}
1546/// For convenience a database handle is marked as [`Sync`]. This is not because the same
1547/// handle can be safely shared among threads, but because in this manner a handle can be
1548/// wrapped in a [`std::sync::RwLock`] and be shared among threads safely as it captures
1549/// the single-writer, multiple-reader nature of a [`LsmDb`]. In this manner, multiple
1550/// cursors may be opened through the same handle, while writes through the handle
1551/// are exclusive (serialized).
1552unsafe impl Sync for LsmDb {}