hala_leveldb/
lib.rs

1use std::{
2    borrow::Borrow,
3    ffi::{c_char, c_void, CString},
4    io,
5    marker::PhantomData,
6    mem::size_of,
7    path::Path,
8    ptr::{null_mut, slice_from_raw_parts},
9    rc::Rc,
10    sync::Arc,
11};
12
13use leveldb_sys::*;
14
15/// The leveldb key / value input type must implement this trait.
16pub trait KeyValue {
17    type Bytes: AsRef<[u8]>;
18
19    fn to_bytes(self) -> io::Result<Self::Bytes>;
20}
21
22/// The leveldb get / iterator return type must implement this trait.
23pub trait KeyValueReturn {
24    fn from_bytes(buf: &[u8]) -> io::Result<Self>
25    where
26        Self: Sized;
27}
28
29/// Convert leveldb information to rust [`io::Result`] and release error object memory.
30pub fn to_io_error<T>(error: *mut c_char) -> io::Result<T> {
31    use std::ffi::CStr;
32    use std::str::from_utf8;
33
34    let err_string = from_utf8(unsafe { CStr::from_ptr(error).to_bytes() })
35        .unwrap()
36        .to_string();
37
38    let err = Err(io::Error::new(
39        io::ErrorKind::Other,
40        format!("leveldb: {}", err_string),
41    ));
42
43    unsafe {
44        leveldb_free(error as *mut c_void);
45    }
46
47    err
48}
49
50/// Database open options.
51pub struct OpenOptions {
52    c_ops: *mut leveldb_options_t,
53}
54
55impl OpenOptions {
56    /// Create default leveldb open options.
57    pub fn new() -> Self {
58        let c_ops = unsafe {
59            let c_ops = leveldb_options_create();
60
61            leveldb_options_set_create_if_missing(c_ops, 1);
62
63            c_ops
64        };
65        Self { c_ops }
66    }
67    /// Create database if none exists yet
68    pub fn create_if_missing(&self, flag: bool) -> &Self {
69        unsafe {
70            leveldb_options_set_create_if_missing(self.c_ops, if flag { 1 } else { 0 });
71        }
72        self
73    }
74    /// Return error if database already exists.
75    pub fn error_if_exists(&self, flag: bool) -> &Self {
76        unsafe {
77            leveldb_options_set_error_if_exists(self.c_ops, if flag { 1 } else { 0 });
78        }
79
80        self
81    }
82    /// Set database write buffer size.
83    pub fn write_buffer_size(&self, size: usize) -> &Self {
84        unsafe {
85            leveldb_options_set_write_buffer_size(self.c_ops, size);
86        }
87
88        self
89    }
90
91    /// Set block size.
92    pub fn block_size(&self, size: usize) -> &Self {
93        unsafe {
94            leveldb_options_set_block_size(self.c_ops, size);
95        }
96
97        self
98    }
99
100    /// Open snappy compression.
101    pub fn compression(&self, flag: bool) -> &Self {
102        unsafe {
103            leveldb_options_set_compression(
104                self.c_ops,
105                if flag {
106                    Compression::Snappy
107                } else {
108                    Compression::No
109                },
110            );
111        }
112        self
113    }
114}
115
116impl Drop for OpenOptions {
117    fn drop(&mut self) {
118        unsafe {
119            leveldb_options_destroy(self.c_ops);
120        }
121    }
122}
123
124struct SnapshotInner<'a> {
125    c_db: *mut leveldb_t,
126    c_snapshot: *mut leveldb_snapshot_t,
127    _marked: PhantomData<&'a ()>,
128}
129
130impl<'a> Drop for SnapshotInner<'a> {
131    fn drop(&mut self) {
132        unsafe { leveldb_release_snapshot(self.c_db, self.c_snapshot) }
133    }
134}
135
136/// Leveldb snapshot object.
137#[derive(Clone)]
138pub struct Snapshot<'a> {
139    _c_snapshot: Arc<SnapshotInner<'a>>,
140}
141
142impl<'a> Snapshot<'a> {
143    fn new(c_db: *mut leveldb_t) -> Self {
144        let c_snapshot = unsafe { leveldb_create_snapshot(c_db) };
145
146        Self {
147            _c_snapshot: Arc::new(SnapshotInner {
148                c_db,
149                c_snapshot,
150                _marked: PhantomData,
151            }),
152        }
153    }
154}
155
156/// Leveldb read options.
157pub struct ReadOps<'a> {
158    c_ops: *mut leveldb_readoptions_t,
159    _snapshot: Option<Snapshot<'a>>,
160}
161
162impl ReadOps<'static> {
163    /// Create new read options with default values.
164    pub fn new() -> Self {
165        let c_ops = unsafe { leveldb_readoptions_create() };
166
167        Self {
168            c_ops,
169            _snapshot: None,
170        }
171    }
172}
173
174impl<'a> ReadOps<'a> {
175    /// Create new read options with default values.
176    pub fn new_with<S>(snapshot: S) -> Self
177    where
178        S: Borrow<Snapshot<'a>>,
179    {
180        let c_ops = unsafe { leveldb_readoptions_create() };
181
182        Self {
183            c_ops,
184            _snapshot: Some(snapshot.borrow().clone()),
185        }
186    }
187}
188
189impl<'a> Drop for ReadOps<'a> {
190    fn drop(&mut self) {
191        unsafe {
192            leveldb_readoptions_destroy(self.c_ops);
193        }
194    }
195}
196
197/// Leveldb write options.
198pub struct WriteOps {
199    c_ops: *mut leveldb_writeoptions_t,
200}
201
202impl WriteOps {
203    /// Create new write options with default values.
204    pub fn new() -> Self {
205        let c_ops = unsafe { leveldb_writeoptions_create() };
206
207        Self { c_ops }
208    }
209
210    /// Open sync write flag.
211    pub fn sync(&self, flag: bool) -> &Self {
212        unsafe {
213            leveldb_writeoptions_set_sync(self.c_ops, if flag { 1 } else { 0 });
214        }
215
216        self
217    }
218}
219
220impl Drop for WriteOps {
221    fn drop(&mut self) {
222        unsafe {
223            leveldb_writeoptions_destroy(self.c_ops);
224        }
225    }
226}
227
228pub struct WriteBatch<'a> {
229    database: &'a Database,
230    c_write_batch: *mut leveldb_writebatch_t,
231}
232
233impl<'a> WriteBatch<'a> {
234    fn new(database: &'a Database) -> Self {
235        let c_write_batch = unsafe { leveldb_writebatch_create() };
236
237        Self {
238            database,
239            c_write_batch,
240        }
241    }
242
243    /// Put new object into leveldb.
244    pub fn put<K, V>(&self, key: K, value: V) -> io::Result<()>
245    where
246        K: KeyValue,
247        V: KeyValue,
248    {
249        let key = key.to_bytes()?;
250        let value = value.to_bytes()?;
251
252        let key = key.as_ref();
253        let value = value.as_ref();
254
255        unsafe {
256            leveldb_writebatch_put(
257                self.c_write_batch,
258                key.as_ptr() as *mut c_char,
259                key.len(),
260                value.as_ptr() as *mut c_char,
261                value.len(),
262            );
263        }
264
265        Ok(())
266    }
267
268    /// Delete one row date from leveldb by provided key.
269    pub fn delete<K: KeyValue>(&self, key: K) -> io::Result<()> {
270        let key = key.to_bytes()?;
271        let key = key.as_ref();
272
273        unsafe {
274            leveldb_writebatch_delete(self.c_write_batch, key.as_ptr() as *mut c_char, key.len());
275        }
276
277        Ok(())
278    }
279
280    /// Commmit batch write to leveldb.
281    pub fn commit(self, ops: Option<WriteOps>) -> io::Result<()> {
282        let ops = ops.unwrap_or_else(|| WriteOps::new());
283
284        let mut error = null_mut();
285
286        unsafe {
287            leveldb_write(
288                self.database.c_db,
289                ops.c_ops,
290                self.c_write_batch,
291                &mut error,
292            )
293        };
294
295        if error != null_mut() {
296            to_io_error(error)
297        } else {
298            Ok(())
299        }
300    }
301}
302
303impl<'a> Drop for WriteBatch<'a> {
304    fn drop(&mut self) {
305        unsafe {
306            leveldb_writebatch_destroy(self.c_write_batch);
307        }
308    }
309}
310
311struct RawIterator<'a> {
312    c_iter: *mut leveldb_iterator_t,
313    _database: &'a Database,
314}
315
316impl<'a> RawIterator<'a> {
317    fn new(database: &'a Database, ops: Option<ReadOps<'a>>) -> Self {
318        let ops = ops.unwrap_or_else(|| ReadOps::new());
319
320        let c_iter = unsafe { leveldb_create_iterator(database.c_db, ops.c_ops) };
321
322        Self {
323            c_iter,
324            _database: database,
325        }
326    }
327}
328
329impl<'a> Drop for RawIterator<'a> {
330    fn drop(&mut self) {
331        unsafe {
332            leveldb_iter_destroy(self.c_iter);
333        }
334    }
335}
336
337/// Key / Value item.
338pub struct Item<'a> {
339    raw: Rc<RawIterator<'a>>,
340}
341
342impl<'a> Item<'a> {
343    pub fn key<K>(&self) -> io::Result<K>
344    where
345        K: KeyValueReturn,
346    {
347        unsafe {
348            let mut length: usize = 0;
349            let raw = leveldb_iter_key(self.raw.c_iter, &mut length);
350
351            let buf = &*slice_from_raw_parts(raw as *const u8, length);
352
353            let r = K::from_bytes(buf);
354
355            // leveldb_free(raw as *mut c_void);
356
357            r
358        }
359    }
360
361    pub fn value<V>(&self) -> io::Result<V>
362    where
363        V: KeyValueReturn,
364    {
365        unsafe {
366            let mut length: usize = 0;
367            let raw = leveldb_iter_value(self.raw.c_iter, &mut length);
368
369            let buf = &*slice_from_raw_parts(raw as *const u8, length);
370
371            let v = V::from_bytes(buf);
372
373            // leveldb_free(raw as *mut c_void);
374
375            v
376        }
377    }
378}
379
380/// Leveldb iterator for kv.
381pub struct KeyValueIterator<'a> {
382    raw: Rc<RawIterator<'a>>,
383    seek: bool,
384}
385
386impl<'a> KeyValueIterator<'a> {
387    fn new(database: &'a Database, ops: Option<ReadOps<'a>>) -> Self {
388        let this = Self {
389            raw: Rc::new(RawIterator::new(database, ops)),
390            seek: false,
391        };
392
393        this
394    }
395
396    pub fn seek<K>(mut self, k: K) -> io::Result<Self>
397    where
398        K: KeyValue,
399    {
400        let k = k.to_bytes()?;
401
402        let k = k.as_ref();
403
404        unsafe { leveldb_iter_seek(self.raw.c_iter, k.as_ptr() as *mut c_char, k.len()) }
405
406        self.seek = true;
407
408        Ok(self)
409    }
410}
411
412impl<'a> Iterator for KeyValueIterator<'a> {
413    type Item = Item<'a>;
414
415    #[inline]
416    fn next(&mut self) -> Option<Self::Item> {
417        unsafe {
418            if leveldb_iter_valid(self.raw.c_iter) == 0 {
419                leveldb_iter_seek_to_first(self.raw.c_iter);
420            } else if self.seek {
421                self.seek = false;
422            } else {
423                leveldb_iter_next(self.raw.c_iter);
424            }
425
426            if leveldb_iter_valid(self.raw.c_iter) == 0 {
427                return None;
428            }
429        };
430
431        Some(Item {
432            raw: self.raw.clone(),
433        })
434    }
435}
436
437/// Leveldb database main entery
438#[must_use = "Must be bound to a value or leveldb will shut down immediately!"]
439pub struct Database {
440    c_db: *mut leveldb_t,
441}
442
443unsafe impl Send for Database {}
444unsafe impl Sync for Database {}
445
446impl Database {
447    /// Open database with optional [`OpenOptions`]
448    pub fn open<P: AsRef<Path>>(path: P, ops: Option<OpenOptions>) -> io::Result<Self> {
449        let ops = ops.unwrap_or_else(|| {
450            let ops = OpenOptions::new();
451
452            ops.create_if_missing(true);
453
454            ops
455        });
456
457        let c_string = CString::new(path.as_ref().to_str().unwrap().as_bytes())?;
458
459        let mut error = null_mut();
460
461        let c_db = unsafe {
462            let db = leveldb_open(
463                ops.c_ops,
464                c_string.as_bytes_with_nul().as_ptr() as *const c_char,
465                &mut error,
466            );
467
468            if error != null_mut() {
469                to_io_error(error)?;
470            }
471
472            db
473        };
474
475        Ok(Self { c_db })
476    }
477    /// Create a snapshot of this leveldb instance.
478    pub fn snapshot(&self) -> Snapshot<'_> {
479        Snapshot::new(self.c_db)
480    }
481
482    /// Put new object into leveldb.
483    pub fn put<K, V>(&self, key: K, value: V, ops: Option<WriteOps>) -> io::Result<()>
484    where
485        K: KeyValue,
486        V: KeyValue,
487    {
488        let ops = ops.unwrap_or_else(|| WriteOps::new());
489
490        let key = key.to_bytes()?;
491        let value = value.to_bytes()?;
492
493        let key = key.as_ref();
494        let value = value.as_ref();
495
496        let mut error = null_mut();
497
498        unsafe {
499            leveldb_put(
500                self.c_db,
501                ops.c_ops,
502                key.as_ptr() as *mut c_char,
503                key.len(),
504                value.as_ptr() as *mut c_char,
505                value.len(),
506                &mut error,
507            );
508        }
509
510        if error != null_mut() {
511            to_io_error(error)
512        } else {
513            Ok(())
514        }
515    }
516
517    /// Read data from leveldb with provided key.
518    pub fn get<'a, K, V>(&self, key: K, ops: Option<ReadOps<'a>>) -> io::Result<V>
519    where
520        K: KeyValue,
521        V: KeyValueReturn,
522    {
523        let ops = ops.unwrap_or_else(|| ReadOps::new());
524
525        let key = key.to_bytes()?;
526
527        let key = key.as_ref();
528
529        let mut error = null_mut();
530        let mut length: usize = 0;
531
532        let raw = unsafe {
533            leveldb_get(
534                self.c_db,
535                ops.c_ops,
536                key.as_ptr() as *mut c_char,
537                key.len(),
538                &mut length,
539                &mut error,
540            )
541        };
542
543        if error != null_mut() {
544            return to_io_error(error);
545        }
546
547        let buf = unsafe { &*slice_from_raw_parts(raw as *const u8, length) };
548
549        let r = V::from_bytes(buf);
550
551        //release buf.
552        unsafe { leveldb_free(raw as *mut c_void) };
553
554        r
555    }
556
557    /// Delete one row date from leveldb by provided key.
558    pub fn delete<K: KeyValue>(&self, key: K, ops: Option<WriteOps>) -> io::Result<bool> {
559        let ops = ops.unwrap_or_else(|| WriteOps::new());
560
561        let key = key.to_bytes()?;
562        let key = key.as_ref();
563
564        let mut error = null_mut();
565
566        let read_ops = ReadOps::new();
567        let mut read_len: usize = 0;
568
569        unsafe {
570            let raw = leveldb_get(
571                self.c_db,
572                read_ops.c_ops,
573                key.as_ptr() as *mut c_char,
574                key.len(),
575                &mut read_len,
576                &mut error,
577            );
578
579            if error != null_mut() {
580                return to_io_error(error);
581            }
582
583            if raw != null_mut() && read_len > 0 {
584                leveldb_free(raw as *mut c_void);
585
586                leveldb_delete(
587                    self.c_db,
588                    ops.c_ops,
589                    key.as_ptr() as *mut c_char,
590                    key.len(),
591                    &mut error,
592                );
593
594                if error != null_mut() {
595                    return to_io_error(error);
596                } else {
597                    return Ok(true);
598                }
599            } else {
600                return Ok(false);
601            }
602        }
603    }
604    /// Create a batch write session
605    pub fn write(&self) -> WriteBatch<'_> {
606        WriteBatch::new(self)
607    }
608
609    /// Create kv iterator.
610    pub fn iter<'a>(&'a self, ops: Option<ReadOps<'a>>) -> KeyValueIterator<'a> {
611        KeyValueIterator::new(self, ops)
612    }
613}
614
615impl Drop for Database {
616    fn drop(&mut self) {
617        unsafe {
618            leveldb_close(self.c_db);
619        }
620    }
621}
622
623impl<'a> KeyValue for &'a str {
624    type Bytes = &'a str;
625
626    fn to_bytes(self) -> io::Result<Self::Bytes> {
627        Ok(self)
628    }
629}
630
631impl KeyValue for String {
632    type Bytes = String;
633
634    fn to_bytes(self) -> io::Result<Self::Bytes> {
635        Ok(self)
636    }
637}
638
639impl KeyValueReturn for String {
640    fn from_bytes(buf: &[u8]) -> io::Result<Self>
641    where
642        Self: Sized,
643    {
644        String::from_utf8(buf.to_vec())
645            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
646    }
647}
648
649impl<'a> KeyValue for &'a [u8] {
650    type Bytes = &'a [u8];
651    fn to_bytes(self) -> io::Result<Self::Bytes> {
652        Ok(self)
653    }
654}
655
656impl KeyValue for Vec<u8> {
657    type Bytes = Self;
658
659    fn to_bytes(self) -> io::Result<Self::Bytes> {
660        Ok(self)
661    }
662}
663
664impl KeyValueReturn for Vec<u8> {
665    fn from_bytes(buf: &[u8]) -> io::Result<Self>
666    where
667        Self: Sized,
668    {
669        Ok(buf.to_vec())
670    }
671}
672
673macro_rules! num_def {
674    ($t: tt) => {
675        impl KeyValue for $t {
676            type Bytes = [u8; size_of::<$t>()];
677            fn to_bytes(self) -> io::Result<Self::Bytes> {
678                Ok(self.to_ne_bytes())
679            }
680        }
681
682        impl KeyValueReturn for $t {
683            fn from_bytes(buf: &[u8]) -> io::Result<Self>
684            where
685                Self: Sized,
686            {
687                if buf.len() != size_of::<$t>() {
688                    return Err(io::Error::new(
689                        io::ErrorKind::InvalidInput,
690                        format!("num data len mismatch. {}", size_of::<$t>()),
691                    ));
692                }
693
694                let mut array = [0; size_of::<$t>()];
695
696                array.copy_from_slice(buf);
697
698                Ok($t::from_ne_bytes(array))
699            }
700        }
701    };
702}
703
704num_def!(i8);
705num_def!(i16);
706num_def!(i32);
707num_def!(i64);
708num_def!(i128);
709num_def!(isize);
710
711num_def!(u8);
712num_def!(u16);
713num_def!(u32);
714num_def!(u64);
715num_def!(u128);
716num_def!(usize);