1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
//! An ergonomic, multithreaded API for an LMDB datastore

use crate::{
    key_val_store::{error::KeyValStoreError, key_val_store::IterationResult},
    lmdb_store::error::LMDBError,
};
use lmdb_zero::{
    db,
    error::{self, LmdbResultExt},
    open,
    put,
    traits::AsLmdbBytes,
    ConstAccessor,
    Cursor,
    CursorIter,
    Database,
    DatabaseOptions,
    EnvBuilder,
    Environment,
    Ignore,
    MaybeOwned,
    ReadTransaction,
    Stat,
    WriteAccessor,
    WriteTransaction,
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::{
    cmp::max,
    collections::HashMap,
    path::{Path, PathBuf},
    sync::Arc,
};

const LOG_TARGET: &str = "lmdb";

/// An atomic pointer to an LMDB database instance
type DatabaseRef = Arc<Database<'static>>;

/// A builder for [LMDBStore](struct.lmdbstore.html)
/// ## Example
///
/// Create a new LMDB database of 500MB in the `db` directory with two named databases: "db1" and "db2"
///
/// ```
/// # use tari_storage::lmdb_store::LMDBBuilder;
/// # use lmdb_zero::db;
/// # use std::env;
/// let mut store = LMDBBuilder::new()
///     .set_path(env::temp_dir())
///     .set_environment_size(500)
///     .set_max_number_of_databases(10)
///     .add_database("db1", db::CREATE)
///     .add_database("db2", db::CREATE)
///     .build()
///     .unwrap();
/// ```
#[derive(Default)]
pub struct LMDBBuilder {
    path: PathBuf,
    db_size_mb: usize,
    max_dbs: usize,
    db_names: HashMap<String, db::Flags>,
}

impl LMDBBuilder {
    /// Create a new LMDBStore builder. Set up the database by calling `set_nnnn` and then create the database
    /// with `build()`. The default values for the database parameters are:
    ///
    /// | Parameter | Default |
    /// |:----------|---------|
    /// | path      | ./store/|
    /// | size      | 64 MB   |
    /// | named DBs | none    |
    pub fn new() -> LMDBBuilder {
        LMDBBuilder {
            path: "./store/".into(),
            db_size_mb: 64,
            db_names: HashMap::new(),
            max_dbs: 8,
        }
    }

    /// Set the directory where the LMDB database exists, or must be created.
    /// Note: The directory must exist already; it is not created for you. If it does not exist, `build()` will
    /// return `LMDBError::InvalidPath`.
    pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> LMDBBuilder {
        self.path = path.as_ref().to_owned();
        self
    }

    /// Sets the size of the environment, in MB.
    /// The actual memory will only be allocated when #build() is called
    pub fn set_environment_size(mut self, size: usize) -> LMDBBuilder {
        self.db_size_mb = size;
        self
    }

    /// Sets the maximum number of databases (tables) in the environment. If this value is less than the number of
    /// DBs that will be created when the environment is built, this value will be ignored.
    pub fn set_max_number_of_databases(mut self, size: usize) -> LMDBBuilder {
        self.max_dbs = size;
        self
    }

    /// Add an additional named database to the LMDB environment.If `add_database` isn't called at least once, only the
    /// `default` database is created.
    pub fn add_database(mut self, name: &str, flags: db::Flags) -> LMDBBuilder {
        // There will always be a 'default' database
        let _ = self.db_names.insert(name.into(), flags);
        self
    }

    /// Create a new LMDBStore instance and open the underlying database environment
    pub fn build(mut self) -> Result<LMDBStore, LMDBError> {
        let max_dbs = max(self.db_names.len(), self.max_dbs) as u32;
        if !self.path.exists() {
            return Err(LMDBError::InvalidPath);
        }
        let path = self
            .path
            .to_str()
            .map(String::from)
            .ok_or_else(|| LMDBError::InvalidPath)?;

        let env = unsafe {
            let mut builder = EnvBuilder::new()?;
            builder.set_mapsize(self.db_size_mb * 1024 * 1024)?;
            builder.set_maxdbs(max_dbs)?;
            // Using open::Flags::NOTLS does not compile!?! NOTLS=0x200000
            let flags = open::Flags::from_bits(0x200_000).expect("LMDB open::Flag is correct");
            builder.open(&path, flags, 0o600)?
        };
        let env = Arc::new(env);

        // Increase map size if usage gets close to the db size
        let mut env_info = env.info()?;
        let env_stat = env.stat()?;
        let size_used = env_stat.psize as usize * env_info.last_pgno;
        let mut space_remaining = env_info.mapsize - size_used;
        let usage = (size_used as f64 / env_info.mapsize as f64) * 100.0;
        if space_remaining <= ((self.db_size_mb * 1024 * 1024) as f64 * 0.5) as usize {
            unsafe {
                env.set_mapsize(size_used + self.db_size_mb * 1024 * 1024)?;
            }
            env_info = env.info()?;
            space_remaining = env_info.mapsize - size_used;
            debug!(
                target: LOG_TARGET,
                "({}) LMDB environment usage factor {:.*} %., size used {:?} MB, increased by {:?} MB.",
                path,
                2,
                usage,
                size_used / (1024 * 1024),
                self.db_size_mb
            );
        };
        info!(
            target: LOG_TARGET,
            "({}) LMDB environment created with a capacity of {} MB, {} MB remaining.",
            path,
            env_info.mapsize / (1024 * 1024),
            space_remaining / (1024 * 1024)
        );

        let mut databases: HashMap<String, LMDBDatabase> = HashMap::new();
        if self.db_names.is_empty() {
            self = self.add_database("default", db::CREATE);
        }
        for (name, flags) in self.db_names.iter() {
            let db = Database::open(env.clone(), Some(name), &DatabaseOptions::new(*flags))?;
            let db = LMDBDatabase {
                name: name.to_string(),
                env: env.clone(),
                db: Arc::new(db),
            };
            databases.insert(name.to_string(), db);
            trace!(target: LOG_TARGET, "({}) LMDB database '{}' is ready", path, name);
        }
        Ok(LMDBStore { path, env, databases })
    }
}

/// A Struct for holding state for an LM Database. LMDB is memory mapped, so you can treat the DB as an (essentially)
/// infinitely large memory-backed hashmap. A single environment is stored in one file. The individual databases
/// are key-value tables stored within the file.
///
/// LMDB databases are thread-safe.
///
/// To create an instance of LMDBStore, use [LMDBBuilder](struct.lmdbbuilder.html).
///
/// ## Memory efficiency
///
/// LMDB really only understands raw byte arrays. Complex structures need to be referenced as (what looks like) a
/// single contiguous blob of memory. This presents some trade offs we need to make when `insert`ing and `get`ting
/// data to/from LMDB.
///
/// ### Writing
///
/// For simple types, like `PublickKey([u8; 32])`, it's most efficient to pass a pointer to the memory position; and
/// LMDB will do (at most) a single copy into its memory structures. the lmdb-zero crate assumes this by only
/// requiring the `AsLmdbBytes` trait when `insert`ing data. i.e. `insert` does does take ownership of the key or
/// value; it just wants to be able to read the `[u8]`.
///
/// This poses something of a problem for complex structures. Structs typically don't have a contiguous block of
/// memory backing the instance, and so you either need to impose one (which isn't a great idea-- now you have to write
/// some sort of memory management software), or you eat the cost of doing an intermediate copy into a buffer every
/// time you need to commit a structure to LMDB.
///
/// However, this cost is mitigated if there's any kind of processing that needs to be done in converting `T` to
/// `[u8]` (e.g. if an IP address is stored as a string for some reason, you might want to represent it as `[u8; 4]`)
/// , which probably happens more often than we think, and offers maximum flexibility.
///
/// Furthermore, the "simple" types are typically quite small, so an additional copy is not usually incurring much
/// overhead.
///
/// So this library makes the trade-off of carrying out two copies per write whilst gaining a significant amount of
/// flexibility in the process.
///
/// ### Reading
///
/// When LMDB returns data from a `get` request, it returns a `&[u8]` - you cannot take ownership of this data.
/// Therefore we necessarily need to copy data anyway in order to pull data into the final Struct instance.
/// So the `From<&[u8]> for T` trait implementation will work for reading, and this works fine for both simple and
/// complex data structures.
///
/// `FromLmdbBytes` is not quite what we want because the trait function returns a reference to an object, rather
/// than the object itself.
///
/// An additional consideration is: how was this data serialised? If the writing was a straight memory dump, we
/// don't always have enough information to reconstruct our data object (how long was a string? How many elements
/// were in the array? Was it big- or little-endian ordering of integers?).
///
/// If we have to store this metadata when reading in byte strings, it means it had to be stored too. This is a
/// further roadblock to the "zero-copy" ideal for writing. And since we're now basically serialising and
/// de-serialising, we may as well use a well-known, highly efficient binary format to do so.
///
/// ## Serialisation
///
/// The ideal serialiasation format is the one that does the least "bit-twiddling" between memory and the byte array;
/// as well as being as compact as possible.
///
/// Candidates include: Bincode, MsgPack, and Protobuf / Cap'nProto. Without spending ages on a comparison, I just
/// took the benchmark results from [this project](https://github.com/erickt/rust-serialization-benchmarks):
///
/// ```text
/// test clone                             ... bench:       1,179 ns/iter (+/- 115) = 444 MB/s
///
/// test capnp_deserialize                 ... bench:         277 ns/iter (+/- 27) = 1617 MB/s  **
/// test flatbuffers_deserialize           ... bench:           0 ns/iter (+/- 0) = 472000 MB/s ***
/// test rust_bincode_deserialize          ... bench:       1,533 ns/iter (+/- 228) = 260 MB/s
/// test rmp_serde_deserialize             ... bench:       1,859 ns/iter (+/- 186) = 154 MB/s
/// test rust_protobuf_deserialize         ... bench:         558 ns/iter (+/- 29) = 512 MB/s   *
/// test serde_json_deserialize            ... bench:       2,244 ns/iter (+/- 249) = 269 MB/s
///
/// test capnp_serialize                   ... bench:          28 ns/iter (+/- 5) = 16000 MB/s  **
/// test flatbuffers_serialize             ... bench:           0 ns/iter (+/- 0) = 472000 MB/s ***
/// test rmp_serde_serialize               ... bench:         278 ns/iter (+/- 27) = 1032 MB/s
/// test rust_bincode_serialize            ... bench:         190 ns/iter (+/- 43) = 2105 MB/s  *
/// test rust_protobuf_serialize           ... bench:         468 ns/iter (+/- 18) = 611 MB/s
/// test serde_json_serialize              ... bench:       1,012 ns/iter (+/- 55) = 597 MB/s
/// ```
///
/// Based on these benchmarks, Flatbuffers and Cap'nProto are far and away the quickest. However, looking at the
/// benchmarks more closely, we see that these aren't strictly Orange to Orange comparisons. The flatbuffers and
/// capnproto tests don't actually serialise to and from the general Rust struct (an HTTP request type template), but
/// from specially generated structs based on the schema.
///
/// Strictly speaking, if we're going to serialise arbitrary key-value types, these benchmarks should include the
/// time it takes to populate a flatbuffer / capnproto structure.
///
/// A quick modification of the benchmarks to take this int account this reveals:
///
/// ```text
/// test rust_bincode_deserialize          ... bench:       1,505 ns/iter (+/- 361) = 265 MB/s *
/// test capnp_deserialize                 ... bench:         282 ns/iter (+/- 37) = 1588 MB/s ***
/// test rmp_serde_deserialize             ... bench:       1,800 ns/iter (+/- 144) = 159 MB/s *
///
/// test capnp_serialize                   ... bench:         941 ns/iter (+/- 40) = 476 MB/s  *
/// test rmp_serde_serialize               ... bench:         269 ns/iter (+/- 19) = 1066 MB/s **
/// test rust_bincode_serialize            ... bench:         191 ns/iter (+/- 41) = 1114 MB/s ***
/// ```
///
/// Now bincode emerges as a reasonable contender. Another positive to bincode is that one doesn't have to update and
/// maintain a schema for the data types begin serialized, nor is a separate compilation step required.
///
/// So after all this, we'll use bincode for the time being to handle serialisation to- and from- LMDB
pub struct LMDBStore {
    path: String,
    pub(crate) env: Arc<Environment>,
    pub(crate) databases: HashMap<String, LMDBDatabase>,
}

/// Close all databases and close the environment. You cannot be guaranteed that the dbs will be closed after calling
/// this function because there still may be threads accessing / writing to a database that will block this call.
/// However, in that case `shutdown` returns an error.
impl LMDBStore {
    pub fn flush(&self) -> Result<(), lmdb_zero::error::Error> {
        trace!(target: LOG_TARGET, "Forcing flush of buffers to disk");
        self.env.sync(true)?;
        debug!(target: LOG_TARGET, "LMDB Buffers have been flushed");
        Ok(())
    }

    pub fn log_info(&self) {
        match self.env.info() {
            Err(e) => warn!(
                target: LOG_TARGET,
                "Could not retrieve LMDB information for {}. {}",
                self.path,
                e.to_string()
            ),
            Ok(info) => {
                let size_mb = info.mapsize / 1024 / 1024;
                debug!(
                    target: LOG_TARGET,
                    "LMDB Environment information ({}). Map Size={} MB. Last page no={}. Last tx id={}",
                    self.path,
                    size_mb,
                    info.last_pgno,
                    info.last_txnid
                )
            },
        }
        match self.env.stat() {
            Err(e) => warn!(
                target: LOG_TARGET,
                "Could not retrieve LMDB statistics for {}. {}",
                self.path,
                e.to_string()
            ),
            Ok(stats) => {
                let page_size = stats.psize / 1024;
                debug!(
                    target: LOG_TARGET,
                    "LMDB Environment statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
                     Overflow pages={}, Entries={}",
                    self.path,
                    page_size,
                    stats.depth,
                    stats.branch_pages,
                    stats.leaf_pages,
                    stats.overflow_pages,
                    stats.entries
                );
            },
        }
    }

    /// Returns a handle to the database given in `db_name`, if it exists, otherwise return None.
    pub fn get_handle(&self, db_name: &str) -> Option<LMDBDatabase> {
        match self.databases.get(db_name) {
            Some(db) => Some(db.clone()),
            None => None,
        }
    }

    pub fn env(&self) -> Arc<Environment> {
        self.env.clone()
    }
}

#[derive(Clone)]
pub struct LMDBDatabase {
    name: String,
    env: Arc<Environment>,
    db: DatabaseRef,
}

impl LMDBDatabase {
    /// Inserts a record into the database. This is an atomic operation. Internally, `insert` creates a new
    /// write transaction, writes the value, and then commits the transaction.
    pub fn insert<K, V>(&self, key: &K, value: &V) -> Result<(), LMDBError>
    where
        K: AsLmdbBytes + ?Sized,
        V: Serialize,
    {
        let env = &(*self.db.env());
        let tx = WriteTransaction::new(env)?;
        {
            let mut accessor = tx.access();
            let buf = LMDBWriteTransaction::convert_value(value)?;
            accessor.put(&*self.db, key, &buf, put::Flags::empty())?;
        }
        tx.commit().map_err(LMDBError::from)
    }

    /// Get a value from the database. This is an atomic operation. A read transaction is created, the value
    /// extracted, copied and converted to V before closing the transaction. A copy is unavoidable because the
    /// extracted byte string is released when the transaction is closed. If you are doing many `gets`, it is more
    /// efficient to use `with_read_transaction`
    pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
    where
        K: AsLmdbBytes + ?Sized,
        for<'t> V: DeserializeOwned, // read this as, for *any* lifetime, t, we can convert a [u8] to V
    {
        let env = &(*self.db.env());
        let txn = ReadTransaction::new(env)?;
        let accessor = txn.access();
        let val = accessor.get(&self.db, key).to_opt();
        LMDBReadTransaction::convert_value(val)
    }

    /// Return statistics about the database, See [Stat](lmdb_zero/struct.Stat.html) for more details.
    pub fn get_stats(&self) -> Result<Stat, LMDBError> {
        let env = &(*self.db.env());
        Ok(ReadTransaction::new(env).and_then(|txn| txn.db_stat(&self.db))?)
    }

    /// Log some pretty printed stats.See [Stat](lmdb_zero/struct.Stat.html) for more details.
    pub fn log_info(&self) {
        match self.get_stats() {
            Err(e) => warn!(
                target: LOG_TARGET,
                "Could not retrieve LMDB statistics for {}. {}",
                self.name,
                e.to_string()
            ),
            Ok(stats) => {
                let page_size = stats.psize / 1024;
                debug!(
                    target: LOG_TARGET,
                    "LMDB Database statistics ({}). Page size={}kB. Tree depth={}. Branch pages={}. Leaf Pages={}, \
                     Overflow pages={}, Entries={}",
                    self.name,
                    page_size,
                    stats.depth,
                    stats.branch_pages,
                    stats.leaf_pages,
                    stats.overflow_pages,
                    stats.entries
                );
            },
        }
    }

    /// Returns if the database is empty.
    pub fn is_empty(&self) -> Result<bool, LMDBError> {
        self.get_stats().and_then(|s| Ok(s.entries > 0))
    }

    /// Returns the total number of entries in this database.
    pub fn len(&self) -> Result<usize, LMDBError> {
        self.get_stats().and_then(|s| Ok(s.entries))
    }

    /// Execute function `f` for each value in the database.
    ///
    /// The underlying LMDB library does not permit database cursors to be returned from functions to preserve Rust
    /// memory guarantees, so this is the closest thing to an iterator that you're going to get :/
    ///
    /// `f` is a closure of form `|pair: Result<(K,V), LMDBError>| -> IterationResult`. If `IterationResult::Break` is
    /// returned the closure will not be called again and `for_each` will return. You will usually need to include
    /// type inference to let Rust know which type to deserialise to:
    /// ```nocompile
    ///    let res = db.for_each::<Key, User, _>(|pair| {
    ///        let (key, user) = pair.unwrap();
    ///        //.. do stuff with key and user..
    ///    });
    pub fn for_each<K, V, F>(&self, mut f: F) -> Result<(), LMDBError>
    where
        K: DeserializeOwned,
        V: DeserializeOwned,
        F: FnMut(Result<(K, V), KeyValStoreError>) -> IterationResult,
    {
        let env = self.env.clone();
        let db = self.db.clone();
        let txn = ReadTransaction::new(env)?;

        let access = txn.access();
        let cursor = txn.cursor(db)?;

        let head = |c: &mut Cursor, a: &ConstAccessor| {
            let (key_bytes, val_bytes) = c.first(a)?;
            ReadOnlyIterator::deserialize::<K, V>(key_bytes, val_bytes)
        };

        let cursor = MaybeOwned::Owned(cursor);
        let iter = CursorIter::new(cursor, &access, head, ReadOnlyIterator::next)?;

        for p in iter {
            match f(p.map_err(|e| KeyValStoreError::DatabaseError(e.to_string()))) {
                IterationResult::Break => break,
                IterationResult::Continue => {},
            }
        }

        Ok(())
    }

    /// Checks whether a key exists in this database
    pub fn contains_key<K>(&self, key: &K) -> Result<bool, LMDBError>
    where K: AsLmdbBytes + ?Sized {
        let txn = ReadTransaction::new(&(*self.db.env()))?;
        let accessor = txn.access();
        let res: error::Result<&Ignore> = accessor.get(&self.db, key);
        let res = res.to_opt()?.is_some();
        Ok(res)
    }

    /// Delete a record associated with `key` from the database. If the key is not found,
    pub fn remove<K>(&self, key: &K) -> Result<(), LMDBError>
    where K: AsLmdbBytes + ?Sized {
        let tx = WriteTransaction::new(&(*self.db.env()))?;
        {
            let mut accessor = tx.access();
            accessor.del_key(&self.db, key)?;
        }
        tx.commit().map_err(Into::into)
    }

    /// Create a read-only transaction on the current database and execute the instructions given in the closure. The
    /// transaction is automatically committed when the closure goes out of scope. You may provide the results of the
    /// transaction to the calling scope by populating a `Vec<V>` with the results of `txn.get(k)`. Otherwise, if the
    /// results are not needed, or you did not call `get`, just return `Ok(None)`.
    pub fn with_read_transaction<F, V>(&self, f: F) -> Result<Option<Vec<V>>, LMDBError>
    where
        V: serde::de::DeserializeOwned,
        F: FnOnce(LMDBReadTransaction) -> Result<Option<Vec<V>>, LMDBError>,
    {
        let txn = ReadTransaction::new(self.env.clone())?;
        let access = txn.access();
        let wrapper = LMDBReadTransaction { db: &self.db, access };
        f(wrapper)
    }

    /// Create a transaction with write access on the current table.
    pub fn with_write_transaction<F>(&self, f: F) -> Result<(), LMDBError>
    where F: FnOnce(LMDBWriteTransaction) -> Result<(), LMDBError> {
        let txn = WriteTransaction::new(self.env.clone())?;
        let access = txn.access();
        let wrapper = LMDBWriteTransaction { db: &self.db, access };
        f(wrapper)?;
        txn.commit().map_err(|e| LMDBError::CommitError(e.to_string()))
    }

    /// Returns an owned atomic reference to the database
    pub fn db(&self) -> DatabaseRef {
        self.db.clone()
    }
}

/// Helper functions for the `for_each` method
struct ReadOnlyIterator {}
impl ReadOnlyIterator {
    fn deserialize<K, V>(key_bytes: &[u8], val_bytes: &[u8]) -> Result<(K, V), error::Error>
    where
        for<'t> K: serde::de::DeserializeOwned,
        for<'t> V: serde::de::DeserializeOwned,
    {
        let key = bincode::deserialize(key_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
        let val = bincode::deserialize(val_bytes).map_err(|e| error::Error::ValRejected(e.to_string()))?;
        Ok((key, val))
    }

    fn next<'r, K, V>(c: &mut Cursor, access: &'r ConstAccessor) -> Result<(K, V), error::Error>
    where
        K: serde::de::DeserializeOwned,
        V: serde::de::DeserializeOwned,
    {
        let (key_bytes, val_bytes) = c.next(access)?;
        ReadOnlyIterator::deserialize(key_bytes, val_bytes)
    }
}

pub struct LMDBReadTransaction<'txn, 'db: 'txn> {
    db: &'db Database<'db>,
    access: ConstAccessor<'txn>,
}

