yrs_kvstore/
lib.rs

1//! yrs-kvstore is a generic library that allows to quickly implement
2//! [Yrs](https://docs.rs/yrs/latest/yrs/index.html) specific document operations over any kind
3//! of persistent key-value store i.e. LMDB or RocksDB.
4//!
5//! In order to do so, persistent unit of transaction should define set of basic operations via
6//! [KVStore] trait implementation. Once this is done it can implement [DocOps]. Latter offers a
7//! set of useful operations like document metadata management options, document and update merging
8//! etc. They are implemented automatically as long struct has correctly implemented [KVStore].
9//!
10//! ## Internal representation
11//!
12//! yrs-kvstore operates around few key spaces. All keys inserted via [DocOps] are prefixed with
13//! [V1] constant. Later on the key space is further divided into:
14//!
15//! - [KEYSPACE_OID] used for object ID (OID) index mapping. Whenever the new document is being
16//!   inserted, a new OID number is generated for it. While document names can be any kind of strings
17//!   OIDs are guaranteed to have constant size. Internally all of the document contents are referred
18//!   to via their OID identifiers.
19//! - [KEYSPACE_DOC] used to store [document state](crate::keys::SUB_DOC), its
20//!   [state vector](crate::keys::SUB_STATE_VEC), corresponding series of
21//!   [updates](crate::keys::SUB_UPDATE) and [metadata](crate::keys::SUB_META). Document state and
22//!   state vector may not represent full system knowledge about the document, as they don't reflect
23//!   information inside document updates. Updates can be stored separately to avoid big document
24//!   binary read/parse/merge/store cycles of every update. It's a good idea to insert updates as they
25//!   come and every once in a while call [DocOps::flush_doc] or [DocOps::flush_doc_with] to merge
26//!   them into document state itself.
27//!
28//! The variants and schemas of byte keys in use could be summarized as:
29//!
30//! ```nocompile
31//! 00{doc_name:N}0      - OID key pattern
32//! 01{oid:4}0           - document key pattern
33//! 01{oid:4}1           - state vector key pattern
34//! 01{oid:4}2{seqNr:4}0 - document update key pattern
35//! 01{oid:4}3{name:M}0  - document meta key pattern
36//! ```
37
38pub mod error;
39pub mod keys;
40
41use crate::error::Error;
42use crate::keys::{
43    doc_oid_name, key_doc, key_doc_end, key_doc_start, key_meta, key_meta_end, key_meta_start,
44    key_oid, key_state_vector, key_update, Key, KEYSPACE_DOC, KEYSPACE_OID, OID, V1,
45};
46use std::convert::TryInto;
47use yrs::updates::decoder::Decode;
48use yrs::updates::encoder::Encode;
49use yrs::{Doc, ReadTxn, StateVector, Transact, TransactionMut, Update};
50
51/// A trait to be implemented by the specific key-value store transaction equivalent in order to
52/// auto-implement features provided by [DocOps] trait.
53pub trait KVStore<'a> {
54    /// Error type returned from the implementation.
55    type Error: std::error::Error;
56    /// Cursor type used to iterate over the ordered range of key-value entries.
57    type Cursor: Iterator<Item = Self::Entry>;
58    /// Entry type returned by cursor.
59    type Entry: KVEntry;
60    /// Type returned from the implementation. Different key-value stores have different
61    /// abstractions over the binary data they use.
62    type Return: AsRef<[u8]>;
63
64    /// Return a value stored under given `key` or `None` if key was not found.
65    fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error>;
66
67    /// Insert a new `value` under given `key` or replace an existing value with new one if
68    /// entry with that `key` already existed.
69    fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>;
70
71    /// Return a value stored under the given `key` if it exists.
72    fn remove(&self, key: &[u8]) -> Result<(), Self::Error>;
73
74    /// Remove all keys between `from`..=`to` range of keys.
75    fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error>;
76
77    /// Return an iterator over all entries between `from`..=`to` range of keys.
78    fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error>;
79
80    /// Looks into the last entry value prior to a given key. The provided key parameter may not
81    /// exist and it's used only to establish cursor position in ordered key collection.
82    ///
83    /// In example: in a key collection of `{1,2,5,7}`, this method with the key parameter of `4`
84    /// should return value of `2`.
85    fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error>;
86}
87
88/// Trait used by [KVStore] to define key-value entry tuples returned by cursor iterators.
89pub trait KVEntry {
90    /// Returns a key of current entry.
91    fn key(&self) -> &[u8];
92    /// Returns a value of current entry.
93    fn value(&self) -> &[u8];
94}
95
96/// Trait used to automatically implement core operations over the Yrs document.
97pub trait DocOps<'a>: KVStore<'a> + Sized
98where
99    Error: From<<Self as KVStore<'a>>::Error>,
100{
101    /// Inserts or updates a document given it's read transaction and name. lib0 v1 encoding is
102    /// used for storing the document.
103    ///
104    /// This feature requires a write capabilities from the database transaction.
105    fn insert_doc<K: AsRef<[u8]> + ?Sized, T: ReadTxn>(
106        &self,
107        name: &K,
108        txn: &T,
109    ) -> Result<(), Error> {
110        let doc_state = txn.encode_diff_v1(&StateVector::default());
111        let state_vector = txn.state_vector().encode_v1();
112        self.insert_doc_raw_v1(name.as_ref(), &doc_state, &state_vector)
113    }
114
115    /// Inserts or updates a document given it's binary update and state vector. lib0 v1 encoding is
116    /// assumed as a format for storing the document.
117    ///
118    /// This is useful when you i.e. want to pre-serialize big document prior to acquiring
119    /// a database transaction.
120    ///
121    /// This feature requires a write capabilities from the database transaction.
122    fn insert_doc_raw_v1(
123        &self,
124        name: &[u8],
125        doc_state_v1: &[u8],
126        doc_sv_v1: &[u8],
127    ) -> Result<(), Error> {
128        let oid = get_or_create_oid(self, name)?;
129        insert_inner_v1(self, oid, doc_state_v1, doc_sv_v1)?;
130        Ok(())
131    }
132
133    /// Loads the document state stored in current database under given document `name` into
134    /// in-memory Yrs document using provided [TransactionMut]. This includes potential update
135    /// entries that may not have been merged with the main document state yet.
136    ///
137    /// This feature requires only a read capabilities from the database transaction.
138    fn load_doc<K: AsRef<[u8]> + ?Sized>(
139        &self,
140        name: &K,
141        txn: &mut TransactionMut,
142    ) -> Result<bool, Error> {
143        if let Some(oid) = get_oid(self, name.as_ref())? {
144            let loaded = load_doc(self, oid, txn)?;
145            Ok(loaded != 0)
146        } else {
147            Ok(false)
148        }
149    }
150
151    /// Merges all updates stored via [Self::push_update] that were detached from the main document
152    /// state, updates the document and its state vector and finally prunes the updates that have
153    /// been integrated this way. Returns the [Doc] with the most recent state produced this way.
154    ///
155    /// This feature requires a write capabilities from the database transaction.
156    fn flush_doc<K: AsRef<[u8]> + ?Sized>(&self, name: &K) -> Result<Option<Doc>, Error> {
157        self.flush_doc_with(name, yrs::Options::default())
158    }
159
160    /// Merges all updates stored via [Self::push_update] that were detached from the main document
161    /// state, updates the document and its state vector and finally prunes the updates that have
162    /// been integrated this way. `options` are used to drive the details of integration process.
163    /// Returns the [Doc] with the most recent state produced this way, initialized using
164    /// `options` parameter.
165    ///
166    /// This feature requires a write capabilities from the database transaction.
167    fn flush_doc_with<K: AsRef<[u8]> + ?Sized>(
168        &self,
169        name: &K,
170        options: yrs::Options,
171    ) -> Result<Option<Doc>, Error> {
172        if let Some(oid) = get_oid(self, name.as_ref())? {
173            let doc = flush_doc(self, oid, options)?;
174            Ok(doc)
175        } else {
176            Ok(None)
177        }
178    }
179
180    /// Returns the [StateVector] stored directly for the document with a given `name`.
181    /// Returns `None` if the state vector was not stored.
182    ///
183    /// Keep in mind that this method only returns a state vector that's stored directly. A second
184    /// tuple parameter boolean informs if returned value is up to date. If that's not the case, it
185    /// means that state vector exists but must be recalculated from the collection of persisted
186    /// updates using either [Self::load_doc] (read-only) or [Self::flush_doc] (read-write).
187    ///
188    /// This feature requires only the read capabilities from the database transaction.
189    fn get_state_vector<K: AsRef<[u8]> + ?Sized>(
190        &self,
191        name: &K,
192    ) -> Result<(Option<StateVector>, bool), Error> {
193        if let Some(oid) = get_oid(self, name.as_ref())? {
194            let key = key_state_vector(oid);
195            let data = self.get(&key)?;
196            let sv = if let Some(data) = data {
197                let state_vector = StateVector::decode_v1(data.as_ref())?;
198                Some(state_vector)
199            } else {
200                None
201            };
202            let update_range_start = key_update(oid, 0);
203            let update_range_end = key_update(oid, u32::MAX);
204            let mut iter = self.iter_range(&update_range_start, &update_range_end)?;
205            let up_to_date = iter.next().is_none();
206            Ok((sv, up_to_date))
207        } else {
208            Ok((None, true))
209        }
210    }
211
212    /// Appends new update without integrating it directly into document store (which is faster
213    /// than persisting full document state on every update). Updates are assumed to be serialized
214    /// using lib0 v1 encoding.
215    ///
216    /// Returns a sequence number of a stored update. Once updates are integrated into document and
217    /// pruned (using [Self::flush_doc] method), sequence number is reset.
218    ///
219    /// This feature requires a write capabilities from the database transaction.
220    fn push_update<K: AsRef<[u8]> + ?Sized>(&self, name: &K, update: &[u8]) -> Result<u32, Error> {
221        let oid = get_or_create_oid(self, name.as_ref())?;
222        let last_clock = {
223            let end = key_update(oid, u32::MAX);
224            if let Some(e) = self.peek_back(&end)? {
225                let last_key = e.key();
226                let len = last_key.len();
227                let last_clock = &last_key[(len - 5)..(len - 1)]; // update key scheme: 01{name:n}1{clock:4}0
228                u32::from_be_bytes(last_clock.try_into().unwrap())
229            } else {
230                0
231            }
232        };
233        let clock = last_clock + 1;
234        let update_key = key_update(oid, clock);
235        self.upsert(&update_key, &update)?;
236        Ok(clock)
237    }
238
239    /// Returns an update (encoded using lib0 v1 encoding) which contains all new changes that
240    /// happened since provided state vector for a given document.
241    ///
242    /// This feature requires only the read capabilities from the database transaction.
243    fn get_diff<K: AsRef<[u8]> + ?Sized>(
244        &self,
245        name: &K,
246        sv: &StateVector,
247    ) -> Result<Option<Vec<u8>>, Error> {
248        let doc = Doc::new();
249        let found = {
250            let mut txn = doc.transact_mut();
251            self.load_doc(name, &mut txn)?
252        };
253        if found {
254            Ok(Some(doc.transact().encode_diff_v1(sv)))
255        } else {
256            Ok(None)
257        }
258    }
259
260    /// Removes all data associated with the current document (including its updates and metadata).
261    ///
262    /// This feature requires a write capabilities from the database transaction.
263    fn clear_doc<K: AsRef<[u8]> + ?Sized>(&self, name: &K) -> Result<(), Error> {
264        let oid_key = key_oid(name.as_ref());
265        if let Some(oid) = self.get(&oid_key)? {
266            // all document related elements are stored within bounds [0,1,..oid,0]..[0,1,..oid,255]
267            let oid: [u8; 4] = oid.as_ref().try_into().unwrap();
268            let oid = OID::from_be_bytes(oid);
269            self.remove(&oid_key)?;
270            let start = key_doc_start(oid);
271            let end = key_doc_end(oid);
272            for v in self.iter_range(&start, &end)? {
273                let key: &[u8] = v.key();
274                if key > &end {
275                    break; //TODO: for some reason key range doesn't always work
276                }
277                self.remove(&key)?;
278            }
279        }
280        Ok(())
281    }
282
283    /// Returns a metadata value stored under its metadata `key` for a document with given `name`.
284    ///
285    /// This feature requires only the read capabilities from the database transaction.
286    fn get_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
287        &self,
288        name: &K1,
289        meta_key: &K2,
290    ) -> Result<Option<Self::Return>, Error> {
291        if let Some(oid) = get_oid(self, name.as_ref())? {
292            let key = key_meta(oid, meta_key.as_ref());
293            Ok(self.get(&key)?)
294        } else {
295            Ok(None)
296        }
297    }
298
299    /// Inserts or updates new `meta` value stored under its metadata `key` for a document with
300    /// given `name`.
301    ///
302    /// This feature requires write capabilities from the database transaction.
303    fn insert_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
304        &self,
305        name: &K1,
306        meta_key: &K2,
307        meta: &[u8],
308    ) -> Result<(), Error> {
309        let oid = get_or_create_oid(self, name.as_ref())?;
310        let key = key_meta(oid, meta_key.as_ref());
311        self.upsert(&key, meta)?;
312        Ok(())
313    }
314
315    /// Removes an metadata entry stored under given metadata `key` for a document with provided `name`.
316    ///
317    /// This feature requires write capabilities from the database transaction.
318    fn remove_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
319        &self,
320        name: &K1,
321        meta_key: &K2,
322    ) -> Result<(), Error> {
323        if let Some(oid) = get_oid(self, name.as_ref())? {
324            let key = key_meta(oid, meta_key.as_ref());
325            self.remove(&key)?;
326        }
327        Ok(())
328    }
329
330    /// Returns an iterator over all document names stored in current database.
331    fn iter_docs(&self) -> Result<DocsNameIter<Self::Cursor, Self::Entry>, Error> {
332        let start = Key::from_const([V1, KEYSPACE_OID]);
333        let end = Key::from_const([V1, KEYSPACE_DOC]);
334        let cursor = self.iter_range(&start, &end)?;
335        Ok(DocsNameIter { cursor })
336    }
337
338    /// Returns an iterator over all metadata entries stored for a given document.
339    fn iter_meta<K: AsRef<[u8]> + ?Sized>(
340        &self,
341        doc_name: &K,
342    ) -> Result<MetadataIter<Self::Cursor, Self::Entry>, Error> {
343        if let Some(oid) = get_oid(self, doc_name.as_ref())? {
344            let start = key_meta_start(oid).to_vec();
345            let end = key_meta_end(oid).to_vec();
346            let cursor = self.iter_range(&start, &end)?;
347            Ok(MetadataIter(Some((cursor, start, end))))
348        } else {
349            Ok(MetadataIter(None))
350        }
351    }
352}
353
354fn get_oid<'a, DB: DocOps<'a> + ?Sized>(db: &DB, name: &[u8]) -> Result<Option<OID>, Error>
355where
356    Error: From<<DB as KVStore<'a>>::Error>,
357{
358    let key = key_oid(name);
359    let value = db.get(&key)?;
360    if let Some(value) = value {
361        let bytes: [u8; 4] = value.as_ref().try_into().unwrap();
362        let oid = OID::from_be_bytes(bytes);
363        Ok(Some(oid))
364    } else {
365        Ok(None)
366    }
367}
368
369fn get_or_create_oid<'a, DB: DocOps<'a> + ?Sized>(db: &DB, name: &[u8]) -> Result<OID, Error>
370where
371    Error: From<<DB as KVStore<'a>>::Error>,
372{
373    if let Some(oid) = get_oid(db, name)? {
374        Ok(oid)
375    } else {
376        /*
377           Since pattern is:
378
379           00{doc_name:n}0      - OID key pattern
380           01{oid:4}0           - document key pattern
381
382           Use 00{0000}0 to try to move cursor to GTE first document, then move cursor 1 position
383           back to get the latest OID or not found.
384        */
385        let last_oid = if let Some(e) = db.peek_back([V1, KEYSPACE_DOC].as_ref())? {
386            let value = e.value();
387            let last_value = OID::from_be_bytes(value.try_into().unwrap());
388            last_value
389        } else {
390            0
391        };
392        let new_oid = last_oid + 1;
393        let key = key_oid(name);
394        db.upsert(&key, new_oid.to_be_bytes().as_ref())?;
395        Ok(new_oid)
396    }
397}
398
399fn load_doc<'a, DB: DocOps<'a> + ?Sized>(
400    db: &DB,
401    oid: OID,
402    txn: &mut TransactionMut,
403) -> Result<u32, Error>
404where
405    Error: From<<DB as KVStore<'a>>::Error>,
406{
407    let mut found = false;
408    {
409        let doc_key = key_doc(oid);
410        if let Some(doc_state) = db.get(&doc_key)? {
411            let update = Update::decode_v1(doc_state.as_ref())?;
412            txn.apply_update(update);
413            found = true;
414        }
415    }
416    let mut update_count = 0;
417    {
418        let update_key_start = key_update(oid, 0);
419        let update_key_end = key_update(oid, u32::MAX);
420        let mut iter = db.iter_range(&update_key_start, &update_key_end)?;
421        while let Some(e) = iter.next() {
422            let value = e.value();
423            let update = Update::decode_v1(value)?;
424            txn.apply_update(update);
425            update_count += 1;
426        }
427    }
428    if found {
429        update_count |= 1 << 31; // mark hi bit to note that document core state was used
430    }
431    Ok(update_count)
432}
433
434fn delete_updates<'a, DB: DocOps<'a> + ?Sized>(db: &DB, oid: OID) -> Result<(), Error>
435where
436    Error: From<<DB as KVStore<'a>>::Error>,
437{
438    let start = key_update(oid, 0);
439    let end = key_update(oid, u32::MAX);
440    db.remove_range(&start, &end)?;
441    Ok(())
442}
443
444fn flush_doc<'a, DB: DocOps<'a> + ?Sized>(
445    db: &DB,
446    oid: OID,
447    options: yrs::Options,
448) -> Result<Option<Doc>, Error>
449where
450    Error: From<<DB as KVStore<'a>>::Error>,
451{
452    let doc = Doc::with_options(options);
453    let found = load_doc(db, oid, &mut doc.transact_mut())?;
454    if found & !(1 << 31) != 0 {
455        // loaded doc was generated from updates
456        let txn = doc.transact();
457        let doc_state = txn.encode_state_as_update_v1(&StateVector::default());
458        let state_vec = txn.state_vector().encode_v1();
459        drop(txn);
460
461        insert_inner_v1(db, oid, &doc_state, &state_vec)?;
462        delete_updates(db, oid)?;
463        Ok(Some(doc))
464    } else {
465        Ok(None)
466    }
467}
468
469fn insert_inner_v1<'a, DB: DocOps<'a> + ?Sized>(
470    db: &DB,
471    oid: OID,
472    doc_state_v1: &[u8],
473    doc_sv_v1: &[u8],
474) -> Result<(), Error>
475where
476    error::Error: From<<DB as KVStore<'a>>::Error>,
477{
478    let key_doc = key_doc(oid);
479    let key_sv = key_state_vector(oid);
480    db.upsert(&key_doc, doc_state_v1)?;
481    db.upsert(&key_sv, doc_sv_v1)?;
482    Ok(())
483}
484
485pub struct DocsNameIter<I, E>
486where
487    I: Iterator<Item = E>,
488    E: KVEntry,
489{
490    cursor: I,
491}
492
493impl<I, E> Iterator for DocsNameIter<I, E>
494where
495    I: Iterator<Item = E>,
496    E: KVEntry,
497{
498    type Item = Box<[u8]>;
499
500    fn next(&mut self) -> Option<Self::Item> {
501        let e = self.cursor.next()?;
502        Some(doc_oid_name(e.key()).into())
503    }
504}
505
506pub struct MetadataIter<I, E>(Option<(I, Vec<u8>, Vec<u8>)>)
507where
508    I: Iterator<Item = E>,
509    E: KVEntry;
510
511impl<I, E> Iterator for MetadataIter<I, E>
512where
513    I: Iterator<Item = E>,
514    E: KVEntry,
515{
516    type Item = (Box<[u8]>, Box<[u8]>);
517
518    fn next(&mut self) -> Option<Self::Item> {
519        let (cursor, _, _) = self.0.as_mut()?;
520        let v = cursor.next()?;
521        let key = v.key();
522        let value = v.value();
523        let meta_key = &key[7..key.len() - 1];
524        Some((meta_key.into(), value.into()))
525    }
526}