yrs_rocksdb/
lib.rs

1//! **yrs-rocksdb** is a persistence layer allowing to store [Yrs](https://docs.rs/yrs/latest/yrs/index.html)
2//! documents and providing convenient utility functions to work with them, using RocksDB for persistent backed.
3//!
4//! # Example
5//!
6//! ```rust
7//! use std::sync::Arc;
8//! use rocksdb::TransactionDB;
9//! use yrs::{Doc, Text, Transact};
10//! use yrs_kvstore::DocOps;
11//! use yrs_rocksdb::RocksDBStore;
12//!
13//! let db: Arc<TransactionDB> = Arc::new(TransactionDB::open_default("my-db-path").unwrap());
14//!
15//! let doc = Doc::new();
16//! let text = doc.get_or_insert_text("text");
17//!
18//! // restore document state from DB
19//! {
20//!   let db_txn = RocksDBStore::from(db.transaction());
21//!   db_txn.load_doc("my-doc-name", &mut doc.transact_mut()).unwrap();
22//! }
23//!
24//! // another options is to flush document state right away, but
25//! // this requires a read-write transaction
26//! {
27//!   let db_txn = RocksDBStore::from(db.transaction());
28//!   let doc = db_txn.flush_doc_with("my-doc-name", yrs::Options::default()).unwrap();
29//!   db_txn.commit().unwrap(); // flush may change store state
30//! }
31//!
32//! // configure document to persist every update and
33//! // occassionaly compact them into document state
34//! let sub = {
35//!   let db = db.clone();
36//!   let options = doc.options().clone();
37//!   doc.observe_update_v1(move |_,e| {
38//!       let db_txn = RocksDBStore::from(db.transaction());
39//!       let seq_nr = db_txn.push_update("my-doc-name", &e.update).unwrap();
40//!       if seq_nr % 64 == 0 {
41//!           // occassinally merge updates into the document state
42//!           db_txn.flush_doc_with("my-doc-name", options.clone()).unwrap();
43//!       }
44//!       db_txn.commit().unwrap();
45//!   })
46//! };
47//!
48//! text.insert(&mut doc.transact_mut(), 0, "a");
49//! text.insert(&mut doc.transact_mut(), 1, "b");
50//! text.insert(&mut doc.transact_mut(), 2, "c");
51//! ```
52
53use rocksdb::{
54    DBIteratorWithThreadMode, DBPinnableSlice, Direction, IteratorMode, ReadOptions, Transaction,
55};
56use std::ops::Deref;
57use yrs_kvstore::{DocOps, KVEntry, KVStore};
58
59pub use yrs_kvstore as store;
60
61/// Type wrapper around RocksDB [Transaction] struct. Used to extend it with [DocOps]
62/// methods used for convenience when working with Yrs documents.
63#[repr(transparent)]
64pub struct RocksDBStore<'a, DB>(Transaction<'a, DB>);
65
66impl<'a, DB> RocksDBStore<'a, DB> {
67    #[inline(always)]
68    pub fn commit(self) -> Result<(), rocksdb::Error> {
69        self.0.commit()
70    }
71}
72
73impl<'a, DB> From<Transaction<'a, DB>> for RocksDBStore<'a, DB> {
74    #[inline(always)]
75    fn from(txn: Transaction<'a, DB>) -> Self {
76        RocksDBStore(txn)
77    }
78}
79
80impl<'a, DB> Into<Transaction<'a, DB>> for RocksDBStore<'a, DB> {
81    #[inline(always)]
82    fn into(self) -> Transaction<'a, DB> {
83        self.0
84    }
85}
86
87impl<'a, DB> Deref for RocksDBStore<'a, DB> {
88    type Target = Transaction<'a, DB>;
89
90    #[inline(always)]
91    fn deref(&self) -> &Self::Target {
92        &self.0
93    }
94}
95
96impl<'a, DB> DocOps<'a> for RocksDBStore<'a, DB> {}
97
98impl<'a, DB> KVStore<'a> for RocksDBStore<'a, DB> {
99    type Error = rocksdb::Error;
100    type Cursor = RocksDBIter<'a, DB>;
101    type Entry = RocksDBEntry;
102    type Return = DBPinnableSlice<'a>;
103
104    fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error> {
105        if let Some(pinned) = self.0.get_pinned(key)? {
106            Ok(Some(unsafe { std::mem::transmute(pinned) }))
107        } else {
108            Ok(None)
109        }
110    }
111
112    fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
113        self.0.put(key, value)?;
114        Ok(())
115    }
116
117    fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
118        self.0.delete(key)?;
119        Ok(())
120    }
121
122    fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error> {
123        let mut opt = ReadOptions::default();
124        opt.set_iterate_lower_bound(from);
125        opt.set_iterate_upper_bound(to);
126        let mut i = self
127            .0
128            .iterator_opt(IteratorMode::From(from, Direction::Forward), opt);
129        while let Some(res) = i.next() {
130            let (key, _) = res?;
131            self.0.delete(key)?;
132        }
133        Ok(())
134    }
135
136    fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error> {
137        let mut opt = ReadOptions::default();
138        opt.set_iterate_lower_bound(from);
139        opt.set_iterate_upper_bound(to);
140        let raw = self
141            .0
142            .iterator_opt(IteratorMode::From(from, Direction::Forward), opt);
143        Ok(RocksDBIter::new(
144            unsafe { std::mem::transmute(raw) },
145            to.to_vec(),
146        ))
147    }
148
149    fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error> {
150        let opt = ReadOptions::default();
151        let mut raw = self.0.raw_iterator_opt(opt);
152        raw.seek_for_prev(key);
153        if let Some((key, value)) = raw.item() {
154            Ok(Some(RocksDBEntry::new(key.into(), value.into())))
155        } else {
156            Ok(None)
157        }
158    }
159}
160
161pub struct RocksDBIter<'a, DB> {
162    inner: DBIteratorWithThreadMode<'a, Transaction<'a, DB>>,
163    to: Vec<u8>,
164}
165
166impl<'a, DB> RocksDBIter<'a, DB> {
167    fn new(inner: DBIteratorWithThreadMode<'a, Transaction<'a, DB>>, to: Vec<u8>) -> Self {
168        RocksDBIter { inner, to }
169    }
170}
171
172impl<'a, DB> Iterator for RocksDBIter<'a, DB> {
173    type Item = RocksDBEntry;
174
175    fn next(&mut self) -> Option<Self::Item> {
176        let n = self.inner.next()?;
177        if let Ok((key, value)) = n {
178            if key.as_ref() >= &self.to {
179                None
180            } else {
181                Some(RocksDBEntry::new(key, value))
182            }
183        } else {
184            None
185        }
186    }
187}
188
189pub struct RocksDBEntry {
190    key: Box<[u8]>,
191    value: Box<[u8]>,
192}
193
194impl RocksDBEntry {
195    fn new(key: Box<[u8]>, value: Box<[u8]>) -> Self {
196        RocksDBEntry { key, value }
197    }
198}
199
200impl Into<(Box<[u8]>, Box<[u8]>)> for RocksDBEntry {
201    fn into(self) -> (Box<[u8]>, Box<[u8]>) {
202        (self.key, self.value)
203    }
204}
205
206impl KVEntry for RocksDBEntry {
207    fn key(&self) -> &[u8] {
208        &self.key
209    }
210
211    fn value(&self) -> &[u8] {
212        &self.value
213    }
214}
215
216#[cfg(test)]
217mod test {
218    use crate::RocksDBStore;
219    use rocksdb::TransactionDB;
220    use std::path::Path;
221    use std::sync::Arc;
222    use tempdir::TempDir;
223    use yrs::{Doc, GetString, ReadTxn, Text, Transact};
224    use yrs_kvstore::DocOps;
225
226    fn init_env<P: AsRef<Path>>(dir: P) -> TransactionDB {
227        let db = TransactionDB::open_default(dir).unwrap();
228        db
229    }
230
231    #[test]
232    fn create_get_remove() {
233        let tmp = TempDir::new("rocksdb-create_get_remove").unwrap();
234        let db = init_env(&tmp);
235
236        // insert document
237        {
238            let doc = Doc::new();
239            let text = doc.get_or_insert_text("text");
240            let mut txn = doc.transact_mut();
241            text.insert(&mut txn, 0, "hello");
242
243            let db_txn = RocksDBStore::from(db.transaction());
244            db_txn.insert_doc("doc", &txn).unwrap();
245            db_txn.commit().unwrap();
246        }
247
248        // retrieve document
249        {
250            let doc = Doc::new();
251            let text = doc.get_or_insert_text("text");
252            let mut txn = doc.transact_mut();
253            let db_txn = RocksDBStore::from(db.transaction());
254            db_txn.load_doc("doc", &mut txn).unwrap();
255
256            assert_eq!(text.get_string(&txn), "hello");
257
258            let (sv, completed) = db_txn.get_state_vector("doc").unwrap();
259            assert_eq!(sv, Some(txn.state_vector()));
260            assert!(completed);
261        }
262
263        // remove document
264        {
265            let db_txn = RocksDBStore::from(db.transaction());
266
267            db_txn.clear_doc("doc").unwrap();
268
269            let doc = Doc::new();
270            let text = doc.get_or_insert_text("text");
271            let mut txn = doc.transact_mut();
272            db_txn.load_doc("doc", &mut txn).unwrap();
273
274            assert_eq!(text.get_string(&txn), "");
275
276            let (sv, completed) = db_txn.get_state_vector("doc").unwrap();
277            assert!(sv.is_none());
278            assert!(completed);
279        }
280    }
281    #[test]
282    fn multi_insert() {
283        let tmp = TempDir::new("rocksdb-multi_insert").unwrap();
284        let db = init_env(&tmp);
285
286        // insert document twice
287        {
288            let doc = Doc::new();
289            let text = doc.get_or_insert_text("text");
290            let mut txn = doc.transact_mut();
291            text.push(&mut txn, "hello");
292
293            let db_txn = RocksDBStore::from(db.transaction());
294
295            db_txn.insert_doc("doc", &txn).unwrap();
296
297            text.push(&mut txn, " world");
298
299            db_txn.insert_doc("doc", &txn).unwrap();
300            db_txn.commit().unwrap();
301        }
302
303        // retrieve document
304        {
305            let db_txn = RocksDBStore::from(db.transaction());
306
307            let doc = Doc::new();
308            let text = doc.get_or_insert_text("text");
309            let mut txn = doc.transact_mut();
310            db_txn.load_doc("doc", &mut txn).unwrap();
311
312            assert_eq!(text.get_string(&txn), "hello world");
313        }
314    }
315
316    #[test]
317    fn incremental_updates() {
318        const DOC_NAME: &str = "doc";
319        let tmp = TempDir::new("rocksdb-incremental_updates").unwrap();
320        let db = init_env(&tmp);
321        let db = Arc::new(db);
322
323        // store document updates
324        {
325            let doc = Doc::new();
326            let text = doc.get_or_insert_text("text");
327
328            let db = db.clone();
329            let _sub = doc.observe_update_v1(move |_, u| {
330                let db_txn = RocksDBStore::from(db.transaction());
331                db_txn.push_update(DOC_NAME, &u.update).unwrap();
332                db_txn.commit().unwrap();
333            });
334            // generate 3 updates
335            text.push(&mut doc.transact_mut(), "a");
336            text.push(&mut doc.transact_mut(), "b");
337            text.push(&mut doc.transact_mut(), "c");
338        }
339
340        // load document
341        {
342            let doc = Doc::new();
343            let text = doc.get_or_insert_text("text");
344            let mut txn = doc.transact_mut();
345
346            let db_txn = RocksDBStore::from(db.transaction());
347            db_txn.load_doc(DOC_NAME, &mut txn).unwrap();
348
349            assert_eq!(text.get_string(&txn), "abc");
350        }
351
352        // flush document
353        {
354            let db_txn = RocksDBStore::from(db.transaction());
355            let doc = db_txn.flush_doc(DOC_NAME).unwrap().unwrap();
356            db_txn.commit().unwrap();
357
358            let text = doc.get_or_insert_text("text");
359
360            assert_eq!(text.get_string(&doc.transact()), "abc");
361        }
362    }
363
364    #[test]
365    fn state_vector_updates_only() {
366        const DOC_NAME: &str = "doc";
367        let tmp = TempDir::new("rocksdb-state_vector_updates_only").unwrap();
368        let db = init_env(&tmp);
369        let db = Arc::new(db);
370
371        // store document updates
372        {
373            let doc = Doc::new();
374            let text = doc.get_or_insert_text("text");
375            let db = db.clone();
376            let _sub = doc.observe_update_v1(move |_, u| {
377                let db_txn = RocksDBStore::from(db.transaction());
378                db_txn.push_update(DOC_NAME, &u.update).unwrap();
379                db_txn.commit().unwrap();
380            });
381            // generate 3 updates
382            text.push(&mut doc.transact_mut(), "a");
383            text.push(&mut doc.transact_mut(), "b");
384            text.push(&mut doc.transact_mut(), "c");
385
386            let sv = doc.transact().state_vector();
387            sv
388        };
389
390        let db_txn = RocksDBStore::from(db.transaction());
391        let (sv, completed) = db_txn.get_state_vector(DOC_NAME).unwrap();
392        assert!(sv.is_none());
393        assert!(!completed); // since it's not completed, we should recalculate state vector from doc state
394    }
395
396    #[test]
397    fn state_diff_from_updates() {
398        const DOC_NAME: &str = "doc";
399        let tmp = TempDir::new("rocksdb-state_diff_from_updates").unwrap();
400        let db = init_env(&tmp);
401        let db = Arc::new(db);
402
403        let (sv, expected) = {
404            let doc = Doc::new();
405            let text = doc.get_or_insert_text("text");
406
407            let db = db.clone();
408            let _sub = doc.observe_update_v1(move |_, u| {
409                let db_txn = RocksDBStore::from(db.transaction());
410                db_txn.push_update(DOC_NAME, &u.update).unwrap();
411                db_txn.commit().unwrap();
412            });
413
414            // generate 3 updates
415            text.push(&mut doc.transact_mut(), "a");
416            text.push(&mut doc.transact_mut(), "b");
417            let sv = doc.transact().state_vector();
418            text.push(&mut doc.transact_mut(), "c");
419            let update = doc.transact().encode_diff_v1(&sv);
420            (sv, update)
421        };
422
423        let db_txn = RocksDBStore::from(db.transaction());
424        let actual = db_txn.get_diff(DOC_NAME, &sv).unwrap();
425        assert_eq!(actual, Some(expected));
426    }
427
428    #[test]
429    fn state_diff_from_doc() {
430        const DOC_NAME: &str = "doc";
431        let tmp = TempDir::new("rocksdb-state_diff_from_doc").unwrap();
432        let db = init_env(&tmp);
433        let db = Arc::new(db);
434
435        let (sv, expected) = {
436            let doc = Doc::new();
437            let text = doc.get_or_insert_text("text");
438            // generate 3 updates
439            text.push(&mut doc.transact_mut(), "a");
440            text.push(&mut doc.transact_mut(), "b");
441            let sv = doc.transact().state_vector();
442            text.push(&mut doc.transact_mut(), "c");
443            let update = doc.transact().encode_diff_v1(&sv);
444
445            let db_txn = RocksDBStore::from(db.transaction());
446            db_txn.insert_doc(DOC_NAME, &doc.transact()).unwrap();
447            db_txn.commit().unwrap();
448
449            (sv, update)
450        };
451
452        let db_txn = RocksDBStore::from(db.transaction());
453        let actual = db_txn.get_diff(DOC_NAME, &sv).unwrap();
454        assert_eq!(actual, Some(expected));
455    }
456
457    #[test]
458    fn doc_meta() {
459        const DOC_NAME: &str = "doc";
460        let tmp = TempDir::new("rocksdb-doc_meta").unwrap();
461        let db = init_env(&tmp);
462        let db = Arc::new(db);
463
464        let db_txn = RocksDBStore::from(db.transaction());
465        let value = db_txn.get_meta(DOC_NAME, "key").unwrap();
466        assert!(value.is_none());
467        db_txn
468            .insert_meta(DOC_NAME, "key", "value1".as_bytes())
469            .unwrap();
470        db_txn.commit().unwrap();
471
472        let db_txn = RocksDBStore::from(db.transaction());
473        let prev = db_txn.get_meta(DOC_NAME, "key").unwrap();
474        db_txn
475            .insert_meta(DOC_NAME, "key", "value2".as_bytes())
476            .unwrap();
477        db_txn.commit().unwrap();
478        assert_eq!(prev.as_deref(), Some("value1".as_bytes()));
479
480        let db_txn = RocksDBStore::from(db.transaction());
481        let prev = db_txn.get_meta(DOC_NAME, "key").unwrap();
482        db_txn.remove_meta(DOC_NAME, "key").unwrap();
483        assert_eq!(prev.as_deref(), Some("value2".as_bytes()));
484        let value = db_txn.get_meta(DOC_NAME, "key").unwrap();
485        assert!(value.is_none());
486    }
487
488    #[test]
489    fn doc_meta_iter() {
490        let tmp = TempDir::new("rocksdb-doc_meta_iter").unwrap();
491        let db = init_env(&tmp);
492        let db_txn = RocksDBStore::from(db.transaction());
493
494        db_txn.insert_meta("A", "key1", [1].as_ref()).unwrap();
495        db_txn.insert_meta("B", "key2", [2].as_ref()).unwrap();
496        db_txn.insert_meta("B", "key3", [3].as_ref()).unwrap();
497        db_txn.insert_meta("C", "key4", [4].as_ref()).unwrap();
498
499        let mut i = db_txn.iter_meta("B").unwrap();
500        assert_eq!(i.next(), Some(("key2".as_bytes().into(), [2].into())));
501        assert_eq!(i.next(), Some(("key3".as_bytes().into(), [3].into())));
502        assert!(i.next().is_none());
503    }
504
505    #[test]
506    fn doc_iter() {
507        let tmp = TempDir::new("rocksdb-doc_iter").unwrap();
508        let db = init_env(&tmp);
509        let db = Arc::new(db);
510
511        // insert metadata
512        {
513            let db_txn = RocksDBStore::from(db.transaction());
514            db_txn.insert_meta("A", "key1", [1].as_ref()).unwrap();
515            db_txn.commit().unwrap();
516        }
517
518        // insert full doc state
519        {
520            let doc = Doc::new();
521            let text = doc.get_or_insert_text("text");
522            let mut txn = doc.transact_mut();
523            text.push(&mut txn, "hello world");
524
525            let db_txn = RocksDBStore::from(db.transaction());
526            db_txn.insert_doc("B", &txn).unwrap();
527            db_txn.commit().unwrap();
528        }
529
530        // insert update
531        {
532            let doc = Doc::new();
533            let db = db.clone();
534            let _sub = doc.observe_update_v1(move |_, u| {
535                let db_txn = RocksDBStore::from(db.transaction());
536                db_txn.push_update("C", &u.update).unwrap();
537                db_txn.commit().unwrap();
538            });
539            let text = doc.get_or_insert_text("text");
540            let mut txn = doc.transact_mut();
541            text.push(&mut txn, "hello world");
542        }
543
544        {
545            let db_txn = RocksDBStore::from(db.transaction());
546            let mut i = db_txn.iter_docs().unwrap();
547            assert_eq!(i.next(), Some("A".as_bytes().into()));
548            assert_eq!(i.next(), Some("B".as_bytes().into()));
549            assert_eq!(i.next(), Some("C".as_bytes().into()));
550            assert!(i.next().is_none());
551        }
552
553        // clear doc
554        {
555            let db_txn = RocksDBStore::from(db.transaction());
556            db_txn.clear_doc("B").unwrap();
557            db_txn.commit().unwrap();
558        }
559
560        {
561            let db_txn = RocksDBStore::from(db.transaction());
562            let mut i = db_txn.iter_docs().unwrap();
563            assert_eq!(i.next(), Some("A".as_bytes().into()));
564            assert_eq!(i.next(), Some("C".as_bytes().into()));
565            assert!(i.next().is_none());
566        }
567    }
568}