yrs_kvstore/lib.rs
1//! yrs-kvstore is a generic library that allows to quickly implement
2//! [Yrs](https://docs.rs/yrs/latest/yrs/index.html) specific document operations over any kind
3//! of persistent key-value store i.e. LMDB or RocksDB.
4//!
5//! In order to do so, persistent unit of transaction should define set of basic operations via
6//! [KVStore] trait implementation. Once this is done it can implement [DocOps]. Latter offers a
7//! set of useful operations like document metadata management options, document and update merging
8//! etc. They are implemented automatically as long struct has correctly implemented [KVStore].
9//!
10//! ## Internal representation
11//!
12//! yrs-kvstore operates around few key spaces. All keys inserted via [DocOps] are prefixed with
13//! [V1] constant. Later on the key space is further divided into:
14//!
15//! - [KEYSPACE_OID] used for object ID (OID) index mapping. Whenever the new document is being
16//! inserted, a new OID number is generated for it. While document names can be any kind of strings
17//! OIDs are guaranteed to have constant size. Internally all of the document contents are referred
18//! to via their OID identifiers.
19//! - [KEYSPACE_DOC] used to store [document state](crate::keys::SUB_DOC), its
20//! [state vector](crate::keys::SUB_STATE_VEC), corresponding series of
21//! [updates](crate::keys::SUB_UPDATE) and [metadata](crate::keys::SUB_META). Document state and
22//! state vector may not represent full system knowledge about the document, as they don't reflect
23//! information inside document updates. Updates can be stored separately to avoid big document
24//! binary read/parse/merge/store cycles of every update. It's a good idea to insert updates as they
25//! come and every once in a while call [DocOps::flush_doc] or [DocOps::flush_doc_with] to merge
26//! them into document state itself.
27//!
28//! The variants and schemas of byte keys in use could be summarized as:
29//!
30//! ```nocompile
31//! 00{doc_name:N}0 - OID key pattern
32//! 01{oid:4}0 - document key pattern
33//! 01{oid:4}1 - state vector key pattern
34//! 01{oid:4}2{seqNr:4}0 - document update key pattern
35//! 01{oid:4}3{name:M}0 - document meta key pattern
36//! ```
37
38pub mod error;
39pub mod keys;
40
41use crate::error::Error;
42use crate::keys::{
43 doc_oid_name, key_doc, key_doc_end, key_doc_start, key_meta, key_meta_end, key_meta_start,
44 key_oid, key_state_vector, key_update, Key, KEYSPACE_DOC, KEYSPACE_OID, OID, V1,
45};
46use std::convert::TryInto;
47use yrs::updates::decoder::Decode;
48use yrs::updates::encoder::Encode;
49use yrs::{Doc, ReadTxn, StateVector, Transact, TransactionMut, Update};
50
51/// A trait to be implemented by the specific key-value store transaction equivalent in order to
52/// auto-implement features provided by [DocOps] trait.
53pub trait KVStore<'a> {
54 /// Error type returned from the implementation.
55 type Error: std::error::Error;
56 /// Cursor type used to iterate over the ordered range of key-value entries.
57 type Cursor: Iterator<Item = Self::Entry>;
58 /// Entry type returned by cursor.
59 type Entry: KVEntry;
60 /// Type returned from the implementation. Different key-value stores have different
61 /// abstractions over the binary data they use.
62 type Return: AsRef<[u8]>;
63
64 /// Return a value stored under given `key` or `None` if key was not found.
65 fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error>;
66
67 /// Insert a new `value` under given `key` or replace an existing value with new one if
68 /// entry with that `key` already existed.
69 fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>;
70
71 /// Return a value stored under the given `key` if it exists.
72 fn remove(&self, key: &[u8]) -> Result<(), Self::Error>;
73
74 /// Remove all keys between `from`..=`to` range of keys.
75 fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error>;
76
77 /// Return an iterator over all entries between `from`..=`to` range of keys.
78 fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error>;
79
80 /// Looks into the last entry value prior to a given key. The provided key parameter may not
81 /// exist and it's used only to establish cursor position in ordered key collection.
82 ///
83 /// In example: in a key collection of `{1,2,5,7}`, this method with the key parameter of `4`
84 /// should return value of `2`.
85 fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error>;
86}
87
88/// Trait used by [KVStore] to define key-value entry tuples returned by cursor iterators.
89pub trait KVEntry {
90 /// Returns a key of current entry.
91 fn key(&self) -> &[u8];
92 /// Returns a value of current entry.
93 fn value(&self) -> &[u8];
94}
95
96/// Trait used to automatically implement core operations over the Yrs document.
97pub trait DocOps<'a>: KVStore<'a> + Sized
98where
99 Error: From<<Self as KVStore<'a>>::Error>,
100{
101 /// Inserts or updates a document given it's read transaction and name. lib0 v1 encoding is
102 /// used for storing the document.
103 ///
104 /// This feature requires a write capabilities from the database transaction.
105 fn insert_doc<K: AsRef<[u8]> + ?Sized, T: ReadTxn>(
106 &self,
107 name: &K,
108 txn: &T,
109 ) -> Result<(), Error> {
110 let doc_state = txn.encode_diff_v1(&StateVector::default());
111 let state_vector = txn.state_vector().encode_v1();
112 self.insert_doc_raw_v1(name.as_ref(), &doc_state, &state_vector)
113 }
114
115 /// Inserts or updates a document given it's binary update and state vector. lib0 v1 encoding is
116 /// assumed as a format for storing the document.
117 ///
118 /// This is useful when you i.e. want to pre-serialize big document prior to acquiring
119 /// a database transaction.
120 ///
121 /// This feature requires a write capabilities from the database transaction.
122 fn insert_doc_raw_v1(
123 &self,
124 name: &[u8],
125 doc_state_v1: &[u8],
126 doc_sv_v1: &[u8],
127 ) -> Result<(), Error> {
128 let oid = get_or_create_oid(self, name)?;
129 insert_inner_v1(self, oid, doc_state_v1, doc_sv_v1)?;
130 Ok(())
131 }
132
133 /// Loads the document state stored in current database under given document `name` into
134 /// in-memory Yrs document using provided [TransactionMut]. This includes potential update
135 /// entries that may not have been merged with the main document state yet.
136 ///
137 /// This feature requires only a read capabilities from the database transaction.
138 fn load_doc<K: AsRef<[u8]> + ?Sized>(
139 &self,
140 name: &K,
141 txn: &mut TransactionMut,
142 ) -> Result<bool, Error> {
143 if let Some(oid) = get_oid(self, name.as_ref())? {
144 let loaded = load_doc(self, oid, txn)?;
145 Ok(loaded != 0)
146 } else {
147 Ok(false)
148 }
149 }
150
151 /// Merges all updates stored via [Self::push_update] that were detached from the main document
152 /// state, updates the document and its state vector and finally prunes the updates that have
153 /// been integrated this way. Returns the [Doc] with the most recent state produced this way.
154 ///
155 /// This feature requires a write capabilities from the database transaction.
156 fn flush_doc<K: AsRef<[u8]> + ?Sized>(&self, name: &K) -> Result<Option<Doc>, Error> {
157 self.flush_doc_with(name, yrs::Options::default())
158 }
159
160 /// Merges all updates stored via [Self::push_update] that were detached from the main document
161 /// state, updates the document and its state vector and finally prunes the updates that have
162 /// been integrated this way. `options` are used to drive the details of integration process.
163 /// Returns the [Doc] with the most recent state produced this way, initialized using
164 /// `options` parameter.
165 ///
166 /// This feature requires a write capabilities from the database transaction.
167 fn flush_doc_with<K: AsRef<[u8]> + ?Sized>(
168 &self,
169 name: &K,
170 options: yrs::Options,
171 ) -> Result<Option<Doc>, Error> {
172 if let Some(oid) = get_oid(self, name.as_ref())? {
173 let doc = flush_doc(self, oid, options)?;
174 Ok(doc)
175 } else {
176 Ok(None)
177 }
178 }
179
180 /// Returns the [StateVector] stored directly for the document with a given `name`.
181 /// Returns `None` if the state vector was not stored.
182 ///
183 /// Keep in mind that this method only returns a state vector that's stored directly. A second
184 /// tuple parameter boolean informs if returned value is up to date. If that's not the case, it
185 /// means that state vector exists but must be recalculated from the collection of persisted
186 /// updates using either [Self::load_doc] (read-only) or [Self::flush_doc] (read-write).
187 ///
188 /// This feature requires only the read capabilities from the database transaction.
189 fn get_state_vector<K: AsRef<[u8]> + ?Sized>(
190 &self,
191 name: &K,
192 ) -> Result<(Option<StateVector>, bool), Error> {
193 if let Some(oid) = get_oid(self, name.as_ref())? {
194 let key = key_state_vector(oid);
195 let data = self.get(&key)?;
196 let sv = if let Some(data) = data {
197 let state_vector = StateVector::decode_v1(data.as_ref())?;
198 Some(state_vector)
199 } else {
200 None
201 };
202 let update_range_start = key_update(oid, 0);
203 let update_range_end = key_update(oid, u32::MAX);
204 let mut iter = self.iter_range(&update_range_start, &update_range_end)?;
205 let up_to_date = iter.next().is_none();
206 Ok((sv, up_to_date))
207 } else {
208 Ok((None, true))
209 }
210 }
211
212 /// Appends new update without integrating it directly into document store (which is faster
213 /// than persisting full document state on every update). Updates are assumed to be serialized
214 /// using lib0 v1 encoding.
215 ///
216 /// Returns a sequence number of a stored update. Once updates are integrated into document and
217 /// pruned (using [Self::flush_doc] method), sequence number is reset.
218 ///
219 /// This feature requires a write capabilities from the database transaction.
220 fn push_update<K: AsRef<[u8]> + ?Sized>(&self, name: &K, update: &[u8]) -> Result<u32, Error> {
221 let oid = get_or_create_oid(self, name.as_ref())?;
222 let last_clock = {
223 let end = key_update(oid, u32::MAX);
224 if let Some(e) = self.peek_back(&end)? {
225 let last_key = e.key();
226 let len = last_key.len();
227 let last_clock = &last_key[(len - 5)..(len - 1)]; // update key scheme: 01{name:n}1{clock:4}0
228 u32::from_be_bytes(last_clock.try_into().unwrap())
229 } else {
230 0
231 }
232 };
233 let clock = last_clock + 1;
234 let update_key = key_update(oid, clock);
235 self.upsert(&update_key, &update)?;
236 Ok(clock)
237 }
238
239 /// Returns an update (encoded using lib0 v1 encoding) which contains all new changes that
240 /// happened since provided state vector for a given document.
241 ///
242 /// This feature requires only the read capabilities from the database transaction.
243 fn get_diff<K: AsRef<[u8]> + ?Sized>(
244 &self,
245 name: &K,
246 sv: &StateVector,
247 ) -> Result<Option<Vec<u8>>, Error> {
248 let doc = Doc::new();
249 let found = {
250 let mut txn = doc.transact_mut();
251 self.load_doc(name, &mut txn)?
252 };
253 if found {
254 Ok(Some(doc.transact().encode_diff_v1(sv)))
255 } else {
256 Ok(None)
257 }
258 }
259
260 /// Removes all data associated with the current document (including its updates and metadata).
261 ///
262 /// This feature requires a write capabilities from the database transaction.
263 fn clear_doc<K: AsRef<[u8]> + ?Sized>(&self, name: &K) -> Result<(), Error> {
264 let oid_key = key_oid(name.as_ref());
265 if let Some(oid) = self.get(&oid_key)? {
266 // all document related elements are stored within bounds [0,1,..oid,0]..[0,1,..oid,255]
267 let oid: [u8; 4] = oid.as_ref().try_into().unwrap();
268 let oid = OID::from_be_bytes(oid);
269 self.remove(&oid_key)?;
270 let start = key_doc_start(oid);
271 let end = key_doc_end(oid);
272 for v in self.iter_range(&start, &end)? {
273 let key: &[u8] = v.key();
274 if key > &end {
275 break; //TODO: for some reason key range doesn't always work
276 }
277 self.remove(&key)?;
278 }
279 }
280 Ok(())
281 }
282
283 /// Returns a metadata value stored under its metadata `key` for a document with given `name`.
284 ///
285 /// This feature requires only the read capabilities from the database transaction.
286 fn get_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
287 &self,
288 name: &K1,
289 meta_key: &K2,
290 ) -> Result<Option<Self::Return>, Error> {
291 if let Some(oid) = get_oid(self, name.as_ref())? {
292 let key = key_meta(oid, meta_key.as_ref());
293 Ok(self.get(&key)?)
294 } else {
295 Ok(None)
296 }
297 }
298
299 /// Inserts or updates new `meta` value stored under its metadata `key` for a document with
300 /// given `name`.
301 ///
302 /// This feature requires write capabilities from the database transaction.
303 fn insert_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
304 &self,
305 name: &K1,
306 meta_key: &K2,
307 meta: &[u8],
308 ) -> Result<(), Error> {
309 let oid = get_or_create_oid(self, name.as_ref())?;
310 let key = key_meta(oid, meta_key.as_ref());
311 self.upsert(&key, meta)?;
312 Ok(())
313 }
314
315 /// Removes an metadata entry stored under given metadata `key` for a document with provided `name`.
316 ///
317 /// This feature requires write capabilities from the database transaction.
318 fn remove_meta<K1: AsRef<[u8]> + ?Sized, K2: AsRef<[u8]> + ?Sized>(
319 &self,
320 name: &K1,
321 meta_key: &K2,
322 ) -> Result<(), Error> {
323 if let Some(oid) = get_oid(self, name.as_ref())? {
324 let key = key_meta(oid, meta_key.as_ref());
325 self.remove(&key)?;
326 }
327 Ok(())
328 }
329
330 /// Returns an iterator over all document names stored in current database.
331 fn iter_docs(&self) -> Result<DocsNameIter<Self::Cursor, Self::Entry>, Error> {
332 let start = Key::from_const([V1, KEYSPACE_OID]);
333 let end = Key::from_const([V1, KEYSPACE_DOC]);
334 let cursor = self.iter_range(&start, &end)?;
335 Ok(DocsNameIter { cursor })
336 }
337
338 /// Returns an iterator over all metadata entries stored for a given document.
339 fn iter_meta<K: AsRef<[u8]> + ?Sized>(
340 &self,
341 doc_name: &K,
342 ) -> Result<MetadataIter<Self::Cursor, Self::Entry>, Error> {
343 if let Some(oid) = get_oid(self, doc_name.as_ref())? {
344 let start = key_meta_start(oid).to_vec();
345 let end = key_meta_end(oid).to_vec();
346 let cursor = self.iter_range(&start, &end)?;
347 Ok(MetadataIter(Some((cursor, start, end))))
348 } else {
349 Ok(MetadataIter(None))
350 }
351 }
352}
353
354fn get_oid<'a, DB: DocOps<'a> + ?Sized>(db: &DB, name: &[u8]) -> Result<Option<OID>, Error>
355where
356 Error: From<<DB as KVStore<'a>>::Error>,
357{
358 let key = key_oid(name);
359 let value = db.get(&key)?;
360 if let Some(value) = value {
361 let bytes: [u8; 4] = value.as_ref().try_into().unwrap();
362 let oid = OID::from_be_bytes(bytes);
363 Ok(Some(oid))
364 } else {
365 Ok(None)
366 }
367}
368
369fn get_or_create_oid<'a, DB: DocOps<'a> + ?Sized>(db: &DB, name: &[u8]) -> Result<OID, Error>
370where
371 Error: From<<DB as KVStore<'a>>::Error>,
372{
373 if let Some(oid) = get_oid(db, name)? {
374 Ok(oid)
375 } else {
376 /*
377 Since pattern is:
378
379 00{doc_name:n}0 - OID key pattern
380 01{oid:4}0 - document key pattern
381
382 Use 00{0000}0 to try to move cursor to GTE first document, then move cursor 1 position
383 back to get the latest OID or not found.
384 */
385 let last_oid = if let Some(e) = db.peek_back([V1, KEYSPACE_DOC].as_ref())? {
386 let value = e.value();
387 let last_value = OID::from_be_bytes(value.try_into().unwrap());
388 last_value
389 } else {
390 0
391 };
392 let new_oid = last_oid + 1;
393 let key = key_oid(name);
394 db.upsert(&key, new_oid.to_be_bytes().as_ref())?;
395 Ok(new_oid)
396 }
397}
398
399fn load_doc<'a, DB: DocOps<'a> + ?Sized>(
400 db: &DB,
401 oid: OID,
402 txn: &mut TransactionMut,
403) -> Result<u32, Error>
404where
405 Error: From<<DB as KVStore<'a>>::Error>,
406{
407 let mut found = false;
408 {
409 let doc_key = key_doc(oid);
410 if let Some(doc_state) = db.get(&doc_key)? {
411 let update = Update::decode_v1(doc_state.as_ref())?;
412 txn.apply_update(update);
413 found = true;
414 }
415 }
416 let mut update_count = 0;
417 {
418 let update_key_start = key_update(oid, 0);
419 let update_key_end = key_update(oid, u32::MAX);
420 let mut iter = db.iter_range(&update_key_start, &update_key_end)?;
421 while let Some(e) = iter.next() {
422 let value = e.value();
423 let update = Update::decode_v1(value)?;
424 txn.apply_update(update);
425 update_count += 1;
426 }
427 }
428 if found {
429 update_count |= 1 << 31; // mark hi bit to note that document core state was used
430 }
431 Ok(update_count)
432}
433
434fn delete_updates<'a, DB: DocOps<'a> + ?Sized>(db: &DB, oid: OID) -> Result<(), Error>
435where
436 Error: From<<DB as KVStore<'a>>::Error>,
437{
438 let start = key_update(oid, 0);
439 let end = key_update(oid, u32::MAX);
440 db.remove_range(&start, &end)?;
441 Ok(())
442}
443
444fn flush_doc<'a, DB: DocOps<'a> + ?Sized>(
445 db: &DB,
446 oid: OID,
447 options: yrs::Options,
448) -> Result<Option<Doc>, Error>
449where
450 Error: From<<DB as KVStore<'a>>::Error>,
451{
452 let doc = Doc::with_options(options);
453 let found = load_doc(db, oid, &mut doc.transact_mut())?;
454 if found & !(1 << 31) != 0 {
455 // loaded doc was generated from updates
456 let txn = doc.transact();
457 let doc_state = txn.encode_state_as_update_v1(&StateVector::default());
458 let state_vec = txn.state_vector().encode_v1();
459 drop(txn);
460
461 insert_inner_v1(db, oid, &doc_state, &state_vec)?;
462 delete_updates(db, oid)?;
463 Ok(Some(doc))
464 } else {
465 Ok(None)
466 }
467}
468
469fn insert_inner_v1<'a, DB: DocOps<'a> + ?Sized>(
470 db: &DB,
471 oid: OID,
472 doc_state_v1: &[u8],
473 doc_sv_v1: &[u8],
474) -> Result<(), Error>
475where
476 error::Error: From<<DB as KVStore<'a>>::Error>,
477{
478 let key_doc = key_doc(oid);
479 let key_sv = key_state_vector(oid);
480 db.upsert(&key_doc, doc_state_v1)?;
481 db.upsert(&key_sv, doc_sv_v1)?;
482 Ok(())
483}
484
485pub struct DocsNameIter<I, E>
486where
487 I: Iterator<Item = E>,
488 E: KVEntry,
489{
490 cursor: I,
491}
492
493impl<I, E> Iterator for DocsNameIter<I, E>
494where
495 I: Iterator<Item = E>,
496 E: KVEntry,
497{
498 type Item = Box<[u8]>;
499
500 fn next(&mut self) -> Option<Self::Item> {
501 let e = self.cursor.next()?;
502 Some(doc_oid_name(e.key()).into())
503 }
504}
505
506pub struct MetadataIter<I, E>(Option<(I, Vec<u8>, Vec<u8>)>)
507where
508 I: Iterator<Item = E>,
509 E: KVEntry;
510
511impl<I, E> Iterator for MetadataIter<I, E>
512where
513 I: Iterator<Item = E>,
514 E: KVEntry,
515{
516 type Item = (Box<[u8]>, Box<[u8]>);
517
518 fn next(&mut self) -> Option<Self::Item> {
519 let (cursor, _, _) = self.0.as_mut()?;
520 let v = cursor.next()?;
521 let key = v.key();
522 let value = v.value();
523 let meta_key = &key[7..key.len() - 1];
524 Some((meta_key.into(), value.into()))
525 }
526}