impl<'txn, 'db: 'txn> LMDBReadTransaction<'txn, 'db> {
    /// Get and deserialise a value from the database.
    pub fn get<K, V>(&self, key: &K) -> Result<Option<V>, LMDBError>
    where
        K: AsLmdbBytes + ?Sized,
        for<'t> V: serde::de::DeserializeOwned, // read this as, for *any* lifetime, t, we can convert a [u8] to V
    {
        let val = self.access.get(&self.db, key).to_opt();
        LMDBReadTransaction::convert_value(val)
    }

    /// Checks whether a key exists in this database
    pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
    where K: AsLmdbBytes + ?Sized {
        let res: error::Result<&Ignore> = self.access.get(&self.db, key);
        let res = res.to_opt()?.is_some();
        Ok(res)
    }

    fn convert_value<V>(val: Result<Option<&[u8]>, error::Error>) -> Result<Option<V>, LMDBError>
    where for<'t> V: serde::de::DeserializeOwned /* read this as, for *any* lifetime, t, we can convert a [u8] to V */
    {
        match val {
            Ok(None) => Ok(None),
            Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {}", e.to_string()))),
            Ok(Some(v)) => match bincode::deserialize(v) {
                // The reference to v is about to be dropped, so we must copy the data now
                Ok(val) => Ok(Some(val)),
                Err(e) => Err(LMDBError::GetError(format!("LMDB get error: {}", e))),
            },
        }
    }
}

