sled/tree.rs
1use std::{
2 num::NonZeroU64,
3 borrow::Cow,
4 fmt::{self, Debug},
5 ops::{self, Deref, RangeBounds},
6 sync::atomic::Ordering::SeqCst,
7};
8
9use parking_lot::RwLock;
10
11use crate::{atomic_shim::AtomicU64, pagecache::NodeView, *};
12
13#[derive(Debug, Clone)]
14pub(crate) struct View<'g> {
15 pub node_view: NodeView<'g>,
16 pub pid: PageId,
17 pub size: u64,
18}
19
20impl<'g> Deref for View<'g> {
21 type Target = Node;
22
23 fn deref(&self) -> &Node {
24 &*self.node_view
25 }
26}
27
28impl IntoIterator for &'_ Tree {
29 type Item = Result<(IVec, IVec)>;
30 type IntoIter = Iter;
31
32 fn into_iter(self) -> Iter {
33 self.iter()
34 }
35}
36
37/// A flash-sympathetic persistent lock-free B+ tree.
38///
39/// A `Tree` represents a single logical keyspace / namespace / bucket.
40///
41/// Separate `Trees` may be opened to separate concerns using
42/// `Db::open_tree`.
43///
44/// `Db` implements `Deref<Target = Tree>` such that a `Db` acts
45/// like the "default" `Tree`. This is the only `Tree` that cannot
46/// be deleted via `Db::drop_tree`.
47///
48/// # Examples
49///
50/// ```
51/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
52/// use sled::IVec;
53///
54/// # let _ = std::fs::remove_dir_all("db");
55/// let db: sled::Db = sled::open("db")?;
56/// db.insert(b"yo!", b"v1".to_vec());
57/// assert_eq!(db.get(b"yo!"), Ok(Some(IVec::from(b"v1"))));
58///
59/// // Atomic compare-and-swap.
60/// db.compare_and_swap(
61/// b"yo!", // key
62/// Some(b"v1"), // old value, None for not present
63/// Some(b"v2"), // new value, None for delete
64/// )?;
65///
66/// // Iterates over key-value pairs, starting at the given key.
67/// let scan_key: &[u8] = b"a non-present key before yo!";
68/// let mut iter = db.range(scan_key..);
69/// assert_eq!(
70/// iter.next().unwrap(),
71/// Ok((IVec::from(b"yo!"), IVec::from(b"v2")))
72/// );
73/// assert_eq!(iter.next(), None);
74///
75/// db.remove(b"yo!");
76/// assert_eq!(db.get(b"yo!"), Ok(None));
77///
78/// let other_tree: sled::Tree = db.open_tree(b"cool db facts")?;
79/// other_tree.insert(
80/// b"k1",
81/// &b"a Db acts like a Tree due to implementing Deref<Target = Tree>"[..]
82/// )?;
83/// # let _ = std::fs::remove_dir_all("db");
84/// # Ok(()) }
85/// ```
86#[derive(Clone)]
87pub struct Tree(pub(crate) Arc<TreeInner>);
88
89#[allow(clippy::module_name_repetitions)]
90pub struct TreeInner {
91 pub(crate) tree_id: IVec,
92 pub(crate) context: Context,
93 pub(crate) subscribers: Subscribers,
94 pub(crate) root: AtomicU64,
95 pub(crate) merge_operator: RwLock<Option<Box<dyn MergeOperator>>>,
96}
97
98impl Drop for TreeInner {
99 fn drop(&mut self) {
100 // Flush the underlying system in a loop until we
101 // have flushed all dirty data.
102 loop {
103 match self.context.pagecache.flush() {
104 Ok(0) => return,
105 Ok(_) => continue,
106 Err(e) => {
107 error!("failed to flush data to disk: {:?}", e);
108 return
109 },
110 }
111 }
112 }
113}
114
115impl Deref for Tree {
116 type Target = TreeInner;
117
118 fn deref(&self) -> &TreeInner {
119 &self.0
120 }
121}
122
123#[allow(unsafe_code)]
124unsafe impl Send for Tree {}
125
126#[allow(unsafe_code)]
127unsafe impl Sync for Tree {}
128
129impl Tree {
130 #[doc(hidden)]
131 #[deprecated(since = "0.24.2", note = "replaced by `Tree::insert`")]
132 pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
133 where
134 K: AsRef<[u8]>,
135 V: Into<IVec>,
136 {
137 self.insert(key, value)
138 }
139
140 /// Insert a key to a new value, returning the last value if it
141 /// was set.
142 ///
143 /// # Examples
144 ///
145 /// ```
146 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
147 /// # let config = sled::Config::new().temporary(true);
148 /// # let db = config.open()?;
149 /// assert_eq!(db.insert(&[1, 2, 3], vec![0]), Ok(None));
150 /// assert_eq!(db.insert(&[1, 2, 3], vec![1]), Ok(Some(sled::IVec::from(&[0]))));
151 /// # Ok(()) }
152 /// ```
153 pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
154 where
155 K: AsRef<[u8]>,
156 V: Into<IVec>,
157 {
158 let value = value.into();
159 let mut guard = pin();
160 let _cc = concurrency_control::read();
161 loop {
162 trace!("setting key {:?}", key.as_ref());
163 if let Ok(res) =
164 self.insert_inner(key.as_ref(), Some(value.clone()), &mut guard)?
165 {
166 return Ok(res);
167 }
168 }
169 }
170
171 pub(crate) fn insert_inner(
172 &self,
173 key: &[u8],
174 mut value: Option<IVec>,
175 guard: &mut Guard,
176 ) -> Result<Conflictable<Option<IVec>>> {
177 let _measure = if value.is_some() {
178 Measure::new(&M.tree_set)
179 } else {
180 Measure::new(&M.tree_del)
181 };
182
183 let View { node_view, pid, .. } =
184 self.view_for_key(key.as_ref(), guard)?;
185
186 let mut subscriber_reservation = self.subscribers.reserve(&key);
187
188 let (encoded_key, last_value) = node_view.node_kv_pair(key.as_ref());
189
190 if value == last_value {
191 // short-circuit a no-op set or delete
192 return Ok(Ok(value))
193 }
194
195 let frag = if let Some(value) = value.clone() {
196 Link::Set(encoded_key, value)
197 } else {
198 Link::Del(encoded_key)
199 };
200
201 let link = self.context.pagecache.link(
202 pid,
203 node_view.0,
204 frag,
205 guard,
206 )?;
207
208 if link.is_ok() {
209 // success
210 if let Some(res) = subscriber_reservation.take() {
211 let event = if let Some(value) = value.take() {
212 subscriber::Event::Insert {
213 key: key.as_ref().into(),
214 value,
215 }
216 } else {
217 subscriber::Event::Remove { key: key.as_ref().into() }
218 };
219
220 res.complete(&event);
221 }
222
223 guard.writeset.push(pid);
224
225 Ok(Ok(last_value))
226 } else {
227 M.tree_looped();
228 Ok(Err(Conflict))
229 }
230 }
231
232 /// Perform a multi-key serializable transaction.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// # use sled::{transaction::TransactionResult, Config};
238 /// # fn main() -> TransactionResult<()> {
239 /// # let config = sled::Config::new().temporary(true);
240 /// # let db = config.open()?;
241 /// // Use write-only transactions as a writebatch:
242 /// db.transaction(|tx_db| {
243 /// tx_db.insert(b"k1", b"cats")?;
244 /// tx_db.insert(b"k2", b"dogs")?;
245 /// Ok(())
246 /// })?;
247 ///
248 /// // Atomically swap two items:
249 /// db.transaction(|tx_db| {
250 /// let v1_option = tx_db.remove(b"k1")?;
251 /// let v1 = v1_option.unwrap();
252 /// let v2_option = tx_db.remove(b"k2")?;
253 /// let v2 = v2_option.unwrap();
254 ///
255 /// tx_db.insert(b"k1", v2)?;
256 /// tx_db.insert(b"k2", v1)?;
257 ///
258 /// Ok(())
259 /// })?;
260 ///
261 /// assert_eq!(&db.get(b"k1")?.unwrap(), b"dogs");
262 /// assert_eq!(&db.get(b"k2")?.unwrap(), b"cats");
263 /// # Ok(())
264 /// # }
265 /// ```
266 ///
267 /// A transaction may return information from
268 /// an intentionally-cancelled transaction by using
269 /// the abort function inside the closure in
270 /// combination with the try operator.
271 ///
272 /// ```
273 /// use sled::{transaction::{abort, TransactionError, TransactionResult}, Config};
274 ///
275 /// #[derive(Debug, PartialEq)]
276 /// struct MyBullshitError;
277 ///
278 /// fn main() -> TransactionResult<(), MyBullshitError> {
279 /// let config = Config::new().temporary(true);
280 /// let db = config.open()?;
281 ///
282 /// // Use write-only transactions as a writebatch:
283 /// let res = db.transaction(|tx_db| {
284 /// tx_db.insert(b"k1", b"cats")?;
285 /// tx_db.insert(b"k2", b"dogs")?;
286 /// // aborting will cause all writes to roll-back.
287 /// if true {
288 /// abort(MyBullshitError)?;
289 /// }
290 /// Ok(42)
291 /// }).unwrap_err();
292 ///
293 /// assert_eq!(res, TransactionError::Abort(MyBullshitError));
294 /// assert_eq!(db.get(b"k1")?, None);
295 /// assert_eq!(db.get(b"k2")?, None);
296 ///
297 /// Ok(())
298 /// }
299 /// ```
300 ///
301 ///
302 /// Transactions also work on tuples of `Tree`s,
303 /// preserving serializable ACID semantics!
304 /// In this example, we treat two trees like a
305 /// work queue, atomically apply updates to
306 /// data and move them from the unprocessed `Tree`
307 /// to the processed `Tree`.
308 ///
309 /// ```
310 /// # use sled::transaction::TransactionResult;
311 /// # fn main() -> TransactionResult<()> {
312 /// # let config = sled::Config::new().temporary(true);
313 /// # let db = config.open()?;
314 /// use sled::Transactional;
315 ///
316 /// let unprocessed = db.open_tree(b"unprocessed items")?;
317 /// let processed = db.open_tree(b"processed items")?;
318 ///
319 /// // An update somehow gets into the tree, which we
320 /// // later trigger the atomic processing of.
321 /// unprocessed.insert(b"k3", b"ligers")?;
322 ///
323 /// // Atomically process the new item and move it
324 /// // between `Tree`s.
325 /// (&unprocessed, &processed)
326 /// .transaction(|(tx_unprocessed, tx_processed)| {
327 /// let unprocessed_item = tx_unprocessed.remove(b"k3")?.unwrap();
328 /// let mut processed_item = b"yappin' ".to_vec();
329 /// processed_item.extend_from_slice(&unprocessed_item);
330 /// tx_processed.insert(b"k3", processed_item)?;
331 /// Ok(())
332 /// })?;
333 ///
334 /// assert_eq!(unprocessed.get(b"k3").unwrap(), None);
335 /// assert_eq!(&processed.get(b"k3").unwrap().unwrap(), b"yappin' ligers");
336 /// # Ok(()) }
337 /// ```
338 pub fn transaction<F, A, E>(
339 &self,
340 f: F,
341 ) -> transaction::TransactionResult<A, E>
342 where
343 F: Fn(
344 &transaction::TransactionalTree,
345 ) -> transaction::ConflictableTransactionResult<A, E>,
346 {
347 Transactional::transaction(&self, f)
348 }
349
350 /// Create a new batched update that can be
351 /// atomically applied.
352 ///
353 /// It is possible to apply a `Batch` in a transaction
354 /// as well, which is the way you can apply a `Batch`
355 /// to multiple `Tree`s atomically.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
361 /// # let config = sled::Config::new().temporary(true);
362 /// # let db = config.open()?;
363 /// db.insert("key_0", "val_0")?;
364 ///
365 /// let mut batch = sled::Batch::default();
366 /// batch.insert("key_a", "val_a");
367 /// batch.insert("key_b", "val_b");
368 /// batch.insert("key_c", "val_c");
369 /// batch.remove("key_0");
370 ///
371 /// db.apply_batch(batch)?;
372 /// // key_0 no longer exists, and key_a, key_b, and key_c
373 /// // now do exist.
374 /// # Ok(()) }
375 /// ```
376 pub fn apply_batch(&self, batch: Batch) -> Result<()> {
377 let _cc = concurrency_control::write();
378 let mut guard = pin();
379 self.apply_batch_inner(batch, &mut guard)
380 }
381
382 pub(crate) fn apply_batch_inner(
383 &self,
384 batch: Batch,
385 guard: &mut Guard,
386 ) -> Result<()> {
387 let peg = self.context.pin_log(guard)?;
388 trace!("applying batch {:?}", batch);
389 for (k, v_opt) in batch.writes {
390 loop {
391 if self.insert_inner(&k, v_opt.clone(), guard)?.is_ok() {
392 break;
393 }
394 }
395 }
396
397 // when the peg drops, it ensures all updates
398 // written to the log since its creation are
399 // recovered atomically
400 peg.seal_batch()
401 }
402
403 /// Retrieve a value from the `Tree` if it exists.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
409 /// # let config = sled::Config::new().temporary(true);
410 /// # let db = config.open()?;
411 /// db.insert(&[0], vec![0])?;
412 /// assert_eq!(db.get(&[0]), Ok(Some(sled::IVec::from(vec![0]))));
413 /// assert_eq!(db.get(&[1]), Ok(None));
414 /// # Ok(()) }
415 /// ```
416 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
417 let mut guard = pin();
418 let _cc = concurrency_control::read();
419 loop {
420 if let Ok(get) = self.get_inner(key.as_ref(), &mut guard)? {
421 return Ok(get);
422 }
423 }
424 }
425
426 pub(crate) fn get_inner(
427 &self,
428 key: &[u8],
429 guard: &mut Guard,
430 ) -> Result<Conflictable<Option<IVec>>> {
431 let _measure = Measure::new(&M.tree_get);
432
433 trace!("getting key {:?}", key);
434
435 let View { node_view, pid, .. } = self.view_for_key(key.as_ref(), guard)?;
436
437 let pair = node_view.leaf_pair_for_key(key.as_ref());
438 let val = pair.map(|kv| kv.1.clone());
439
440 guard.readset.push(pid);
441
442 Ok(Ok(val))
443 }
444
445 #[doc(hidden)]
446 #[deprecated(since = "0.24.2", note = "replaced by `Tree::remove`")]
447 pub fn del<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
448 self.remove(key)
449 }
450
451 /// Delete a value, returning the old value if it existed.
452 ///
453 /// # Examples
454 ///
455 /// ```
456 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
457 /// # let config = sled::Config::new().temporary(true);
458 /// # let db = config.open()?;
459 /// db.insert(&[1], vec![1]);
460 /// assert_eq!(db.remove(&[1]), Ok(Some(sled::IVec::from(vec![1]))));
461 /// assert_eq!(db.remove(&[1]), Ok(None));
462 /// # Ok(()) }
463 /// ```
464 pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
465 let mut guard = pin();
466 let _cc = concurrency_control::read();
467 loop {
468 trace!("removing key {:?}", key.as_ref());
469
470 if let Ok(res) = self.insert_inner(key.as_ref(), None, &mut guard)? {
471 return Ok(res);
472 }
473 }
474 }
475
476 /// Compare and swap. Capable of unique creation, conditional modification,
477 /// or deletion. If old is `None`, this will only set the value if it
478 /// doesn't exist yet. If new is `None`, will delete the value if old is
479 /// correct. If both old and new are `Some`, will modify the value if
480 /// old is correct.
481 ///
482 /// It returns `Ok(Ok(()))` if operation finishes successfully.
483 ///
484 /// If it fails it returns:
485 /// - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation
486 /// failed to setup a new value. `CompareAndSwapError` contains
487 /// current and proposed values.
488 /// - `Err(Error::Unsupported)` if the database is opened in read-only
489 /// mode.
490 ///
491 /// # Examples
492 ///
493 /// ```
494 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
495 /// # let config = sled::Config::new().temporary(true);
496 /// # let db = config.open()?;
497 /// // unique creation
498 /// assert_eq!(
499 /// db.compare_and_swap(&[1], None as Option<&[u8]>, Some(&[10])),
500 /// Ok(Ok(()))
501 /// );
502 ///
503 /// // conditional modification
504 /// assert_eq!(
505 /// db.compare_and_swap(&[1], Some(&[10]), Some(&[20])),
506 /// Ok(Ok(()))
507 /// );
508 ///
509 /// // failed conditional modification -- the current value is returned in
510 /// // the error variant
511 /// let operation = db.compare_and_swap(&[1], Some(&[30]), Some(&[40]));
512 /// assert!(operation.is_ok()); // the operation succeeded
513 /// let modification = operation.unwrap();
514 /// assert!(modification.is_err());
515 /// let actual_value = modification.unwrap_err();
516 /// assert_eq!(actual_value.current.map(|ivec| ivec.to_vec()), Some(vec![20]));
517 ///
518 /// // conditional deletion
519 /// assert_eq!(
520 /// db.compare_and_swap(&[1], Some(&[20]), None as Option<&[u8]>),
521 /// Ok(Ok(()))
522 /// );
523 /// assert_eq!(db.get(&[1]), Ok(None));
524 /// # Ok(()) }
525 /// ```
526 #[allow(clippy::needless_pass_by_value)]
527 pub fn compare_and_swap<K, OV, NV>(
528 &self,
529 key: K,
530 old: Option<OV>,
531 new: Option<NV>,
532 ) -> CompareAndSwapResult
533 where
534 K: AsRef<[u8]>,
535 OV: AsRef<[u8]>,
536 NV: Into<IVec>,
537 {
538 trace!("cas'ing key {:?}", key.as_ref());
539 let _measure = Measure::new(&M.tree_cas);
540
541 let guard = pin();
542 let _cc = concurrency_control::read();
543
544 let new = new.map(Into::into);
545
546 // we need to retry caps until old != cur, since just because
547 // cap fails it doesn't mean our value was changed.
548 loop {
549 let View { pid, node_view, .. } =
550 self.view_for_key(key.as_ref(), &guard)?;
551
552 let (encoded_key, current_value) =
553 node_view.node_kv_pair(key.as_ref());
554 let matches = match (old.as_ref(), ¤t_value) {
555 (None, None) => true,
556 (Some(o), Some(ref c)) => o.as_ref() == &**c,
557 _ => false,
558 };
559
560 if !matches {
561 return Ok(Err(CompareAndSwapError {
562 current: current_value,
563 proposed: new,
564 }));
565 }
566
567 let mut subscriber_reservation = self.subscribers.reserve(&key);
568
569 let frag = if let Some(ref new) = new {
570 Link::Set(encoded_key, new.clone())
571 } else {
572 Link::Del(encoded_key)
573 };
574 let link =
575 self.context.pagecache.link(pid, node_view.0, frag, &guard)?;
576
577 if link.is_ok() {
578 if let Some(res) = subscriber_reservation.take() {
579 let event = if let Some(new) = new {
580 subscriber::Event::Insert {
581 key: key.as_ref().into(),
582 value: new,
583 }
584 } else {
585 subscriber::Event::Remove { key: key.as_ref().into() }
586 };
587
588 res.complete(&event);
589 }
590
591 return Ok(Ok(()));
592 }
593 M.tree_looped();
594 }
595 }
596
597 /// Fetch the value, apply a function to it and return the result.
598 ///
599 /// # Note
600 ///
601 /// This may call the function multiple times if the value has been
602 /// changed from other threads in the meantime.
603 ///
604 /// # Examples
605 ///
606 /// ```
607 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
608 /// use sled::{Config, Error, IVec};
609 /// use std::convert::TryInto;
610 ///
611 /// let config = Config::new().temporary(true);
612 /// let db = config.open()?;
613 ///
614 /// fn u64_to_ivec(number: u64) -> IVec {
615 /// IVec::from(number.to_be_bytes().to_vec())
616 /// }
617 ///
618 /// let zero = u64_to_ivec(0);
619 /// let one = u64_to_ivec(1);
620 /// let two = u64_to_ivec(2);
621 /// let three = u64_to_ivec(3);
622 ///
623 /// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
624 /// let number = match old {
625 /// Some(bytes) => {
626 /// let array: [u8; 8] = bytes.try_into().unwrap();
627 /// let number = u64::from_be_bytes(array);
628 /// number + 1
629 /// }
630 /// None => 0,
631 /// };
632 ///
633 /// Some(number.to_be_bytes().to_vec())
634 /// }
635 ///
636 /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(zero)));
637 /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(one)));
638 /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(two)));
639 /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(three)));
640 /// # Ok(()) }
641 /// ```
642 pub fn update_and_fetch<K, V, F>(
643 &self,
644 key: K,
645 mut f: F,
646 ) -> Result<Option<IVec>>
647 where
648 K: AsRef<[u8]>,
649 F: FnMut(Option<&[u8]>) -> Option<V>,
650 V: Into<IVec>,
651 {
652 let key_ref = key.as_ref();
653 let mut current = self.get(key_ref)?;
654
655 loop {
656 let tmp = current.as_ref().map(AsRef::as_ref);
657 let next = f(tmp).map(Into::into);
658 match self.compare_and_swap::<_, _, IVec>(
659 key_ref,
660 tmp,
661 next.clone(),
662 )? {
663 Ok(()) => return Ok(next),
664 Err(CompareAndSwapError { current: cur, .. }) => {
665 current = cur;
666 }
667 }
668 }
669 }
670
671 /// Fetch the value, apply a function to it and return the previous value.
672 ///
673 /// # Note
674 ///
675 /// This may call the function multiple times if the value has been
676 /// changed from other threads in the meantime.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
682 /// use sled::{Config, Error, IVec};
683 /// use std::convert::TryInto;
684 ///
685 /// let config = Config::new().temporary(true);
686 /// let db = config.open()?;
687 ///
688 /// fn u64_to_ivec(number: u64) -> IVec {
689 /// IVec::from(number.to_be_bytes().to_vec())
690 /// }
691 ///
692 /// let zero = u64_to_ivec(0);
693 /// let one = u64_to_ivec(1);
694 /// let two = u64_to_ivec(2);
695 ///
696 /// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
697 /// let number = match old {
698 /// Some(bytes) => {
699 /// let array: [u8; 8] = bytes.try_into().unwrap();
700 /// let number = u64::from_be_bytes(array);
701 /// number + 1
702 /// }
703 /// None => 0,
704 /// };
705 ///
706 /// Some(number.to_be_bytes().to_vec())
707 /// }
708 ///
709 /// assert_eq!(db.fetch_and_update("counter", increment), Ok(None));
710 /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(zero)));
711 /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(one)));
712 /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(two)));
713 /// # Ok(()) }
714 /// ```
715 pub fn fetch_and_update<K, V, F>(
716 &self,
717 key: K,
718 mut f: F,
719 ) -> Result<Option<IVec>>
720 where
721 K: AsRef<[u8]>,
722 F: FnMut(Option<&[u8]>) -> Option<V>,
723 V: Into<IVec>,
724 {
725 let key_ref = key.as_ref();
726 let mut current = self.get(key_ref)?;
727
728 loop {
729 let tmp = current.as_ref().map(AsRef::as_ref);
730 let next = f(tmp);
731 match self.compare_and_swap(key_ref, tmp, next)? {
732 Ok(()) => return Ok(current),
733 Err(CompareAndSwapError { current: cur, .. }) => {
734 current = cur;
735 }
736 }
737 }
738 }
739
740 /// Subscribe to `Event`s that happen to keys that have
741 /// the specified prefix. Events for particular keys are
742 /// guaranteed to be witnessed in the same order by all
743 /// threads, but threads may witness different interleavings
744 /// of `Event`s across different keys. If subscribers don't
745 /// keep up with new writes, they will cause new writes
746 /// to block. There is a buffer of 1024 items per
747 /// `Subscriber`. This can be used to build reactive
748 /// and replicated systems.
749 ///
750 /// `Subscriber` implements both `Iterator<Item = Event>`
751 /// and `Future<Output=Option<Event>>`
752 ///
753 /// # Examples
754 ///
755 /// Synchronous, blocking subscriber:
756 /// ```
757 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
758 /// # let config = sled::Config::new().temporary(true);
759 /// # let db = config.open()?;
760 /// // watch all events by subscribing to the empty prefix
761 /// let mut subscriber = db.watch_prefix(vec![]);
762 ///
763 /// let tree_2 = db.clone();
764 /// let thread = std::thread::spawn(move || {
765 /// db.insert(vec![0], vec![1])
766 /// });
767 ///
768 /// // `Subscription` implements `Iterator<Item=Event>`
769 /// for event in subscriber.take(1) {
770 /// match event {
771 /// sled::Event::Insert{ key, value } => assert_eq!(key.as_ref(), &[0]),
772 /// sled::Event::Remove {key } => {}
773 /// }
774 /// }
775 ///
776 /// # thread.join().unwrap();
777 /// # Ok(()) }
778 /// ```
779 /// Aynchronous, non-blocking subscriber:
780 ///
781 /// `Subscription` implements `Future<Output=Option<Event>>`.
782 ///
783 /// `while let Some(event) = (&mut subscriber).await { /* use it */ }`
784 pub fn watch_prefix<P: AsRef<[u8]>>(&self, prefix: P) -> Subscriber {
785 self.subscribers.register(prefix.as_ref())
786 }
787
788 /// Synchronously flushes all dirty IO buffers and calls
789 /// fsync. If this succeeds, it is guaranteed that all
790 /// previous writes will be recovered if the system
791 /// crashes. Returns the number of bytes flushed during
792 /// this call.
793 ///
794 /// Flushing can take quite a lot of time, and you should
795 /// measure the performance impact of using it on
796 /// realistic sustained workloads running on realistic
797 /// hardware.
798 pub fn flush(&self) -> Result<usize> {
799 self.context.pagecache.flush()
800 }
801
802 /// Asynchronously flushes all dirty IO buffers
803 /// and calls fsync. If this succeeds, it is
804 /// guaranteed that all previous writes will
805 /// be recovered if the system crashes. Returns
806 /// the number of bytes flushed during this call.
807 ///
808 /// Flushing can take quite a lot of time, and you
809 /// should measure the performance impact of
810 /// using it on realistic sustained workloads
811 /// running on realistic hardware.
812 // this clippy check is mis-firing on async code.
813 #[allow(clippy::used_underscore_binding)]
814 pub async fn flush_async(&self) -> Result<usize> {
815 let pagecache = self.context.pagecache.clone();
816 if let Some(result) = threadpool::spawn(move || pagecache.flush())?.await
817 {
818 result
819 } else {
820 Err(Error::ReportableBug(
821 "threadpool failed to complete \
822 action before shutdown"
823 .to_string(),
824 ))
825 }
826 }
827
828 /// Returns `true` if the `Tree` contains a value for
829 /// the specified key.
830 ///
831 /// # Examples
832 ///
833 /// ```
834 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
835 /// # let config = sled::Config::new().temporary(true);
836 /// # let db = config.open()?;
837 /// db.insert(&[0], vec![0])?;
838 /// assert!(db.contains_key(&[0])?);
839 /// assert!(!db.contains_key(&[1])?);
840 /// # Ok(()) }
841 /// ```
842 pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> Result<bool> {
843 self.get(key).map(|v| v.is_some())
844 }
845
846 /// Retrieve the key and value before the provided key,
847 /// if one exists.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
853 /// use sled::IVec;
854 /// # let config = sled::Config::new().temporary(true);
855 /// # let db = config.open()?;
856 /// for i in 0..10 {
857 /// db.insert(&[i], vec![i])
858 /// .expect("should write successfully");
859 /// }
860 ///
861 /// assert_eq!(db.get_lt(&[]), Ok(None));
862 /// assert_eq!(db.get_lt(&[0]), Ok(None));
863 /// assert_eq!(
864 /// db.get_lt(&[1]),
865 /// Ok(Some((IVec::from(&[0]), IVec::from(&[0]))))
866 /// );
867 /// assert_eq!(
868 /// db.get_lt(&[9]),
869 /// Ok(Some((IVec::from(&[8]), IVec::from(&[8]))))
870 /// );
871 /// assert_eq!(
872 /// db.get_lt(&[10]),
873 /// Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
874 /// );
875 /// assert_eq!(
876 /// db.get_lt(&[255]),
877 /// Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
878 /// );
879 /// # Ok(()) }
880 /// ```
881 pub fn get_lt<K>(&self, key: K) -> Result<Option<(IVec, IVec)>>
882 where
883 K: AsRef<[u8]>,
884 {
885 let _measure = Measure::new(&M.tree_get);
886 self.range(..key).next_back().transpose()
887 }
888
889 /// Retrieve the next key and value from the `Tree` after the
890 /// provided key.
891 ///
892 /// # Note
893 /// The order follows the Ord implementation for `Vec<u8>`:
894 ///
895 /// `[] < [0] < [255] < [255, 0] < [255, 255] ...`
896 ///
897 /// To retain the ordering of numerical types use big endian reprensentation
898 ///
899 /// # Examples
900 ///
901 /// ```
902 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
903 /// use sled::IVec;
904 /// # let config = sled::Config::new().temporary(true);
905 /// # let db = config.open()?;
906 /// for i in 0..10 {
907 /// db.insert(&[i], vec![i])?;
908 /// }
909 ///
910 /// assert_eq!(
911 /// db.get_gt(&[]),
912 /// Ok(Some((IVec::from(&[0]), IVec::from(&[0]))))
913 /// );
914 /// assert_eq!(
915 /// db.get_gt(&[0]),
916 /// Ok(Some((IVec::from(&[1]), IVec::from(&[1]))))
917 /// );
918 /// assert_eq!(
919 /// db.get_gt(&[1]),
920 /// Ok(Some((IVec::from(&[2]), IVec::from(&[2]))))
921 /// );
922 /// assert_eq!(
923 /// db.get_gt(&[8]),
924 /// Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
925 /// );
926 /// assert_eq!(db.get_gt(&[9]), Ok(None));
927 ///
928 /// db.insert(500u16.to_be_bytes(), vec![10]);
929 /// assert_eq!(
930 /// db.get_gt(&499u16.to_be_bytes()),
931 /// Ok(Some((IVec::from(&500u16.to_be_bytes()), IVec::from(&[10]))))
932 /// );
933 /// # Ok(()) }
934 /// ```
935 pub fn get_gt<K>(&self, key: K) -> Result<Option<(IVec, IVec)>>
936 where
937 K: AsRef<[u8]>,
938 {
939 let _measure = Measure::new(&M.tree_get);
940 self.range((ops::Bound::Excluded(key), ops::Bound::Unbounded))
941 .next()
942 .transpose()
943 }
944
945 /// Merge state directly into a given key's value using the
946 /// configured merge operator. This allows state to be written
947 /// into a value directly, without any read-modify-write steps.
948 /// Merge operators can be used to implement arbitrary data
949 /// structures.
950 ///
951 /// Calling `merge` will return an `Unsupported` error if it
952 /// is called without first setting a merge operator function.
953 ///
954 /// Merge operators are shared by all instances of a particular
955 /// `Tree`. Different merge operators may be set on different
956 /// `Tree`s.
957 ///
958 /// # Examples
959 ///
960 /// ```
961 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
962 /// # let config = sled::Config::new().temporary(true);
963 /// # let db = config.open()?;
964 /// use sled::IVec;
965 ///
966 /// fn concatenate_merge(
967 /// _key: &[u8], // the key being merged
968 /// old_value: Option<&[u8]>, // the previous value, if one existed
969 /// merged_bytes: &[u8] // the new bytes being merged in
970 /// ) -> Option<Vec<u8>> { // set the new value, return None to delete
971 /// let mut ret = old_value
972 /// .map(|ov| ov.to_vec())
973 /// .unwrap_or_else(|| vec![]);
974 ///
975 /// ret.extend_from_slice(merged_bytes);
976 ///
977 /// Some(ret)
978 /// }
979 ///
980 /// db.set_merge_operator(concatenate_merge);
981 ///
982 /// let k = b"k1";
983 ///
984 /// db.insert(k, vec![0]);
985 /// db.merge(k, vec![1]);
986 /// db.merge(k, vec![2]);
987 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![0, 1, 2]))));
988 ///
989 /// // Replace previously merged data. The merge function will not be called.
990 /// db.insert(k, vec![3]);
991 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![3]))));
992 ///
993 /// // Merges on non-present values will cause the merge function to be called
994 /// // with `old_value == None`. If the merge function returns something (which it
995 /// // does, in this case) a new value will be inserted.
996 /// db.remove(k);
997 /// db.merge(k, vec![4]);
998 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![4]))));
999 /// # Ok(()) }
1000 /// ```
1001 pub fn merge<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
1002 where
1003 K: AsRef<[u8]>,
1004 V: AsRef<[u8]>,
1005 {
1006 let _cc = concurrency_control::read();
1007 loop {
1008 if let Ok(merge) = self.merge_inner(key.as_ref(), value.as_ref())? {
1009 return Ok(merge);
1010 }
1011 }
1012 }
1013
1014 pub(crate) fn merge_inner(
1015 &self,
1016 key: &[u8],
1017 value: &[u8],
1018 ) -> Result<Conflictable<Option<IVec>>> {
1019 trace!("merging key {:?}", key);
1020 let _measure = Measure::new(&M.tree_merge);
1021
1022 let merge_operator_opt = self.merge_operator.read();
1023
1024 if merge_operator_opt.is_none() {
1025 return Err(Error::Unsupported(
1026 "must set a merge operator on this Tree \
1027 before calling merge by calling \
1028 Tree::set_merge_operator"
1029 .to_owned(),
1030 ));
1031 }
1032
1033 let merge_operator = merge_operator_opt.as_ref().unwrap();
1034
1035 loop {
1036 let guard = pin();
1037 let View { pid, node_view, .. } =
1038 self.view_for_key(key.as_ref(), &guard)?;
1039
1040 let (encoded_key, current_value) =
1041 node_view.node_kv_pair(key.as_ref());
1042 let tmp = current_value.as_ref().map(AsRef::as_ref);
1043 let new = merge_operator(key, tmp, value).map(IVec::from);
1044
1045 let mut subscriber_reservation = self.subscribers.reserve(&key);
1046
1047 let frag = if let Some(ref new) = new {
1048 Link::Set(encoded_key, new.clone())
1049 } else {
1050 Link::Del(encoded_key)
1051 };
1052 let link =
1053 self.context.pagecache.link(pid, node_view.0, frag, &guard)?;
1054
1055 if link.is_ok() {
1056 if let Some(res) = subscriber_reservation.take() {
1057 let event = if let Some(new) = &new {
1058 subscriber::Event::Insert {
1059 key: key.as_ref().into(),
1060 value: new.clone(),
1061 }
1062 } else {
1063 subscriber::Event::Remove { key: key.as_ref().into() }
1064 };
1065
1066 res.complete(&event);
1067 }
1068
1069 return Ok(Ok(new));
1070 }
1071 M.tree_looped();
1072 }
1073 }
1074
1075 /// Sets a merge operator for use with the `merge` function.
1076 ///
1077 /// Merge state directly into a given key's value using the
1078 /// configured merge operator. This allows state to be written
1079 /// into a value directly, without any read-modify-write steps.
1080 /// Merge operators can be used to implement arbitrary data
1081 /// structures.
1082 ///
1083 /// # Panics
1084 ///
1085 /// Calling `merge` will panic if no merge operator has been
1086 /// configured.
1087 ///
1088 /// # Examples
1089 ///
1090 /// ```
1091 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1092 /// # let config = sled::Config::new().temporary(true);
1093 /// # let db = config.open()?;
1094 /// use sled::IVec;
1095 ///
1096 /// fn concatenate_merge(
1097 /// _key: &[u8], // the key being merged
1098 /// old_value: Option<&[u8]>, // the previous value, if one existed
1099 /// merged_bytes: &[u8] // the new bytes being merged in
1100 /// ) -> Option<Vec<u8>> { // set the new value, return None to delete
1101 /// let mut ret = old_value
1102 /// .map(|ov| ov.to_vec())
1103 /// .unwrap_or_else(|| vec![]);
1104 ///
1105 /// ret.extend_from_slice(merged_bytes);
1106 ///
1107 /// Some(ret)
1108 /// }
1109 ///
1110 /// db.set_merge_operator(concatenate_merge);
1111 ///
1112 /// let k = b"k1";
1113 ///
1114 /// db.insert(k, vec![0]);
1115 /// db.merge(k, vec![1]);
1116 /// db.merge(k, vec![2]);
1117 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![0, 1, 2]))));
1118 ///
1119 /// // Replace previously merged data. The merge function will not be called.
1120 /// db.insert(k, vec![3]);
1121 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![3]))));
1122 ///
1123 /// // Merges on non-present values will cause the merge function to be called
1124 /// // with `old_value == None`. If the merge function returns something (which it
1125 /// // does, in this case) a new value will be inserted.
1126 /// db.remove(k);
1127 /// db.merge(k, vec![4]);
1128 /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![4]))));
1129 /// # Ok(()) }
1130 /// ```
1131 pub fn set_merge_operator(
1132 &self,
1133 merge_operator: impl MergeOperator + 'static,
1134 ) {
1135 let mut mo_write = self.merge_operator.write();
1136 *mo_write = Some(Box::new(merge_operator));
1137 }
1138
1139 /// Create a double-ended iterator over the tuples of keys and
1140 /// values in this tree.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```
1145 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1146 /// # let config = sled::Config::new().temporary(true);
1147 /// # let db = config.open()?;
1148 /// use sled::IVec;
1149 /// db.insert(&[1], vec![10]);
1150 /// db.insert(&[2], vec![20]);
1151 /// db.insert(&[3], vec![30]);
1152 /// let mut iter = db.iter();
1153 /// assert_eq!(
1154 /// iter.next().unwrap(),
1155 /// Ok((IVec::from(&[1]), IVec::from(&[10])))
1156 /// );
1157 /// assert_eq!(
1158 /// iter.next().unwrap(),
1159 /// Ok((IVec::from(&[2]), IVec::from(&[20])))
1160 /// );
1161 /// assert_eq!(
1162 /// iter.next().unwrap(),
1163 /// Ok((IVec::from(&[3]), IVec::from(&[30])))
1164 /// );
1165 /// assert_eq!(iter.next(), None);
1166 /// # Ok(()) }
1167 /// ```
1168 pub fn iter(&self) -> Iter {
1169 self.range::<Vec<u8>, _>(..)
1170 }
1171
1172 /// Create a double-ended iterator over tuples of keys and values,
1173 /// where the keys fall within the specified range.
1174 ///
1175 /// # Examples
1176 ///
1177 /// ```
1178 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1179 /// # let config = sled::Config::new().temporary(true);
1180 /// # let db = config.open()?;
1181 /// use sled::IVec;
1182 /// db.insert(&[0], vec![0])?;
1183 /// db.insert(&[1], vec![10])?;
1184 /// db.insert(&[2], vec![20])?;
1185 /// db.insert(&[3], vec![30])?;
1186 /// db.insert(&[4], vec![40])?;
1187 /// db.insert(&[5], vec![50])?;
1188 ///
1189 /// let start: &[u8] = &[2];
1190 /// let end: &[u8] = &[4];
1191 /// let mut r = db.range(start..end);
1192 /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[2]), IVec::from(&[20]))));
1193 /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[3]), IVec::from(&[30]))));
1194 /// assert_eq!(r.next(), None);
1195 ///
1196 /// let mut r = db.range(start..end).rev();
1197 /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[3]), IVec::from(&[30]))));
1198 /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[2]), IVec::from(&[20]))));
1199 /// assert_eq!(r.next(), None);
1200 /// # Ok(()) }
1201 /// ```
1202 pub fn range<K, R>(&self, range: R) -> Iter
1203 where
1204 K: AsRef<[u8]>,
1205 R: RangeBounds<K>,
1206 {
1207 let lo = match range.start_bound() {
1208 ops::Bound::Included(start) => {
1209 ops::Bound::Included(IVec::from(start.as_ref()))
1210 }
1211 ops::Bound::Excluded(start) => {
1212 ops::Bound::Excluded(IVec::from(start.as_ref()))
1213 }
1214 ops::Bound::Unbounded => ops::Bound::Included(IVec::from(&[])),
1215 };
1216
1217 let hi = match range.end_bound() {
1218 ops::Bound::Included(end) => {
1219 ops::Bound::Included(IVec::from(end.as_ref()))
1220 }
1221 ops::Bound::Excluded(end) => {
1222 ops::Bound::Excluded(IVec::from(end.as_ref()))
1223 }
1224 ops::Bound::Unbounded => ops::Bound::Unbounded,
1225 };
1226
1227 Iter {
1228 tree: self.clone(),
1229 hi,
1230 lo,
1231 cached_node: None,
1232 going_forward: true,
1233 }
1234 }
1235
1236 /// Create an iterator over tuples of keys and values,
1237 /// where the all the keys starts with the given prefix.
1238 ///
1239 /// # Examples
1240 ///
1241 /// ```
1242 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1243 /// # let config = sled::Config::new().temporary(true);
1244 /// # let db = config.open()?;
1245 /// use sled::IVec;
1246 /// db.insert(&[0, 0, 0], vec![0, 0, 0])?;
1247 /// db.insert(&[0, 0, 1], vec![0, 0, 1])?;
1248 /// db.insert(&[0, 0, 2], vec![0, 0, 2])?;
1249 /// db.insert(&[0, 0, 3], vec![0, 0, 3])?;
1250 /// db.insert(&[0, 1, 0], vec![0, 1, 0])?;
1251 /// db.insert(&[0, 1, 1], vec![0, 1, 1])?;
1252 ///
1253 /// let prefix: &[u8] = &[0, 0];
1254 /// let mut r = db.scan_prefix(prefix);
1255 /// assert_eq!(
1256 /// r.next(),
1257 /// Some(Ok((IVec::from(&[0, 0, 0]), IVec::from(&[0, 0, 0]))))
1258 /// );
1259 /// assert_eq!(
1260 /// r.next(),
1261 /// Some(Ok((IVec::from(&[0, 0, 1]), IVec::from(&[0, 0, 1]))))
1262 /// );
1263 /// assert_eq!(
1264 /// r.next(),
1265 /// Some(Ok((IVec::from(&[0, 0, 2]), IVec::from(&[0, 0, 2]))))
1266 /// );
1267 /// assert_eq!(
1268 /// r.next(),
1269 /// Some(Ok((IVec::from(&[0, 0, 3]), IVec::from(&[0, 0, 3]))))
1270 /// );
1271 /// assert_eq!(r.next(), None);
1272 /// # Ok(()) }
1273 /// ```
1274 pub fn scan_prefix<P>(&self, prefix: P) -> Iter
1275 where
1276 P: AsRef<[u8]>,
1277 {
1278 let prefix_ref = prefix.as_ref();
1279 let mut upper = prefix_ref.to_vec();
1280
1281 while let Some(last) = upper.pop() {
1282 if last < u8::max_value() {
1283 upper.push(last + 1);
1284 return self.range(prefix_ref..&upper);
1285 }
1286 }
1287
1288 self.range(prefix..)
1289 }
1290
1291 /// Returns the first key and value in the `Tree`, or
1292 /// `None` if the `Tree` is empty.
1293 pub fn first(&self) -> Result<Option<(IVec, IVec)>> {
1294 self.iter().next().transpose()
1295 }
1296
1297 /// Returns the last key and value in the `Tree`, or
1298 /// `None` if the `Tree` is empty.
1299 pub fn last(&self) -> Result<Option<(IVec, IVec)>> {
1300 self.iter().next_back().transpose()
1301 }
1302
1303 /// Atomically removes the maximum item in the `Tree` instance.
1304 ///
1305 /// # Examples
1306 ///
1307 /// ```
1308 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1309 /// # let config = sled::Config::new().temporary(true);
1310 /// # let db = config.open()?;
1311 /// db.insert(&[0], vec![0])?;
1312 /// db.insert(&[1], vec![10])?;
1313 /// db.insert(&[2], vec![20])?;
1314 /// db.insert(&[3], vec![30])?;
1315 /// db.insert(&[4], vec![40])?;
1316 /// db.insert(&[5], vec![50])?;
1317 ///
1318 /// assert_eq!(&db.pop_max()?.unwrap().0, &[5]);
1319 /// assert_eq!(&db.pop_max()?.unwrap().0, &[4]);
1320 /// assert_eq!(&db.pop_max()?.unwrap().0, &[3]);
1321 /// assert_eq!(&db.pop_max()?.unwrap().0, &[2]);
1322 /// assert_eq!(&db.pop_max()?.unwrap().0, &[1]);
1323 /// assert_eq!(&db.pop_max()?.unwrap().0, &[0]);
1324 /// assert_eq!(db.pop_max()?, None);
1325 /// # Ok(()) }
1326 /// ```
1327 pub fn pop_max(&self) -> Result<Option<(IVec, IVec)>> {
1328 loop {
1329 if let Some(first_res) = self.iter().next_back() {
1330 let first = first_res?;
1331 if self
1332 .compare_and_swap::<_, _, &[u8]>(
1333 &first.0,
1334 Some(&first.1),
1335 None,
1336 )?
1337 .is_ok()
1338 {
1339 return Ok(Some(first));
1340 }
1341 // try again
1342 } else {
1343 return Ok(None);
1344 }
1345 }
1346 }
1347
1348 /// Atomically removes the minimum item in the `Tree` instance.
1349 ///
1350 /// # Examples
1351 ///
1352 /// ```
1353 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1354 /// # let config = sled::Config::new().temporary(true);
1355 /// # let db = config.open()?;
1356 /// db.insert(&[0], vec![0])?;
1357 /// db.insert(&[1], vec![10])?;
1358 /// db.insert(&[2], vec![20])?;
1359 /// db.insert(&[3], vec![30])?;
1360 /// db.insert(&[4], vec![40])?;
1361 /// db.insert(&[5], vec![50])?;
1362 ///
1363 /// assert_eq!(&db.pop_min()?.unwrap().0, &[0]);
1364 /// assert_eq!(&db.pop_min()?.unwrap().0, &[1]);
1365 /// assert_eq!(&db.pop_min()?.unwrap().0, &[2]);
1366 /// assert_eq!(&db.pop_min()?.unwrap().0, &[3]);
1367 /// assert_eq!(&db.pop_min()?.unwrap().0, &[4]);
1368 /// assert_eq!(&db.pop_min()?.unwrap().0, &[5]);
1369 /// assert_eq!(db.pop_min()?, None);
1370 /// # Ok(()) }
1371 /// ```
1372 pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
1373 loop {
1374 if let Some(first_res) = self.iter().next() {
1375 let first = first_res?;
1376 if self
1377 .compare_and_swap::<_, _, &[u8]>(
1378 &first.0,
1379 Some(&first.1),
1380 None,
1381 )?
1382 .is_ok()
1383 {
1384 return Ok(Some(first));
1385 }
1386 // try again
1387 } else {
1388 return Ok(None);
1389 }
1390 }
1391 }
1392
1393 /// Returns the number of elements in this tree.
1394 ///
1395 /// Beware: performs a full O(n) scan under the hood.
1396 ///
1397 /// # Examples
1398 ///
1399 /// ```
1400 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1401 /// # let config = sled::Config::new().temporary(true);
1402 /// # let db = config.open()?;
1403 /// db.insert(b"a", vec![0]);
1404 /// db.insert(b"b", vec![1]);
1405 /// assert_eq!(db.len(), 2);
1406 /// # Ok(()) }
1407 /// ```
1408 pub fn len(&self) -> usize {
1409 self.iter().count()
1410 }
1411
1412 /// Returns `true` if the `Tree` contains no elements.
1413 pub fn is_empty(&self) -> bool {
1414 self.iter().next().is_none()
1415 }
1416
1417 /// Clears the `Tree`, removing all values.
1418 ///
1419 /// Note that this is not atomic.
1420 pub fn clear(&self) -> Result<()> {
1421 for k in self.iter().keys() {
1422 let key = k?;
1423 let _old = self.remove(key)?;
1424 }
1425 Ok(())
1426 }
1427
1428 /// Returns the name of the tree.
1429 pub fn name(&self) -> IVec {
1430 self.tree_id.clone()
1431 }
1432
1433 /// Returns the CRC32 of all keys and values
1434 /// in this Tree.
1435 ///
1436 /// This is O(N) and locks the underlying tree
1437 /// for the duration of the entire scan.
1438 pub fn checksum(&self) -> Result<u32> {
1439 let mut hasher = crc32fast::Hasher::new();
1440 let mut iter = self.iter();
1441 let _cc = concurrency_control::write();
1442 while let Some(kv_res) = iter.next_inner() {
1443 let (k, v) = kv_res?;
1444 hasher.update(&k);
1445 hasher.update(&v);
1446 }
1447 Ok(hasher.finalize())
1448 }
1449
1450 fn split_node<'g>(
1451 &self,
1452 view: &View<'g>,
1453 parent_view: &Option<View<'g>>,
1454 root_pid: PageId,
1455 guard: &'g Guard,
1456 ) -> Result<()> {
1457 trace!("splitting node {}", view.pid);
1458 // split node
1459 let (mut lhs, rhs) = view.deref().clone().split();
1460 let rhs_lo = rhs.lo.clone();
1461
1462 // install right side
1463 let (rhs_pid, rhs_ptr) = self.context.pagecache.allocate(rhs, guard)?;
1464
1465 // replace node, pointing next to installed right
1466 lhs.next = Some(NonZeroU64::new(rhs_pid).unwrap());
1467 let replace = self.context.pagecache.replace(
1468 view.pid,
1469 view.node_view.0,
1470 lhs,
1471 guard,
1472 )?;
1473 M.tree_child_split_attempt();
1474 if replace.is_err() {
1475 // if we failed, don't follow through with the
1476 // parent split or root hoist.
1477 let _new_stack = self
1478 .context
1479 .pagecache
1480 .free(rhs_pid, rhs_ptr, guard)?
1481 .expect("could not free allocated page");
1482 return Ok(());
1483 }
1484 M.tree_child_split_success();
1485
1486 // either install parent split or hoist root
1487 if let Some(parent_view) = parent_view {
1488 M.tree_parent_split_attempt();
1489 let mut parent: Node = parent_view.deref().clone();
1490 let split_applied = parent.parent_split(&rhs_lo, rhs_pid);
1491
1492 if !split_applied {
1493 // due to deep races, it's possible for the
1494 // parent to already have a node for this lo key.
1495 // if this is the case, we can skip the parent split
1496 // because it's probably going to fail anyway.
1497 return Ok(());
1498 }
1499
1500 let replace = self.context.pagecache.replace(
1501 parent_view.pid,
1502 parent_view.node_view.0,
1503 parent,
1504 guard,
1505 )?;
1506 if replace.is_ok() {
1507 M.tree_parent_split_success();
1508 } else {
1509 // Parent splits are an optimization
1510 // so we don't need to care if we
1511 // failed.
1512 }
1513 } else {
1514 let _ = self.root_hoist(root_pid, rhs_pid, rhs_lo, guard)?;
1515 }
1516
1517 Ok(())
1518 }
1519
1520 fn root_hoist(
1521 &self,
1522 from: PageId,
1523 to: PageId,
1524 at: IVec,
1525 guard: &Guard,
1526 ) -> Result<bool> {
1527 M.tree_root_split_attempt();
1528 // hoist new root, pointing to lhs & rhs
1529
1530 let new_root = Node::new_hoisted_root(from, at, to);
1531
1532 let (new_root_pid, new_root_ptr) =
1533 self.context.pagecache.allocate(new_root, guard)?;
1534 debug!("allocated pid {} in root_hoist", new_root_pid);
1535
1536 debug_delay();
1537
1538 let cas = self.context.pagecache.cas_root_in_meta(
1539 &self.tree_id,
1540 Some(from),
1541 Some(new_root_pid),
1542 guard,
1543 )?;
1544 if cas.is_ok() {
1545 debug!("root hoist from {} to {} successful", from, new_root_pid);
1546 M.tree_root_split_success();
1547
1548 // we spin in a cas loop because it's possible
1549 // 2 threads are at this point, and we don't want
1550 // to cause roots to diverge between meta and
1551 // our version.
1552 while self.root.compare_and_swap(from, new_root_pid, SeqCst) != from
1553 {
1554 }
1555
1556 Ok(true)
1557 } else {
1558 debug!(
1559 "root hoist from {} to {} failed: {:?}",
1560 from, new_root_pid, cas
1561 );
1562 let _new_stack = self
1563 .context
1564 .pagecache
1565 .free(new_root_pid, new_root_ptr, guard)?
1566 .expect("could not free allocated page");
1567
1568 Ok(false)
1569 }
1570 }
1571
1572 pub(crate) fn view_for_pid<'g>(
1573 &self,
1574 pid: PageId,
1575 guard: &'g Guard,
1576 ) -> Result<Option<View<'g>>> {
1577 loop {
1578 let node_view_opt = self.context.pagecache.get(pid, guard)?;
1579 // if let Some((tree_ptr, ref leaf, size)) = &frag_opt {
1580 if let Some(node_view) = &node_view_opt {
1581 let size = node_view.0.log_size();
1582 let view = View { node_view: *node_view, pid, size };
1583 if view.merging_child.is_some() {
1584 self.merge_node(&view, view.merging_child.unwrap().get(), guard)?;
1585 } else {
1586 return Ok(Some(view));
1587 }
1588 } else {
1589 return Ok(None);
1590 }
1591 }
1592 }
1593
1594 // Returns the traversal path, completing any observed
1595 // partially complete splits or merges along the way.
1596 //
1597 // We intentionally leave the cyclometric complexity
1598 // high because attempts to split it up have made
1599 // the inherent complexity of the operation more
1600 // challenging to understand.
1601 #[allow(clippy::cognitive_complexity)]
1602 pub(crate) fn view_for_key<'g, K>(
1603 &self,
1604 key: K,
1605 guard: &'g Guard,
1606 ) -> Result<View<'g>>
1607 where
1608 K: AsRef<[u8]>,
1609 {
1610 #[cfg(any(test, feature = "lock_free_delays"))]
1611 const MAX_LOOPS: usize = usize::max_value();
1612
1613 #[cfg(not(any(test, feature = "lock_free_delays")))]
1614 const MAX_LOOPS: usize = 1_000_000;
1615
1616 let _measure = Measure::new(&M.tree_traverse);
1617
1618 let mut cursor = self.root.load(Acquire);
1619 let mut root_pid = cursor;
1620 let mut parent_view = None;
1621 let mut unsplit_parent = None;
1622 let mut took_leftmost_branch = false;
1623
1624 macro_rules! retry {
1625 () => {
1626 trace!(
1627 "retrying at line {} when cursor was {}",
1628 line!(),
1629 cursor
1630 );
1631 cursor = self.root.load(Acquire);
1632 root_pid = cursor;
1633 parent_view = None;
1634 unsplit_parent = None;
1635 took_leftmost_branch = false;
1636 continue;
1637 };
1638 }
1639
1640 for _ in 0..MAX_LOOPS {
1641 if cursor == u64::max_value() {
1642 // this collection has been explicitly removed
1643 return Err(Error::CollectionNotFound(self.tree_id.clone()));
1644 }
1645
1646 let node_opt = self.view_for_pid(cursor, guard)?;
1647
1648 let view = if let Some(view) = node_opt {
1649 view
1650 } else {
1651 retry!();
1652 };
1653
1654 // When we encounter a merge intention, we collaboratively help out
1655 if view.merging_child.is_some() {
1656 self.merge_node(&view, view.merging_child.unwrap().get(), guard)?;
1657 retry!();
1658 } else if view.merging {
1659 // we missed the parent merge intention due to a benign race,
1660 // so go around again and try to help out if necessary
1661 retry!();
1662 }
1663
1664 let overshot = key.as_ref() < view.lo.as_ref();
1665 let undershot =
1666 key.as_ref() >= view.hi.as_ref() && !view.hi.is_empty();
1667
1668 if overshot {
1669 // merge interfered, reload root and retry
1670 retry!();
1671 }
1672
1673 if view.should_split() {
1674 self.split_node(&view, &parent_view, root_pid, guard)?;
1675 retry!();
1676 }
1677
1678 if undershot {
1679 // half-complete split detect & completion
1680 cursor = view.next.expect(
1681 "if our hi bound is not Inf (inity), \
1682 we should have a right sibling",
1683 ).get();
1684 if unsplit_parent.is_none() && parent_view.is_some() {
1685 unsplit_parent = parent_view.clone();
1686 } else if parent_view.is_none() && view.lo.is_empty() {
1687 assert!(unsplit_parent.is_none());
1688 assert_eq!(view.pid, root_pid);
1689 // we have found a partially-split root
1690 if self.root_hoist(
1691 root_pid,
1692 view.next.unwrap().get(),
1693 view.hi.clone(),
1694 guard,
1695 )? {
1696 M.tree_root_split_success();
1697 retry!();
1698 }
1699 }
1700 continue;
1701 } else if let Some(unsplit_parent) = unsplit_parent.take() {
1702 // we have found the proper page for
1703 // our cooperative parent split
1704 let mut parent: Node = unsplit_parent.deref().clone();
1705 let split_applied =
1706 parent.parent_split(view.lo.as_ref(), cursor);
1707
1708 if !split_applied {
1709 // due to deep races, it's possible for the
1710 // parent to already have a node for this lo key.
1711 // if this is the case, we can skip the parent split
1712 // because it's probably going to fail anyway.
1713 retry!();
1714 }
1715
1716 M.tree_parent_split_attempt();
1717 let replace = self.context.pagecache.replace(
1718 unsplit_parent.pid,
1719 unsplit_parent.node_view.0,
1720 parent,
1721 guard,
1722 )?;
1723 if replace.is_ok() {
1724 M.tree_parent_split_success();
1725 }
1726 }
1727
1728 // detect whether a node is mergeable, and begin
1729 // the merge process.
1730 // NB we can never begin merging a node that is
1731 // the leftmost child of an index, because it
1732 // would be merged into a different index, which
1733 // would add considerable complexity to this already
1734 // fairly complex implementation.
1735 if view.should_merge() && !took_leftmost_branch {
1736 if let Some(ref mut parent) = parent_view {
1737 assert!(parent.merging_child.is_none());
1738 if parent.can_merge_child() {
1739 let frag = Link::ParentMergeIntention(cursor);
1740
1741 let link = self.context.pagecache.link(
1742 parent.pid,
1743 parent.node_view.0,
1744 frag,
1745 guard,
1746 )?;
1747
1748 if let Ok(new_parent_ptr) = link {
1749 parent.node_view = NodeView(new_parent_ptr);
1750 self.merge_node(parent, cursor, guard)?;
1751 retry!();
1752 }
1753 }
1754 }
1755 }
1756
1757 if view.data.is_index() {
1758 let next = view.index_next_node(key.as_ref());
1759 took_leftmost_branch = next.0 == 0;
1760 parent_view = Some(view);
1761 cursor = next.1;
1762 } else {
1763 assert!(!overshot && !undershot);
1764 return Ok(view);
1765 }
1766 }
1767 panic!(
1768 "cannot find pid {} in view_for_key, looking for key {:?} in tree",
1769 cursor,
1770 key.as_ref(),
1771 );
1772 }
1773
1774 fn cap_merging_child<'g>(
1775 &'g self,
1776 child_pid: PageId,
1777 guard: &'g Guard,
1778 ) -> Result<Option<View<'g>>> {
1779 // Get the child node and try to install a `MergeCap` frag.
1780 // In case we succeed, we break, otherwise we try from the start.
1781 loop {
1782 let mut child_view = if let Some(child_view) =
1783 self.view_for_pid(child_pid, guard)?
1784 {
1785 child_view
1786 } else {
1787 // the child was already freed, meaning
1788 // somebody completed this whole loop already
1789 return Ok(None);
1790 };
1791
1792 if child_view.merging {
1793 trace!("child pid {} already merging", child_pid);
1794 return Ok(Some(child_view));
1795 }
1796
1797 let install_frag = self.context.pagecache.link(
1798 child_pid,
1799 child_view.node_view.0,
1800 Link::ChildMergeCap,
1801 guard,
1802 )?;
1803 match install_frag {
1804 Ok(new_ptr) => {
1805 trace!("child pid {} merge capped", child_pid);
1806 child_view.node_view = NodeView(new_ptr);
1807 return Ok(Some(child_view));
1808 }
1809 Err(Some((_, _))) => {
1810 trace!(
1811 "child pid {} merge cap failed, retrying",
1812 child_pid
1813 );
1814 continue;
1815 }
1816 Err(None) => {
1817 trace!("child pid {} already freed", child_pid);
1818 return Ok(None);
1819 }
1820 }
1821 }
1822 }
1823
1824 fn install_parent_merge<'g>(
1825 &self,
1826 parent_view: &View<'g>,
1827 child_pid: PageId,
1828 guard: &'g Guard,
1829 ) -> Result<bool> {
1830 let mut parent_view = Cow::Borrowed(parent_view);
1831 loop {
1832 let linked = self.context.pagecache.link(
1833 parent_view.pid,
1834 parent_view.node_view.0,
1835 Link::ParentMergeConfirm,
1836 guard,
1837 )?;
1838 match linked {
1839 Ok(_) => {
1840 trace!(
1841 "ParentMergeConfirm succeeded on parent pid {}, \
1842 now freeing child pid {}",
1843 parent_view.pid,
1844 child_pid
1845 );
1846 return Ok(true);
1847 }
1848 Err(None) => {
1849 trace!(
1850 "ParentMergeConfirm \
1851 failed on (now freed) parent pid {}",
1852 parent_view.pid
1853 );
1854 return Ok(false);
1855 }
1856 Err(_) => {
1857 let new_parent_view = if let Some(new_parent_view) =
1858 self.view_for_pid(parent_view.pid, guard)?
1859 {
1860 trace!(
1861 "failed to confirm merge \
1862 on parent pid {}, trying again",
1863 parent_view.pid
1864 );
1865 new_parent_view
1866 } else {
1867 trace!(
1868 "failed to confirm merge \
1869 on parent pid {}, which was freed",
1870 parent_view.pid
1871 );
1872 return Ok(false);
1873 };
1874
1875 if new_parent_view.merging_child.map(NonZeroU64::get) != Some(child_pid) {
1876 trace!(
1877 "someone else must have already \
1878 completed the merge, and now the \
1879 merging child for parent pid {} is {:?}",
1880 new_parent_view.pid,
1881 new_parent_view.merging_child
1882 );
1883 return Ok(false);
1884 }
1885
1886 parent_view = Cow::Owned(new_parent_view);
1887 }
1888 }
1889 }
1890 }
1891
1892 pub(crate) fn merge_node<'g>(
1893 &self,
1894 parent_view: &View<'g>,
1895 child_pid: PageId,
1896 guard: &'g Guard,
1897 ) -> Result<()> {
1898 trace!(
1899 "merging child pid {} of parent pid {}",
1900 child_pid,
1901 parent_view.pid
1902 );
1903
1904 let child_view = if let Some(merging_child) =
1905 self.cap_merging_child(child_pid, guard)?
1906 {
1907 merging_child
1908 } else {
1909 return Ok(());
1910 };
1911
1912 let index = parent_view.data.index_ref().unwrap();
1913 let child_index =
1914 index.pointers.iter().position(|pid| pid == &child_pid).unwrap();
1915
1916 assert_ne!(
1917 child_index, 0,
1918 "merging child must not be the \
1919 leftmost child of its parent"
1920 );
1921
1922 let mut merge_index = child_index - 1;
1923
1924 // we assume caller only merges when
1925 // the node to be merged is not the
1926 // leftmost child.
1927 let mut cursor_pid = index.pointers[merge_index];
1928
1929 // searching for the left sibling to merge the target page into
1930 loop {
1931 // The only way this child could have been freed is if the original
1932 // merge has already been handled. Only in that case can this child
1933 // have been freed
1934 trace!(
1935 "cursor_pid is {} while looking for left sibling",
1936 cursor_pid
1937 );
1938 let cursor_view = if let Some(cursor_view) =
1939 self.view_for_pid(cursor_pid, guard)?
1940 {
1941 cursor_view
1942 } else {
1943 trace!(
1944 "couldn't retrieve frags for freed \
1945 (possibly outdated) prospective left \
1946 sibling with pid {}",
1947 cursor_pid
1948 );
1949
1950 if merge_index == 0 {
1951 trace!(
1952 "failed to find any left sibling for \
1953 merging pid {}, which means this merge \
1954 must have already completed.",
1955 child_pid
1956 );
1957 return Ok(());
1958 }
1959
1960 merge_index -= 1;
1961 cursor_pid = index.pointers[merge_index];
1962
1963 continue;
1964 };
1965
1966 // This means that `cursor_node` is the node we want to replace
1967 if cursor_view.next.map(NonZeroU64::get) == Some(child_pid) {
1968 trace!(
1969 "found left sibling pid {} points to merging node pid {}",
1970 cursor_view.pid,
1971 child_pid
1972 );
1973 let cursor_node = cursor_view.node_view;
1974
1975 let replacement = cursor_node.receive_merge(&child_view);
1976 let replace = self.context.pagecache.replace(
1977 cursor_pid,
1978 cursor_node.0,
1979 replacement,
1980 guard,
1981 )?;
1982 match replace {
1983 Ok(_) => {
1984 trace!(
1985 "merged node pid {} into left sibling pid {}",
1986 child_pid,
1987 cursor_pid
1988 );
1989 break;
1990 }
1991 Err(None) => {
1992 trace!(
1993 "failed to merge pid {} into \
1994 pid {} since pid {} doesn't exist anymore",
1995 child_pid,
1996 cursor_pid,
1997 cursor_pid
1998 );
1999 return Ok(());
2000 }
2001 Err(_) => {
2002 trace!(
2003 "failed to merge pid {} into \
2004 pid {} due to cas failure",
2005 child_pid,
2006 cursor_pid
2007 );
2008 continue;
2009 }
2010 }
2011 } else if cursor_view.hi >= child_view.lo {
2012 // we overshot the node being merged,
2013 trace!(
2014 "cursor pid {} has hi key {:?}, which is \
2015 >= merging child pid {}'s lo key of {:?}, breaking",
2016 cursor_pid,
2017 cursor_view.hi,
2018 child_pid,
2019 child_view.lo
2020 );
2021 break;
2022 } else {
2023 // In case we didn't find the child, we get the next cursor node
2024 if let Some(next) = cursor_view.next {
2025 trace!(
2026 "traversing from cursor pid {} to right sibling pid {}",
2027 cursor_pid,
2028 next
2029 );
2030 cursor_pid = next.get();
2031 } else {
2032 trace!(
2033 "hit the right side of the tree without finding \
2034 a left sibling for merging child pid {}",
2035 child_pid
2036 );
2037 break;
2038 }
2039 }
2040 }
2041
2042 trace!(
2043 "trying to install parent merge \
2044 confirmation of merged child pid {} for parent pid {}",
2045 child_pid,
2046 parent_view.pid
2047 );
2048
2049 let should_continue =
2050 self.install_parent_merge(parent_view, child_pid, guard)?;
2051
2052 if !should_continue {
2053 return Ok(());
2054 }
2055
2056 match self.context.pagecache.free(
2057 child_pid,
2058 child_view.node_view.0,
2059 guard,
2060 )? {
2061 Ok(_) => {
2062 // we freed it
2063 trace!("freed merged pid {}", child_pid);
2064 }
2065 Err(None) => {
2066 // someone else freed it
2067 trace!("someone else freed merged pid {}", child_pid);
2068 }
2069 Err(Some(_)) => {
2070 trace!(
2071 "someone was able to reuse freed merged pid {}",
2072 child_pid
2073 );
2074 // it was reused somehow after we
2075 // observed it as in the merging state
2076 panic!(
2077 "somehow the merging child was reused \
2078 before all threads that witnessed its previous \
2079 merge have left their epoch"
2080 )
2081 }
2082 }
2083
2084 trace!("finished with merge of pid {}", child_pid);
2085 Ok(())
2086 }
2087
2088 // Remove all pages for this tree from the underlying
2089 // PageCache. This will leave orphans behind if
2090 // the tree crashes during gc.
2091 pub(crate) fn gc_pages(
2092 &self,
2093 mut leftmost_chain: Vec<PageId>,
2094 ) -> Result<()> {
2095 let guard = pin();
2096
2097 while let Some(mut pid) = leftmost_chain.pop() {
2098 loop {
2099 let cursor_view =
2100 if let Some(view) = self.view_for_pid(pid, &guard)? {
2101 view
2102 } else {
2103 trace!("encountered Free node while GC'ing tree");
2104 break;
2105 };
2106
2107 let ret = self.context.pagecache.free(
2108 pid,
2109 cursor_view.node_view.0,
2110 &guard,
2111 )?;
2112
2113 if ret.is_ok() {
2114 let next_pid = if let Some(next_pid) = cursor_view.next {
2115 next_pid
2116 } else {
2117 break;
2118 };
2119 assert_ne!(pid, next_pid.get());
2120 pid = next_pid.get();
2121 }
2122 }
2123 }
2124
2125 Ok(())
2126 }
2127
2128 #[doc(hidden)]
2129 pub fn verify_integrity(&self) -> Result<()> {
2130 // verification happens in Debug impl
2131 let _out = format!("{:?}", self);
2132 Ok(())
2133 }
2134}
2135
2136impl Debug for Tree {
2137 fn fmt(
2138 &self,
2139 f: &mut fmt::Formatter<'_>,
2140 ) -> std::result::Result<(), fmt::Error> {
2141 let guard = pin();
2142
2143 let mut pid = self.root.load(Acquire);
2144 let mut left_most = pid;
2145 let mut level = 0;
2146 let mut expected_pids = FastSet8::default();
2147 let mut referenced_pids = FastSet8::default();
2148 let mut loop_detector = FastSet8::default();
2149
2150 expected_pids.insert(pid);
2151
2152 f.write_str("Tree: \n\t")?;
2153 self.context.pagecache.fmt(f)?;
2154 f.write_str("\tlevel 0:\n")?;
2155
2156 loop {
2157 let get_res = self.view_for_pid(pid, &guard);
2158 let node = if let Ok(Some(ref view)) = get_res {
2159 expected_pids.remove(&pid);
2160 if loop_detector.contains(&pid) {
2161 if cfg!(feature = "testing") {
2162 panic!(
2163 "detected a loop while iterating over the Tree. \
2164 pid {} was encountered multiple times",
2165 pid
2166 );
2167 } else {
2168 error!(
2169 "detected a loop while iterating over the Tree. \
2170 pid {} was encountered multiple times",
2171 pid
2172 );
2173 }
2174 } else {
2175 loop_detector.insert(pid);
2176 }
2177
2178 view.deref()
2179 } else {
2180 if cfg!(feature = "testing") {
2181 panic!(
2182 "Tree::fmt failed to read node {} \
2183 that has been freed",
2184 pid,
2185 );
2186 } else {
2187 error!(
2188 "Tree::fmt failed to read node {} \
2189 that has been freed",
2190 pid,
2191 );
2192 }
2193 break;
2194 };
2195
2196 write!(f, "\t\t{}: ", pid)?;
2197 node.fmt(f)?;
2198 f.write_str("\n")?;
2199
2200 if let Data::Index(ref index) = &node.data {
2201 for pid in &index.pointers {
2202 referenced_pids.insert(*pid);
2203 }
2204 }
2205
2206 if let Some(next_pid) = node.next {
2207 pid = next_pid.get();
2208 } else {
2209 // we've traversed our level, time to bump down
2210 let left_get_res = self.view_for_pid(left_most, &guard);
2211 let left_node = if let Ok(Some(ref view)) = left_get_res {
2212 view
2213 } else {
2214 panic!(
2215 "pagecache returned non-base node: {:?}",
2216 left_get_res
2217 )
2218 };
2219
2220 match &left_node.data {
2221 Data::Index(index) => {
2222 if let Some(next_pid) = index.pointers.first() {
2223 pid = *next_pid;
2224 left_most = *next_pid;
2225 level += 1;
2226 f.write_str(&*format!("\n\tlevel {}:\n", level))?;
2227 assert!(
2228 expected_pids.is_empty(),
2229 "expected pids {:?} but never \
2230 saw them on this level",
2231 expected_pids
2232 );
2233 std::mem::swap(&mut expected_pids, &mut referenced_pids);
2234 } else {
2235 panic!("trying to debug print empty index node");
2236 }
2237 }
2238 Data::Leaf(_items) => {
2239 // we've reached the end of our tree, all leafs are on
2240 // the lowest level.
2241 break;
2242 }
2243 }
2244 }
2245 }
2246
2247 Ok(())
2248 }
2249}
2250
2251/// Compare and swap result.
2252///
2253/// It returns `Ok(Ok(()))` if operation finishes successfully and
2254/// - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation failed
2255/// to setup a new value. `CompareAndSwapError` contains current and
2256/// proposed values.
2257/// - `Err(Error::Unsupported)` if the database is opened in read-only mode.
2258/// otherwise.
2259pub type CompareAndSwapResult =
2260 Result<std::result::Result<(), CompareAndSwapError>>;
2261
2262impl From<Error> for CompareAndSwapResult {
2263 fn from(error: Error) -> Self {
2264 Err(error)
2265 }
2266}
2267
2268/// Compare and swap error.
2269#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
2270pub struct CompareAndSwapError {
2271 /// The current value which caused your CAS to fail.
2272 pub current: Option<IVec>,
2273 /// Returned value that was proposed unsuccessfully.
2274 pub proposed: Option<IVec>,
2275}
2276
2277impl fmt::Display for CompareAndSwapError {
2278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2279 write!(f, "Compare and swap conflict")
2280 }
2281}
2282
2283impl std::error::Error for CompareAndSwapError {}