1#![allow(unsafe_code)]
4
5pub mod constants;
6pub mod logger;
7
8mod blob_io;
9mod disk_pointer;
10mod header;
11mod iobuf;
12mod iterator;
13mod pagetable;
14#[cfg(any(all(not(unix), not(windows)), miri))]
15mod parallel_io_polyfill;
16#[cfg(all(unix, not(miri)))]
17mod parallel_io_unix;
18#[cfg(all(windows, not(miri)))]
19mod parallel_io_windows;
20mod reservation;
21mod segment;
22mod snapshot;
23
24use std::{collections::BinaryHeap, ops::Deref};
25
26use crate::*;
27
28#[cfg(any(all(not(unix), not(windows)), miri))]
29use parallel_io_polyfill::{pread_exact, pread_exact_or_eof, pwrite_all};
30
31#[cfg(all(unix, not(miri)))]
32use parallel_io_unix::{pread_exact, pread_exact_or_eof, pwrite_all};
33
34#[cfg(all(windows, not(miri)))]
35use parallel_io_windows::{pread_exact, pread_exact_or_eof, pwrite_all};
36
37use self::{
38 blob_io::{gc_blobs, read_blob, remove_blob, write_blob},
39 constants::{
40 BATCH_MANIFEST_PID, COUNTER_PID, META_PID,
41 PAGE_CONSOLIDATION_THRESHOLD, SEGMENT_CLEANUP_THRESHOLD,
42 },
43 header::Header,
44 iobuf::{roll_iobuf, IoBuf, IoBufs},
45 iterator::{raw_segment_iter_from, LogIter},
46 pagetable::PageTable,
47 segment::{SegmentAccountant, SegmentCleaner, SegmentOp},
48};
49
50pub(crate) use self::{
51 logger::{
52 read_message, read_segment_header, MessageHeader, SegmentHeader,
53 SegmentNumber,
54 },
55 reservation::Reservation,
56 snapshot::{read_snapshot_or_default, PageState, Snapshot},
57};
58
59pub use self::{
60 constants::{
61 MAX_MSG_HEADER_LEN, MAX_SPACE_AMPLIFICATION, MINIMUM_ITEMS_PER_SEGMENT,
62 SEG_HEADER_LEN,
63 },
64 disk_pointer::DiskPtr,
65 logger::{Log, LogRead},
66};
67
68pub type SegmentId = usize;
71
72pub type LogOffset = u64;
74
75pub type BlobPointer = Lsn;
77
78pub type Lsn = i64;
80
81pub type PageId = u64;
83
84#[derive(Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Debug)]
86#[repr(transparent)]
87pub struct BatchManifest(pub Lsn);
88
89#[derive(Debug)]
92pub struct BasedBuf {
93 pub buf: Vec<u8>,
94 pub offset: LogOffset,
95}
96
97#[derive(Clone, Copy, PartialEq, Eq, Debug)]
99#[repr(u8)]
100pub enum MessageKind {
101 Corrupted = 0,
104 Canceled = 1,
109 Cap = 2,
113 BatchManifest = 3,
116 Free = 4,
118 Counter = 5,
121 InlineMeta = 6,
123 BlobMeta = 7,
125 InlineNode = 8,
127 BlobNode = 9,
129 InlineLink = 10,
131 BlobLink = 11,
133}
134
135impl MessageKind {
136 pub(crate) const fn into(self) -> u8 {
137 self as u8
138 }
139}
140
141impl From<u8> for MessageKind {
142 fn from(byte: u8) -> Self {
143 use MessageKind::*;
144 match byte {
145 0 => Corrupted,
146 1 => Canceled,
147 2 => Cap,
148 3 => BatchManifest,
149 4 => Free,
150 5 => Counter,
151 6 => InlineMeta,
152 7 => BlobMeta,
153 8 => InlineNode,
154 9 => BlobNode,
155 10 => InlineLink,
156 11 => BlobLink,
157 other => {
158 debug!("encountered unexpected message kind byte {}", other);
159 Corrupted
160 }
161 }
162 }
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq)]
168pub enum LogKind {
169 Replace,
171 Link,
173 Free,
175 Skip,
177 Corrupted,
179}
180
181fn log_kind_from_update(update: &Update) -> LogKind {
182 match update {
183 Update::Free => LogKind::Free,
184 Update::Link(..) => LogKind::Link,
185 Update::Node(..) | Update::Counter(..) | Update::Meta(..) => {
186 LogKind::Replace
187 }
188 }
189}
190
191impl From<MessageKind> for LogKind {
192 fn from(kind: MessageKind) -> Self {
193 match kind {
194 MessageKind::Free => LogKind::Free,
195 MessageKind::InlineNode
196 | MessageKind::Counter
197 | MessageKind::BlobNode
198 | MessageKind::InlineMeta
199 | MessageKind::BlobMeta => LogKind::Replace,
200 MessageKind::InlineLink | MessageKind::BlobLink => LogKind::Link,
201 MessageKind::Canceled
202 | MessageKind::Cap
203 | MessageKind::BatchManifest => LogKind::Skip,
204 other => {
205 debug!("encountered unexpected message kind byte {:?}", other);
206 LogKind::Corrupted
207 }
208 }
209 }
210}
211
212fn assert_usize<T>(from: T) -> usize
213where
214 usize: TryFrom<T, Error = std::num::TryFromIntError>,
215{
216 usize::try_from(from).expect("lost data cast while converting to usize")
217}
218
219fn bump_atomic_lsn(atomic_lsn: &AtomicLsn, to: Lsn) {
221 let mut current = atomic_lsn.load(Acquire);
222 loop {
223 if current >= to {
224 return;
225 }
226 let last = atomic_lsn.compare_and_swap(current, to, SeqCst);
227 if last == current {
228 return;
230 }
231 current = last;
232 }
233}
234
235use std::convert::{TryFrom, TryInto};
236
237#[inline]
238pub(crate) fn lsn_to_arr(number: Lsn) -> [u8; 8] {
239 number.to_le_bytes()
240}
241
242#[inline]
243pub(crate) fn arr_to_lsn(arr: &[u8]) -> Lsn {
244 Lsn::from_le_bytes(arr.try_into().unwrap())
245}
246
247#[inline]
248pub(crate) fn u64_to_arr(number: u64) -> [u8; 8] {
249 number.to_le_bytes()
250}
251
252#[inline]
253pub(crate) fn arr_to_u32(arr: &[u8]) -> u32 {
254 u32::from_le_bytes(arr.try_into().unwrap())
255}
256
257#[inline]
258pub(crate) fn u32_to_arr(number: u32) -> [u8; 4] {
259 number.to_le_bytes()
260}
261
262#[allow(clippy::needless_pass_by_value)]
263pub(crate) fn maybe_decompress(in_buf: Vec<u8>) -> std::io::Result<Vec<u8>> {
264 #[cfg(feature = "compression")]
265 {
266 use zstd::stream::decode_all;
267
268 let scootable_in_buf = &mut &*in_buf;
269 let _ivec_varint = u64::deserialize(scootable_in_buf)
270 .expect("this had to be serialized with an extra length frame");
271 let _measure = Measure::new(&M.decompress);
272 let out_buf = decode_all(scootable_in_buf).expect(
273 "failed to decompress data. \
274 This is not expected, please open an issue on \
275 https://github.com/spacejam/sled so we can \
276 fix this critical issue ASAP. Thank you :)",
277 );
278
279 Ok(out_buf)
280 }
281
282 #[cfg(not(feature = "compression"))]
283 Ok(in_buf)
284}
285
286#[derive(Debug, Clone, Copy)]
287pub struct NodeView<'g>(pub(crate) PageView<'g>);
288
289impl<'g> Deref for NodeView<'g> {
290 type Target = Node;
291 fn deref(&self) -> &Node {
292 self.0.as_node()
293 }
294}
295
296unsafe impl<'g> Send for NodeView<'g> {}
297unsafe impl<'g> Sync for NodeView<'g> {}
298
299#[derive(Debug, Clone, Copy)]
300pub struct MetaView<'g>(PageView<'g>);
301
302impl<'g> Deref for MetaView<'g> {
303 type Target = Meta;
304 fn deref(&self) -> &Meta {
305 self.0.as_meta()
306 }
307}
308
309unsafe impl<'g> Send for MetaView<'g> {}
310unsafe impl<'g> Sync for MetaView<'g> {}
311
312#[derive(Debug, Clone, Copy)]
313pub struct PageView<'g> {
314 pub(crate) read: Shared<'g, Page>,
315 pub(crate) entry: &'g Atomic<Page>,
316}
317
318unsafe impl<'g> Send for PageView<'g> {}
319unsafe impl<'g> Sync for PageView<'g> {}
320
321impl<'g> Deref for PageView<'g> {
322 type Target = Page;
323
324 fn deref(&self) -> &Page {
325 unsafe { self.read.deref() }
326 }
327}
328
329#[derive(Clone, Copy, Debug, Eq, PartialEq)]
330pub struct CacheInfo {
331 pub ts: u64,
332 pub lsn: Lsn,
333 pub pointer: DiskPtr,
334 pub log_size: u64,
335}
336
337#[cfg(test)]
338impl quickcheck::Arbitrary for CacheInfo {
339 fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> CacheInfo {
340 use rand::Rng;
341
342 CacheInfo {
343 ts: g.gen(),
344 lsn: g.gen(),
345 pointer: DiskPtr::arbitrary(g),
346 log_size: g.gen(),
347 }
348 }
349}
350
351#[derive(Clone, Debug, PartialEq)]
354pub(crate) enum Update {
355 Link(Link),
356 Node(Node),
357 Free,
358 Counter(u64),
359 Meta(Meta),
360}
361
362impl Update {
363 fn as_node(&self) -> &Node {
364 match self {
365 Update::Node(node) => node,
366 other => panic!("called as_node on non-Node: {:?}", other),
367 }
368 }
369
370 fn as_node_mut(&mut self) -> &mut Node {
371 match self {
372 Update::Node(node) => node,
373 other => panic!("called as_node_mut on non-Node: {:?}", other),
374 }
375 }
376
377 fn as_link(&self) -> &Link {
378 match self {
379 Update::Link(link) => link,
380 other => panic!("called as_link on non-Link: {:?}", other),
381 }
382 }
383
384 pub(crate) fn as_meta(&self) -> &Meta {
385 if let Update::Meta(meta) = self {
386 meta
387 } else {
388 panic!("called as_meta on {:?}", self)
389 }
390 }
391
392 pub(crate) fn as_counter(&self) -> u64 {
393 if let Update::Counter(counter) = self {
394 *counter
395 } else {
396 panic!("called as_counter on {:?}", self)
397 }
398 }
399
400 fn is_free(&self) -> bool {
401 if let Update::Free = self {
402 true
403 } else {
404 false
405 }
406 }
407}
408
409#[derive(Debug)]
422pub struct RecoveryGuard<'a> {
423 batch_res: Reservation<'a>,
424}
425
426impl<'a> RecoveryGuard<'a> {
427 pub(crate) fn seal_batch(self) -> Result<()> {
430 let max_reserved =
431 self.batch_res.log.iobufs.max_reserved_lsn.load(Acquire);
432 self.batch_res.mark_writebatch(max_reserved).map(|_| ())
433 }
434}
435
436#[derive(Debug, Clone)]
439pub struct Page {
440 pub(crate) update: Option<Box<Update>>,
441 pub(crate) cache_infos: Vec<CacheInfo>,
442}
443
444impl Page {
445 pub(crate) fn to_page_state(&self) -> PageState {
446 let base = &self.cache_infos[0];
447 if self.is_free() {
448 PageState::Free(base.lsn, base.pointer)
449 } else {
450 let mut frags: Vec<(Lsn, DiskPtr, u64)> = vec![];
451
452 for cache_info in self.cache_infos.iter().skip(1) {
453 frags.push((
454 cache_info.lsn,
455 cache_info.pointer,
456 cache_info.log_size,
457 ));
458 }
459
460 PageState::Present {
461 base: (base.lsn, base.pointer, base.log_size),
462 frags,
463 }
464 }
465 }
466
467 pub(crate) fn as_node(&self) -> &Node {
468 self.update.as_ref().unwrap().as_node()
469 }
470
471 pub(crate) fn as_meta(&self) -> &Meta {
472 self.update.as_ref().unwrap().as_meta()
473 }
474
475 pub(crate) fn as_counter(&self) -> u64 {
476 self.update.as_ref().unwrap().as_counter()
477 }
478
479 pub(crate) fn is_free(&self) -> bool {
480 self.update.as_ref().map_or(false, |u| u.is_free())
481 || self.cache_infos.is_empty()
482 }
483
484 pub(crate) fn last_lsn(&self) -> Lsn {
485 self.cache_infos.last().map(|ci| ci.lsn).unwrap()
486 }
487
488 pub(crate) fn log_size(&self) -> u64 {
489 self.cache_infos.iter().map(|ci| ci.log_size).sum()
490 }
491
492 fn ts(&self) -> u64 {
493 self.cache_infos.last().map_or(0, |ci| ci.ts)
494 }
495
496 fn lone_blob(&self) -> Option<DiskPtr> {
497 if self.cache_infos.len() == 1 && self.cache_infos[0].pointer.is_blob()
498 {
499 Some(self.cache_infos[0].pointer)
500 } else {
501 None
502 }
503 }
504}
505
506pub struct PageCache {
509 pub(crate) config: RunningConfig,
510 inner: PageTable,
511 next_pid_to_allocate: Mutex<PageId>,
512 free: Arc<Mutex<BinaryHeap<PageId>>>,
513 #[doc(hidden)]
514 pub log: Log,
515 lru: Lru,
516 idgen: Arc<AtomicU64>,
517 idgen_persists: Arc<AtomicU64>,
518 idgen_persist_mu: Arc<Mutex<()>>,
519 was_recovered: bool,
520}
521
522unsafe impl Send for PageCache {}
523
524unsafe impl Sync for PageCache {}
525
526impl Debug for PageCache {
527 fn fmt(
528 &self,
529 f: &mut fmt::Formatter<'_>,
530 ) -> std::result::Result<(), fmt::Error> {
531 f.write_str(&*format!(
532 "PageCache {{ max: {:?} free: {:?} }}\n",
533 *self.next_pid_to_allocate.lock(),
534 self.free
535 ))
536 }
537}
538
539#[cfg(feature = "event_log")]
540impl Drop for PageCache {
541 fn drop(&mut self) {
542 use std::collections::HashMap;
543
544 trace!("dropping pagecache");
545
546 if self.log.iobufs.config.global_error().is_ok() {
549 let mut pages_before_restart = HashMap::new();
550
551 let guard = pin();
552
553 self.config.event_log.meta_before_restart(
554 self.get_meta(&guard)
555 .expect("should get meta under test")
556 .deref()
557 .clone(),
558 );
559
560 for pid in 0..*self.next_pid_to_allocate.lock() {
561 let pte = if let Some(pte) = self.inner.get(pid, &guard) {
562 pte
563 } else {
564 continue;
565 };
566 let pointers =
567 pte.cache_infos.iter().map(|ci| ci.pointer).collect();
568 pages_before_restart.insert(pid, pointers);
569 }
570
571 self.config.event_log.pages_before_restart(pages_before_restart);
572 }
573
574 trace!("pagecache dropped");
575 }
576}
577
578impl PageCache {
579 pub(crate) fn start(config: RunningConfig) -> Result<Self> {
581 trace!("starting pagecache");
582
583 config.reset_global_error();
584
585 let snapshot = read_snapshot_or_default(&config)?;
589
590 #[cfg(feature = "testing")]
591 {
592 trace!(
596 "\n\n~~~~ regenerating snapshot for idempotency test ~~~~\n"
597 );
598
599 let snapshot2 = read_snapshot_or_default(&config)
600 .expect("second read snapshot");
601 assert_eq!(
602 snapshot.active_segment, snapshot2.active_segment,
603 "snapshot active_segment diverged across recoveries.\n\n \
604 first: {:?}\n\n
605 second: {:?}\n\n",
606 snapshot, snapshot2
607 );
608 assert_eq!(
609 snapshot.stable_lsn, snapshot2.stable_lsn,
610 "snapshot stable_lsn diverged across recoveries.\n\n \
611 first: {:?}\n\n
612 second: {:?}\n\n",
613 snapshot, snapshot2
614 );
615 for (pid, (p1, p2)) in
616 snapshot.pt.iter().zip(snapshot2.pt.iter()).enumerate()
617 {
618 assert_eq!(
619 p1, p2,
620 "snapshot pid {} diverged across recoveries.\n\n \
621 first: {:?}\n\n
622 second: {:?}\n\n",
623 pid, p1, p2
624 );
625 }
626 assert_eq!(
627 snapshot.pt.len(),
628 snapshot2.pt.len(),
629 "snapshots number of pages diverged across recoveries.\n\n \
630 first: {:?}\n\n
631 second: {:?}\n\n",
632 snapshot.pt,
633 snapshot2.pt
634 );
635 assert_eq!(
636 snapshot, snapshot2,
637 "snapshots diverged across recoveries.\n\n \
638 first: {:?}\n\n
639 second: {:?}\n\n",
640 snapshot, snapshot2
641 );
642 }
643
644 let _measure = Measure::new(&M.start_pagecache);
645
646 let cache_capacity = config.cache_capacity;
647 let lru = Lru::new(cache_capacity);
648
649 let mut pc = Self {
650 config: config.clone(),
651 inner: PageTable::default(),
652 next_pid_to_allocate: Mutex::new(0),
653 free: Arc::new(Mutex::new(BinaryHeap::new())),
654 log: Log::start(config, &snapshot)?,
655 lru,
656 idgen_persist_mu: Arc::new(Mutex::new(())),
657 idgen: Arc::new(AtomicU64::new(0)),
658 idgen_persists: Arc::new(AtomicU64::new(0)),
659 was_recovered: false,
660 };
661
662 pc.load_snapshot(&snapshot)?;
664
665 #[cfg(feature = "testing")]
666 {
667 use std::collections::HashMap;
668
669 let guard = pin();
672
673 let mut pages_after_restart = HashMap::new();
674
675 for pid in 0..*pc.next_pid_to_allocate.lock() {
676 let pte = if let Some(pte) = pc.inner.get(pid, &guard) {
677 pte
678 } else {
679 continue;
680 };
681 let pointers =
682 pte.cache_infos.iter().map(|ci| ci.pointer).collect();
683 pages_after_restart.insert(pid, pointers);
684 }
685
686 pc.config.event_log.pages_after_restart(pages_after_restart);
687 }
688
689 let mut was_recovered = true;
690
691 {
692 let guard = pin();
695
696 if let Err(Error::ReportableBug(..)) = pc.get_meta(&guard) {
697 was_recovered = false;
699
700 let meta_update = Update::Meta(Meta::default());
701
702 let (meta_id, _) = pc.allocate_inner(meta_update, &guard)?;
703
704 assert_eq!(
705 meta_id, META_PID,
706 "we expect the meta page to have pid {}, but it had pid {} instead",
707 META_PID, meta_id,
708 );
709 }
710
711 if let Err(Error::ReportableBug(..)) = pc.get_idgen(&guard) {
712 was_recovered = false;
714
715 let counter_update = Update::Counter(0);
716
717 let (counter_id, _) =
718 pc.allocate_inner(counter_update, &guard)?;
719
720 assert_eq!(
721 counter_id, COUNTER_PID,
722 "we expect the counter to have pid {}, but it had pid {} instead",
723 COUNTER_PID, counter_id,
724 );
725 }
726
727 let (_, counter) = pc.get_idgen(&guard)?;
728 let idgen_recovery = if was_recovered {
729 counter + (2 * pc.config.idgen_persist_interval)
730 } else {
731 0
732 };
733 let idgen_persists = counter / pc.config.idgen_persist_interval
734 * pc.config.idgen_persist_interval;
735
736 pc.idgen.store(idgen_recovery, Release);
737 pc.idgen_persists.store(idgen_persists, Release);
738 }
739
740 pc.was_recovered = was_recovered;
741
742 #[cfg(feature = "event_log")]
743 {
744 let guard = pin();
745
746 pc.config.event_log.meta_after_restart(
747 pc.get_meta(&guard)
748 .expect("should be able to get meta under test")
749 .deref()
750 .clone(),
751 );
752 }
753
754 trace!("pagecache started");
755
756 Ok(pc)
757 }
758
759 pub(crate) fn flush(&self) -> Result<usize> {
762 self.log.flush()
763 }
764
765 pub(crate) fn allocate<'g>(
770 &self,
771 new: Node,
772 guard: &'g Guard,
773 ) -> Result<(PageId, PageView<'g>)> {
774 self.allocate_inner(Update::Node(new), guard)
775 }
776
777 fn allocate_inner<'g>(
778 &self,
779 new: Update,
780 guard: &'g Guard,
781 ) -> Result<(PageId, PageView<'g>)> {
782 let mut allocation_serializer;
783
784 let free_opt = self.free.lock().pop();
785
786 let (pid, page_view) = if let Some(pid) = free_opt {
787 trace!("re-allocating pid {}", pid);
788
789 let page_view = match self.inner.get(pid, guard) {
790 None => panic!(
791 "expected to find existing stack \
792 for re-allocated pid {}",
793 pid
794 ),
795 Some(p) => p,
796 };
797 assert!(
798 page_view.is_free(),
799 "failed to re-allocate pid {} which \
800 contained unexpected state {:?}",
801 pid,
802 page_view,
803 );
804 (pid, page_view)
805 } else {
806 allocation_serializer = self.next_pid_to_allocate.lock();
818 let pid = *allocation_serializer;
819 *allocation_serializer += 1;
820
821 trace!("allocating pid {} for the first time", pid);
822
823 let new_page = Page { update: None, cache_infos: Vec::default() };
824
825 let page_view = self.inner.insert(pid, new_page, guard);
826
827 (pid, page_view)
828 };
829
830 let new_pointer = self
831 .cas_page(pid, page_view, new, false, guard)?
832 .unwrap_or_else(|e| {
833 panic!(
834 "should always be able to install \
835 a new page during allocation, but \
836 failed for pid {}: {:?}",
837 pid, e
838 )
839 });
840
841 Ok((pid, new_pointer))
842 }
843
844 #[cfg(all(
851 not(miri),
852 any(
853 windows,
854 target_os = "linux",
855 target_os = "macos",
856 target_os = "dragonfly",
857 target_os = "freebsd",
858 target_os = "openbsd",
859 target_os = "netbsd",
860 )
861 ))]
862 pub(crate) fn attempt_gc(&self) -> Result<bool> {
863 let guard = pin();
864 let cc = concurrency_control::read();
865 let to_clean = self.log.iobufs.segment_cleaner.pop();
866 let ret = if let Some((pid_to_clean, segment_to_clean)) = to_clean {
867 self.rewrite_page(pid_to_clean, segment_to_clean, &guard)
868 .map(|_| true)
869 } else {
870 Ok(false)
871 };
872 drop(cc);
873 guard.flush();
874 ret
875 }
876
877 pub(crate) fn pin_log(&self, guard: &Guard) -> Result<RecoveryGuard<'_>> {
890 self.log.roll_iobuf()?;
899
900 let batch_res = self.log.reserve(
901 LogKind::Skip,
902 BATCH_MANIFEST_PID,
903 &BatchManifest::default(),
904 guard,
905 )?;
906
907 iobuf::maybe_seal_and_write_iobuf(
908 &self.log.iobufs,
909 &batch_res.iobuf,
910 batch_res.iobuf.get_header(),
911 false,
912 )?;
913
914 Ok(RecoveryGuard { batch_res })
915 }
916
917 #[doc(hidden)]
918 #[cfg(feature = "failpoints")]
919 #[cfg(all(
920 not(miri),
921 any(
922 windows,
923 target_os = "linux",
924 target_os = "macos",
925 target_os = "dragonfly",
926 target_os = "freebsd",
927 target_os = "openbsd",
928 target_os = "netbsd",
929 )
930 ))]
931 pub(crate) fn set_failpoint(&self, e: Error) {
932 if let Error::FailPoint = e {
933 self.config.set_global_error(e);
934
935 let intervals = self.log.iobufs.intervals.lock();
938
939 drop(intervals);
942
943 let _notified = self.log.iobufs.interval_updated.notify_all();
944 }
945 }
946
947 pub(crate) fn free<'g>(
949 &self,
950 pid: PageId,
951 old: PageView<'g>,
952 guard: &'g Guard,
953 ) -> Result<CasResult<'g, ()>> {
954 trace!("attempting to free pid {}", pid);
955
956 if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
957 return Err(Error::Unsupported(
958 "you are not able to free the first \
959 couple pages, which are allocated \
960 for system internal purposes"
961 .into(),
962 ));
963 }
964
965 let new_pointer =
966 self.cas_page(pid, old, Update::Free, false, guard)?;
967
968 if new_pointer.is_ok() {
969 let free = self.free.clone();
970 guard.defer(move || {
971 let mut free = free.lock();
972 if free.iter().any(|e| e == &pid) {
974 panic!("pid {} was double-freed", pid);
975 }
976
977 free.push(pid);
978 });
979 }
980
981 Ok(new_pointer.map_err(|o| o.map(|(pointer, _)| (pointer, ()))))
982 }
983
984 pub(crate) fn link<'g>(
989 &'g self,
990 pid: PageId,
991 mut old: PageView<'g>,
992 new: Link,
993 guard: &'g Guard,
994 ) -> Result<CasResult<'g, Link>> {
995 let _measure = Measure::new(&M.link_page);
996
997 trace!("linking pid {} with {:?}", pid, new);
998
999 #[cfg(any(test, feature = "lock_free_delays"))]
1002 {
1003 use std::cell::RefCell;
1004 use std::time::{SystemTime, UNIX_EPOCH};
1005
1006 thread_local! {
1007 pub static COUNT: RefCell<u32> = RefCell::new(1);
1008 }
1009
1010 let time_now =
1011 SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1012
1013 #[allow(clippy::cast_possible_truncation)]
1014 let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
1015
1016 let inject_failure = COUNT.with(|c| {
1017 let mut cr = c.borrow_mut();
1018 *cr += 1;
1019 *cr % fail_seed == 0
1020 });
1021
1022 if inject_failure {
1023 debug!(
1024 "injecting a randomized failure in the link of pid {}",
1025 pid
1026 );
1027 if let Some(current_pointer) = self.get(pid, guard)? {
1028 return Ok(Err(Some((current_pointer.0, new))));
1029 } else {
1030 return Ok(Err(None));
1031 }
1032 }
1033 }
1034
1035 let mut node: Node = old.as_node().clone();
1036 node.apply(&new);
1037
1038 if old.cache_infos.len() >= PAGE_CONSOLIDATION_THRESHOLD {
1040 let short_circuit = self.replace(pid, old, node, guard)?;
1041 return Ok(short_circuit.map_err(|a| a.map(|b| (b.0, new))));
1042 }
1043
1044 let mut new_page = Some(Owned::new(Page {
1045 update: Some(Box::new(Update::Node(node))),
1046 cache_infos: Vec::default(),
1047 }));
1048
1049 loop {
1050 let log_reservation =
1053 self.log.reserve(LogKind::Link, pid, &new, guard)?;
1054 let lsn = log_reservation.lsn();
1055 let pointer = log_reservation.pointer();
1056
1057 let ts = old.ts() + 1;
1072
1073 let cache_info = CacheInfo {
1074 lsn,
1075 pointer,
1076 ts,
1077 log_size: log_reservation.reservation_len() as u64,
1078 };
1079
1080 let mut new_cache_infos =
1081 Vec::with_capacity(old.cache_infos.len() + 1);
1082 new_cache_infos.extend_from_slice(&old.cache_infos);
1083 new_cache_infos.push(cache_info);
1084
1085 let mut page_ptr = new_page.take().unwrap();
1086 page_ptr.cache_infos = new_cache_infos;
1087
1088 debug_delay();
1089 let result =
1090 old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
1091
1092 match result {
1093 Ok(new_shared) => {
1094 trace!("link of pid {} succeeded", pid);
1095
1096 unsafe {
1097 guard.defer_destroy(old.read);
1098 }
1099
1100 assert_ne!(old.last_lsn(), 0);
1101
1102 self.log.iobufs.sa_mark_link(pid, cache_info, guard);
1103
1104 log_reservation.complete()?;
1110
1111 let total_page_size =
1113 unsafe { new_shared.deref().log_size() };
1114 let to_evict =
1115 self.lru.accessed(pid, total_page_size, guard);
1116 trace!(
1117 "accessed pid {} -> paging out pids {:?}",
1118 pid,
1119 to_evict
1120 );
1121 if !to_evict.is_empty() {
1122 self.page_out(to_evict, guard)?;
1123 }
1124
1125 old.read = new_shared;
1126
1127 return Ok(Ok(old));
1128 }
1129 Err(cas_error) => {
1130 log_reservation.abort()?;
1131 let actual = cas_error.current;
1132 let actual_ts = unsafe { actual.deref().ts() };
1133 if actual_ts == old.ts() {
1134 trace!(
1135 "link of pid {} failed due to movement, retrying",
1136 pid
1137 );
1138 new_page = Some(cas_error.new);
1139
1140 old.read = actual;
1141 } else {
1142 trace!("link of pid {} failed due to new update", pid);
1143 let mut page_view = old;
1144 page_view.read = actual;
1145 return Ok(Err(Some((page_view, new))));
1146 }
1147 }
1148 }
1149 }
1150 }
1151
1152 pub(crate) fn replace<'g>(
1157 &self,
1158 pid: PageId,
1159 old: PageView<'g>,
1160 new: Node,
1161 guard: &'g Guard,
1162 ) -> Result<CasResult<'g, Node>> {
1163 let _measure = Measure::new(&M.replace_page);
1164
1165 trace!("replacing pid {} with {:?}", pid, new);
1166
1167 #[cfg(any(test, feature = "lock_free_delays"))]
1170 {
1171 use std::cell::RefCell;
1172 use std::time::{SystemTime, UNIX_EPOCH};
1173
1174 thread_local! {
1175 pub static COUNT: RefCell<u32> = RefCell::new(1);
1176 }
1177
1178 let time_now =
1179 SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1180
1181 #[allow(clippy::cast_possible_truncation)]
1182 let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
1183
1184 let inject_failure = COUNT.with(|c| {
1185 let mut cr = c.borrow_mut();
1186 *cr += 1;
1187 *cr % fail_seed == 0
1188 });
1189
1190 if inject_failure {
1191 debug!(
1192 "injecting a randomized failure in the replace of pid {}",
1193 pid
1194 );
1195 if let Some(current_pointer) = self.get(pid, guard)? {
1196 return Ok(Err(Some((current_pointer.0, new))));
1197 } else {
1198 return Ok(Err(None));
1199 }
1200 }
1201 }
1202
1203 let result =
1204 self.cas_page(pid, old, Update::Node(new), false, guard)?;
1205
1206 if let Some((pid_to_clean, segment_to_clean)) =
1207 self.log.iobufs.segment_cleaner.pop()
1208 {
1209 self.rewrite_page(pid_to_clean, segment_to_clean, guard)?;
1210 }
1211
1212 Ok(result.map_err(|fail| {
1213 let (pointer, shared) = fail.unwrap();
1214 if let Update::Node(rejected_new) = shared {
1215 Some((pointer, rejected_new))
1216 } else {
1217 unreachable!();
1218 }
1219 }))
1220 }
1221
1222 fn rewrite_page(
1227 &self,
1228 pid: PageId,
1229 segment_to_purge: LogOffset,
1230 guard: &Guard,
1231 ) -> Result<()> {
1232 let _measure = Measure::new(&M.rewrite_page);
1233
1234 trace!("rewriting pid {}", pid);
1235
1236 let purge_segment_id =
1237 segment_to_purge / self.config.segment_size as u64;
1238
1239 loop {
1240 let page_view = if let Some(page_view) = self.inner.get(pid, guard)
1241 {
1242 page_view
1243 } else {
1244 panic!("rewriting pid {} failed (no longer exists)", pid);
1245 };
1246
1247 let already_moved = !unsafe { page_view.read.deref() }
1248 .cache_infos
1249 .iter()
1250 .any(|ce| {
1251 ce.pointer.lid() / self.config.segment_size as u64
1252 == purge_segment_id
1253 });
1254 if already_moved {
1255 return Ok(());
1256 }
1257
1258 if let Some(disk_pointer) = page_view.lone_blob() {
1260 trace!("rewriting blob with pid {}", pid);
1261 let blob_pointer = disk_pointer.blob().1;
1262
1263 let log_reservation =
1264 self.log.rewrite_blob_pointer(pid, blob_pointer, guard)?;
1265
1266 let cache_info = CacheInfo {
1267 ts: page_view.ts(),
1268 lsn: log_reservation.lsn,
1269 pointer: log_reservation.pointer,
1270 log_size: u64::try_from(log_reservation.reservation_len())
1271 .unwrap(),
1272 };
1273
1274 let new_page = Owned::new(Page {
1275 update: page_view.update.clone(),
1276 cache_infos: vec![cache_info],
1277 });
1278
1279 debug_delay();
1280 let result = page_view.entry.compare_and_set(
1281 page_view.read,
1282 new_page,
1283 SeqCst,
1284 guard,
1285 );
1286
1287 if let Ok(new_shared) = result {
1288 unsafe {
1289 guard.defer_destroy(page_view.read);
1290 }
1291
1292 let lsn = log_reservation.lsn();
1293
1294 self.log.iobufs.sa_mark_replace(
1295 pid,
1296 lsn,
1297 &page_view.cache_infos,
1298 cache_info,
1299 guard,
1300 )?;
1301
1302 let _pointer = log_reservation.complete()?;
1306
1307 let total_page_size =
1309 unsafe { new_shared.deref().log_size() };
1310 let to_evict =
1311 self.lru.accessed(pid, total_page_size, guard);
1312 trace!(
1313 "accessed pid {} -> paging out pids {:?}",
1314 pid,
1315 to_evict
1316 );
1317 if !to_evict.is_empty() {
1318 self.page_out(to_evict, guard)?;
1319 }
1320
1321 trace!("rewriting pid {} succeeded", pid);
1322
1323 return Ok(());
1324 } else {
1325 let _pointer = log_reservation.abort()?;
1326
1327 trace!("rewriting pid {} failed", pid);
1328 }
1329 } else {
1330 trace!("rewriting page with pid {}", pid);
1331
1332 let (key, update): (_, Update) = if pid == META_PID {
1334 let meta_view = self.get_meta(guard)?;
1335 (meta_view.0, Update::Meta(meta_view.deref().clone()))
1336 } else if pid == COUNTER_PID {
1337 let (key, counter) = self.get_idgen(guard)?;
1338 (key, Update::Counter(counter))
1339 } else if let Some(node_view) = self.get(pid, guard)? {
1340 (node_view.0, Update::Node(node_view.deref().clone()))
1341 } else {
1342 let page_view = match self.inner.get(pid, guard) {
1343 None => panic!("expected page missing in rewrite"),
1344 Some(p) => p,
1345 };
1346
1347 if page_view.is_free() {
1348 (page_view, Update::Free)
1349 } else {
1350 debug!(
1351 "when rewriting pid {} \
1352 we encountered a rewritten \
1353 node with a link {:?} that \
1354 we previously witnessed a Free \
1355 for (PageCache::get returned None), \
1356 assuming we can just return now since \
1357 the Free was replace'd",
1358 pid, page_view.update
1359 );
1360 return Ok(());
1361 }
1362 };
1363
1364 let res = self.cas_page(pid, key, update, true, guard).map(
1365 |res| {
1366 trace!(
1367 "rewriting pid {} success: {}",
1368 pid,
1369 res.is_ok()
1370 );
1371 res
1372 },
1373 )?;
1374 if res.is_ok() {
1375 return Ok(());
1376 }
1377 }
1378 }
1379 }
1380
1381 #[allow(clippy::cast_precision_loss)]
1386 #[allow(clippy::float_arithmetic)]
1387 #[doc(hidden)]
1388 pub(crate) fn space_amplification(&self) -> Result<f64> {
1389 let on_disk_bytes = self.size_on_disk()? as f64;
1390 let logical_size = (self.logical_size_of_all_pages()?
1391 + self.config.segment_size as u64)
1392 as f64;
1393
1394 Ok(on_disk_bytes / logical_size)
1395 }
1396
1397 pub(crate) fn size_on_disk(&self) -> Result<u64> {
1398 let mut size = self.config.file.metadata()?.len();
1399
1400 let stable = self.config.blob_path(0);
1401 let blob_dir = stable.parent().expect(
1402 "should be able to determine the parent for the blob directory",
1403 );
1404 let blob_files = std::fs::read_dir(blob_dir)?;
1405
1406 for blob_file in blob_files {
1407 let blob_file = if let Ok(bf) = blob_file {
1408 bf
1409 } else {
1410 continue;
1411 };
1412
1413 #[cfg(not(miri))]
1416 {
1417 size += blob_file.metadata().map(|m| m.len()).unwrap_or(0);
1418 }
1419
1420 #[cfg(miri)]
1422 {
1423 size += std::fs::metadata(blob_file.path())
1424 .map(|m| m.len())
1425 .unwrap_or(0);
1426 }
1427 }
1428
1429 Ok(size)
1430 }
1431
1432 fn logical_size_of_all_pages(&self) -> Result<u64> {
1433 let guard = pin();
1434 let meta_size = self.get_meta(&guard)?.rss();
1435 let idgen_size = std::mem::size_of::<u64>() as u64;
1436
1437 let mut ret = meta_size + idgen_size;
1438 let min_pid = COUNTER_PID + 1;
1439 let next_pid_to_allocate = *self.next_pid_to_allocate.lock();
1440 for pid in min_pid..next_pid_to_allocate {
1441 if let Some(node_cell) = self.get(pid, &guard)? {
1442 ret += node_cell.rss();
1443 }
1444 }
1445 Ok(ret)
1446 }
1447
1448 fn cas_page<'g>(
1449 &self,
1450 pid: PageId,
1451 mut old: PageView<'g>,
1452 update: Update,
1453 is_rewrite: bool,
1454 guard: &'g Guard,
1455 ) -> Result<CasResult<'g, Update>> {
1456 trace!(
1457 "cas_page called on pid {} to {:?} with old ts {:?}",
1458 pid,
1459 update,
1460 old.ts()
1461 );
1462
1463 let log_kind = log_kind_from_update(&update);
1464 trace!("cas_page on pid {} has log kind: {:?}", pid, log_kind);
1465
1466 let mut new_page = Some(Owned::new(Page {
1467 update: Some(Box::new(update)),
1468 cache_infos: Vec::default(),
1469 }));
1470
1471 loop {
1472 let mut page_ptr = new_page.take().unwrap();
1473 let log_reservation = match &**page_ptr.update.as_ref().unwrap() {
1474 Update::Counter(ref c) => {
1475 self.log.reserve(log_kind, pid, c, guard)?
1476 }
1477 Update::Meta(ref m) => {
1478 self.log.reserve(log_kind, pid, m, guard)?
1479 }
1480 Update::Free => self.log.reserve(log_kind, pid, &(), guard)?,
1481 Update::Node(ref node) => {
1482 self.log.reserve(log_kind, pid, node, guard)?
1483 }
1484 other => {
1485 panic!("non-replacement used in cas_page: {:?}", other)
1486 }
1487 };
1488 let lsn = log_reservation.lsn();
1489 let new_pointer = log_reservation.pointer();
1490
1491 let ts = if is_rewrite { old.ts() } else { old.ts() + 1 };
1509
1510 let cache_info = CacheInfo {
1511 ts,
1512 lsn,
1513 pointer: new_pointer,
1514 log_size: u64::try_from(log_reservation.reservation_len())
1515 .unwrap(),
1516 };
1517
1518 page_ptr.cache_infos = vec![cache_info];
1519
1520 debug_delay();
1521 let result =
1522 old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
1523
1524 match result {
1525 Ok(new_shared) => {
1526 unsafe {
1527 guard.defer_destroy(old.read);
1528 }
1529
1530 trace!("cas_page succeeded on pid {}", pid);
1531 self.log.iobufs.sa_mark_replace(
1532 pid,
1533 lsn,
1534 &old.cache_infos,
1535 cache_info,
1536 guard,
1537 )?;
1538
1539 let _pointer = log_reservation.complete()?;
1543
1544 let total_page_size =
1546 unsafe { new_shared.deref().log_size() };
1547 let to_evict =
1548 self.lru.accessed(pid, total_page_size, guard);
1549 trace!(
1550 "accessed pid {} -> paging out pids {:?}",
1551 pid,
1552 to_evict
1553 );
1554 if !to_evict.is_empty() {
1555 self.page_out(to_evict, guard)?;
1556 }
1557
1558 return Ok(Ok(PageView {
1559 read: new_shared,
1560 entry: old.entry,
1561 }));
1562 }
1563 Err(cas_error) => {
1564 trace!("cas_page failed on pid {}", pid);
1565 let _pointer = log_reservation.abort()?;
1566
1567 let current: Shared<'_, _> = cas_error.current;
1568 let actual_ts = unsafe { current.deref().ts() };
1569
1570 let mut returned_update: Owned<_> = cas_error.new;
1571
1572 if actual_ts != old.ts() || is_rewrite {
1573 return Ok(Err(Some((
1574 PageView { read: current, entry: old.entry },
1575 *returned_update.update.take().unwrap(),
1576 ))));
1577 }
1578 trace!(
1579 "retrying CAS on pid {} with same ts of {}",
1580 pid,
1581 old.ts()
1582 );
1583 old.read = current;
1584 new_page = Some(returned_update);
1585 }
1586 } } }
1589
1590 pub(crate) fn get_meta<'g>(
1592 &self,
1593 guard: &'g Guard,
1594 ) -> Result<MetaView<'g>> {
1595 trace!("getting page iter for META");
1596
1597 let page_view = match self.inner.get(META_PID, guard) {
1598 None => {
1599 return Err(Error::ReportableBug(
1600 "failed to retrieve META page \
1601 which should always be present"
1602 .into(),
1603 ));
1604 }
1605 Some(p) => p,
1606 };
1607
1608 if page_view.update.is_some() {
1609 Ok(MetaView(page_view))
1610 } else {
1611 Err(Error::ReportableBug(
1612 "failed to retrieve META page \
1613 which should always be present"
1614 .into(),
1615 ))
1616 }
1617 }
1618
1619 pub(crate) fn get_idgen<'g>(
1621 &self,
1622 guard: &'g Guard,
1623 ) -> Result<(PageView<'g>, u64)> {
1624 trace!("getting page iter for idgen");
1625
1626 let page_view = match self.inner.get(COUNTER_PID, guard) {
1627 None => {
1628 return Err(Error::ReportableBug(
1629 "failed to retrieve counter page \
1630 which should always be present"
1631 .into(),
1632 ));
1633 }
1634 Some(p) => p,
1635 };
1636
1637 if page_view.update.is_some() {
1638 let counter = page_view.as_counter();
1639 Ok((page_view, counter))
1640 } else {
1641 Err(Error::ReportableBug(
1642 "failed to retrieve counter page \
1643 which should always be present"
1644 .into(),
1645 ))
1646 }
1647 }
1648
1649 pub(crate) fn get<'g>(
1651 &self,
1652 pid: PageId,
1653 guard: &'g Guard,
1654 ) -> Result<Option<NodeView<'g>>> {
1655 trace!("getting page iterator for pid {}", pid);
1656 let _measure = Measure::new(&M.get_page);
1657
1658 if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
1659 return Err(Error::Unsupported(
1660 "you are not able to iterate over \
1661 the first couple pages, which are \
1662 reserved for storing metadata and \
1663 monotonic ID generator info"
1664 .into(),
1665 ));
1666 }
1667
1668 let mut last_attempted_cache_info = None;
1669 let mut last_err = None;
1670 let mut page_view;
1671
1672 let mut updates: Vec<Update> = loop {
1673 page_view = match self.inner.get(pid, guard) {
1677 None => return Ok(None),
1678 Some(p) => p,
1679 };
1680
1681 if page_view.is_free() {
1682 return Ok(None);
1683 }
1684
1685 if page_view.update.is_some() {
1686 let total_page_size = page_view.log_size();
1688 let to_evict = self.lru.accessed(pid, total_page_size, guard);
1689 trace!(
1690 "accessed pid {} -> paging out pids {:?}",
1691 pid,
1692 to_evict
1693 );
1694 if !to_evict.is_empty() {
1695 self.page_out(to_evict, guard)?;
1696 }
1697 return Ok(Some(NodeView(page_view)));
1698 }
1699
1700 trace!(
1701 "pulling pid {} view {:?} deref {:?}",
1702 pid,
1703 page_view,
1704 page_view.deref()
1705 );
1706 if page_view.cache_infos.first()
1707 == last_attempted_cache_info.as_ref()
1708 {
1709 return Err(last_err.unwrap());
1710 } else {
1711 last_attempted_cache_info =
1712 page_view.cache_infos.first().copied();
1713 }
1714
1715 let updates_result: Result<Vec<Update>> = page_view
1717 .cache_infos
1718 .iter()
1719 .map(|ci| self.pull(pid, ci.lsn, ci.pointer))
1720 .collect();
1721
1722 last_err = if let Ok(updates) = updates_result {
1723 break updates;
1724 } else {
1725 Some(updates_result.unwrap_err())
1726 };
1727 };
1728
1729 let (base_slice, links) = updates.split_at_mut(1);
1730
1731 let base: &mut Node = base_slice[0].as_node_mut();
1732
1733 for link_update in links {
1734 let link: &Link = link_update.as_link();
1735 base.apply(link);
1736 }
1737
1738 updates.truncate(1);
1739 let base = updates.pop().unwrap();
1740
1741 let page = Owned::new(Page {
1742 update: Some(Box::new(base)),
1743 cache_infos: page_view.cache_infos.clone(),
1744 });
1745
1746 debug_delay();
1747 let result = page_view.entry.compare_and_set(
1748 page_view.read,
1749 page,
1750 SeqCst,
1751 guard,
1752 );
1753
1754 if let Ok(new_shared) = result {
1755 trace!("fix-up for pid {} succeeded", pid);
1756
1757 unsafe {
1758 guard.defer_destroy(page_view.read);
1759 }
1760
1761 let total_page_size = unsafe { new_shared.deref().log_size() };
1763 let to_evict = self.lru.accessed(pid, total_page_size, guard);
1764 trace!("accessed pid {} -> paging out pids {:?}", pid, to_evict);
1765 if !to_evict.is_empty() {
1766 self.page_out(to_evict, guard)?;
1767 }
1768
1769 let mut page_view = page_view;
1770 page_view.read = new_shared;
1771
1772 Ok(Some(NodeView(page_view)))
1773 } else {
1774 trace!("fix-up for pid {} failed", pid);
1775
1776 self.get(pid, guard)
1777 }
1778 }
1779
1780 pub const fn was_recovered(&self) -> bool {
1791 self.was_recovered
1792 }
1793
1794 pub(crate) fn generate_id_inner(&self) -> Result<u64> {
1803 let ret = self.idgen.fetch_add(1, Release);
1804
1805 trace!("generating ID {}", ret);
1806
1807 let interval = self.config.idgen_persist_interval;
1808 let necessary_persists = ret / interval * interval;
1809 let mut persisted = self.idgen_persists.load(Acquire);
1810
1811 while persisted < necessary_persists {
1812 let _mu = self.idgen_persist_mu.lock();
1813 persisted = self.idgen_persists.load(Acquire);
1814 if persisted < necessary_persists {
1815 trace!(
1817 "persisting ID gen, as persist count {} \
1818 is below necessary persists {}",
1819 persisted,
1820 necessary_persists
1821 );
1822 let guard = pin();
1823 let (key, current) = self.get_idgen(&guard)?;
1824
1825 assert_eq!(current, persisted);
1826
1827 let counter_update = Update::Counter(necessary_persists);
1828
1829 let old = self.idgen_persists.swap(necessary_persists, Release);
1830 assert_eq!(old, persisted);
1831
1832 if self
1833 .cas_page(COUNTER_PID, key, counter_update, false, &guard)?
1834 .is_err()
1835 {
1836 continue;
1838 }
1839
1840 iobuf::make_durable(&self.log.iobufs, key.last_lsn())?;
1849 }
1850 }
1851
1852 Ok(ret)
1853 }
1854
1855 pub(crate) fn meta_pid_for_name(
1863 &self,
1864 name: &[u8],
1865 guard: &Guard,
1866 ) -> Result<PageId> {
1867 let m = self.get_meta(guard)?;
1868 if let Some(root) = m.get_root(name) {
1869 Ok(root)
1870 } else {
1871 Err(Error::CollectionNotFound(name.into()))
1872 }
1873 }
1874
1875 pub(crate) fn cas_root_in_meta<'g>(
1878 &self,
1879 name: &[u8],
1880 old: Option<PageId>,
1881 new: Option<PageId>,
1882 guard: &'g Guard,
1883 ) -> Result<std::result::Result<(), Option<PageId>>> {
1884 loop {
1885 let meta_view = self.get_meta(guard)?;
1886
1887 let actual = meta_view.get_root(name);
1888 if actual != old {
1889 return Ok(Err(actual));
1890 }
1891
1892 let mut new_meta = meta_view.deref().clone();
1893 if let Some(new) = new {
1894 new_meta.set_root(name.into(), new);
1895 } else {
1896 new_meta.del_root(name);
1897 }
1898
1899 let new_meta_link = Update::Meta(new_meta);
1900
1901 let res = self.cas_page(
1902 META_PID,
1903 meta_view.0,
1904 new_meta_link,
1905 false,
1906 guard,
1907 )?;
1908
1909 match res {
1910 Ok(_worked) => return Ok(Ok(())),
1911 Err(Some((_current_pointer, _rejected))) => {}
1912 Err(None) => {
1913 return Err(Error::ReportableBug(
1914 "replacing the META page has failed because \
1915 the pagecache does not think it currently exists."
1916 .into(),
1917 ));
1918 }
1919 }
1920 }
1921 }
1922
1923 fn page_out(&self, to_evict: Vec<PageId>, guard: &Guard) -> Result<()> {
1924 let _measure = Measure::new(&M.page_out);
1925 for pid in to_evict {
1926 if pid == COUNTER_PID
1927 || pid == META_PID
1928 || pid == BATCH_MANIFEST_PID
1929 {
1930 continue;
1932 }
1933 loop {
1934 if let Some(page_view) = self.inner.get(pid, guard) {
1935 if page_view.is_free() {
1936 break;
1938 }
1939 let new_page = Owned::new(Page {
1940 update: None,
1941 cache_infos: page_view.cache_infos.clone(),
1942 });
1943
1944 debug_delay();
1945 if page_view
1946 .entry
1947 .compare_and_set(
1948 page_view.read,
1949 new_page,
1950 SeqCst,
1951 guard,
1952 )
1953 .is_ok()
1954 {
1955 unsafe {
1956 guard.defer_destroy(page_view.read);
1957 }
1958
1959 break;
1960 }
1961 }
1963 }
1964 }
1965 Ok(())
1966 }
1967
1968 fn pull(&self, pid: PageId, lsn: Lsn, pointer: DiskPtr) -> Result<Update> {
1969 use MessageKind::*;
1970
1971 trace!("pulling pid {} lsn {} pointer {} from disk", pid, lsn, pointer);
1972 let _measure = Measure::new(&M.pull);
1973
1974 let expected_segment_number: SegmentNumber = SegmentNumber(
1975 u64::try_from(lsn).unwrap()
1976 / u64::try_from(self.config.segment_size).unwrap(),
1977 );
1978
1979 let (header, bytes) = match self.log.read(pid, lsn, pointer) {
1980 Ok(LogRead::Inline(header, buf, _len)) => {
1981 assert_eq!(
1982 header.pid, pid,
1983 "expected pid {} on pull of pointer {}, \
1984 but got {} instead",
1985 pid, pointer, header.pid
1986 );
1987 assert_eq!(
1988 header.segment_number, expected_segment_number,
1989 "expected segment number {:?} on pull of pointer {}, \
1990 but got segment number {:?} instead",
1991 expected_segment_number, pointer, header.segment_number
1992 );
1993 Ok((header, buf))
1994 }
1995 Ok(LogRead::Blob(header, buf, _blob_pointer, _inline_len)) => {
1996 assert_eq!(
1997 header.pid, pid,
1998 "expected pid {} on pull of pointer {}, \
1999 but got {} instead",
2000 pid, pointer, header.pid
2001 );
2002 assert_eq!(
2003 header.segment_number, expected_segment_number,
2004 "expected segment number {:?} on pull of pointer {}, \
2005 but got segment number {:?} instead",
2006 expected_segment_number, pointer, header.segment_number
2007 );
2008
2009 Ok((header, buf))
2010 }
2011 Ok(other) => {
2012 debug!("read unexpected page: {:?}", other);
2013 Err(Error::corruption(Some(pointer)))
2014 }
2015 Err(e) => {
2016 debug!("failed to read page: {:?}", e);
2017 Err(e)
2018 }
2019 }?;
2020
2021 let buf = &mut bytes.as_slice();
2025
2026 let update_res = {
2027 let _deserialize_latency = Measure::new(&M.deserialize);
2028
2029 match header.kind {
2030 Counter => u64::deserialize(buf).map(Update::Counter),
2031 BlobMeta | InlineMeta => {
2032 Meta::deserialize(buf).map(Update::Meta)
2033 }
2034 BlobLink | InlineLink => {
2035 Link::deserialize(buf).map(Update::Link)
2036 }
2037 BlobNode | InlineNode => {
2038 Node::deserialize(buf).map(Update::Node)
2039 }
2040 Free => Ok(Update::Free),
2041 Corrupted | Canceled | Cap | BatchManifest => {
2042 panic!("unexpected pull: {:?}", header.kind)
2043 }
2044 }
2045 };
2046
2047 let update = update_res.expect("failed to deserialize data");
2048
2049 if let Update::Free = update {
2051 Err(Error::ReportableBug(format!(
2052 "non-link/replace found in pull of pid {}",
2053 pid
2054 )))
2055 } else {
2056 Ok(update)
2057 }
2058 }
2059
2060 fn load_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
2061 let next_pid_to_allocate = snapshot.pt.len() as PageId;
2062
2063 self.next_pid_to_allocate = Mutex::new(next_pid_to_allocate);
2064
2065 debug!("load_snapshot loading pages from 0..{}", next_pid_to_allocate);
2066 for pid in 0..next_pid_to_allocate {
2067 let state = if let Some(state) =
2068 snapshot.pt.get(usize::try_from(pid).unwrap())
2069 {
2070 state
2071 } else {
2072 panic!(
2073 "load_snapshot pid {} not found, despite being below the max pid {}",
2074 pid, next_pid_to_allocate
2075 );
2076 };
2077
2078 trace!("load_snapshot pid {} {:?}", pid, state);
2079
2080 let mut cache_infos = Vec::default();
2081
2082 let guard = pin();
2083
2084 match *state {
2085 PageState::Present { base, ref frags } => {
2086 cache_infos.push(CacheInfo {
2087 lsn: base.0,
2088 pointer: base.1,
2089 log_size: base.2,
2090 ts: 0,
2091 });
2092 for (lsn, pointer, sz) in frags {
2093 let cache_info = CacheInfo {
2094 lsn: *lsn,
2095 pointer: *pointer,
2096 log_size: *sz,
2097 ts: 0,
2098 };
2099
2100 cache_infos.push(cache_info);
2101 }
2102 }
2103 PageState::Free(lsn, pointer) => {
2104 trace!("load_snapshot freeing pid {}", pid);
2106 let cache_info = CacheInfo {
2107 lsn,
2108 pointer,
2109 log_size: u64::try_from(MAX_MSG_HEADER_LEN).unwrap(),
2110 ts: 0,
2111 };
2112 cache_infos.push(cache_info);
2113 self.free.lock().push(pid);
2114 }
2115 _ => panic!("tried to load a {:?}", state),
2116 }
2117
2118 trace!("installing page for pid {}", pid);
2120
2121 let update = if pid == META_PID || pid == COUNTER_PID {
2122 let update =
2123 self.pull(pid, cache_infos[0].lsn, cache_infos[0].pointer)?;
2124 Some(Box::new(update))
2125 } else if state.is_free() {
2126 Some(Box::new(Update::Free))
2127 } else {
2128 None
2129 };
2130 let page = Page { update, cache_infos };
2131
2132 self.inner.insert(pid, page, &guard);
2133 }
2134
2135 Ok(())
2136 }
2137
2138 #[allow(unused)]
2146 fn take_fuzzy_snapshot(self) -> Snapshot {
2147 let stable_lsn_now: Lsn = self.log.stable_offset();
2148
2149 let pid_bound = *self.next_pid_to_allocate.lock();
2151
2152 let pid_bound_usize = assert_usize(pid_bound);
2153
2154 let mut page_states = Vec::<PageState>::with_capacity(pid_bound_usize);
2155 let guard = pin();
2156 for pid in 0..pid_bound {
2157 'inner: loop {
2158 if let Some(pg_view) = self.inner.get(pid, &guard) {
2159 if pg_view.cache_infos.is_empty() {
2160 std::thread::yield_now();
2166 } else {
2167 let page_state = pg_view.to_page_state();
2168 page_states.push(page_state);
2169 break 'inner;
2170 }
2171 } else {
2172 std::thread::yield_now();
2177 }
2178 }
2179 }
2180
2181 Snapshot {
2182 stable_lsn: Some(stable_lsn_now),
2183 active_segment: None,
2184 pt: page_states,
2185 }
2186 }
2187}