1use lmdb_rs::core::{CursorIterator, MdbResult};
65use lmdb_rs::{CursorKeyRangeIter, Database, MdbError, ReadonlyTransaction};
66use std::ops::Deref;
67
68pub use yrs_kvstore as store;
69use yrs_kvstore::error::Error;
70use yrs_kvstore::keys::Key;
71use yrs_kvstore::{DocOps, KVEntry, KVStore};
72
73trait OptionalNotFound {
74 type Return;
75 type Error;
76
77 fn optional(self) -> Result<Option<Self::Return>, Self::Error>;
78}
79
80impl<T> OptionalNotFound for MdbResult<T> {
81 type Return = T;
82 type Error = MdbError;
83
84 fn optional(self) -> Result<Option<Self::Return>, Self::Error> {
86 match self {
87 Ok(value) => Ok(Some(value)),
88 Err(MdbError::NotFound) => Ok(None),
89 Err(err) => Err(err),
90 }
91 }
92}
93
94#[repr(transparent)]
97#[derive(Debug)]
98pub struct LmdbStore<'db>(Database<'db>);
99
100impl<'db> From<Database<'db>> for LmdbStore<'db> {
101 #[inline(always)]
102 fn from(db: Database<'db>) -> Self {
103 LmdbStore(db)
104 }
105}
106
107impl<'db> Into<Database<'db>> for LmdbStore<'db> {
108 #[inline(always)]
109 fn into(self) -> Database<'db> {
110 self.0
111 }
112}
113
114impl<'db> Deref for LmdbStore<'db> {
115 type Target = Database<'db>;
116
117 fn deref(&self) -> &Self::Target {
118 &self.0
119 }
120}
121
122impl<'db> DocOps<'db> for LmdbStore<'db> {}
123
124impl<'db> KVStore<'db> for LmdbStore<'db> {
125 type Error = MdbError;
126 type Cursor = LmdbRange<'db>;
127 type Entry = LmdbEntry<'db>;
128 type Return = &'db [u8];
129
130 fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error> {
131 let value = self.0.get(&key).optional()?;
132 Ok(value)
133 }
134
135 fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
136 self.0.set(&key, &value)?;
137 Ok(())
138 }
139
140 fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
141 let prev: Option<&[u8]> = self.0.get(&key).optional()?;
142 if prev.is_some() {
143 self.0.del(&key)?;
144 }
145 Ok(())
146 }
147
148 fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error> {
149 let mut c = self.0.new_cursor()?;
150 if c.to_gte_key(&from).optional()?.is_some() {
151 while c.get_key::<&[u8]>()? <= to {
152 c.del()?;
153 if c.to_next_key().optional()?.is_none() {
154 break;
155 }
156 }
157 }
158 Ok(())
159 }
160
161 fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error> {
162 let from = from.to_vec();
163 let to = to.to_vec();
164 let cursor = unsafe { std::mem::transmute(self.0.keyrange(&from, &to)?) };
165 Ok(LmdbRange { from, to, cursor })
166 }
167
168 fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error> {
169 let mut cursor = self.0.new_cursor()?;
170 cursor.to_gte_key(&key).optional()?;
171 if cursor.to_prev_key().optional()?.is_none() {
172 return Ok(None);
173 }
174 let key = cursor.get_key()?;
175 let value = cursor.get_value()?;
176 Ok(Some(LmdbEntry::new(key, value)))
177 }
178}
179
180pub struct LmdbRange<'a> {
181 from: Vec<u8>,
182 to: Vec<u8>,
183 cursor: CursorIterator<'a, CursorKeyRangeIter<'a>>,
184}
185
186impl<'a> Iterator for LmdbRange<'a> {
187 type Item = LmdbEntry<'a>;
188
189 fn next(&mut self) -> Option<Self::Item> {
190 let (key, value) = self.cursor.next()?.get();
191 Some(LmdbEntry::new(key, value))
192 }
193}
194
195pub struct LmdbEntry<'a> {
196 key: &'a [u8],
197 value: &'a [u8],
198}
199
200impl<'a> LmdbEntry<'a> {
201 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
202 LmdbEntry { key, value }
203 }
204}
205
206impl<'a> KVEntry for LmdbEntry<'a> {
207 fn key(&self) -> &[u8] {
208 self.key
209 }
210 fn value(&self) -> &[u8] {
211 self.value
212 }
213}
214
215pub(crate) struct OwnedCursorRange<'a> {
216 txn: ReadonlyTransaction<'a>,
217 db: Database<'a>,
218 cursor: CursorIterator<'a, CursorKeyRangeIter<'a>>,
219 start: Vec<u8>,
220 end: Vec<u8>,
221}
222
223impl<'a> OwnedCursorRange<'a> {
224 pub(crate) fn new<const N: usize>(
225 txn: ReadonlyTransaction<'a>,
226 db: Database<'a>,
227 start: Key<N>,
228 end: Key<N>,
229 ) -> Result<Self, Error> {
230 let start = start.into();
231 let end = end.into();
232 let cursor = unsafe { std::mem::transmute(db.keyrange(&start, &end)?) };
233
234 Ok(OwnedCursorRange {
235 txn,
236 db,
237 cursor,
238 start,
239 end,
240 })
241 }
242
243 pub(crate) fn db(&self) -> &Database {
244 &self.db
245 }
246}
247
248impl<'a> Iterator for OwnedCursorRange<'a> {
249 type Item = (&'a [u8], &'a [u8]);
250
251 fn next(&mut self) -> Option<Self::Item> {
252 let v = self.cursor.next()?;
253 Some(v.get())
254 }
255}
256
257#[cfg(test)]
258mod test {
259 use crate::{DocOps, LmdbStore};
260 use lmdb_rs::core::DbCreate;
261 use lmdb_rs::Environment;
262 use std::path::Path;
263 use std::sync::Arc;
264 use tempdir::TempDir;
265 use yrs::{Doc, GetString, ReadTxn, Text, Transact};
266
267 fn init_env<P: AsRef<Path>>(dir: P) -> Environment {
268 let env = Environment::new()
269 .autocreate_dir(true)
270 .max_dbs(4)
271 .open(dir, 0o777)
272 .unwrap();
273 env
274 }
275
276 #[test]
277 fn create_get_remove() {
278 let dir = TempDir::new("lmdb-create_get_remove").unwrap();
279 let env = init_env(&dir);
280 let h = env.create_db("yrs", DbCreate).unwrap();
281
282 {
284 let doc = Doc::new();
285 let text = doc.get_or_insert_text("text");
286 let mut txn = doc.transact_mut();
287 text.insert(&mut txn, 0, "hello");
288
289 let db_txn = env.new_transaction().unwrap();
290 let db = LmdbStore::from(db_txn.bind(&h));
291 db.insert_doc("doc", &txn).unwrap();
292 db_txn.commit().unwrap();
293 }
294
295 {
297 let doc = Doc::new();
298 let text = doc.get_or_insert_text("text");
299 let mut txn = doc.transact_mut();
300 let db_txn = env.get_reader().unwrap();
301 let db = LmdbStore::from(db_txn.bind(&h));
302 db.load_doc("doc", &mut txn).unwrap();
303
304 assert_eq!(text.get_string(&txn), "hello");
305
306 let (sv, completed) = db.get_state_vector("doc").unwrap();
307 assert_eq!(sv, Some(txn.state_vector()));
308 assert!(completed);
309 }
310
311 {
313 let db_txn = env.new_transaction().unwrap();
314 let db = LmdbStore::from(db_txn.bind(&h));
315
316 db.clear_doc("doc").unwrap();
317
318 let doc = Doc::new();
319 let text = doc.get_or_insert_text("text");
320 let mut txn = doc.transact_mut();
321 db.load_doc("doc", &mut txn).unwrap();
322
323 assert_eq!(text.get_string(&txn), "");
324
325 let (sv, completed) = db.get_state_vector("doc").unwrap();
326 assert!(sv.is_none());
327 assert!(completed);
328 }
329 }
330 #[test]
331 fn multi_insert() {
332 let dir = TempDir::new("lmdb-multi_insert").unwrap();
333 let env = init_env(&dir);
334 let h = env.create_db("yrs", DbCreate).unwrap();
335
336 {
338 let doc = Doc::new();
339 let text = doc.get_or_insert_text("text");
340 let mut txn = doc.transact_mut();
341 text.push(&mut txn, "hello");
342
343 let db_txn = env.new_transaction().unwrap();
344 let db = LmdbStore::from(db_txn.bind(&h));
345
346 db.insert_doc("doc", &txn).unwrap();
347
348 text.push(&mut txn, " world");
349
350 db.insert_doc("doc", &txn).unwrap();
351
352 db_txn.commit().unwrap();
353 }
354
355 {
357 let db_txn = env.get_reader().unwrap();
358 let db = LmdbStore::from(db_txn.bind(&h));
359
360 let doc = Doc::new();
361 let text = doc.get_or_insert_text("text");
362 let mut txn = doc.transact_mut();
363 db.load_doc("doc", &mut txn).unwrap();
364
365 assert_eq!(text.get_string(&txn), "hello world");
366 }
367 }
368
369 #[test]
370 fn incremental_updates() {
371 const DOC_NAME: &str = "doc";
372 let dir = TempDir::new("lmdb-incremental_updates").unwrap();
373 let env = init_env(&dir);
374 let h = env.create_db("yrs", DbCreate).unwrap();
375 let env = Arc::new(env);
376 let h = Arc::new(h);
377
378 {
380 let doc = Doc::new();
381 let text = doc.get_or_insert_text("text");
382
383 let env = env.clone();
384 let h = h.clone();
385 let _sub = doc.observe_update_v1(move |_, u| {
386 let db_txn = env.new_transaction().unwrap();
387 let db = LmdbStore::from(db_txn.bind(&h));
388 db.push_update(DOC_NAME, &u.update).unwrap();
389 db_txn.commit().unwrap();
390 });
391 text.push(&mut doc.transact_mut(), "a");
393 text.push(&mut doc.transact_mut(), "b");
394 text.push(&mut doc.transact_mut(), "c");
395 }
396
397 {
399 let doc = Doc::new();
400 let text = doc.get_or_insert_text("text");
401 let mut txn = doc.transact_mut();
402
403 let db_txn = env.get_reader().unwrap();
404 let db = LmdbStore::from(db_txn.bind(&h));
405 db.load_doc(DOC_NAME, &mut txn).unwrap();
406
407 assert_eq!(text.get_string(&txn), "abc");
408 }
409
410 {
412 let db_txn = env.new_transaction().unwrap();
413 let db = LmdbStore::from(db_txn.bind(&h));
414 let doc = db.flush_doc(DOC_NAME).unwrap().unwrap();
415 db_txn.commit().unwrap();
416
417 let text = doc.get_or_insert_text("text");
418
419 assert_eq!(text.get_string(&doc.transact()), "abc");
420 }
421 }
422
423 #[test]
424 fn state_vector_updates_only() {
425 const DOC_NAME: &str = "doc";
426 let dir = TempDir::new("lmdb-state_vector_updates_only").unwrap();
427 let env = init_env(&dir);
428 let h = env.create_db("yrs", DbCreate).unwrap();
429 let env = Arc::new(env);
430 let h = Arc::new(h);
431
432 {
434 let doc = Doc::new();
435 let text = doc.get_or_insert_text("text");
436 let env = env.clone();
437 let h = h.clone();
438 let _sub = doc.observe_update_v1(move |_, u| {
439 let db_txn = env.new_transaction().unwrap();
440 let db = LmdbStore::from(db_txn.bind(&h));
441 db.push_update(DOC_NAME, &u.update).unwrap();
442 db_txn.commit().unwrap();
443 });
444 text.push(&mut doc.transact_mut(), "a");
446 text.push(&mut doc.transact_mut(), "b");
447 text.push(&mut doc.transact_mut(), "c");
448
449 let sv = doc.transact().state_vector();
450 sv
451 };
452
453 let db_txn = env.get_reader().unwrap();
454 let db = LmdbStore::from(db_txn.bind(&h));
455 let (sv, completed) = db.get_state_vector(DOC_NAME).unwrap();
456 assert!(sv.is_none());
457 assert!(!completed); }
459
460 #[test]
461 fn state_diff_from_updates() {
462 const DOC_NAME: &str = "doc";
463 let dir = TempDir::new("lmdb-state_diff_from_updates").unwrap();
464 let env = init_env(&dir);
465 let h = env.create_db("yrs", DbCreate).unwrap();
466 let env = Arc::new(env);
467 let h = Arc::new(h);
468
469 let (sv, expected) = {
470 let doc = Doc::new();
471 let text = doc.get_or_insert_text("text");
472
473 let env = env.clone();
474 let h = h.clone();
475 let _sub = doc.observe_update_v1(move |_, u| {
476 let db_txn = env.new_transaction().unwrap();
477 let db = LmdbStore::from(db_txn.bind(&h));
478 db.push_update(DOC_NAME, &u.update).unwrap();
479 db_txn.commit().unwrap();
480 });
481
482 text.push(&mut doc.transact_mut(), "a");
484 text.push(&mut doc.transact_mut(), "b");
485 let sv = doc.transact().state_vector();
486 text.push(&mut doc.transact_mut(), "c");
487 let update = doc.transact().encode_diff_v1(&sv);
488 (sv, update)
489 };
490
491 let db_txn = env.get_reader().unwrap();
492 let db = LmdbStore::from(db_txn.bind(&h));
493 let actual = db.get_diff(DOC_NAME, &sv).unwrap();
494 assert_eq!(actual, Some(expected));
495 }
496
497 #[test]
498 fn state_diff_from_doc() {
499 const DOC_NAME: &str = "doc";
500 let dir = TempDir::new("lmdb-state_diff_from_doc").unwrap();
501 let env = init_env(&dir);
502 let h = env.create_db("yrs", DbCreate).unwrap();
503
504 let (sv, expected) = {
505 let doc = Doc::new();
506 let text = doc.get_or_insert_text("text");
507 text.push(&mut doc.transact_mut(), "a");
509 text.push(&mut doc.transact_mut(), "b");
510 let sv = doc.transact().state_vector();
511 text.push(&mut doc.transact_mut(), "c");
512 let update = doc.transact().encode_diff_v1(&sv);
513
514 let db_txn = env.new_transaction().unwrap();
515 let db = LmdbStore::from(db_txn.bind(&h));
516 db.insert_doc(DOC_NAME, &doc.transact()).unwrap();
517 db_txn.commit().unwrap();
518
519 (sv, update)
520 };
521
522 let db_txn = env.get_reader().unwrap();
523 let db = LmdbStore::from(db_txn.bind(&h));
524 let actual = db.get_diff(DOC_NAME, &sv).unwrap();
525 assert_eq!(actual, Some(expected));
526 }
527
528 #[test]
529 fn doc_meta() {
530 const DOC_NAME: &str = "doc";
531 let dir = TempDir::new("lmdb-doc_meta").unwrap();
532 let env = init_env(&dir);
533 let h = env.create_db("yrs", DbCreate).unwrap();
534
535 let db_txn = env.new_transaction().unwrap();
536 let db = LmdbStore::from(db_txn.bind(&h));
537 let value = db.get_meta(DOC_NAME, "key").unwrap();
538 assert!(value.is_none());
539 db.insert_meta(DOC_NAME, "key", "value1".as_bytes())
540 .unwrap();
541 db_txn.commit().unwrap();
542
543 let db_txn = env.new_transaction().unwrap();
544 let db = LmdbStore::from(db_txn.bind(&h));
545 let prev = db.get_meta(DOC_NAME, "key").unwrap().map(Vec::from);
546 db.insert_meta(DOC_NAME, "key", "value2".as_bytes())
547 .unwrap();
548 db_txn.commit().unwrap();
549 assert_eq!(prev.as_deref(), Some("value1".as_bytes()));
550
551 let db_txn = env.new_transaction().unwrap();
552 let db = LmdbStore::from(db_txn.bind(&h));
553 let prev = db.get_meta(DOC_NAME, "key").unwrap().map(Vec::from);
554 db.remove_meta(DOC_NAME, "key").unwrap();
555 assert_eq!(prev.as_deref(), Some("value2".as_bytes()));
556 let value = db.get_meta(DOC_NAME, "key").unwrap();
557 assert!(value.is_none());
558 }
559
560 #[test]
561 fn doc_meta_iter() {
562 let dir = TempDir::new("lmdb-doc_meta_iter").unwrap();
563 let env = init_env(&dir);
564 let h = env.create_db("yrs", DbCreate).unwrap();
565 let db_txn = env.new_transaction().unwrap();
566 let db = LmdbStore::from(db_txn.bind(&h));
567
568 db.insert_meta("A", "key1", [1].as_ref()).unwrap();
569 db.insert_meta("B", "key2", [2].as_ref()).unwrap();
570 db.insert_meta("B", "key3", [3].as_ref()).unwrap();
571 db.insert_meta("C", "key4", [4].as_ref()).unwrap();
572
573 let mut i = db.iter_meta("B").unwrap();
574 assert_eq!(i.next(), Some(("key2".as_bytes().into(), [2].into())));
575 assert_eq!(i.next(), Some(("key3".as_bytes().into(), [3].into())));
576 assert!(i.next().is_none());
577 }
578
579 #[test]
580 fn doc_iter() {
581 let dir = TempDir::new("lmdb-doc_iter").unwrap();
582 let env = init_env(&dir);
583 let h = env.create_db("yrs", DbCreate).unwrap();
584 let env = Arc::new(env);
585 let h = Arc::new(h);
586
587 {
589 let db_txn = env.new_transaction().unwrap();
590 let db = LmdbStore::from(db_txn.bind(&h));
591 db.insert_meta("A", "key1", [1].as_ref()).unwrap();
592 db_txn.commit().unwrap();
593 }
594
595 {
597 let doc = Doc::new();
598 let text = doc.get_or_insert_text("text");
599 let mut txn = doc.transact_mut();
600 text.push(&mut txn, "hello world");
601 let db_txn = env.new_transaction().unwrap();
602 let db = LmdbStore::from(db_txn.bind(&h));
603 db.insert_doc("B", &txn).unwrap();
604 db_txn.commit().unwrap();
605 }
606
607 {
609 let doc = Doc::new();
610 let env = env.clone();
611 let h = h.clone();
612 let _sub = doc.observe_update_v1(move |_, u| {
613 let db_txn = env.new_transaction().unwrap();
614 let db = LmdbStore::from(db_txn.bind(&h));
615 db.push_update("C", &u.update).unwrap();
616 db_txn.commit().unwrap();
617 });
618 let text = doc.get_or_insert_text("text");
619 let mut txn = doc.transact_mut();
620 text.push(&mut txn, "hello world");
621 }
622
623 {
624 let db_txn = env.get_reader().unwrap();
625 let db = LmdbStore::from(db_txn.bind(&h));
626 let mut i = db.iter_docs().unwrap();
627 assert_eq!(i.next(), Some("A".as_bytes().into()));
628 assert_eq!(i.next(), Some("B".as_bytes().into()));
629 assert_eq!(i.next(), Some("C".as_bytes().into()));
630 assert!(i.next().is_none());
631 }
632
633 {
635 let db_txn = env.new_transaction().unwrap();
636 let db = LmdbStore::from(db_txn.bind(&h));
637 db.clear_doc("B").unwrap();
638 db_txn.commit().unwrap();
639 }
640
641 {
642 let db_txn = env.get_reader().unwrap();
643 let db = LmdbStore::from(db_txn.bind(&h));
644 let mut i = db.iter_docs().unwrap();
645 assert_eq!(i.next(), Some("A".as_bytes().into()));
646 assert_eq!(i.next(), Some("C".as_bytes().into()));
647 assert!(i.next().is_none());
648 }
649 }
650}