1use rocksdb::{
54 DBIteratorWithThreadMode, DBPinnableSlice, Direction, IteratorMode, ReadOptions, Transaction,
55};
56use std::ops::Deref;
57use yrs_kvstore::{DocOps, KVEntry, KVStore};
58
59pub use yrs_kvstore as store;
60
61#[repr(transparent)]
64pub struct RocksDBStore<'a, DB>(Transaction<'a, DB>);
65
66impl<'a, DB> RocksDBStore<'a, DB> {
67 #[inline(always)]
68 pub fn commit(self) -> Result<(), rocksdb::Error> {
69 self.0.commit()
70 }
71}
72
73impl<'a, DB> From<Transaction<'a, DB>> for RocksDBStore<'a, DB> {
74 #[inline(always)]
75 fn from(txn: Transaction<'a, DB>) -> Self {
76 RocksDBStore(txn)
77 }
78}
79
80impl<'a, DB> Into<Transaction<'a, DB>> for RocksDBStore<'a, DB> {
81 #[inline(always)]
82 fn into(self) -> Transaction<'a, DB> {
83 self.0
84 }
85}
86
87impl<'a, DB> Deref for RocksDBStore<'a, DB> {
88 type Target = Transaction<'a, DB>;
89
90 #[inline(always)]
91 fn deref(&self) -> &Self::Target {
92 &self.0
93 }
94}
95
96impl<'a, DB> DocOps<'a> for RocksDBStore<'a, DB> {}
97
98impl<'a, DB> KVStore<'a> for RocksDBStore<'a, DB> {
99 type Error = rocksdb::Error;
100 type Cursor = RocksDBIter<'a, DB>;
101 type Entry = RocksDBEntry;
102 type Return = DBPinnableSlice<'a>;
103
104 fn get(&self, key: &[u8]) -> Result<Option<Self::Return>, Self::Error> {
105 if let Some(pinned) = self.0.get_pinned(key)? {
106 Ok(Some(unsafe { std::mem::transmute(pinned) }))
107 } else {
108 Ok(None)
109 }
110 }
111
112 fn upsert(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error> {
113 self.0.put(key, value)?;
114 Ok(())
115 }
116
117 fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
118 self.0.delete(key)?;
119 Ok(())
120 }
121
122 fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<(), Self::Error> {
123 let mut opt = ReadOptions::default();
124 opt.set_iterate_lower_bound(from);
125 opt.set_iterate_upper_bound(to);
126 let mut i = self
127 .0
128 .iterator_opt(IteratorMode::From(from, Direction::Forward), opt);
129 while let Some(res) = i.next() {
130 let (key, _) = res?;
131 self.0.delete(key)?;
132 }
133 Ok(())
134 }
135
136 fn iter_range(&self, from: &[u8], to: &[u8]) -> Result<Self::Cursor, Self::Error> {
137 let mut opt = ReadOptions::default();
138 opt.set_iterate_lower_bound(from);
139 opt.set_iterate_upper_bound(to);
140 let raw = self
141 .0
142 .iterator_opt(IteratorMode::From(from, Direction::Forward), opt);
143 Ok(RocksDBIter::new(
144 unsafe { std::mem::transmute(raw) },
145 to.to_vec(),
146 ))
147 }
148
149 fn peek_back(&self, key: &[u8]) -> Result<Option<Self::Entry>, Self::Error> {
150 let opt = ReadOptions::default();
151 let mut raw = self.0.raw_iterator_opt(opt);
152 raw.seek_for_prev(key);
153 if let Some((key, value)) = raw.item() {
154 Ok(Some(RocksDBEntry::new(key.into(), value.into())))
155 } else {
156 Ok(None)
157 }
158 }
159}
160
161pub struct RocksDBIter<'a, DB> {
162 inner: DBIteratorWithThreadMode<'a, Transaction<'a, DB>>,
163 to: Vec<u8>,
164}
165
166impl<'a, DB> RocksDBIter<'a, DB> {
167 fn new(inner: DBIteratorWithThreadMode<'a, Transaction<'a, DB>>, to: Vec<u8>) -> Self {
168 RocksDBIter { inner, to }
169 }
170}
171
172impl<'a, DB> Iterator for RocksDBIter<'a, DB> {
173 type Item = RocksDBEntry;
174
175 fn next(&mut self) -> Option<Self::Item> {
176 let n = self.inner.next()?;
177 if let Ok((key, value)) = n {
178 if key.as_ref() >= &self.to {
179 None
180 } else {
181 Some(RocksDBEntry::new(key, value))
182 }
183 } else {
184 None
185 }
186 }
187}
188
189pub struct RocksDBEntry {
190 key: Box<[u8]>,
191 value: Box<[u8]>,
192}
193
194impl RocksDBEntry {
195 fn new(key: Box<[u8]>, value: Box<[u8]>) -> Self {
196 RocksDBEntry { key, value }
197 }
198}
199
200impl Into<(Box<[u8]>, Box<[u8]>)> for RocksDBEntry {
201 fn into(self) -> (Box<[u8]>, Box<[u8]>) {
202 (self.key, self.value)
203 }
204}
205
206impl KVEntry for RocksDBEntry {
207 fn key(&self) -> &[u8] {
208 &self.key
209 }
210
211 fn value(&self) -> &[u8] {
212 &self.value
213 }
214}
215
216#[cfg(test)]
217mod test {
218 use crate::RocksDBStore;
219 use rocksdb::TransactionDB;
220 use std::path::Path;
221 use std::sync::Arc;
222 use tempdir::TempDir;
223 use yrs::{Doc, GetString, ReadTxn, Text, Transact};
224 use yrs_kvstore::DocOps;
225
226 fn init_env<P: AsRef<Path>>(dir: P) -> TransactionDB {
227 let db = TransactionDB::open_default(dir).unwrap();
228 db
229 }
230
231 #[test]
232 fn create_get_remove() {
233 let tmp = TempDir::new("rocksdb-create_get_remove").unwrap();
234 let db = init_env(&tmp);
235
236 {
238 let doc = Doc::new();
239 let text = doc.get_or_insert_text("text");
240 let mut txn = doc.transact_mut();
241 text.insert(&mut txn, 0, "hello");
242
243 let db_txn = RocksDBStore::from(db.transaction());
244 db_txn.insert_doc("doc", &txn).unwrap();
245 db_txn.commit().unwrap();
246 }
247
248 {
250 let doc = Doc::new();
251 let text = doc.get_or_insert_text("text");
252 let mut txn = doc.transact_mut();
253 let db_txn = RocksDBStore::from(db.transaction());
254 db_txn.load_doc("doc", &mut txn).unwrap();
255
256 assert_eq!(text.get_string(&txn), "hello");
257
258 let (sv, completed) = db_txn.get_state_vector("doc").unwrap();
259 assert_eq!(sv, Some(txn.state_vector()));
260 assert!(completed);
261 }
262
263 {
265 let db_txn = RocksDBStore::from(db.transaction());
266
267 db_txn.clear_doc("doc").unwrap();
268
269 let doc = Doc::new();
270 let text = doc.get_or_insert_text("text");
271 let mut txn = doc.transact_mut();
272 db_txn.load_doc("doc", &mut txn).unwrap();
273
274 assert_eq!(text.get_string(&txn), "");
275
276 let (sv, completed) = db_txn.get_state_vector("doc").unwrap();
277 assert!(sv.is_none());
278 assert!(completed);
279 }
280 }
281 #[test]
282 fn multi_insert() {
283 let tmp = TempDir::new("rocksdb-multi_insert").unwrap();
284 let db = init_env(&tmp);
285
286 {
288 let doc = Doc::new();
289 let text = doc.get_or_insert_text("text");
290 let mut txn = doc.transact_mut();
291 text.push(&mut txn, "hello");
292
293 let db_txn = RocksDBStore::from(db.transaction());
294
295 db_txn.insert_doc("doc", &txn).unwrap();
296
297 text.push(&mut txn, " world");
298
299 db_txn.insert_doc("doc", &txn).unwrap();
300 db_txn.commit().unwrap();
301 }
302
303 {
305 let db_txn = RocksDBStore::from(db.transaction());
306
307 let doc = Doc::new();
308 let text = doc.get_or_insert_text("text");
309 let mut txn = doc.transact_mut();
310 db_txn.load_doc("doc", &mut txn).unwrap();
311
312 assert_eq!(text.get_string(&txn), "hello world");
313 }
314 }
315
316 #[test]
317 fn incremental_updates() {
318 const DOC_NAME: &str = "doc";
319 let tmp = TempDir::new("rocksdb-incremental_updates").unwrap();
320 let db = init_env(&tmp);
321 let db = Arc::new(db);
322
323 {
325 let doc = Doc::new();
326 let text = doc.get_or_insert_text("text");
327
328 let db = db.clone();
329 let _sub = doc.observe_update_v1(move |_, u| {
330 let db_txn = RocksDBStore::from(db.transaction());
331 db_txn.push_update(DOC_NAME, &u.update).unwrap();
332 db_txn.commit().unwrap();
333 });
334 text.push(&mut doc.transact_mut(), "a");
336 text.push(&mut doc.transact_mut(), "b");
337 text.push(&mut doc.transact_mut(), "c");
338 }
339
340 {
342 let doc = Doc::new();
343 let text = doc.get_or_insert_text("text");
344 let mut txn = doc.transact_mut();
345
346 let db_txn = RocksDBStore::from(db.transaction());
347 db_txn.load_doc(DOC_NAME, &mut txn).unwrap();
348
349 assert_eq!(text.get_string(&txn), "abc");
350 }
351
352 {
354 let db_txn = RocksDBStore::from(db.transaction());
355 let doc = db_txn.flush_doc(DOC_NAME).unwrap().unwrap();
356 db_txn.commit().unwrap();
357
358 let text = doc.get_or_insert_text("text");
359
360 assert_eq!(text.get_string(&doc.transact()), "abc");
361 }
362 }
363
364 #[test]
365 fn state_vector_updates_only() {
366 const DOC_NAME: &str = "doc";
367 let tmp = TempDir::new("rocksdb-state_vector_updates_only").unwrap();
368 let db = init_env(&tmp);
369 let db = Arc::new(db);
370
371 {
373 let doc = Doc::new();
374 let text = doc.get_or_insert_text("text");
375 let db = db.clone();
376 let _sub = doc.observe_update_v1(move |_, u| {
377 let db_txn = RocksDBStore::from(db.transaction());
378 db_txn.push_update(DOC_NAME, &u.update).unwrap();
379 db_txn.commit().unwrap();
380 });
381 text.push(&mut doc.transact_mut(), "a");
383 text.push(&mut doc.transact_mut(), "b");
384 text.push(&mut doc.transact_mut(), "c");
385
386 let sv = doc.transact().state_vector();
387 sv
388 };
389
390 let db_txn = RocksDBStore::from(db.transaction());
391 let (sv, completed) = db_txn.get_state_vector(DOC_NAME).unwrap();
392 assert!(sv.is_none());
393 assert!(!completed); }
395
396 #[test]
397 fn state_diff_from_updates() {
398 const DOC_NAME: &str = "doc";
399 let tmp = TempDir::new("rocksdb-state_diff_from_updates").unwrap();
400 let db = init_env(&tmp);
401 let db = Arc::new(db);
402
403 let (sv, expected) = {
404 let doc = Doc::new();
405 let text = doc.get_or_insert_text("text");
406
407 let db = db.clone();
408 let _sub = doc.observe_update_v1(move |_, u| {
409 let db_txn = RocksDBStore::from(db.transaction());
410 db_txn.push_update(DOC_NAME, &u.update).unwrap();
411 db_txn.commit().unwrap();
412 });
413
414 text.push(&mut doc.transact_mut(), "a");
416 text.push(&mut doc.transact_mut(), "b");
417 let sv = doc.transact().state_vector();
418 text.push(&mut doc.transact_mut(), "c");
419 let update = doc.transact().encode_diff_v1(&sv);
420 (sv, update)
421 };
422
423 let db_txn = RocksDBStore::from(db.transaction());
424 let actual = db_txn.get_diff(DOC_NAME, &sv).unwrap();
425 assert_eq!(actual, Some(expected));
426 }
427
428 #[test]
429 fn state_diff_from_doc() {
430 const DOC_NAME: &str = "doc";
431 let tmp = TempDir::new("rocksdb-state_diff_from_doc").unwrap();
432 let db = init_env(&tmp);
433 let db = Arc::new(db);
434
435 let (sv, expected) = {
436 let doc = Doc::new();
437 let text = doc.get_or_insert_text("text");
438 text.push(&mut doc.transact_mut(), "a");
440 text.push(&mut doc.transact_mut(), "b");
441 let sv = doc.transact().state_vector();
442 text.push(&mut doc.transact_mut(), "c");
443 let update = doc.transact().encode_diff_v1(&sv);
444
445 let db_txn = RocksDBStore::from(db.transaction());
446 db_txn.insert_doc(DOC_NAME, &doc.transact()).unwrap();
447 db_txn.commit().unwrap();
448
449 (sv, update)
450 };
451
452 let db_txn = RocksDBStore::from(db.transaction());
453 let actual = db_txn.get_diff(DOC_NAME, &sv).unwrap();
454 assert_eq!(actual, Some(expected));
455 }
456
457 #[test]
458 fn doc_meta() {
459 const DOC_NAME: &str = "doc";
460 let tmp = TempDir::new("rocksdb-doc_meta").unwrap();
461 let db = init_env(&tmp);
462 let db = Arc::new(db);
463
464 let db_txn = RocksDBStore::from(db.transaction());
465 let value = db_txn.get_meta(DOC_NAME, "key").unwrap();
466 assert!(value.is_none());
467 db_txn
468 .insert_meta(DOC_NAME, "key", "value1".as_bytes())
469 .unwrap();
470 db_txn.commit().unwrap();
471
472 let db_txn = RocksDBStore::from(db.transaction());
473 let prev = db_txn.get_meta(DOC_NAME, "key").unwrap();
474 db_txn
475 .insert_meta(DOC_NAME, "key", "value2".as_bytes())
476 .unwrap();
477 db_txn.commit().unwrap();
478 assert_eq!(prev.as_deref(), Some("value1".as_bytes()));
479
480 let db_txn = RocksDBStore::from(db.transaction());
481 let prev = db_txn.get_meta(DOC_NAME, "key").unwrap();
482 db_txn.remove_meta(DOC_NAME, "key").unwrap();
483 assert_eq!(prev.as_deref(), Some("value2".as_bytes()));
484 let value = db_txn.get_meta(DOC_NAME, "key").unwrap();
485 assert!(value.is_none());
486 }
487
488 #[test]
489 fn doc_meta_iter() {
490 let tmp = TempDir::new("rocksdb-doc_meta_iter").unwrap();
491 let db = init_env(&tmp);
492 let db_txn = RocksDBStore::from(db.transaction());
493
494 db_txn.insert_meta("A", "key1", [1].as_ref()).unwrap();
495 db_txn.insert_meta("B", "key2", [2].as_ref()).unwrap();
496 db_txn.insert_meta("B", "key3", [3].as_ref()).unwrap();
497 db_txn.insert_meta("C", "key4", [4].as_ref()).unwrap();
498
499 let mut i = db_txn.iter_meta("B").unwrap();
500 assert_eq!(i.next(), Some(("key2".as_bytes().into(), [2].into())));
501 assert_eq!(i.next(), Some(("key3".as_bytes().into(), [3].into())));
502 assert!(i.next().is_none());
503 }
504
505 #[test]
506 fn doc_iter() {
507 let tmp = TempDir::new("rocksdb-doc_iter").unwrap();
508 let db = init_env(&tmp);
509 let db = Arc::new(db);
510
511 {
513 let db_txn = RocksDBStore::from(db.transaction());
514 db_txn.insert_meta("A", "key1", [1].as_ref()).unwrap();
515 db_txn.commit().unwrap();
516 }
517
518 {
520 let doc = Doc::new();
521 let text = doc.get_or_insert_text("text");
522 let mut txn = doc.transact_mut();
523 text.push(&mut txn, "hello world");
524
525 let db_txn = RocksDBStore::from(db.transaction());
526 db_txn.insert_doc("B", &txn).unwrap();
527 db_txn.commit().unwrap();
528 }
529
530 {
532 let doc = Doc::new();
533 let db = db.clone();
534 let _sub = doc.observe_update_v1(move |_, u| {
535 let db_txn = RocksDBStore::from(db.transaction());
536 db_txn.push_update("C", &u.update).unwrap();
537 db_txn.commit().unwrap();
538 });
539 let text = doc.get_or_insert_text("text");
540 let mut txn = doc.transact_mut();
541 text.push(&mut txn, "hello world");
542 }
543
544 {
545 let db_txn = RocksDBStore::from(db.transaction());
546 let mut i = db_txn.iter_docs().unwrap();
547 assert_eq!(i.next(), Some("A".as_bytes().into()));
548 assert_eq!(i.next(), Some("B".as_bytes().into()));
549 assert_eq!(i.next(), Some("C".as_bytes().into()));
550 assert!(i.next().is_none());
551 }
552
553 {
555 let db_txn = RocksDBStore::from(db.transaction());
556 db_txn.clear_doc("B").unwrap();
557 db_txn.commit().unwrap();
558 }
559
560 {
561 let db_txn = RocksDBStore::from(db.transaction());
562 let mut i = db_txn.iter_docs().unwrap();
563 assert_eq!(i.next(), Some("A".as_bytes().into()));
564 assert_eq!(i.next(), Some("C".as_bytes().into()));
565 assert!(i.next().is_none());
566 }
567 }
568}