pub struct LMDBWriteTransaction<'txn, 'db: 'txn> {
    db: &'db Database<'db>,
    access: WriteAccessor<'txn>,
}

impl<'txn, 'db: 'txn> LMDBWriteTransaction<'txn, 'db> {
    pub fn insert<K, V>(&mut self, key: &K, value: &V) -> Result<(), LMDBError>
    where
        K: AsLmdbBytes + ?Sized,
        V: serde::Serialize,
    {
        let buf = Self::convert_value(value)?;
        self.access.put(&self.db, key, &buf, put::Flags::empty())?;
        Ok(())
    }

    /// Checks whether a key exists in this database
    pub fn exists<K>(&self, key: &K) -> Result<bool, LMDBError>
    where K: AsLmdbBytes + ?Sized {
        let res: error::Result<&Ignore> = self.access.get(&self.db, key);
        let res = res.to_opt()?.is_some();
        Ok(res)
    }

    pub fn delete<K>(&mut self, key: &K) -> Result<(), LMDBError>
    where K: AsLmdbBytes + ?Sized {
        Ok(self.access.del_key(&self.db, key)?)
    }

    fn convert_value<V>(value: &V) -> Result<Vec<u8>, LMDBError>
    where V: serde::Serialize {
        let size = bincode::serialized_size(value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
        let mut buf = Vec::with_capacity(size as usize);
        bincode::serialize_into(&mut buf, value).map_err(|e| LMDBError::SerializationErr(e.to_string()))?;
        Ok(buf)
    }
}

#[cfg(test)]
mod test {
    use crate::lmdb_store::LMDBBuilder;
    use lmdb_zero::db;
    use std::env;

    #[test]
    fn test_lmdb_builder() {
        let store = LMDBBuilder::new()
            .set_path(env::temp_dir())
            .set_environment_size(500)
            .set_max_number_of_databases(10)
            .add_database("db1", db::CREATE)
            .add_database("db2", db::CREATE)
            .build()
            .unwrap();
        assert!(&store.databases.len() == &2);
    }
}