lsmlite_rs/lsmdb.rs
1// Copyright 2023 Helsing GmbH
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::cmp::Ordering;
15use std::convert::TryFrom;
16use std::ffi::CString;
17use std::os::raw::c_char;
18use std::ptr::null_mut;
19use std::thread::park_timeout;
20use std::time::{Duration, Instant};
21
22use crate::compression::lz4::LsmLz4;
23use crate::compression::zlib::LsmZLib;
24use crate::compression::zstd::LsmZStd;
25use crate::compression::Compression;
26use crate::threads::NUM_MERGE_SEGMENTS;
27use crate::{
28 lsm_cursor, lsm_db, lsm_env, Cursor, DbConf, Disk, LsmBgWorkerMessage, LsmBgWorkers,
29 LsmCompressionLib, LsmCursor, LsmCursorSeekOp, LsmDb, LsmErrorCode, LsmHandleMode, LsmInfo,
30 LsmMode, LsmParam, LsmSafety,
31};
32
33// This is the amount of time a writer sleeps while a background worker does some work.
34// It is only relevant when background threads are spawn.
35const WRITER_PARK_TIME_MS: u64 = 1; // milliseconds.
36
37// This is the hard main memory limit we keep for the main memory component of the LSM
38// (a main-memory b-tree).
39pub(crate) const SIZE_MAIN_MEMORY_TREE_KB: i32 = 16 << 10; // X KiBs * 1024 = X MiB
40
41// These are lower and upper bounds on the amount of unchecked data that we can bear
42// to lose. Observe that there is also the main memory component that needs to be
43// flushed. Thus, at any point in time, there is at most
44// (2 * SIZE_MAIN_MEMORY_TREE_KB) + MAX_CHECKPOINT_SIZE_KB worth of data at risk.
45pub(crate) const MIN_CHECKPOINT_SIZE_KB: i32 = 2 << 10; // X KiBs * 1024 = X MiB
46pub(crate) const MAX_CHECKPOINT_SIZE_KB: i32 = 2 * MIN_CHECKPOINT_SIZE_KB;
47
48// Block size of the database (amount of data to be written as a unit).
49pub(crate) const BLOCK_SIZE_KB: i32 = 8 << 10;
50// Page size of the database (unit of bytes into which blocks are divided).
51pub(crate) const PAGE_SIZE_B: i32 = 4 << 10;
52
53// These functions translate to internal LSM functions. Thus the signatures have
54// to match. Observe that we treat LSM's types as opaque, and thus they are passed
55// around as memory references that are fully visible inside LSM, but not so
56// outside of it.
57extern "C" {
58 // These functions are the ones we need in other files (like threads.rs).
59 pub(crate) fn lsm_info(db: *mut lsm_db, e_conf: i32, ...) -> i32;
60 pub(crate) fn lsm_config(db: *mut lsm_db, e_param: i32, ...) -> i32;
61 pub(crate) fn lsm_work(db: *mut lsm_db, n_segments: i32, n_kb: i32, p_nwrite: *mut i32) -> i32;
62 pub(crate) fn lsm_checkpoint(db: *mut lsm_db, p_n_kb: *mut i32) -> i32;
63 pub(crate) fn lsm_new(env: *mut lsm_env, db: *mut *mut lsm_db) -> i32;
64 pub(crate) fn lsm_open(db: *mut lsm_db, file_name: *const c_char) -> i32;
65 pub(crate) fn lsm_close(db: *mut lsm_db) -> i32;
66
67 // These functions are private to this file.
68 fn lsm_insert(
69 db: *mut lsm_db,
70 p_key: *const u8,
71 n_key: i32,
72 p_val: *const u8,
73 n_val: i32,
74 ) -> i32;
75 fn lsm_delete(db: *mut lsm_db, p_key: *const u8, n_key: i32) -> i32;
76 fn lsm_delete_range(
77 db: *mut lsm_db,
78 p_key1: *const u8,
79 n_key1: i32,
80 p_key2: *const u8,
81 n_key2: i32,
82 ) -> i32;
83 fn lsm_begin(db: *mut lsm_db, level: i32) -> i32;
84 fn lsm_commit(db: *mut lsm_db, level: i32) -> i32;
85 fn lsm_rollback(db: *mut lsm_db, level: i32) -> i32;
86 fn lsm_csr_open(db: *mut lsm_db, cursor: *const *mut lsm_cursor) -> i32;
87 fn lsm_csr_close(cursor: *mut lsm_cursor) -> i32;
88 fn lsm_csr_first(cursor: *mut lsm_cursor) -> i32;
89 fn lsm_csr_seek(cursor: *mut lsm_cursor, p_key: *const u8, n_key: i32, e_seek: i32) -> i32;
90 fn lsm_csr_last(cursor: *mut lsm_cursor) -> i32;
91 fn lsm_csr_next(cursor: *mut lsm_cursor) -> i32;
92 fn lsm_csr_prev(cursor: *mut lsm_cursor) -> i32;
93 fn lsm_csr_valid(cursor: *mut lsm_cursor) -> i32;
94 fn lsm_csr_key(cursor: *mut lsm_cursor, pp_key: *const *mut u8, pn_key: *mut i32) -> i32; // # spellchecker:disable-line
95 fn lsm_csr_value(cursor: *mut lsm_cursor, pp_val: *const *mut u8, pn_val: *mut i32) -> i32; // # spellchecker:disable-line
96 fn lsm_csr_cmp(cursor: *mut lsm_cursor, p_key: *const u8, n_key: i32, pi_res: *mut i32) -> i32;
97 #[allow(dead_code)]
98 fn lsm_free(env: *mut lsm_env, ptr: *mut c_char);
99}
100
101/// Custom implementation of [`Disk`] for [`LsmDb`].
102impl Disk for LsmDb {
103 type C<'a> = LsmCursor<'a>;
104 /// This function sets up general variables about the database. Initializing
105 /// a handle more than once is considered [`LsmErrorCode::LsmMisuse`].
106 ///
107 /// # Example
108 ///
109 /// ```rust
110 /// use lsmlite_rs::*;
111 ///
112 /// let db_conf = DbConf::new(
113 /// "/tmp/",
114 /// "my_db_b".to_string(),
115 /// );
116 ///
117 /// let mut db: LsmDb = Default::default();
118 /// assert!(!db.is_initialized());
119 ///
120 /// let rc = db.initialize(db_conf.clone());
121 /// assert_eq!(rc, Ok(()));
122 /// assert!(db.is_initialized());
123 ///
124 /// let rc = db.initialize(db_conf);
125 /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
126 /// ```
127 fn initialize(&mut self, conf: DbConf) -> Result<(), LsmErrorCode> {
128 if self.initialized {
129 // If the database handle has already been initialized, we signal
130 // trying to initialize it again as an error.
131 return Err(LsmErrorCode::LsmMisuse);
132 }
133 self.db_conf = conf;
134
135 self.db_env = null_mut();
136 self.db_handle = null_mut();
137
138 // This is the fully-qualified name of the database.
139 let db_fq_name = format!(
140 "{}/{}.lsm",
141 self.db_conf.db_path.display(),
142 self.db_conf.db_base_name
143 );
144 // This is the c-string version of the name of the database.
145 self.db_fq_name = CString::new(db_fq_name).map_err(|e| {
146 tracing::error!(?e, "Name of the data base is not a valid c-string.");
147 LsmErrorCode::LsmError
148 })?;
149
150 self.initialized = true;
151
152 Ok(())
153 }
154
155 /// This method produces a main-memory handle to connect to the database. At this point
156 /// the database file is created at the given path, and upon success, the database can be
157 /// operated using any other available method for it.
158 ///
159 /// Connecting to a database using the same handle more than once, or connecting using
160 /// an uninitialized handle, is considered [`LsmErrorCode::LsmMisuse`].
161 ///
162 /// # Example
163 ///
164 /// ```rust
165 /// use lsmlite_rs::*;
166 ///
167 /// let db_conf = DbConf::new(
168 /// "/tmp/",
169 /// "my_db_c".to_string(),
170 /// );
171 ///
172 /// let mut db: LsmDb = Default::default();
173 ///
174 /// let rc = db.connect();
175 /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
176 ///
177 /// let rc = db.initialize(db_conf);
178 /// let rc = db.connect();
179 /// assert_eq!(rc, Ok(()));
180 /// assert!(db.is_connected());
181 ///
182 /// let rc = db.connect();
183 /// assert_eq!(rc, Err(LsmErrorCode::LsmMisuse));
184 /// ```
185 fn connect(&mut self) -> Result<(), LsmErrorCode> {
186 if !self.initialized || self.connected {
187 return Err(LsmErrorCode::LsmMisuse);
188 }
189
190 // Database has been initialized, thus we can proceed.
191 let mut rc: i32;
192 let mut db_handle = null_mut();
193 unsafe {
194 // Get a new handle to connect to the database on disk.
195 rc = lsm_new(null_mut(), &mut db_handle);
196 }
197
198 if rc != 0 {
199 // The only error that can occur is memory allocation. Thus,
200 // if we fail while allocating, we do not have to deallocate anything.
201 return Err(LsmErrorCode::try_from(rc)?);
202 }
203
204 self.db_handle = db_handle;
205
206 // In here we configure parameters of the database. These parameters
207 // are a good approximation in general. On bigger machines we would
208 // consume more main-memory. We differentiate between opening in read-only mode
209 // or not, as in read-only we need no extra threads or write-oriented parameters.
210 unsafe {
211 // These parameters are independent of the mode we open the file in.
212
213 // Disable multi-process support to improve performance
214 // (no OS advisory locks are used to synchronize access
215 // to the database file).
216 let multi_process: i32 = 0;
217 rc = lsm_config(
218 self.db_handle,
219 LsmParam::MultipleProcesses as i32,
220 &multi_process,
221 );
222
223 if rc != 0 {
224 self.disconnect()?;
225 return Err(LsmErrorCode::try_from(rc)?);
226 }
227
228 // These are our default parameters of any handle (whether it writes
229 // to the database or not).
230
231 // Maximum size of a main-memory tree before it can be marked as old.
232 let autoflush: i32 = SIZE_MAIN_MEMORY_TREE_KB;
233 rc = lsm_config(self.db_handle, LsmParam::AutoFlush as i32, &autoflush);
234
235 if rc != 0 {
236 self.disconnect()?;
237 return Err(LsmErrorCode::try_from(rc)?);
238 }
239
240 rc = lsm_config(self.db_handle, LsmParam::PageSize as i32, &PAGE_SIZE_B);
241
242 if rc != 0 {
243 self.disconnect()?;
244 return Err(LsmErrorCode::try_from(rc)?);
245 }
246
247 rc = lsm_config(self.db_handle, LsmParam::BlockSize as i32, &BLOCK_SIZE_KB);
248
249 if rc != 0 {
250 self.disconnect()?;
251 return Err(LsmErrorCode::try_from(rc)?);
252 }
253
254 // How much of the file is kept in memory.
255 let mmap_size: i32 = 0;
256 rc = lsm_config(self.db_handle, LsmParam::Mmap as i32, &mmap_size);
257
258 if rc != 0 {
259 self.disconnect()?;
260 return Err(LsmErrorCode::try_from(rc)?);
261 }
262
263 let safety: i32 = LsmSafety::Normal as i32;
264 rc = lsm_config(self.db_handle, LsmParam::Safety as i32, &safety);
265
266 if rc != 0 {
267 self.disconnect()?;
268 return Err(LsmErrorCode::try_from(rc)?);
269 }
270
271 if self.db_conf.handle_mode == LsmHandleMode::ReadOnly {
272 // Here are parameters set that are only relevant in read-only mode.
273 // Observe that this overwrites the mode the handle operates in,
274 // as only reads are performed.
275 let read_only_int = 1;
276 rc = lsm_config(self.db_handle, LsmParam::ReadOnly as i32, &read_only_int);
277
278 if rc != 0 {
279 self.disconnect()?;
280 return Err(LsmErrorCode::try_from(rc)?);
281 }
282
283 // When opening in read-only mode, we ignore the mode provided by the user
284 // as no background threads (operating on the file) are needed.
285 self.db_conf.mode = LsmMode::LsmNoBackgroundThreads;
286 }
287
288 // If we are instructed to configure compression, we do it now. Setting the
289 // compression can be done only when the database was created, once set,
290 // trying to do it again or not setting it to the same compression scheme
291 // will be considered an error (LsmErrorCode::LsmMismatch).
292 match self.db_conf.compression {
293 LsmCompressionLib::NoCompression => {}
294 LsmCompressionLib::LZ4 => {
295 let lz4 = LsmLz4::new();
296 self.db_compress = lz4.get_compression_methods().ok();
297 }
298 LsmCompressionLib::ZLib => {
299 let zlib = LsmZLib::new();
300 self.db_compress = zlib.get_compression_methods().ok();
301 }
302 LsmCompressionLib::ZStd => {
303 let zstd = LsmZStd::new();
304 self.db_compress = zstd.get_compression_methods().ok();
305 }
306 };
307
308 // Only if the compression library is defined we pass it onto
309 // the engine. Otherwise no compression whatsoever.
310 if let Some(lsm_compress) = self.db_compress.as_ref() {
311 rc = lsm_config(
312 self.db_handle,
313 LsmParam::SetCompression as i32,
314 lsm_compress,
315 );
316
317 if rc != 0 {
318 self.disconnect()?;
319 return Err(LsmErrorCode::try_from(rc)?);
320 }
321 }
322
323 rc = lsm_open(self.db_handle, self.db_fq_name.as_ptr());
324
325 if rc != 0 {
326 self.disconnect()?;
327 return Err(LsmErrorCode::try_from(rc)?);
328 }
329
330 self.connected = true;
331
332 // Whether we spawn background threads is at this point properly set.
333 // Currently we spawn only one background thread at most, and thus its
334 // id is set to 0. This has to be executed after we have connected
335 // to the database.
336 self.configure_bg_threads(self.db_conf.mode, 0)?;
337 }
338
339 // We output the current parameters of the writer.
340 unsafe {
341 let auto_flush: i32 = -1;
342 let _ = lsm_config(self.db_handle, LsmParam::AutoFlush as i32, &auto_flush);
343
344 let page_size_b: i32 = -1;
345 let _ = lsm_config(self.db_handle, LsmParam::PageSize as i32, &page_size_b);
346
347 let block_size_kb: i32 = -1;
348 let _ = lsm_config(self.db_handle, LsmParam::BlockSize as i32, &block_size_kb);
349
350 let auto_checkpoint_kb: i32 = -1;
351 let _ = lsm_config(
352 self.db_handle,
353 LsmParam::AutoCheckPoint as i32,
354 &auto_checkpoint_kb,
355 );
356
357 let auto_work: i32 = -1;
358 let _ = lsm_config(self.db_handle, LsmParam::AutoWork as i32, &auto_work);
359
360 let multi_process: i32 = -1;
361 let _ = lsm_config(
362 self.db_handle,
363 LsmParam::MultipleProcesses as i32,
364 &multi_process,
365 );
366
367 let read_only: i32 = -1;
368 let _ = lsm_config(self.db_handle, LsmParam::ReadOnly as i32, &read_only);
369
370 let mmap_size: i32 = -1;
371 let _ = lsm_config(self.db_handle, LsmParam::Mmap as i32, &mmap_size);
372
373 let safety: i32 = -1;
374 let _ = lsm_config(self.db_handle, LsmParam::Safety as i32, &safety);
375
376 tracing::info!(
377 auto_flush = format!("{auto_flush} KBs"),
378 page_size = format!("{page_size_b} Bs"),
379 block_size = format!("{block_size_kb} KBs"),
380 auto_checkpoint = format!("{auto_checkpoint_kb} KBs"),
381 auto_work = if auto_work != 0 { "yes" } else { "no" },
382 multi_process = if multi_process != 0 { "yes" } else { "no" },
383 read_only = if read_only != 0 { "yes" } else { "no" },
384 background_threads = if self.db_conf.mode != LsmMode::LsmNoBackgroundThreads {
385 "yes"
386 } else {
387 "no"
388 },
389 mmap_overhead = format!("{mmap_size} KBs"),
390 compression = ?self.db_conf.compression,
391 safety = if safety == 0 { "None" } else if safety == 1 { "Normal" } else { "Full" },
392 "lsmlite-rs parameters.",
393 );
394 }
395
396 // If we get through, then everything is fine.
397 Ok(())
398 }
399
400 /// This method frees up all the resources used by the main-memory handle. A call
401 /// to `connect` has to have the corresponding call to `disconnect`, otherwise:
402 /// 1. The resources that belong to the memory handle will be leaked.
403 /// 2. The database (file) won't be closed and the next time we open it the recovery
404 /// process will kick-in (which can take considerable time. Depending on the
405 /// size of the log).
406 ///
407 /// For completeness, [`LsmDb`] also implements [`Drop`] so that a handle
408 /// gets automatically released once it goes out of scope.
409 ///
410 /// Disconnecting using an uninitialized handle, or a handle that is not yet connected
411 /// is not considered an error.
412 ///
413 /// # Example
414 ///
415 /// ```rust
416 /// use lsmlite_rs::*;
417 ///
418 /// let db_conf = DbConf::new(
419 /// "/tmp/",
420 /// "my_db_d".to_string(),
421 /// );
422 ///
423 /// let mut db: LsmDb = Default::default();
424 /// let rc = db.disconnect();
425 /// assert_eq!(rc, Ok(()));
426 ///
427 /// let rc = db.initialize(db_conf);
428 /// let rc = db.connect();
429 /// let rc = db.disconnect();
430 /// assert_eq!(rc, Ok(()));
431 /// assert!(!db.is_connected());
432 /// ```
433 fn disconnect(&mut self) -> Result<(), LsmErrorCode> {
434 if !self.initialized || !self.connected {
435 return Ok(());
436 }
437
438 // First, we explicitly shutdown all background threads (it might take some time
439 // depending on what they are doing). In this manner, they will all flush their
440 // data to disk.
441 if self.db_conf.mode != LsmMode::LsmNoBackgroundThreads {
442 self.db_bg_threads.shutdown();
443 }
444
445 // We now proceed to close the database and destroy all allocated resources of the handle.
446 let rc: i32;
447 unsafe {
448 rc = lsm_close(self.db_handle);
449 }
450 if rc != 0 {
451 return Err(LsmErrorCode::try_from(rc)?);
452 }
453
454 // We reset the pointer once we know we were able to cleanly close the database.
455 self.db_handle = null_mut();
456 self.connected = false;
457
458 // If we get through, then everything is fine.
459 Ok(())
460 }
461
462 /// This function writes the given entry on the database file in a transactional
463 /// manner. That is, it either writes it completely, or not, but it leaves the
464 /// database in no inconsistent state.
465 ///
466 /// Trying to persist data using an uninitialized handle, or one that is not yet
467 /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
468 ///
469 /// # Example
470 ///
471 /// ```rust
472 /// use lsmlite_rs::*;
473 /// use rand::{thread_rng, Rng};
474 ///
475 /// let db_conf = DbConf::new(
476 /// "/tmp/",
477 /// "my_db_e".to_string(),
478 /// );
479 ///
480 /// let mut db: LsmDb = Default::default();
481 /// let rc = db.initialize(db_conf);
482 /// let rc = db.connect();
483 ///
484 /// let mut prng = thread_rng();
485 /// // 16-byte random key (not very useful in practice).
486 /// let key: Vec<u8> = (0..16).map(|_| prng.gen_range(0..=255)).collect();
487 /// // 1 KB zeroed payload.
488 /// let value = vec![0; 1024];
489 ///
490 /// let rc = db.persist(&key, &value);
491 /// assert_eq!(rc, Ok(()));
492 ///
493 /// // This is also possible (would overwrite the entry)
494 /// let rc = Disk::persist(&mut db, &key, &value);
495 /// assert_eq!(rc, Ok(()));
496 ///
497 /// let rc = db.disconnect();
498 /// ```
499 fn persist(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode> {
500 if !self.initialized || !self.connected {
501 return Err(LsmErrorCode::LsmMisuse);
502 }
503
504 let start = Instant::now();
505 let serial_key = key;
506 let serial_key_len = serial_key.len() as i32;
507 let serial_blob = value;
508 let serial_blob_len = serial_blob.len() as i32;
509 let rc: i32;
510
511 unsafe {
512 // If we have background threads, we have to synchronize to avoid running
513 // out of memory. That is, we cannot write to the database until we make sure that
514 // we haven't exceeded the resources we are told (main memory for example).
515 self.deal_with_bg_threads()?;
516
517 rc = lsm_insert(
518 self.db_handle,
519 serial_key.as_ptr(),
520 serial_key_len,
521 serial_blob.as_ptr(),
522 serial_blob_len,
523 );
524 if rc != 0 {
525 return Err(LsmErrorCode::try_from(rc)?);
526 }
527 }
528
529 let current_request_duration = Instant::now()
530 .checked_duration_since(start)
531 .unwrap_or_default();
532 match &self.db_conf.metrics {
533 None => {}
534 Some(metrics) => metrics
535 .write_times_s
536 .observe(current_request_duration.as_secs_f64()),
537 }
538 Ok(())
539 }
540
541 /// This function is just sugar. The database file can be considered a primary
542 /// index in which only one entry under a given key can exist. If another entry
543 /// with an existing key is persisted, it overwrites the existing one.
544 fn update(&mut self, key: &[u8], value: &[u8]) -> Result<(), LsmErrorCode> {
545 Disk::persist(self, key, value)
546 }
547
548 /// This deletes the entry under the given key (in a transactional manner).
549 ///
550 /// Trying to delete data using an uninitialized handle, or one that is not yet
551 /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
552 ///
553 /// # Example
554 ///
555 /// ```rust
556 /// use lsmlite_rs::*;
557 /// use rand::{thread_rng, Rng};
558 ///
559 /// let db_conf = DbConf::new(
560 /// "/tmp/",
561 /// "my_db_f".to_string(),
562 /// );
563 ///
564 /// let mut db: LsmDb = Default::default();
565 /// let rc = db.initialize(db_conf);
566 /// let rc = db.connect();
567 ///
568 /// let mut prng = thread_rng();
569 /// // 16-byte random key (not very useful in practice).
570 /// let key: Vec<u8> = (0..16).map(|_| prng.gen_range(0..=255)).collect();
571 /// // 1 KB zeroed payload.
572 /// let value = vec![0; 1024];
573 ///
574 /// let rc = db.persist(&key, &value);
575 /// assert_eq!(rc, Ok(()));
576 ///
577 /// // Entry under `key` will disappear.
578 /// let rc = db.delete(&key);
579 /// assert_eq!(rc, Ok(()));
580 ///
581 /// let rc = db.disconnect();
582 /// ```
583 fn delete(&mut self, key: &[u8]) -> Result<(), LsmErrorCode> {
584 if !self.initialized || !self.connected {
585 return Err(LsmErrorCode::LsmMisuse);
586 }
587
588 let rc: i32;
589 let key_len = key.len();
590 let key_ptr = key.as_ptr();
591 unsafe { rc = lsm_delete(self.db_handle, key_ptr, key_len as i32) }
592 if rc != 0 {
593 return Err(LsmErrorCode::try_from(rc)?);
594 }
595
596 Ok(())
597 }
598
599 /// This function deletes the open interval of keys (being, end) (in a transactional
600 /// manner as well).
601 fn delete_range(&mut self, begin: &[u8], end: &[u8]) -> Result<(), LsmErrorCode> {
602 if !self.initialized || !self.connected {
603 return Err(LsmErrorCode::LsmMisuse);
604 }
605
606 let rc: i32;
607 let starting_key_len = begin.len();
608 let starting_key_ptr = begin.as_ptr();
609 let ending_key_len = end.len();
610 let ending_key_ptr = end.as_ptr();
611 unsafe {
612 rc = lsm_delete_range(
613 self.db_handle,
614 starting_key_ptr,
615 starting_key_len as i32,
616 ending_key_ptr,
617 ending_key_len as i32,
618 )
619 }
620 if rc != 0 {
621 return Err(LsmErrorCode::try_from(rc)?);
622 }
623
624 Ok(())
625 }
626
627 /// This function optimizes a database to make it occupy as little space as possible.
628 /// Essentially, this function compacts the whole database into a single tightly-packed
629 /// B-tree: Thus, read I/O is optimized.
630 /// This function is thought to be used in an offline fashion - once
631 /// all writers have finished with the database.
632 fn optimize(&mut self) -> Result<(), LsmErrorCode> {
633 if !self.initialized || !self.connected {
634 return Err(LsmErrorCode::LsmMisuse);
635 }
636
637 let rc: i32;
638 unsafe {
639 // Let's work all the way through.
640 rc = lsm_work(self.db_handle, 1, -1, null_mut());
641
642 // Anything different than ok (0), or busy (5) is wrong!
643 if rc != 0 && rc != 5 {
644 lsm_close(self.db_handle);
645 let ec = LsmErrorCode::try_from(rc)?;
646 tracing::error!(
647 datafile = self.get_full_db_path()?,
648 rc = ?ec,
649 "Error occurred while working on the datafile. Exiting background thread.",
650 );
651 }
652 }
653
654 Ok(())
655 }
656
657 /// This function opens a transaction explicitly. All operations contained between
658 /// opening a transaction and committing it using [`Disk::commit_transaction`] will be
659 /// performed atomically. Similarly, if the transaction is explicitly rolled back using
660 /// [`Disk::rollback_transaction`], all enclosed operations will not be persistent.
661 /// Observe that every database operation is contained in an implicit transaction. This
662 /// function is thought to encapsulate multiple operations into a single transaction.
663 fn begin_transaction(&mut self) -> Result<(), LsmErrorCode> {
664 if !self.initialized || !self.connected {
665 return Err(LsmErrorCode::LsmMisuse);
666 }
667
668 // TODO: For the time being we accept only a single
669 // transaction, no nested ones.
670 let rc: i32;
671 unsafe {
672 rc = lsm_begin(self.db_handle, 1);
673 }
674
675 // A transaction is easy to deal with, it's either successfully
676 // opened or not.
677 if rc != 0 {
678 return Err(LsmErrorCode::try_from(rc)?);
679 }
680
681 Ok(())
682 }
683
684 /// This function commits an opened transaction. Without committing a transaction,
685 /// all enclosed operations will remain hidden from the consistent state of the database.
686 fn commit_transaction(&mut self) -> Result<(), LsmErrorCode> {
687 if !self.initialized || !self.connected {
688 return Err(LsmErrorCode::LsmMisuse);
689 }
690
691 // TODO: For the time being we do not support nested
692 // transactions, but we try to commit all open ones anyway.
693 let rc: i32;
694 unsafe {
695 rc = lsm_commit(self.db_handle, 0);
696 }
697
698 // The commit either succeeds or not.
699 if rc != 0 {
700 return Err(LsmErrorCode::try_from(rc)?);
701 }
702
703 Ok(())
704 }
705
706 /// This function rollbacks an opened transaction explicitly. All enclosed
707 /// operations will remain hidden from the consistent state of the database.
708 fn rollback_transaction(&mut self) -> Result<(), LsmErrorCode> {
709 if !self.initialized || !self.connected {
710 return Err(LsmErrorCode::LsmMisuse);
711 }
712
713 // TODO: For the time being we do not support nested
714 // transactions, thus we simply rollback and close the top-level
715 // transaction.
716 let rc: i32;
717 unsafe {
718 rc = lsm_rollback(self.db_handle, 0);
719 }
720
721 // The rollback should succeed, otherwise there are pretty bad
722 // issues down the pipeline.
723 if rc != 0 {
724 return Err(LsmErrorCode::try_from(rc)?);
725 }
726
727 Ok(())
728 }
729
730 /// This function returns a cursor to the underlying database.
731 /// This cursor can be operated by the methods provided by the [`Cursor`] trait.
732 /// When opening a cursor, a snapshot of the database will be created for it. No
733 /// new data arriving after the cursor has been created will be visible to the
734 /// cursor. A cursor is used to performed read-only operations over the database.
735 /// That is, no data of the database can be modified through a cursor.
736 ///
737 /// Trying to open a cursor using a uninitialized handle, or one that is not yet
738 /// connected to a database, is considered [`LsmErrorCode::LsmMisuse`].
739 ///
740 /// # Example
741 ///
742 /// ```rust
743 /// use lsmlite_rs::*;
744 ///
745 /// let db_conf = DbConf::new(
746 /// "/tmp/",
747 /// "my_db_g".to_string(),
748 /// );
749 ///
750 /// let mut db: LsmDb = Default::default();
751 /// let rc = db.initialize(db_conf);
752 /// let rc = db.connect();
753 ///
754 /// // Opening a cursor for `db`. This cursor is currently
755 /// // not positioned anywhere, and thus no data can be extracted
756 /// // from it.
757 /// let cursor = db.cursor_open();
758 /// assert!(cursor.is_ok());
759 /// ```
760 fn cursor_open(&self) -> Result<LsmCursor, LsmErrorCode> {
761 if !self.initialized || !self.connected {
762 return Err(LsmErrorCode::LsmMisuse);
763 }
764
765 let cursor: *mut lsm_cursor = null_mut();
766 let rc: i32;
767 unsafe {
768 rc = lsm_csr_open(self.db_handle, &cursor);
769 }
770 if rc != 0 {
771 return Err(LsmErrorCode::try_from(rc)?);
772 }
773 Ok(LsmCursor {
774 db_cursor: cursor,
775 _marker: Default::default(),
776 })
777 }
778}
779
780/// Custom implementation of [`Cursor`] for [`LsmCursor`].
781impl Cursor for LsmCursor<'_> {
782 /// This function closes an existing cursor over the underlying
783 /// database. A call to [`Disk::cursor_open`] must be paired up (in the end)
784 /// with a call to [`Cursor::close`]. Otherwise the database won't be cleanly
785 /// closed (due to the snapshot that belongs to the cursor), and
786 /// a recovery process will be spawn the next time the database
787 /// file is opened.
788 ///
789 /// For completeness, [`LsmCursor`] also implements [`Drop`] so that a cursor
790 /// gets automatically released once it goes out of scope (thus releasing
791 /// resources).
792 ///
793 /// Closing an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
794 fn close(&mut self) -> Result<(), LsmErrorCode> {
795 if self.db_cursor.is_null() {
796 return Err(LsmErrorCode::LsmMisuse);
797 }
798
799 let rc: i32;
800 // Free resources.
801 unsafe {
802 // This call is infallible, it always returns Ok.
803 rc = lsm_csr_close(self.db_cursor);
804 // From here on, this cursor is not useful any longer.
805 self.db_cursor = null_mut();
806 }
807 if rc != 0 {
808 return Err(LsmErrorCode::try_from(rc)?);
809 }
810
811 Ok(())
812 }
813
814 /// Tests whether the cursor is currently pointing to a valid database record.
815 /// When operating a cursor, this function has to be called before extracting
816 /// records from it (key and/or value) to make sure that the values can be
817 /// trusted. That is, a cursor might internally retain the last output value
818 /// for a while after it has become invalid (say moved past the end of the
819 /// database), or empty values can be extracted from it before positioning
820 /// the cursor on a valid record.
821 ///
822 /// Testing for validity of an uninitialized uninitialized [`LsmCursor`] is
823 /// considered [`LsmErrorCode::LsmMisuse`].
824 fn valid(&self) -> Result<(), LsmErrorCode> {
825 if self.db_cursor.is_null() {
826 return Err(LsmErrorCode::LsmMisuse);
827 }
828
829 let mut rc: i32;
830 unsafe {
831 rc = lsm_csr_valid(self.db_cursor);
832 // Internally, lsm_csr_valid returns true (== 1) when valid,
833 // but LsmOk == 0, thus we exchange the value to represent
834 // true as LsmOk, and false as LsmError.
835 rc = 1 - rc;
836 }
837 if rc != 0 {
838 return Err(LsmErrorCode::try_from(rc)?);
839 }
840
841 Ok(())
842 }
843
844 /// Moves the cursor to the very first record in the database.
845 /// Positioning an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
846 ///
847 /// # Example
848 ///
849 /// ```rust
850 /// use lsmlite_rs::*;
851 ///
852 /// let db_conf = DbConf::new(
853 /// "/tmp/",
854 /// "my_db_h".to_string(),
855 /// );
856 ///
857 /// let mut db: LsmDb = Default::default();
858 /// let rc = db.initialize(db_conf);
859 /// let rc = db.connect();
860 ///
861 /// // Insert data into the database, so that something gets traversed.
862 /// let key: usize = 1;
863 /// let key_serial = key.to_be_bytes();
864 /// // 1 KB zeroed payload.
865 /// let value = vec![0; 1024];
866 /// let rc = db.persist(&key_serial, &value)?;
867 ///
868 /// let key: usize = 2;
869 /// let key_serial = key.to_be_bytes();
870 /// let rc = db.persist(&key_serial, &value)?;
871 ///
872 /// let mut cursor = db.cursor_open()?;
873 ///
874 /// let rc = cursor.first();
875 /// assert!(rc.is_ok());
876 ///
877 /// let mut num_records = 0;
878 /// while cursor.valid().is_ok() {
879 /// num_records += 1;
880 /// let current_key = Cursor::get_key(&cursor)?;
881 /// let current_value = Cursor::get_value(&cursor)?;
882 /// cursor.next()?;
883 /// }
884 /// assert_eq!(num_records, 2);
885 ///
886 /// // EOF
887 /// assert!(cursor.valid().is_err());
888 ///
889 /// # Result::<(), LsmErrorCode>::Ok(())
890 /// ```
891 fn first(&mut self) -> Result<(), LsmErrorCode> {
892 if self.db_cursor.is_null() {
893 return Err(LsmErrorCode::LsmMisuse);
894 }
895
896 let rc: i32;
897 unsafe {
898 rc = lsm_csr_first(self.db_cursor);
899 }
900 if rc != 0 {
901 return Err(LsmErrorCode::try_from(rc)?);
902 }
903
904 Ok(())
905 }
906
907 /// Moves the cursor to the very last record in the database.
908 /// Positioning an uninitialized [`LsmCursor`] is considered [`LsmErrorCode::LsmMisuse`].
909 /// # Example
910 ///
911 /// ```rust
912 /// use lsmlite_rs::*;
913 ///
914 /// let db_conf = DbConf::new(
915 /// "/tmp/",
916 /// "my_db_i".to_string(),
917 /// );
918 ///
919 /// let mut db: LsmDb = Default::default();
920 /// let rc = db.initialize(db_conf);
921 /// let rc = db.connect();
922 ///
923 /// // Insert data into the database, so that something gets traversed.
924 /// let key: usize = 1;
925 /// let key_serial = key.to_be_bytes();
926 /// // 1 KB zeroed payload.
927 /// let value = vec![0; 1024];
928 /// let rc = db.persist(&key_serial, &value)?;
929 ///
930 /// let key: usize = 2;
931 /// let key_serial = key.to_be_bytes();
932 /// let rc = db.persist(&key_serial, &value)?;
933 ///
934 /// let mut cursor = db.cursor_open()?;
935 ///
936 /// let rc = cursor.last();
937 /// assert!(rc.is_ok());
938 ///
939 /// let mut num_records = 0;
940 /// while cursor.valid().is_ok() {
941 /// num_records += 1;
942 /// let current_key = Cursor::get_key(&cursor)?;
943 /// let current_value = Cursor::get_value(&cursor)?;
944 /// cursor.prev()?;
945 /// }
946 /// assert_eq!(num_records, 2);
947 ///
948 /// // EOF
949 /// assert!(cursor.valid().is_err());
950 ///
951 /// # Result::<(), LsmErrorCode>::Ok(())
952 /// ```
953 fn last(&mut self) -> Result<(), LsmErrorCode> {
954 if self.db_cursor.is_null() {
955 return Err(LsmErrorCode::LsmMisuse);
956 }
957
958 let rc: i32;
959 unsafe {
960 rc = lsm_csr_last(self.db_cursor);
961 }
962 if rc != 0 {
963 return Err(LsmErrorCode::try_from(rc)?);
964 }
965
966 Ok(())
967 }
968
969 /// This positions the cursor on an entry of the database that depends on
970 /// the seek mode provided:
971 /// 1. If [`LsmCursorSeekOp::LsmCursorSeekLe`] is given, then the cursor will be positioned
972 /// at the entry that is less or equal than the provided key depending on whether
973 /// the key is found in the database or not.
974 /// 2. If [`LsmCursorSeekOp::LsmCursorSeekEq`] is given, then the cursor will be positioned
975 /// at the entry that corresponding to the given key, or at the end of the database
976 /// depending on whether the entry is found or not. If the entry is found, a call
977 /// to `valid` on the cursor will return success, and otherwise an error.
978 /// 3. If [`LsmCursorSeekOp::LsmCursorSeekGe`] is given, then the cursor will be positioned
979 /// at the entry that is greater or equal than the provided key depending on
980 /// whether the key is found in the database or not.
981 fn seek(&mut self, key: &[u8], mode: LsmCursorSeekOp) -> Result<(), LsmErrorCode> {
982 if self.db_cursor.is_null() {
983 return Err(LsmErrorCode::LsmMisuse);
984 }
985
986 let rc: i32;
987 let key_len = key.len();
988 let key_ptr = key.as_ptr();
989 unsafe {
990 rc = lsm_csr_seek(self.db_cursor, key_ptr, key_len as i32, mode as i32);
991 }
992 if rc != 0 {
993 return Err(LsmErrorCode::try_from(rc)?);
994 }
995
996 Ok(())
997 }
998
999 /// Once a cursor is position at a valid entry, this function moves it to the next
1000 /// entry. This function can be called only when moving forward on the database. That is,
1001 /// when starting from [`Cursor::first`] or when seeking with [`LsmCursorSeekOp::LsmCursorSeekGe`].
1002 /// Otherwise an error will be issued.
1003 fn next(&mut self) -> Result<(), LsmErrorCode> {
1004 if self.db_cursor.is_null() {
1005 return Err(LsmErrorCode::LsmMisuse);
1006 }
1007
1008 let rc: i32;
1009 unsafe {
1010 rc = lsm_csr_next(self.db_cursor);
1011 }
1012 if rc != 0 {
1013 return Err(LsmErrorCode::try_from(rc)?);
1014 }
1015
1016 Ok(())
1017 }
1018
1019 /// Similar to [`Cursor::next`], but moving to the previous entry. This function can
1020 /// be called only when moving backwards on the database. That is, when starting from
1021 /// [`Cursor::last`] or when seeking with [`LsmCursorSeekOp::LsmCursorSeekLe`]. Otherwise an
1022 /// error will be issued.
1023 fn prev(&mut self) -> Result<(), LsmErrorCode> {
1024 if self.db_cursor.is_null() {
1025 return Err(LsmErrorCode::LsmMisuse);
1026 }
1027
1028 let rc: i32;
1029 unsafe {
1030 rc = lsm_csr_prev(self.db_cursor);
1031 }
1032 if rc != 0 {
1033 return Err(LsmErrorCode::try_from(rc)?);
1034 }
1035
1036 Ok(())
1037 }
1038
1039 /// If the cursor is [`Cursor::valid`], then this function retrieves the key of
1040 /// the entry the cursor is currently pointing to. The memory the key uses
1041 /// belongs to the parent call. If the cursor is not valid, an error is returned.
1042 fn get_key(&self) -> Result<Vec<u8>, LsmErrorCode> {
1043 self.valid()?;
1044
1045 let rc: i32;
1046 let key_ptr: *mut u8 = null_mut();
1047 let mut key_len: i32 = 0;
1048 let mut key: Vec<u8> = vec![];
1049 unsafe {
1050 rc = lsm_csr_key(self.db_cursor, &key_ptr, &mut key_len);
1051 if rc != 0 {
1052 return Err(LsmErrorCode::try_from(rc)?);
1053 }
1054 // We reserve enough space so that we can copy what the cursor returned.
1055 key.reserve(key_len as usize);
1056 // We copy the returned value onto new memory that the upper call
1057 // will own.
1058 key_ptr.copy_to_nonoverlapping(key.as_mut_ptr(), key_len as usize);
1059 key.set_len(key_len as usize);
1060 }
1061 // This memory belongs now to the upper layer.
1062 Ok(key)
1063 }
1064
1065 /// If the cursor is [`Cursor::valid`], then this function retrieves the value
1066 /// of the entry the cursor is currently pointing to. The memory the key uses
1067 /// belongs to the parent call. If the cursor is not valid, an error is returned.
1068 fn get_value(&self) -> Result<Vec<u8>, LsmErrorCode> {
1069 self.valid()?;
1070
1071 let rc: i32;
1072 let value_ptr: *mut u8 = null_mut();
1073 let mut value_len: i32 = 0;
1074 let mut value: Vec<u8> = vec![];
1075 unsafe {
1076 rc = lsm_csr_value(self.db_cursor, &value_ptr, &mut value_len);
1077 if rc != 0 {
1078 return Err(LsmErrorCode::try_from(rc)?);
1079 }
1080 // We reserve enough space so that we can copy what the cursor returned.
1081 value.reserve(value_len as usize);
1082 // We copy the returned value onto new memory that the upper call
1083 // will own.
1084 value_ptr.copy_to_nonoverlapping(value.as_mut_ptr(), value_len as usize);
1085 value.set_len(value_len as usize);
1086 }
1087 // This memory belongs now to the upper layer.
1088 Ok(value)
1089 }
1090
1091 /// If the cursor is [`Cursor::valid`], then this function compares the key of the
1092 /// entry the cursor is currently pointing to, with the given key. On success, the
1093 /// result of the comparison is returned. The comparison happens as per
1094 /// `memcmp`, that is, if the cursor's key is [`Ordering::Less`], [`Ordering::Equal`],
1095 /// or [`Ordering::Greater`] than the provided key, then the corresponding [`Ordering`]
1096 /// will be returned. On prefix comparison, that is, the given key is a strict prefix of
1097 /// the cursor key, [`Ordering::Greater`] will be returned.
1098 ///
1099 /// This function is useful when probing the database for a range.
1100 ///
1101 /// # Example
1102 ///
1103 /// ```rust
1104 /// use std::cmp::Ordering;
1105 /// use lsmlite_rs::*;
1106 ///
1107 /// let db_conf = DbConf::new(
1108 /// "/tmp/",
1109 /// "my_db_j".to_string(),
1110 /// );
1111 ///
1112 /// let mut db: LsmDb = Default::default();
1113 /// let rc = db.initialize(db_conf);
1114 /// let rc = db.connect();
1115 ///
1116 /// // Insert data into the database, so that something gets traversed.
1117 /// let key: usize = 1;
1118 /// let key_serial = key.to_be_bytes();
1119 /// // 1 KB zeroed payload.
1120 /// let value = vec![0; 1024];
1121 /// let rc = db.persist(&key_serial, &value)?;
1122 ///
1123 /// let key: usize = 2;
1124 /// let key_serial = key.to_be_bytes();
1125 /// let rc = db.persist(&key_serial, &value)?;
1126 ///
1127 /// let mut cursor = db.cursor_open()?;
1128 ///
1129 /// let rc = cursor.first();
1130 /// assert!(rc.is_ok());
1131 ///
1132 /// // Assume the very first record is smaller than this.
1133 /// let key_ub_value: usize = 2;
1134 /// let key_ub_serial = key_ub_value.to_be_bytes();
1135 /// // `Ordering::Less` tells that the key of the cursor is smaller
1136 /// // than `key_ub_value`.
1137 /// let mut key_cmp = cursor.compare(&key_ub_serial)?;
1138 ///
1139 /// let mut num_records = 0;
1140 /// while cursor.valid().is_ok() && key_cmp < Ordering::Equal {
1141 /// num_records += 1;
1142 /// cursor.next()?;
1143 /// key_cmp = cursor.compare(&key_ub_serial)?;
1144 /// }
1145 ///
1146 /// assert_eq!(num_records, 1);
1147 ///
1148 /// // We either exhausted the database or found a key >= than `key_ub_value`.
1149 /// # Result::<(), LsmErrorCode>::Ok(())
1150 /// ```
1151 fn compare(&self, key: &[u8]) -> Result<Ordering, LsmErrorCode> {
1152 self.valid()?;
1153
1154 let rc: i32;
1155 let mut result: i32 = 0;
1156 let zero: i32 = 0;
1157 let key_len = key.len();
1158 let key_ptr = key.as_ptr();
1159 unsafe {
1160 rc = lsm_csr_cmp(self.db_cursor, key_ptr, key_len as i32, &mut result);
1161 }
1162 if rc != 0 {
1163 return Err(LsmErrorCode::try_from(rc)?);
1164 }
1165
1166 Ok(result.cmp(&zero))
1167 }
1168}
1169
1170/// Additional to implementing [`Disk`], the following helper functions are also available.
1171impl LsmDb {
1172 fn configure_bg_threads(&mut self, mode: LsmMode, id: usize) -> Result<(), LsmErrorCode> {
1173 let rc: i32;
1174 match mode {
1175 LsmMode::LsmNoBackgroundThreads => {
1176 // In single-threaded mode we do nothing but to set parameters of the connection.
1177 // These parameters are a good approximation in general. On bigger machines we would
1178 // want to consume more main-memory for example.
1179 unsafe {
1180 // Modifying auto checkpointing, as a single thread will handle all operations.
1181 let checkpoint_size: i32 = MAX_CHECKPOINT_SIZE_KB;
1182 rc = lsm_config(
1183 self.db_handle,
1184 LsmParam::AutoCheckPoint as i32,
1185 &checkpoint_size,
1186 );
1187
1188 if rc != 0 {
1189 return Err(LsmErrorCode::try_from(rc)?);
1190 }
1191 }
1192 }
1193 // If a single extra thread should handle work and checkpoint, then
1194 // we signal this.
1195 LsmMode::LsmBackgroundMerger => {
1196 // We now initialize the thread that will take care of working and checkpointing.
1197 self.db_bg_threads = LsmBgWorkers::new(&self.db_conf, &self.db_fq_name, id);
1198 // If the background thread was issued, then we output this information.
1199 if self.db_bg_threads.bg_threads[0].thread.is_some() {
1200 // Disable auto work, which will be delegated to a thread. The main
1201 // writer won't take care of this.
1202 let auto_work: i32 = 0;
1203 unsafe {
1204 rc = lsm_config(self.db_handle, LsmParam::AutoWork as i32, &auto_work);
1205 }
1206
1207 if rc != 0 {
1208 // Let's destroy the background threads.
1209 self.db_bg_threads.shutdown();
1210 return Err(LsmErrorCode::try_from(rc)?);
1211 }
1212
1213 // All good, go ahead and inform.
1214 tracing::info!(
1215 datafile = self.get_full_db_path()?,
1216 "Combined merger and check-pointer thread scheduled.",
1217 );
1218 }
1219
1220 // If the background thread was not issued (due to internal errors)
1221 // we change no property of the main connection to avoid problems.
1222 }
1223 LsmMode::LsmBackgroundCheckpointer => {
1224 // We first initialize the thread that will take care of checkpointing.
1225 self.db_bg_threads = LsmBgWorkers::new(&self.db_conf, &self.db_fq_name, id);
1226 // If the background thread was issued, then we output this information.
1227 if self.db_bg_threads.bg_threads[0].thread.is_some() {
1228 // Disable auto checkpointing, which will be delegated to a thread. The main
1229 // writer won't take care of this.
1230 let auto_checkpoint: i32 = 0;
1231 unsafe {
1232 rc = lsm_config(
1233 self.db_handle,
1234 LsmParam::AutoCheckPoint as i32,
1235 &auto_checkpoint,
1236 );
1237 }
1238
1239 if rc != 0 {
1240 // Let's destroy the background threads.
1241 self.db_bg_threads.shutdown();
1242 return Err(LsmErrorCode::try_from(rc)?);
1243 }
1244
1245 // All good, go ahead and inform.
1246 tracing::info!(
1247 datafile = self.get_full_db_path()?,
1248 "Check-pointer thread scheduled.",
1249 );
1250 }
1251
1252 // If the background thread was not issued (due to internal errors)
1253 // we change no property of the main connection to avoid problems.
1254 }
1255 }
1256 Ok(())
1257 }
1258
1259 fn deal_with_bg_threads(&mut self) -> Result<(), LsmErrorCode> {
1260 match self.db_conf.mode {
1261 LsmMode::LsmNoBackgroundThreads => {}
1262 LsmMode::LsmBackgroundMerger => {
1263 // We register the time it takes for the merger to work.
1264 let start = Instant::now();
1265 self.wait_on_merger()?;
1266 let current_request_duration = Instant::now()
1267 .checked_duration_since(start)
1268 .unwrap_or_default();
1269 match &self.db_conf.metrics {
1270 None => {}
1271 Some(metrics) => metrics
1272 .work_times_s
1273 .observe(current_request_duration.as_secs_f64()),
1274 }
1275 }
1276 LsmMode::LsmBackgroundCheckpointer => {
1277 // We register the time it takes for the checkpointer to work.
1278 let start = Instant::now();
1279 self.wait_on_checkpointer()?;
1280 let current_request_duration = Instant::now()
1281 .checked_duration_since(start)
1282 .unwrap_or_default();
1283 match &self.db_conf.metrics {
1284 None => {}
1285 Some(metrics) => metrics
1286 .checkpoint_times_s
1287 .observe(current_request_duration.as_secs_f64()),
1288 }
1289 }
1290 }
1291 Ok(())
1292 }
1293
1294 fn wait_on_merger(&mut self) -> Result<(), LsmErrorCode> {
1295 let mut rc: i32;
1296 let mut old_tree_size: i32 = -1;
1297 let mut new_tree_size: i32 = -1;
1298
1299 // Since the database is single-writer, we can safely query
1300 // the current sizes of the main memory structures (trees)
1301 // to decide how to proceed. Observe that this is done
1302 // only in the case that multiple threads are used.
1303 // Otherwise, LSM does this internally.
1304 unsafe {
1305 rc = lsm_info(
1306 self.db_handle,
1307 LsmInfo::LsmTreeSize as i32,
1308 &mut old_tree_size,
1309 &mut new_tree_size,
1310 );
1311 }
1312
1313 if rc != 0 {
1314 return Err(LsmErrorCode::try_from(rc)?);
1315 }
1316
1317 let mut written_kb: i32 = 0;
1318 let mut overall_written_kb = 0;
1319 let work_kb: i32 = 128;
1320 // We perform work until we have enough space in main memory to keep writing.
1321 // This is to avoid running over the amount of main memory allowed to use.
1322 while old_tree_size > 0 {
1323 unsafe {
1324 rc = lsm_work(self.db_handle, NUM_MERGE_SEGMENTS, work_kb, &mut written_kb);
1325 overall_written_kb += written_kb;
1326
1327 // Anything different than ok (0), or busy (5) is wrong!
1328 if rc != 0 && rc != 5 {
1329 lsm_close(self.db_handle);
1330 let ec = LsmErrorCode::try_from(rc);
1331 tracing::error!(
1332 datafile = ?self.get_full_db_path(),
1333 rc = ?ec,
1334 "Error occurred while working on the datafile. No work performed \
1335 on the database.",
1336 );
1337 return Err(LsmErrorCode::try_from(rc)?);
1338 }
1339
1340 // After having performed some work, we query the sizes of the
1341 // main-memory components to decide whether another iteration will be done or not.
1342 rc = lsm_info(
1343 self.db_handle,
1344 LsmInfo::LsmTreeSize as i32,
1345 &mut old_tree_size,
1346 &mut new_tree_size,
1347 );
1348
1349 // Something went wrong!
1350 if rc != 0 {
1351 let ec = LsmErrorCode::try_from(rc);
1352 tracing::error!(
1353 datafile = ?self.get_full_db_path(),
1354 rc = ?ec,
1355 "Error occurred while obtaining segment information for background thread. \
1356 Exiting background thread.",
1357 );
1358 return Err(LsmErrorCode::try_from(rc)?);
1359 }
1360
1361 park_timeout(Duration::from_millis(WRITER_PARK_TIME_MS));
1362 }
1363 }
1364
1365 // We update the metric on the amount of data written.
1366 match &self.db_conf.metrics {
1367 None => {}
1368 Some(metrics) => metrics.work_kbs.observe(overall_written_kb as f64),
1369 }
1370
1371 // Should hold.
1372 debug_assert!(old_tree_size == 0);
1373
1374 // Background thread will now take care of file operations in the background,
1375 // but main thread will be allowed to write to memory as well.
1376 self.db_bg_threads.execute(LsmBgWorkerMessage::Merge)?;
1377
1378 Ok(())
1379 }
1380
1381 fn wait_on_checkpointer(&mut self) -> Result<(), LsmErrorCode> {
1382 let mut rc: i32;
1383 let mut amount_volatile_data: i32 = -1;
1384 let writer_park_time_ms = Duration::from_millis(WRITER_PARK_TIME_MS);
1385
1386 // Since the database is single-writer, we can safely query
1387 // the current sizes of the main memory structures (trees)
1388 // to decide how to proceed. Observe that this is done
1389 // only in the case that multiple threads are used.
1390 // Otherwise, LSM does this internally.
1391 unsafe {
1392 rc = lsm_info(
1393 self.db_handle,
1394 LsmInfo::LsmCheckpointSize as i32,
1395 &mut amount_volatile_data,
1396 );
1397 }
1398
1399 if rc != 0 {
1400 return Err(LsmErrorCode::try_from(rc)?);
1401 }
1402
1403 // If a checkpoint is due, then we wake up the background thread.
1404 if amount_volatile_data >= MAX_CHECKPOINT_SIZE_KB {
1405 // This asks the background thread to checkpoint the data file (needed at this point).
1406 self.db_bg_threads.execute(LsmBgWorkerMessage::Checkpoint)?;
1407
1408 // Once the message has been sent, we wait for the background thread to
1409 // finish before returning control to the upper layer.
1410 // To avoid busy waits we yield for a little bit in every iteration.
1411 while amount_volatile_data >= MAX_CHECKPOINT_SIZE_KB {
1412 // TODO: We currently assume that the background thread is running
1413 // doing stuff. If the background thread dies, then we have to see how we
1414 // proceed (re-spawning the thread most probably to not interfere with the
1415 // existing writer connection).
1416 park_timeout(writer_park_time_ms);
1417
1418 unsafe {
1419 rc = lsm_info(
1420 self.db_handle,
1421 LsmInfo::LsmCheckpointSize as i32,
1422 &mut amount_volatile_data,
1423 );
1424 }
1425
1426 if rc != 0 {
1427 return Err(LsmErrorCode::try_from(rc)?);
1428 }
1429 }
1430 }
1431
1432 // If this condition holds, we can keep "safely" writing to the database.
1433 debug_assert!(amount_volatile_data < MAX_CHECKPOINT_SIZE_KB);
1434
1435 Ok(())
1436 }
1437
1438 /// This function tests whether a database handle has been initialized.
1439 pub fn is_initialized(&self) -> bool {
1440 self.initialized
1441 }
1442
1443 /// This function tests whether a database handle is connected.
1444 pub fn is_connected(&self) -> bool {
1445 self.connected
1446 }
1447
1448 /// This function outputs the full-qualified path of the database.
1449 /// It errors if the database has not been initialized.
1450 pub fn get_full_db_path(&self) -> Result<String, LsmErrorCode> {
1451 if !self.initialized {
1452 return Err(LsmErrorCode::LsmMisuse);
1453 }
1454 Ok(String::from_utf8_lossy(self.db_fq_name.as_bytes()).to_string())
1455 }
1456
1457 /// This function outputs the compression id of the database. The only possible
1458 /// error is [`LsmErrorCode::LsmMismatch`] which means that records of the database
1459 /// have been compressed with a unknown library. At this point there is not much
1460 /// to do, and this error should be considered unrecoverable. That is, the database
1461 /// can be considered corrupted, and its data is most probably lost.
1462 pub fn get_compression_id(&self) -> Result<LsmCompressionLib, LsmErrorCode> {
1463 if !self.initialized || !self.connected {
1464 return Err(LsmErrorCode::LsmMisuse);
1465 }
1466
1467 let compression_id: i32 = -1;
1468 unsafe {
1469 let _ = lsm_info(
1470 self.db_handle,
1471 LsmInfo::LsmCompressionId as i32,
1472 &compression_id,
1473 );
1474 }
1475 LsmCompressionLib::try_from(compression_id)
1476 }
1477}
1478
1479/// A default database. This database is not useful without
1480/// getting first initialized using [`Disk::initialize`]. The purpose
1481/// of this method is to simply zero all attributes of [`LsmDb`].
1482impl Default for LsmDb {
1483 fn default() -> Self {
1484 Self {
1485 db_env: null_mut(),
1486 db_handle: null_mut(),
1487 db_compress: None,
1488 db_fq_name: Default::default(),
1489 db_conf: Default::default(),
1490 db_bg_threads: Default::default(),
1491 initialized: false,
1492 connected: false,
1493 }
1494 }
1495}
1496
1497/// A default cursor. This cursor is not useful by itself as it is not
1498/// bound to any database. This construction is provided to be used in cases
1499/// in which a cursor needs to be declared ahead of time, only to be later
1500/// assigned a cursor bound to a database.
1501impl Default for LsmCursor<'_> {
1502 fn default() -> Self {
1503 Self {
1504 db_cursor: null_mut(),
1505 _marker: Default::default(),
1506 }
1507 }
1508}
1509
1510/// Drop for [`LsmDb`] so that it gets properly terminated when it goes out of scope for example.
1511impl Drop for LsmDb {
1512 fn drop(&mut self) {
1513 // Now we close the data file so that it does not require recovery next time we open it.
1514 // We might fail thou. If we fail, for whatever reason, the handle will be leaked, but since
1515 // this is `Drop`, we have no way to signal this to the upper layer.
1516 let rc = self.disconnect();
1517 if rc == Err(LsmErrorCode::LsmMisuse) {
1518 tracing::warn!(
1519 ?rc,
1520 "Database could not be closed. Most probably there are still cursors accessing it. \
1521 A recovery procedure will be required next time the database is accessed. \
1522 DB handle has not been destroyed and is still fully functional.",
1523 );
1524 } else if rc != Ok(()) {
1525 tracing::error!(
1526 ?rc,
1527 "Database could not be closed. Unexpected error happened. \
1528 Resources occupied by the handle will be most probably leaked.",
1529 );
1530 }
1531 }
1532}
1533
1534/// Drop for `LsmCursor` so that it gets properly terminated when it goes out of scope for example.
1535impl Drop for LsmCursor<'_> {
1536 fn drop(&mut self) {
1537 // We simply close the cursor (thus releasing resources occupied by it like
1538 // snapshot(s) and memory.
1539 let _ = self.close();
1540 }
1541}
1542
1543/// A database handle is marked as [`Send`] as it can be safely sent to another
1544/// thread (for further usage), for example in async code.
1545unsafe impl Send for LsmDb {}
1546/// For convenience a database handle is marked as [`Sync`]. This is not because the same
1547/// handle can be safely shared among threads, but because in this manner a handle can be
1548/// wrapped in a [`std::sync::RwLock`] and be shared among threads safely as it captures
1549/// the single-writer, multiple-reader nature of a [`LsmDb`]. In this manner, multiple
1550/// cursors may be opened through the same handle, while writes through the handle
1551/// are exclusive (serialized).
1552unsafe impl Sync for LsmDb {}