yrs_lmdb/
lib.rs

1//! **yrs-lmdb** is a persistence layer allowing to store [Yrs](https://docs.rs/yrs/latest/yrs/index.html)
2//! documents and providing convenient utility functions to work with them, using LMDB for persistent
3//! backend.
4//!
5//! # Example
6//!
7//! ```rust
8//! use std::sync::Arc;
9//! use lmdb_rs::core::DbCreate;
10//! use lmdb_rs::Environment;
11//! use yrs::{Doc, Text, Transact};
12//! use yrs_kvstore::DocOps;
13//! use yrs_lmdb::LmdbStore;
14//!
15//! let env = Arc::new(Environment::new()
16//!     .autocreate_dir(true)
17//!     .max_dbs(4)
18//!     .open("my-lmdb-dir", 0o777)
19//!     .unwrap());
20//! let h = Arc::new(env.create_db("yrs", DbCreate).unwrap());
21//!
22//! let doc = Doc::new();
23//! let text = doc.get_or_insert_text("text");
24//!
25//! // restore document state from DB
26//! {
27//!   let db_txn = env.get_reader().unwrap();
28//!   let db = LmdbStore::from(db_txn.bind(&h));
29//!   db.load_doc("my-doc-name", &mut doc.transact_mut()).unwrap();
30//! }
31//!
32//! // another options is to flush document state right away, but
33//! // this requires a read-write transaction
34//! {
35//!   let db_txn = env.new_transaction().unwrap();
36//!   let db = LmdbStore::from(db_txn.bind(&h));
37//!   let doc = db.flush_doc_with("my-doc-name", yrs::Options::default()).unwrap();
38//!   db_txn.commit().unwrap(); // flush may modify internal store state
39//! }
40//!
41//! // configure document to persist every update and
42//! // occassionaly compact them into document state
43//! let sub = {
44//!   let env = env.clone();
45//!   let h = h.clone();
46//!   let options = doc.options().clone();
47//!   doc.observe_update_v1(move |_,e| {
48//!       let db_txn = env.new_transaction().unwrap();
49//!       let db = LmdbStore::from(db_txn.bind(&h));
50//!       let seq_nr = db.push_update("my-doc-name", &e.update).unwrap();
51//!       if seq_nr % 64 == 0 {
52//!           // occassinally merge updates into the document state
53//!           db.flush_doc_with("my-doc-name", options.clone()).unwrap();
54//!       }
55//!       db_txn.commit().unwrap();
56//!   })
57//! };
58//!
59//! text.insert(&mut doc.transact_mut(), 0, "a");
60//! text.insert(&mut doc.transact_mut(), 1, "b");
61//! text.insert(&mut doc.transact_mut(), 2, "c");
62//! ```
63
64use lmdb_rs::core::{CursorIterator, MdbResult};
65use lmdb_rs::{CursorKeyRangeIter, Database, MdbError, ReadonlyTransaction};
66use std::ops::Deref;
67
68pub use yrs_kvstore as store;
69use yrs_kvstore::error::Error;
70use yrs_kvstore::keys::Key;
71use yrs_kvstore::{DocOps, KVEntry, KVStore};
72
73trait OptionalNotFound {
74    type Return;
75    type Error;
76
77    fn optional(self) -> Result<Option<Self::Return>, Self::Error>;
78}
79
80impl<T> OptionalNotFound for MdbResult<T> {
81    type Return = T;
82    type Error = MdbError;
83
84    /// Changes [MdbError::NotFound] onto [None] case.
85    fn optional(self) -> Result<Option<Self::Return>, Self::Error> {
86        match self {
87            Ok(value) => Ok(Some(value)),
88            Err(MdbError::NotFound) => Ok(None),
89            Err(err) => Err(err),
90        }
91    }
92}
93
94/// Type wrapper around LMDB's [Database] struct. Used to extend LMDB transactions with [DocOps]
95/// methods used for convenience when working with Yrs documents.
96#[repr(transparent)]
97#[derive(Debug)]
98pub struct LmdbStore<'db>(Database<'db>);
99
100impl<'db> From<Database<'db>> for LmdbStore<'db> {
101    #[inline(always)]
102    fn from(db: Database<'db>) -> Self {
103        LmdbStore(db)
104    }
105}
106
107impl<'db> Into<Database<'db>> for LmdbStore<'db> {
108    #[inline(always)]
109    fn into(self) -> Database<'db> {
110        self.0
111    }
112}
113
114impl<'db> Deref for LmdbStore<'db> {
115    type Target = Database<'db>;
116
117    fn deref(&self) -> &Self::Target {
118        &self.0
119    }
120}
121
122impl<'db> DocOps<'db> for LmdbStore<'db> {}
123
124impl<'db> KVStore<'db> for LmdbStore<'db> {
125    type Error = MdbError;
126    type Cursor = LmdbRange<'db>;
127    type Entry = LmdbEntry<'db>;
128    type Return = &'db [u8];
129
130    fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error> {
131        let value = self.0.get(&key).optional()?;
132        Ok(value)
133    }
134
135    fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
136        self.0.set(&key, &value)?;
137        Ok(())
138    }
139
140    fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
141        let prev: Option<&[u8]> = self.0.get(&key).optional()?;
142        if prev.is_some() {
143            self.0.del(&key)?;
144        }
145        Ok(())
146    }
147
148    fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error> {
149        let mut c = self.0.new_cursor()?;
150        if c.to_gte_key(&from).optional()?.is_some() {
151            while c.get_key::<&[u8]>()? <= to {
152                c.del()?;
153                if c.to_next_key().optional()?.is_none() {
154                    break;
155                }
156            }
157        }
158        Ok(())
159    }
160
161    fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error> {
162        let from = from.to_vec();
163        let to = to.to_vec();
164        let cursor = unsafe { std::mem::transmute(self.0.keyrange(&from, &to)?) };
165        Ok(LmdbRange { from, to, cursor })
166    }
167
168    fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error> {
169        let mut cursor = self.0.new_cursor()?;
170        cursor.to_gte_key(&key).optional()?;
171        if cursor.to_prev_key().optional()?.is_none() {
172            return Ok(None);
173        }
174        let key = cursor.get_key()?;
175        let value = cursor.get_value()?;
176        Ok(Some(LmdbEntry::new(key, value)))
177    }
178}
179
180pub struct LmdbRange<'a> {
181    from: Vec<u8>,
182    to: Vec<u8>,
183    cursor: CursorIterator<'a, CursorKeyRangeIter<'a>>,
184}
185
186impl<'a> Iterator for LmdbRange<'a> {
187    type Item = LmdbEntry<'a>;
188
189    fn next(&mut self) -> Option<Self::Item> {
190        let (key, value) = self.cursor.next()?.get();
191        Some(LmdbEntry::new(key, value))
192    }
193}
194
195pub struct LmdbEntry<'a> {
196    key: &'a [u8],
197    value: &'a [u8],
198}
199
200impl<'a> LmdbEntry<'a> {
201    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
202        LmdbEntry { key, value }
203    }
204}
205
206impl<'a> KVEntry for LmdbEntry<'a> {
207    fn key(&self) -> &[u8] {
208        self.key
209    }
210    fn value(&self) -> &[u8] {
211        self.value
212    }
213}
214
215pub(crate) struct OwnedCursorRange<'a> {
216    txn: ReadonlyTransaction<'a>,
217    db: Database<'a>,
218    cursor: CursorIterator<'a, CursorKeyRangeIter<'a>>,
219    start: Vec<u8>,
220    end: Vec<u8>,
221}
222
223impl<'a> OwnedCursorRange<'a> {
224    pub(crate) fn new<const N: usize>(
225        txn: ReadonlyTransaction<'a>,
226        db: Database<'a>,
227        start: Key<N>,
228        end: Key<N>,
229    ) -> Result<Self, Error> {
230        let start = start.into();
231        let end = end.into();
232        let cursor = unsafe { std::mem::transmute(db.keyrange(&start, &end)?) };
233
234        Ok(OwnedCursorRange {
235            txn,
236            db,
237            cursor,
238            start,
239            end,
240        })
241    }
242
243    pub(crate) fn db(&self) -> &Database {
244        &self.db
245    }
246}
247
248impl<'a> Iterator for OwnedCursorRange<'a> {
249    type Item = (&'a [u8], &'a [u8]);
250
251    fn next(&mut self) -> Option<Self::Item> {
252        let v = self.cursor.next()?;
253        Some(v.get())
254    }
255}
256
257#[cfg(test)]
258mod test {
259    use crate::{DocOps, LmdbStore};
260    use lmdb_rs::core::DbCreate;
261    use lmdb_rs::Environment;
262    use std::path::Path;
263    use std::sync::Arc;
264    use tempdir::TempDir;
265    use yrs::{Doc, GetString, ReadTxn, Text, Transact};
266
267    fn init_env<P: AsRef<Path>>(dir: P) -> Environment {
268        let env = Environment::new()
269            .autocreate_dir(true)
270            .max_dbs(4)
271            .open(dir, 0o777)
272            .unwrap();
273        env
274    }
275
276    #[test]
277    fn create_get_remove() {
278        let dir = TempDir::new("lmdb-create_get_remove").unwrap();
279        let env = init_env(&dir);
280        let h = env.create_db("yrs", DbCreate).unwrap();
281
282        // insert document
283        {
284            let doc = Doc::new();
285            let text = doc.get_or_insert_text("text");
286            let mut txn = doc.transact_mut();
287            text.insert(&mut txn, 0, "hello");
288
289            let db_txn = env.new_transaction().unwrap();
290            let db = LmdbStore::from(db_txn.bind(&h));
291            db.insert_doc("doc", &txn).unwrap();
292            db_txn.commit().unwrap();
293        }
294
295        // retrieve document
296        {
297            let doc = Doc::new();
298            let text = doc.get_or_insert_text("text");
299            let mut txn = doc.transact_mut();
300            let db_txn = env.get_reader().unwrap();
301            let db = LmdbStore::from(db_txn.bind(&h));
302            db.load_doc("doc", &mut txn).unwrap();
303
304            assert_eq!(text.get_string(&txn), "hello");
305
306            let (sv, completed) = db.get_state_vector("doc").unwrap();
307            assert_eq!(sv, Some(txn.state_vector()));
308            assert!(completed);
309        }
310
311        // remove document
312        {
313            let db_txn = env.new_transaction().unwrap();
314            let db = LmdbStore::from(db_txn.bind(&h));
315
316            db.clear_doc("doc").unwrap();
317
318            let doc = Doc::new();
319            let text = doc.get_or_insert_text("text");
320            let mut txn = doc.transact_mut();
321            db.load_doc("doc", &mut txn).unwrap();
322
323            assert_eq!(text.get_string(&txn), "");
324
325            let (sv, completed) = db.get_state_vector("doc").unwrap();
326            assert!(sv.is_none());
327            assert!(completed);
328        }
329    }
330    #[test]
331    fn multi_insert() {
332        let dir = TempDir::new("lmdb-multi_insert").unwrap();
333        let env = init_env(&dir);
334        let h = env.create_db("yrs", DbCreate).unwrap();
335
336        // insert document twice
337        {
338            let doc = Doc::new();
339            let text = doc.get_or_insert_text("text");
340            let mut txn = doc.transact_mut();
341            text.push(&mut txn, "hello");
342
343            let db_txn = env.new_transaction().unwrap();
344            let db = LmdbStore::from(db_txn.bind(&h));
345
346            db.insert_doc("doc", &txn).unwrap();
347
348            text.push(&mut txn, " world");
349
350            db.insert_doc("doc", &txn).unwrap();
351
352            db_txn.commit().unwrap();
353        }
354
355        // retrieve document
356        {
357            let db_txn = env.get_reader().unwrap();
358            let db = LmdbStore::from(db_txn.bind(&h));
359
360            let doc = Doc::new();
361            let text = doc.get_or_insert_text("text");
362            let mut txn = doc.transact_mut();
363            db.load_doc("doc", &mut txn).unwrap();
364
365            assert_eq!(text.get_string(&txn), "hello world");
366        }
367    }
368
369    #[test]
370    fn incremental_updates() {
371        const DOC_NAME: &str = "doc";
372        let dir = TempDir::new("lmdb-incremental_updates").unwrap();
373        let env = init_env(&dir);
374        let h = env.create_db("yrs", DbCreate).unwrap();
375        let env = Arc::new(env);
376        let h = Arc::new(h);
377
378        // store document updates
379        {
380            let doc = Doc::new();
381            let text = doc.get_or_insert_text("text");
382
383            let env = env.clone();
384            let h = h.clone();
385            let _sub = doc.observe_update_v1(move |_, u| {
386                let db_txn = env.new_transaction().unwrap();
387                let db = LmdbStore::from(db_txn.bind(&h));
388                db.push_update(DOC_NAME, &u.update).unwrap();
389                db_txn.commit().unwrap();
390            });
391            // generate 3 updates
392            text.push(&mut doc.transact_mut(), "a");
393            text.push(&mut doc.transact_mut(), "b");
394            text.push(&mut doc.transact_mut(), "c");
395        }
396
397        // load document
398        {
399            let doc = Doc::new();
400            let text = doc.get_or_insert_text("text");
401            let mut txn = doc.transact_mut();
402
403            let db_txn = env.get_reader().unwrap();
404            let db = LmdbStore::from(db_txn.bind(&h));
405            db.load_doc(DOC_NAME, &mut txn).unwrap();
406
407            assert_eq!(text.get_string(&txn), "abc");
408        }
409
410        // flush document
411        {
412            let db_txn = env.new_transaction().unwrap();
413            let db = LmdbStore::from(db_txn.bind(&h));
414            let doc = db.flush_doc(DOC_NAME).unwrap().unwrap();
415            db_txn.commit().unwrap();
416
417            let text = doc.get_or_insert_text("text");
418
419            assert_eq!(text.get_string(&doc.transact()), "abc");
420        }
421    }
422
423    #[test]
424    fn state_vector_updates_only() {
425        const DOC_NAME: &str = "doc";
426        let dir = TempDir::new("lmdb-state_vector_updates_only").unwrap();
427        let env = init_env(&dir);
428        let h = env.create_db("yrs", DbCreate).unwrap();
429        let env = Arc::new(env);
430        let h = Arc::new(h);
431
432        // store document updates
433        {
434            let doc = Doc::new();
435            let text = doc.get_or_insert_text("text");
436            let env = env.clone();
437            let h = h.clone();
438            let _sub = doc.observe_update_v1(move |_, u| {
439                let db_txn = env.new_transaction().unwrap();
440                let db = LmdbStore::from(db_txn.bind(&h));
441                db.push_update(DOC_NAME, &u.update).unwrap();
442                db_txn.commit().unwrap();
443            });
444            // generate 3 updates
445            text.push(&mut doc.transact_mut(), "a");
446            text.push(&mut doc.transact_mut(), "b");
447            text.push(&mut doc.transact_mut(), "c");
448
449            let sv = doc.transact().state_vector();
450            sv
451        };
452
453        let db_txn = env.get_reader().unwrap();
454        let db = LmdbStore::from(db_txn.bind(&h));
455        let (sv, completed) = db.get_state_vector(DOC_NAME).unwrap();
456        assert!(sv.is_none());
457        assert!(!completed); // since it's not completed, we should recalculate state vector from doc state
458    }
459
460    #[test]
461    fn state_diff_from_updates() {
462        const DOC_NAME: &str = "doc";
463        let dir = TempDir::new("lmdb-state_diff_from_updates").unwrap();
464        let env = init_env(&dir);
465        let h = env.create_db("yrs", DbCreate).unwrap();
466        let env = Arc::new(env);
467        let h = Arc::new(h);
468
469        let (sv, expected) = {
470            let doc = Doc::new();
471            let text = doc.get_or_insert_text("text");
472
473            let env = env.clone();
474            let h = h.clone();
475            let _sub = doc.observe_update_v1(move |_, u| {
476                let db_txn = env.new_transaction().unwrap();
477                let db = LmdbStore::from(db_txn.bind(&h));
478                db.push_update(DOC_NAME, &u.update).unwrap();
479                db_txn.commit().unwrap();
480            });
481
482            // generate 3 updates
483            text.push(&mut doc.transact_mut(), "a");
484            text.push(&mut doc.transact_mut(), "b");
485            let sv = doc.transact().state_vector();
486            text.push(&mut doc.transact_mut(), "c");
487            let update = doc.transact().encode_diff_v1(&sv);
488            (sv, update)
489        };
490
491        let db_txn = env.get_reader().unwrap();
492        let db = LmdbStore::from(db_txn.bind(&h));
493        let actual = db.get_diff(DOC_NAME, &sv).unwrap();
494        assert_eq!(actual, Some(expected));
495    }
496
497    #[test]
498    fn state_diff_from_doc() {
499        const DOC_NAME: &str = "doc";
500        let dir = TempDir::new("lmdb-state_diff_from_doc").unwrap();
501        let env = init_env(&dir);
502        let h = env.create_db("yrs", DbCreate).unwrap();
503
504        let (sv, expected) = {
505            let doc = Doc::new();
506            let text = doc.get_or_insert_text("text");
507            // generate 3 updates
508            text.push(&mut doc.transact_mut(), "a");
509            text.push(&mut doc.transact_mut(), "b");
510            let sv = doc.transact().state_vector();
511            text.push(&mut doc.transact_mut(), "c");
512            let update = doc.transact().encode_diff_v1(&sv);
513
514            let db_txn = env.new_transaction().unwrap();
515            let db = LmdbStore::from(db_txn.bind(&h));
516            db.insert_doc(DOC_NAME, &doc.transact()).unwrap();
517            db_txn.commit().unwrap();
518
519            (sv, update)
520        };
521
522        let db_txn = env.get_reader().unwrap();
523        let db = LmdbStore::from(db_txn.bind(&h));
524        let actual = db.get_diff(DOC_NAME, &sv).unwrap();
525        assert_eq!(actual, Some(expected));
526    }
527
528    #[test]
529    fn doc_meta() {
530        const DOC_NAME: &str = "doc";
531        let dir = TempDir::new("lmdb-doc_meta").unwrap();
532        let env = init_env(&dir);
533        let h = env.create_db("yrs", DbCreate).unwrap();
534
535        let db_txn = env.new_transaction().unwrap();
536        let db = LmdbStore::from(db_txn.bind(&h));
537        let value = db.get_meta(DOC_NAME, "key").unwrap();
538        assert!(value.is_none());
539        db.insert_meta(DOC_NAME, "key", "value1".as_bytes())
540            .unwrap();
541        db_txn.commit().unwrap();
542
543        let db_txn = env.new_transaction().unwrap();
544        let db = LmdbStore::from(db_txn.bind(&h));
545        let prev = db.get_meta(DOC_NAME, "key").unwrap().map(Vec::from);
546        db.insert_meta(DOC_NAME, "key", "value2".as_bytes())
547            .unwrap();
548        db_txn.commit().unwrap();
549        assert_eq!(prev.as_deref(), Some("value1".as_bytes()));
550
551        let db_txn = env.new_transaction().unwrap();
552        let db = LmdbStore::from(db_txn.bind(&h));
553        let prev = db.get_meta(DOC_NAME, "key").unwrap().map(Vec::from);
554        db.remove_meta(DOC_NAME, "key").unwrap();
555        assert_eq!(prev.as_deref(), Some("value2".as_bytes()));
556        let value = db.get_meta(DOC_NAME, "key").unwrap();
557        assert!(value.is_none());
558    }
559
560    #[test]
561    fn doc_meta_iter() {
562        let dir = TempDir::new("lmdb-doc_meta_iter").unwrap();
563        let env = init_env(&dir);
564        let h = env.create_db("yrs", DbCreate).unwrap();
565        let db_txn = env.new_transaction().unwrap();
566        let db = LmdbStore::from(db_txn.bind(&h));
567
568        db.insert_meta("A", "key1", [1].as_ref()).unwrap();
569        db.insert_meta("B", "key2", [2].as_ref()).unwrap();
570        db.insert_meta("B", "key3", [3].as_ref()).unwrap();
571        db.insert_meta("C", "key4", [4].as_ref()).unwrap();
572
573        let mut i = db.iter_meta("B").unwrap();
574        assert_eq!(i.next(), Some(("key2".as_bytes().into(), [2].into())));
575        assert_eq!(i.next(), Some(("key3".as_bytes().into(), [3].into())));
576        assert!(i.next().is_none());
577    }
578
579    #[test]
580    fn doc_iter() {
581        let dir = TempDir::new("lmdb-doc_iter").unwrap();
582        let env = init_env(&dir);
583        let h = env.create_db("yrs", DbCreate).unwrap();
584        let env = Arc::new(env);
585        let h = Arc::new(h);
586
587        // insert metadata
588        {
589            let db_txn = env.new_transaction().unwrap();
590            let db = LmdbStore::from(db_txn.bind(&h));
591            db.insert_meta("A", "key1", [1].as_ref()).unwrap();
592            db_txn.commit().unwrap();
593        }
594
595        // insert full doc state
596        {
597            let doc = Doc::new();
598            let text = doc.get_or_insert_text("text");
599            let mut txn = doc.transact_mut();
600            text.push(&mut txn, "hello world");
601            let db_txn = env.new_transaction().unwrap();
602            let db = LmdbStore::from(db_txn.bind(&h));
603            db.insert_doc("B", &txn).unwrap();
604            db_txn.commit().unwrap();
605        }
606
607        // insert update
608        {
609            let doc = Doc::new();
610            let env = env.clone();
611            let h = h.clone();
612            let _sub = doc.observe_update_v1(move |_, u| {
613                let db_txn = env.new_transaction().unwrap();
614                let db = LmdbStore::from(db_txn.bind(&h));
615                db.push_update("C", &u.update).unwrap();
616                db_txn.commit().unwrap();
617            });
618            let text = doc.get_or_insert_text("text");
619            let mut txn = doc.transact_mut();
620            text.push(&mut txn, "hello world");
621        }
622
623        {
624            let db_txn = env.get_reader().unwrap();
625            let db = LmdbStore::from(db_txn.bind(&h));
626            let mut i = db.iter_docs().unwrap();
627            assert_eq!(i.next(), Some("A".as_bytes().into()));
628            assert_eq!(i.next(), Some("B".as_bytes().into()));
629            assert_eq!(i.next(), Some("C".as_bytes().into()));
630            assert!(i.next().is_none());
631        }
632
633        // clear doc
634        {
635            let db_txn = env.new_transaction().unwrap();
636            let db = LmdbStore::from(db_txn.bind(&h));
637            db.clear_doc("B").unwrap();
638            db_txn.commit().unwrap();
639        }
640
641        {
642            let db_txn = env.get_reader().unwrap();
643            let db = LmdbStore::from(db_txn.bind(&h));
644            let mut i = db.iter_docs().unwrap();
645            assert_eq!(i.next(), Some("A".as_bytes().into()));
646            assert_eq!(i.next(), Some("C".as_bytes().into()));
647            assert!(i.next().is_none());
648        }
649    }
650}