sled/pagecache/
mod.rs

1//! `pagecache` is a lock-free pagecache and log for building high-performance
2//! databases.
3#![allow(unsafe_code)]
4
5pub mod constants;
6pub mod logger;
7
8mod blob_io;
9mod disk_pointer;
10mod header;
11mod iobuf;
12mod iterator;
13mod pagetable;
14#[cfg(any(all(not(unix), not(windows)), miri))]
15mod parallel_io_polyfill;
16#[cfg(all(unix, not(miri)))]
17mod parallel_io_unix;
18#[cfg(all(windows, not(miri)))]
19mod parallel_io_windows;
20mod reservation;
21mod segment;
22mod snapshot;
23
24use std::{collections::BinaryHeap, ops::Deref};
25
26use crate::*;
27
28#[cfg(any(all(not(unix), not(windows)), miri))]
29use parallel_io_polyfill::{pread_exact, pread_exact_or_eof, pwrite_all};
30
31#[cfg(all(unix, not(miri)))]
32use parallel_io_unix::{pread_exact, pread_exact_or_eof, pwrite_all};
33
34#[cfg(all(windows, not(miri)))]
35use parallel_io_windows::{pread_exact, pread_exact_or_eof, pwrite_all};
36
37use self::{
38    blob_io::{gc_blobs, read_blob, remove_blob, write_blob},
39    constants::{
40        BATCH_MANIFEST_PID, COUNTER_PID, META_PID,
41        PAGE_CONSOLIDATION_THRESHOLD, SEGMENT_CLEANUP_THRESHOLD,
42    },
43    header::Header,
44    iobuf::{roll_iobuf, IoBuf, IoBufs},
45    iterator::{raw_segment_iter_from, LogIter},
46    pagetable::PageTable,
47    segment::{SegmentAccountant, SegmentCleaner, SegmentOp},
48};
49
50pub(crate) use self::{
51    logger::{
52        read_message, read_segment_header, MessageHeader, SegmentHeader,
53        SegmentNumber,
54    },
55    reservation::Reservation,
56    snapshot::{read_snapshot_or_default, PageState, Snapshot},
57};
58
59pub use self::{
60    constants::{
61        MAX_MSG_HEADER_LEN, MAX_SPACE_AMPLIFICATION, MINIMUM_ITEMS_PER_SEGMENT,
62        SEG_HEADER_LEN,
63    },
64    disk_pointer::DiskPtr,
65    logger::{Log, LogRead},
66};
67
68/// The offset of a segment. This equals its `LogOffset` (or the offset of any
69/// item contained inside it) divided by the configured `segment_size`.
70pub type SegmentId = usize;
71
72/// A file offset in the database log.
73pub type LogOffset = u64;
74
75/// A pointer to an blob blob.
76pub type BlobPointer = Lsn;
77
78/// The logical sequence number of an item in the database log.
79pub type Lsn = i64;
80
81/// A page identifier.
82pub type PageId = u64;
83
84/// Uses a non-varint `Lsn` to mark offsets.
85#[derive(Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Debug)]
86#[repr(transparent)]
87pub struct BatchManifest(pub Lsn);
88
89/// A buffer with an associated offset. Useful for
90/// batching many reads over a file segment.
91#[derive(Debug)]
92pub struct BasedBuf {
93    pub buf: Vec<u8>,
94    pub offset: LogOffset,
95}
96
97/// A byte used to disambiguate log message types
98#[derive(Clone, Copy, PartialEq, Eq, Debug)]
99#[repr(u8)]
100pub enum MessageKind {
101    /// The EVIL_BYTE is written as a canary to help
102    /// detect torn writes.
103    Corrupted = 0,
104    /// Indicates that the following buffer corresponds
105    /// to a reservation for an in-memory operation that
106    /// failed to complete. It should be skipped during
107    /// recovery.
108    Canceled = 1,
109    /// Indicates that the following buffer is used
110    /// as padding to fill out the rest of the segment
111    /// before sealing it.
112    Cap = 2,
113    /// Indicates that the following buffer contains
114    /// an Lsn for the last write in an atomic writebatch.
115    BatchManifest = 3,
116    /// Indicates that this page was freed from the pagetable.
117    Free = 4,
118    /// Indicates that the last persisted ID was at least
119    /// this high.
120    Counter = 5,
121    /// The meta page, stored inline
122    InlineMeta = 6,
123    /// The meta page, stored blobly
124    BlobMeta = 7,
125    /// A consolidated page replacement, stored inline
126    InlineNode = 8,
127    /// A consolidated page replacement, stored blobly
128    BlobNode = 9,
129    /// A partial page update, stored inline
130    InlineLink = 10,
131    /// A partial page update, stored blobly
132    BlobLink = 11,
133}
134
135impl MessageKind {
136    pub(crate) const fn into(self) -> u8 {
137        self as u8
138    }
139}
140
141impl From<u8> for MessageKind {
142    fn from(byte: u8) -> Self {
143        use MessageKind::*;
144        match byte {
145            0 => Corrupted,
146            1 => Canceled,
147            2 => Cap,
148            3 => BatchManifest,
149            4 => Free,
150            5 => Counter,
151            6 => InlineMeta,
152            7 => BlobMeta,
153            8 => InlineNode,
154            9 => BlobNode,
155            10 => InlineLink,
156            11 => BlobLink,
157            other => {
158                debug!("encountered unexpected message kind byte {}", other);
159                Corrupted
160            }
161        }
162    }
163}
164
165/// The high-level types of stored information
166/// about pages and their mutations
167#[derive(Clone, Copy, Debug, PartialEq, Eq)]
168pub enum LogKind {
169    /// Persisted data containing a page replacement
170    Replace,
171    /// Persisted immutable update
172    Link,
173    /// Freeing of a page
174    Free,
175    /// Some state indicating this should be skipped
176    Skip,
177    /// Unexpected corruption
178    Corrupted,
179}
180
181fn log_kind_from_update(update: &Update) -> LogKind {
182    match update {
183        Update::Free => LogKind::Free,
184        Update::Link(..) => LogKind::Link,
185        Update::Node(..) | Update::Counter(..) | Update::Meta(..) => {
186            LogKind::Replace
187        }
188    }
189}
190
191impl From<MessageKind> for LogKind {
192    fn from(kind: MessageKind) -> Self {
193        match kind {
194            MessageKind::Free => LogKind::Free,
195            MessageKind::InlineNode
196            | MessageKind::Counter
197            | MessageKind::BlobNode
198            | MessageKind::InlineMeta
199            | MessageKind::BlobMeta => LogKind::Replace,
200            MessageKind::InlineLink | MessageKind::BlobLink => LogKind::Link,
201            MessageKind::Canceled
202            | MessageKind::Cap
203            | MessageKind::BatchManifest => LogKind::Skip,
204            other => {
205                debug!("encountered unexpected message kind byte {:?}", other);
206                LogKind::Corrupted
207            }
208        }
209    }
210}
211
212fn assert_usize<T>(from: T) -> usize
213where
214    usize: TryFrom<T, Error = std::num::TryFromIntError>,
215{
216    usize::try_from(from).expect("lost data cast while converting to usize")
217}
218
219// TODO remove this when atomic fetch_max stabilizes in #48655
220fn bump_atomic_lsn(atomic_lsn: &AtomicLsn, to: Lsn) {
221    let mut current = atomic_lsn.load(Acquire);
222    loop {
223        if current >= to {
224            return;
225        }
226        let last = atomic_lsn.compare_and_swap(current, to, SeqCst);
227        if last == current {
228            // we succeeded.
229            return;
230        }
231        current = last;
232    }
233}
234
235use std::convert::{TryFrom, TryInto};
236
237#[inline]
238pub(crate) fn lsn_to_arr(number: Lsn) -> [u8; 8] {
239    number.to_le_bytes()
240}
241
242#[inline]
243pub(crate) fn arr_to_lsn(arr: &[u8]) -> Lsn {
244    Lsn::from_le_bytes(arr.try_into().unwrap())
245}
246
247#[inline]
248pub(crate) fn u64_to_arr(number: u64) -> [u8; 8] {
249    number.to_le_bytes()
250}
251
252#[inline]
253pub(crate) fn arr_to_u32(arr: &[u8]) -> u32 {
254    u32::from_le_bytes(arr.try_into().unwrap())
255}
256
257#[inline]
258pub(crate) fn u32_to_arr(number: u32) -> [u8; 4] {
259    number.to_le_bytes()
260}
261
262#[allow(clippy::needless_pass_by_value)]
263pub(crate) fn maybe_decompress(in_buf: Vec<u8>) -> std::io::Result<Vec<u8>> {
264    #[cfg(feature = "compression")]
265    {
266        use zstd::stream::decode_all;
267
268        let scootable_in_buf = &mut &*in_buf;
269        let _ivec_varint = u64::deserialize(scootable_in_buf)
270            .expect("this had to be serialized with an extra length frame");
271        let _measure = Measure::new(&M.decompress);
272        let out_buf = decode_all(scootable_in_buf).expect(
273            "failed to decompress data. \
274             This is not expected, please open an issue on \
275             https://github.com/spacejam/sled so we can \
276             fix this critical issue ASAP. Thank you :)",
277        );
278
279        Ok(out_buf)
280    }
281
282    #[cfg(not(feature = "compression"))]
283    Ok(in_buf)
284}
285
286#[derive(Debug, Clone, Copy)]
287pub struct NodeView<'g>(pub(crate) PageView<'g>);
288
289impl<'g> Deref for NodeView<'g> {
290    type Target = Node;
291    fn deref(&self) -> &Node {
292        self.0.as_node()
293    }
294}
295
296unsafe impl<'g> Send for NodeView<'g> {}
297unsafe impl<'g> Sync for NodeView<'g> {}
298
299#[derive(Debug, Clone, Copy)]
300pub struct MetaView<'g>(PageView<'g>);
301
302impl<'g> Deref for MetaView<'g> {
303    type Target = Meta;
304    fn deref(&self) -> &Meta {
305        self.0.as_meta()
306    }
307}
308
309unsafe impl<'g> Send for MetaView<'g> {}
310unsafe impl<'g> Sync for MetaView<'g> {}
311
312#[derive(Debug, Clone, Copy)]
313pub struct PageView<'g> {
314    pub(crate) read: Shared<'g, Page>,
315    pub(crate) entry: &'g Atomic<Page>,
316}
317
318unsafe impl<'g> Send for PageView<'g> {}
319unsafe impl<'g> Sync for PageView<'g> {}
320
321impl<'g> Deref for PageView<'g> {
322    type Target = Page;
323
324    fn deref(&self) -> &Page {
325        unsafe { self.read.deref() }
326    }
327}
328
329#[derive(Clone, Copy, Debug, Eq, PartialEq)]
330pub struct CacheInfo {
331    pub ts: u64,
332    pub lsn: Lsn,
333    pub pointer: DiskPtr,
334    pub log_size: u64,
335}
336
337#[cfg(test)]
338impl quickcheck::Arbitrary for CacheInfo {
339    fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> CacheInfo {
340        use rand::Rng;
341
342        CacheInfo {
343            ts: g.gen(),
344            lsn: g.gen(),
345            pointer: DiskPtr::arbitrary(g),
346            log_size: g.gen(),
347        }
348    }
349}
350
351/// Update<PageLinkment> denotes a state or a change in a sequence of updates
352/// of which a page consists.
353#[derive(Clone, Debug, PartialEq)]
354pub(crate) enum Update {
355    Link(Link),
356    Node(Node),
357    Free,
358    Counter(u64),
359    Meta(Meta),
360}
361
362impl Update {
363    fn as_node(&self) -> &Node {
364        match self {
365            Update::Node(node) => node,
366            other => panic!("called as_node on non-Node: {:?}", other),
367        }
368    }
369
370    fn as_node_mut(&mut self) -> &mut Node {
371        match self {
372            Update::Node(node) => node,
373            other => panic!("called as_node_mut on non-Node: {:?}", other),
374        }
375    }
376
377    fn as_link(&self) -> &Link {
378        match self {
379            Update::Link(link) => link,
380            other => panic!("called as_link on non-Link: {:?}", other),
381        }
382    }
383
384    pub(crate) fn as_meta(&self) -> &Meta {
385        if let Update::Meta(meta) = self {
386            meta
387        } else {
388            panic!("called as_meta on {:?}", self)
389        }
390    }
391
392    pub(crate) fn as_counter(&self) -> u64 {
393        if let Update::Counter(counter) = self {
394            *counter
395        } else {
396            panic!("called as_counter on {:?}", self)
397        }
398    }
399
400    fn is_free(&self) -> bool {
401        if let Update::Free = self {
402            true
403        } else {
404            false
405        }
406    }
407}
408
409/// Ensures that any operations that are written to disk between the
410/// creation of this guard and its destruction will be recovered
411/// atomically. When this guard is dropped, it marks in an earlier
412/// reservation where the stable tip must be in order to perform
413/// recovery. If this is beyond where the system successfully
414/// wrote before crashing, then the recovery will stop immediately
415/// before any of the atomic batch can be partially recovered.
416///
417/// Must call `seal_batch` to complete the atomic batch operation.
418///
419/// If this is dropped without calling `seal_batch`, the complete
420/// recovery effect will not occur.
421#[derive(Debug)]
422pub struct RecoveryGuard<'a> {
423    batch_res: Reservation<'a>,
424}
425
426impl<'a> RecoveryGuard<'a> {
427    /// Writes the last LSN for a batch into an earlier
428    /// reservation, releasing it.
429    pub(crate) fn seal_batch(self) -> Result<()> {
430        let max_reserved =
431            self.batch_res.log.iobufs.max_reserved_lsn.load(Acquire);
432        self.batch_res.mark_writebatch(max_reserved).map(|_| ())
433    }
434}
435
436/// A page consists of a sequence of state transformations
437/// with associated storage parameters like disk pos, lsn, time.
438#[derive(Debug, Clone)]
439pub struct Page {
440    pub(crate) update: Option<Box<Update>>,
441    pub(crate) cache_infos: Vec<CacheInfo>,
442}
443
444impl Page {
445    pub(crate) fn to_page_state(&self) -> PageState {
446        let base = &self.cache_infos[0];
447        if self.is_free() {
448            PageState::Free(base.lsn, base.pointer)
449        } else {
450            let mut frags: Vec<(Lsn, DiskPtr, u64)> = vec![];
451
452            for cache_info in self.cache_infos.iter().skip(1) {
453                frags.push((
454                    cache_info.lsn,
455                    cache_info.pointer,
456                    cache_info.log_size,
457                ));
458            }
459
460            PageState::Present {
461                base: (base.lsn, base.pointer, base.log_size),
462                frags,
463            }
464        }
465    }
466
467    pub(crate) fn as_node(&self) -> &Node {
468        self.update.as_ref().unwrap().as_node()
469    }
470
471    pub(crate) fn as_meta(&self) -> &Meta {
472        self.update.as_ref().unwrap().as_meta()
473    }
474
475    pub(crate) fn as_counter(&self) -> u64 {
476        self.update.as_ref().unwrap().as_counter()
477    }
478
479    pub(crate) fn is_free(&self) -> bool {
480        self.update.as_ref().map_or(false, |u| u.is_free())
481            || self.cache_infos.is_empty()
482    }
483
484    pub(crate) fn last_lsn(&self) -> Lsn {
485        self.cache_infos.last().map(|ci| ci.lsn).unwrap()
486    }
487
488    pub(crate) fn log_size(&self) -> u64 {
489        self.cache_infos.iter().map(|ci| ci.log_size).sum()
490    }
491
492    fn ts(&self) -> u64 {
493        self.cache_infos.last().map_or(0, |ci| ci.ts)
494    }
495
496    fn lone_blob(&self) -> Option<DiskPtr> {
497        if self.cache_infos.len() == 1 && self.cache_infos[0].pointer.is_blob()
498        {
499            Some(self.cache_infos[0].pointer)
500        } else {
501            None
502        }
503    }
504}
505
506/// A lock-free pagecache which supports linkmented pages
507/// for dramatically improving write throughput.
508pub struct PageCache {
509    pub(crate) config: RunningConfig,
510    inner: PageTable,
511    next_pid_to_allocate: Mutex<PageId>,
512    free: Arc<Mutex<BinaryHeap<PageId>>>,
513    #[doc(hidden)]
514    pub log: Log,
515    lru: Lru,
516    idgen: Arc<AtomicU64>,
517    idgen_persists: Arc<AtomicU64>,
518    idgen_persist_mu: Arc<Mutex<()>>,
519    was_recovered: bool,
520}
521
522unsafe impl Send for PageCache {}
523
524unsafe impl Sync for PageCache {}
525
526impl Debug for PageCache {
527    fn fmt(
528        &self,
529        f: &mut fmt::Formatter<'_>,
530    ) -> std::result::Result<(), fmt::Error> {
531        f.write_str(&*format!(
532            "PageCache {{ max: {:?} free: {:?} }}\n",
533            *self.next_pid_to_allocate.lock(),
534            self.free
535        ))
536    }
537}
538
539#[cfg(feature = "event_log")]
540impl Drop for PageCache {
541    fn drop(&mut self) {
542        use std::collections::HashMap;
543
544        trace!("dropping pagecache");
545
546        // we can't as easily assert recovery
547        // invariants across failpoints for now
548        if self.log.iobufs.config.global_error().is_ok() {
549            let mut pages_before_restart = HashMap::new();
550
551            let guard = pin();
552
553            self.config.event_log.meta_before_restart(
554                self.get_meta(&guard)
555                    .expect("should get meta under test")
556                    .deref()
557                    .clone(),
558            );
559
560            for pid in 0..*self.next_pid_to_allocate.lock() {
561                let pte = if let Some(pte) = self.inner.get(pid, &guard) {
562                    pte
563                } else {
564                    continue;
565                };
566                let pointers =
567                    pte.cache_infos.iter().map(|ci| ci.pointer).collect();
568                pages_before_restart.insert(pid, pointers);
569            }
570
571            self.config.event_log.pages_before_restart(pages_before_restart);
572        }
573
574        trace!("pagecache dropped");
575    }
576}
577
578impl PageCache {
579    /// Instantiate a new `PageCache`.
580    pub(crate) fn start(config: RunningConfig) -> Result<Self> {
581        trace!("starting pagecache");
582
583        config.reset_global_error();
584
585        // try to pull any existing snapshot off disk, and
586        // apply any new data to it to "catch-up" the
587        // snapshot before loading it.
588        let snapshot = read_snapshot_or_default(&config)?;
589
590        #[cfg(feature = "testing")]
591        {
592            // these checks are in place to catch non-idempotent
593            // recovery which could trigger feedback loops and
594            // emergent behavior.
595            trace!(
596                "\n\n~~~~ regenerating snapshot for idempotency test ~~~~\n"
597            );
598
599            let snapshot2 = read_snapshot_or_default(&config)
600                .expect("second read snapshot");
601            assert_eq!(
602                snapshot.active_segment, snapshot2.active_segment,
603                "snapshot active_segment diverged across recoveries.\n\n \
604                first: {:?}\n\n
605                second: {:?}\n\n",
606                snapshot, snapshot2
607            );
608            assert_eq!(
609                snapshot.stable_lsn, snapshot2.stable_lsn,
610                "snapshot stable_lsn diverged across recoveries.\n\n \
611                first: {:?}\n\n
612                second: {:?}\n\n",
613                snapshot, snapshot2
614            );
615            for (pid, (p1, p2)) in
616                snapshot.pt.iter().zip(snapshot2.pt.iter()).enumerate()
617            {
618                assert_eq!(
619                    p1, p2,
620                    "snapshot pid {} diverged across recoveries.\n\n \
621                first: {:?}\n\n
622                second: {:?}\n\n",
623                    pid, p1, p2
624                );
625            }
626            assert_eq!(
627                snapshot.pt.len(),
628                snapshot2.pt.len(),
629                "snapshots number of pages diverged across recoveries.\n\n \
630                first: {:?}\n\n
631                second: {:?}\n\n",
632                snapshot.pt,
633                snapshot2.pt
634            );
635            assert_eq!(
636                snapshot, snapshot2,
637                "snapshots diverged across recoveries.\n\n \
638                first: {:?}\n\n
639                second: {:?}\n\n",
640                snapshot, snapshot2
641            );
642        }
643
644        let _measure = Measure::new(&M.start_pagecache);
645
646        let cache_capacity = config.cache_capacity;
647        let lru = Lru::new(cache_capacity);
648
649        let mut pc = Self {
650            config: config.clone(),
651            inner: PageTable::default(),
652            next_pid_to_allocate: Mutex::new(0),
653            free: Arc::new(Mutex::new(BinaryHeap::new())),
654            log: Log::start(config, &snapshot)?,
655            lru,
656            idgen_persist_mu: Arc::new(Mutex::new(())),
657            idgen: Arc::new(AtomicU64::new(0)),
658            idgen_persists: Arc::new(AtomicU64::new(0)),
659            was_recovered: false,
660        };
661
662        // now we read it back in
663        pc.load_snapshot(&snapshot)?;
664
665        #[cfg(feature = "testing")]
666        {
667            use std::collections::HashMap;
668
669            // NB this must be before idgen/meta are initialized
670            // because they may cas_page on initial page-in.
671            let guard = pin();
672
673            let mut pages_after_restart = HashMap::new();
674
675            for pid in 0..*pc.next_pid_to_allocate.lock() {
676                let pte = if let Some(pte) = pc.inner.get(pid, &guard) {
677                    pte
678                } else {
679                    continue;
680                };
681                let pointers =
682                    pte.cache_infos.iter().map(|ci| ci.pointer).collect();
683                pages_after_restart.insert(pid, pointers);
684            }
685
686            pc.config.event_log.pages_after_restart(pages_after_restart);
687        }
688
689        let mut was_recovered = true;
690
691        {
692            // subscope required because pc.begin() borrows pc
693
694            let guard = pin();
695
696            if let Err(Error::ReportableBug(..)) = pc.get_meta(&guard) {
697                // set up meta
698                was_recovered = false;
699
700                let meta_update = Update::Meta(Meta::default());
701
702                let (meta_id, _) = pc.allocate_inner(meta_update, &guard)?;
703
704                assert_eq!(
705                    meta_id, META_PID,
706                    "we expect the meta page to have pid {}, but it had pid {} instead",
707                    META_PID, meta_id,
708                );
709            }
710
711            if let Err(Error::ReportableBug(..)) = pc.get_idgen(&guard) {
712                // set up idgen
713                was_recovered = false;
714
715                let counter_update = Update::Counter(0);
716
717                let (counter_id, _) =
718                    pc.allocate_inner(counter_update, &guard)?;
719
720                assert_eq!(
721                    counter_id, COUNTER_PID,
722                    "we expect the counter to have pid {}, but it had pid {} instead",
723                    COUNTER_PID, counter_id,
724                );
725            }
726
727            let (_, counter) = pc.get_idgen(&guard)?;
728            let idgen_recovery = if was_recovered {
729                counter + (2 * pc.config.idgen_persist_interval)
730            } else {
731                0
732            };
733            let idgen_persists = counter / pc.config.idgen_persist_interval
734                * pc.config.idgen_persist_interval;
735
736            pc.idgen.store(idgen_recovery, Release);
737            pc.idgen_persists.store(idgen_persists, Release);
738        }
739
740        pc.was_recovered = was_recovered;
741
742        #[cfg(feature = "event_log")]
743        {
744            let guard = pin();
745
746            pc.config.event_log.meta_after_restart(
747                pc.get_meta(&guard)
748                    .expect("should be able to get meta under test")
749                    .deref()
750                    .clone(),
751            );
752        }
753
754        trace!("pagecache started");
755
756        Ok(pc)
757    }
758
759    /// Flushes any pending IO buffers to disk to ensure durability.
760    /// Returns the number of bytes written during this call.
761    pub(crate) fn flush(&self) -> Result<usize> {
762        self.log.flush()
763    }
764
765    /// Create a new page, trying to reuse old freed pages if possible
766    /// to maximize underlying `PageTable` pointer density. Returns
767    /// the page ID and its pointer for use in future atomic `replace`
768    /// and `link` operations.
769    pub(crate) fn allocate<'g>(
770        &self,
771        new: Node,
772        guard: &'g Guard,
773    ) -> Result<(PageId, PageView<'g>)> {
774        self.allocate_inner(Update::Node(new), guard)
775    }
776
777    fn allocate_inner<'g>(
778        &self,
779        new: Update,
780        guard: &'g Guard,
781    ) -> Result<(PageId, PageView<'g>)> {
782        let mut allocation_serializer;
783
784        let free_opt = self.free.lock().pop();
785
786        let (pid, page_view) = if let Some(pid) = free_opt {
787            trace!("re-allocating pid {}", pid);
788
789            let page_view = match self.inner.get(pid, guard) {
790                None => panic!(
791                    "expected to find existing stack \
792                     for re-allocated pid {}",
793                    pid
794                ),
795                Some(p) => p,
796            };
797            assert!(
798                page_view.is_free(),
799                "failed to re-allocate pid {} which \
800                 contained unexpected state {:?}",
801                pid,
802                page_view,
803            );
804            (pid, page_view)
805        } else {
806            // we need to hold the allocation mutex because
807            // we have to maintain the invariant that our
808            // recoverable allocated pages will be contiguous.
809            // If we did not hold this mutex, it would be
810            // possible (especially under high thread counts)
811            // to persist pages non-monotonically to disk,
812            // which would break our recovery invariants.
813            // While we could just remove that invariant,
814            // because it is overly-strict, it allows us
815            // to flag corruption and bugs during testing
816            // much more easily.
817            allocation_serializer = self.next_pid_to_allocate.lock();
818            let pid = *allocation_serializer;
819            *allocation_serializer += 1;
820
821            trace!("allocating pid {} for the first time", pid);
822
823            let new_page = Page { update: None, cache_infos: Vec::default() };
824
825            let page_view = self.inner.insert(pid, new_page, guard);
826
827            (pid, page_view)
828        };
829
830        let new_pointer = self
831            .cas_page(pid, page_view, new, false, guard)?
832            .unwrap_or_else(|e| {
833                panic!(
834                    "should always be able to install \
835                     a new page during allocation, but \
836                     failed for pid {}: {:?}",
837                    pid, e
838                )
839            });
840
841        Ok((pid, new_pointer))
842    }
843
844    /// Attempt to opportunistically rewrite data from a Draining
845    /// segment of the file to help with space amplification.
846    /// Returns Ok(true) if we had the opportunity to attempt to
847    /// move a page. Returns Ok(false) if there were no pages
848    /// to GC. Returns an Err if we encountered an IO problem
849    /// while performing this GC.
850    #[cfg(all(
851        not(miri),
852        any(
853            windows,
854            target_os = "linux",
855            target_os = "macos",
856            target_os = "dragonfly",
857            target_os = "freebsd",
858            target_os = "openbsd",
859            target_os = "netbsd",
860        )
861    ))]
862    pub(crate) fn attempt_gc(&self) -> Result<bool> {
863        let guard = pin();
864        let cc = concurrency_control::read();
865        let to_clean = self.log.iobufs.segment_cleaner.pop();
866        let ret = if let Some((pid_to_clean, segment_to_clean)) = to_clean {
867            self.rewrite_page(pid_to_clean, segment_to_clean, &guard)
868                .map(|_| true)
869        } else {
870            Ok(false)
871        };
872        drop(cc);
873        guard.flush();
874        ret
875    }
876
877    /// Initiate an atomic sequence of writes to the
878    /// underlying log. Returns a `RecoveryGuard` which,
879    /// when dropped, will record the current max reserved
880    /// LSN into an earlier log reservation. During recovery,
881    /// when we hit this early atomic LSN marker, if the
882    /// specified LSN is beyond the contiguous tip of the log,
883    /// we immediately halt recovery, preventing the recovery
884    /// of partial transactions or write batches. This is
885    /// a relatively low-level primitive that can be used
886    /// to facilitate transactions and write batches when
887    /// combined with a concurrency control system in another
888    /// component.
889    pub(crate) fn pin_log(&self, guard: &Guard) -> Result<RecoveryGuard<'_>> {
890        // HACK: we are rolling the io buffer before AND
891        // after taking out the reservation pin to avoid
892        // a deadlock where the batch reservation causes
893        // writes to fail to flush to disk. in the future,
894        // this may be addressed in a nicer way by representing
895        // transactions with a begin and end message, rather
896        // than a single beginning message that needs to
897        // be held until we know the final batch LSN.
898        self.log.roll_iobuf()?;
899
900        let batch_res = self.log.reserve(
901            LogKind::Skip,
902            BATCH_MANIFEST_PID,
903            &BatchManifest::default(),
904            guard,
905        )?;
906
907        iobuf::maybe_seal_and_write_iobuf(
908            &self.log.iobufs,
909            &batch_res.iobuf,
910            batch_res.iobuf.get_header(),
911            false,
912        )?;
913
914        Ok(RecoveryGuard { batch_res })
915    }
916
917    #[doc(hidden)]
918    #[cfg(feature = "failpoints")]
919    #[cfg(all(
920        not(miri),
921        any(
922            windows,
923            target_os = "linux",
924            target_os = "macos",
925            target_os = "dragonfly",
926            target_os = "freebsd",
927            target_os = "openbsd",
928            target_os = "netbsd",
929        )
930    ))]
931    pub(crate) fn set_failpoint(&self, e: Error) {
932        if let Error::FailPoint = e {
933            self.config.set_global_error(e);
934
935            // wake up any waiting threads
936            // so they don't stall forever
937            let intervals = self.log.iobufs.intervals.lock();
938
939            // having held the mutex makes this linearized
940            // with the notify below.
941            drop(intervals);
942
943            let _notified = self.log.iobufs.interval_updated.notify_all();
944        }
945    }
946
947    /// Free a particular page.
948    pub(crate) fn free<'g>(
949        &self,
950        pid: PageId,
951        old: PageView<'g>,
952        guard: &'g Guard,
953    ) -> Result<CasResult<'g, ()>> {
954        trace!("attempting to free pid {}", pid);
955
956        if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
957            return Err(Error::Unsupported(
958                "you are not able to free the first \
959                 couple pages, which are allocated \
960                 for system internal purposes"
961                    .into(),
962            ));
963        }
964
965        let new_pointer =
966            self.cas_page(pid, old, Update::Free, false, guard)?;
967
968        if new_pointer.is_ok() {
969            let free = self.free.clone();
970            guard.defer(move || {
971                let mut free = free.lock();
972                // panic if we double-freed a page
973                if free.iter().any(|e| e == &pid) {
974                    panic!("pid {} was double-freed", pid);
975                }
976
977                free.push(pid);
978            });
979        }
980
981        Ok(new_pointer.map_err(|o| o.map(|(pointer, _)| (pointer, ()))))
982    }
983
984    /// Try to atomically add a `PageLink` to the page.
985    /// Returns `Ok(new_key)` if the operation was successful. Returns
986    /// `Err(None)` if the page no longer exists. Returns
987    /// `Err(Some(actual_key))` if the atomic link fails.
988    pub(crate) fn link<'g>(
989        &'g self,
990        pid: PageId,
991        mut old: PageView<'g>,
992        new: Link,
993        guard: &'g Guard,
994    ) -> Result<CasResult<'g, Link>> {
995        let _measure = Measure::new(&M.link_page);
996
997        trace!("linking pid {} with {:?}", pid, new);
998
999        // A failure injector that fails links randomly
1000        // during test to ensure interleaving coverage.
1001        #[cfg(any(test, feature = "lock_free_delays"))]
1002        {
1003            use std::cell::RefCell;
1004            use std::time::{SystemTime, UNIX_EPOCH};
1005
1006            thread_local! {
1007                pub static COUNT: RefCell<u32> = RefCell::new(1);
1008            }
1009
1010            let time_now =
1011                SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1012
1013            #[allow(clippy::cast_possible_truncation)]
1014            let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
1015
1016            let inject_failure = COUNT.with(|c| {
1017                let mut cr = c.borrow_mut();
1018                *cr += 1;
1019                *cr % fail_seed == 0
1020            });
1021
1022            if inject_failure {
1023                debug!(
1024                    "injecting a randomized failure in the link of pid {}",
1025                    pid
1026                );
1027                if let Some(current_pointer) = self.get(pid, guard)? {
1028                    return Ok(Err(Some((current_pointer.0, new))));
1029                } else {
1030                    return Ok(Err(None));
1031                }
1032            }
1033        }
1034
1035        let mut node: Node = old.as_node().clone();
1036        node.apply(&new);
1037
1038        // see if we should short-circuit replace
1039        if old.cache_infos.len() >= PAGE_CONSOLIDATION_THRESHOLD {
1040            let short_circuit = self.replace(pid, old, node, guard)?;
1041            return Ok(short_circuit.map_err(|a| a.map(|b| (b.0, new))));
1042        }
1043
1044        let mut new_page = Some(Owned::new(Page {
1045            update: Some(Box::new(Update::Node(node))),
1046            cache_infos: Vec::default(),
1047        }));
1048
1049        loop {
1050            // TODO handle replacement on threshold here instead
1051
1052            let log_reservation =
1053                self.log.reserve(LogKind::Link, pid, &new, guard)?;
1054            let lsn = log_reservation.lsn();
1055            let pointer = log_reservation.pointer();
1056
1057            // NB the setting of the timestamp is quite
1058            // correctness-critical! We use the ts to
1059            // ensure that fundamentally new data causes
1060            // high-level link and replace operations
1061            // to fail when the data in the pagecache
1062            // actually changes. When we just rewrite
1063            // the page for the purposes of moving it
1064            // to a new location on disk, however, we
1065            // don't want to cause threads that are
1066            // basing the correctness of their new
1067            // writes on the unchanged state to fail.
1068            // Here, we bump it by 1, to signal that
1069            // the underlying state is fundamentally
1070            // changing.
1071            let ts = old.ts() + 1;
1072
1073            let cache_info = CacheInfo {
1074                lsn,
1075                pointer,
1076                ts,
1077                log_size: log_reservation.reservation_len() as u64,
1078            };
1079
1080            let mut new_cache_infos =
1081                Vec::with_capacity(old.cache_infos.len() + 1);
1082            new_cache_infos.extend_from_slice(&old.cache_infos);
1083            new_cache_infos.push(cache_info);
1084
1085            let mut page_ptr = new_page.take().unwrap();
1086            page_ptr.cache_infos = new_cache_infos;
1087
1088            debug_delay();
1089            let result =
1090                old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
1091
1092            match result {
1093                Ok(new_shared) => {
1094                    trace!("link of pid {} succeeded", pid);
1095
1096                    unsafe {
1097                        guard.defer_destroy(old.read);
1098                    }
1099
1100                    assert_ne!(old.last_lsn(), 0);
1101
1102                    self.log.iobufs.sa_mark_link(pid, cache_info, guard);
1103
1104                    // NB complete must happen AFTER calls to SA, because
1105                    // when the iobuf's n_writers hits 0, we may transition
1106                    // the segment to inactive, resulting in a race otherwise.
1107                    // FIXME can result in deadlock if a node that holds SA
1108                    // is waiting to acquire a new reservation blocked by this?
1109                    log_reservation.complete()?;
1110
1111                    // possibly evict an item now that our cache has grown
1112                    let total_page_size =
1113                        unsafe { new_shared.deref().log_size() };
1114                    let to_evict =
1115                        self.lru.accessed(pid, total_page_size, guard);
1116                    trace!(
1117                        "accessed pid {} -> paging out pids {:?}",
1118                        pid,
1119                        to_evict
1120                    );
1121                    if !to_evict.is_empty() {
1122                        self.page_out(to_evict, guard)?;
1123                    }
1124
1125                    old.read = new_shared;
1126
1127                    return Ok(Ok(old));
1128                }
1129                Err(cas_error) => {
1130                    log_reservation.abort()?;
1131                    let actual = cas_error.current;
1132                    let actual_ts = unsafe { actual.deref().ts() };
1133                    if actual_ts == old.ts() {
1134                        trace!(
1135                            "link of pid {} failed due to movement, retrying",
1136                            pid
1137                        );
1138                        new_page = Some(cas_error.new);
1139
1140                        old.read = actual;
1141                    } else {
1142                        trace!("link of pid {} failed due to new update", pid);
1143                        let mut page_view = old;
1144                        page_view.read = actual;
1145                        return Ok(Err(Some((page_view, new))));
1146                    }
1147                }
1148            }
1149        }
1150    }
1151
1152    /// Node an existing page with a different set of `PageLink`s.
1153    /// Returns `Ok(new_key)` if the operation was successful. Returns
1154    /// `Err(None)` if the page no longer exists. Returns
1155    /// `Err(Some(actual_key))` if the atomic swap fails.
1156    pub(crate) fn replace<'g>(
1157        &self,
1158        pid: PageId,
1159        old: PageView<'g>,
1160        new: Node,
1161        guard: &'g Guard,
1162    ) -> Result<CasResult<'g, Node>> {
1163        let _measure = Measure::new(&M.replace_page);
1164
1165        trace!("replacing pid {} with {:?}", pid, new);
1166
1167        // A failure injector that fails replace calls randomly
1168        // during test to ensure interleaving coverage.
1169        #[cfg(any(test, feature = "lock_free_delays"))]
1170        {
1171            use std::cell::RefCell;
1172            use std::time::{SystemTime, UNIX_EPOCH};
1173
1174            thread_local! {
1175                pub static COUNT: RefCell<u32> = RefCell::new(1);
1176            }
1177
1178            let time_now =
1179                SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1180
1181            #[allow(clippy::cast_possible_truncation)]
1182            let fail_seed = std::cmp::max(3, time_now.as_nanos() as u32 % 128);
1183
1184            let inject_failure = COUNT.with(|c| {
1185                let mut cr = c.borrow_mut();
1186                *cr += 1;
1187                *cr % fail_seed == 0
1188            });
1189
1190            if inject_failure {
1191                debug!(
1192                    "injecting a randomized failure in the replace of pid {}",
1193                    pid
1194                );
1195                if let Some(current_pointer) = self.get(pid, guard)? {
1196                    return Ok(Err(Some((current_pointer.0, new))));
1197                } else {
1198                    return Ok(Err(None));
1199                }
1200            }
1201        }
1202
1203        let result =
1204            self.cas_page(pid, old, Update::Node(new), false, guard)?;
1205
1206        if let Some((pid_to_clean, segment_to_clean)) =
1207            self.log.iobufs.segment_cleaner.pop()
1208        {
1209            self.rewrite_page(pid_to_clean, segment_to_clean, guard)?;
1210        }
1211
1212        Ok(result.map_err(|fail| {
1213            let (pointer, shared) = fail.unwrap();
1214            if let Update::Node(rejected_new) = shared {
1215                Some((pointer, rejected_new))
1216            } else {
1217                unreachable!();
1218            }
1219        }))
1220    }
1221
1222    // rewrite a page so we can reuse the segment that it is
1223    // (at least partially) located in. This happens when a
1224    // segment has had enough resident page replacements moved
1225    // away to trigger the `segment_cleanup_threshold`.
1226    fn rewrite_page(
1227        &self,
1228        pid: PageId,
1229        segment_to_purge: LogOffset,
1230        guard: &Guard,
1231    ) -> Result<()> {
1232        let _measure = Measure::new(&M.rewrite_page);
1233
1234        trace!("rewriting pid {}", pid);
1235
1236        let purge_segment_id =
1237            segment_to_purge / self.config.segment_size as u64;
1238
1239        loop {
1240            let page_view = if let Some(page_view) = self.inner.get(pid, guard)
1241            {
1242                page_view
1243            } else {
1244                panic!("rewriting pid {} failed (no longer exists)", pid);
1245            };
1246
1247            let already_moved = !unsafe { page_view.read.deref() }
1248                .cache_infos
1249                .iter()
1250                .any(|ce| {
1251                    ce.pointer.lid() / self.config.segment_size as u64
1252                        == purge_segment_id
1253                });
1254            if already_moved {
1255                return Ok(());
1256            }
1257
1258            // if the page is just a single blob pointer, rewrite it.
1259            if let Some(disk_pointer) = page_view.lone_blob() {
1260                trace!("rewriting blob with pid {}", pid);
1261                let blob_pointer = disk_pointer.blob().1;
1262
1263                let log_reservation =
1264                    self.log.rewrite_blob_pointer(pid, blob_pointer, guard)?;
1265
1266                let cache_info = CacheInfo {
1267                    ts: page_view.ts(),
1268                    lsn: log_reservation.lsn,
1269                    pointer: log_reservation.pointer,
1270                    log_size: u64::try_from(log_reservation.reservation_len())
1271                        .unwrap(),
1272                };
1273
1274                let new_page = Owned::new(Page {
1275                    update: page_view.update.clone(),
1276                    cache_infos: vec![cache_info],
1277                });
1278
1279                debug_delay();
1280                let result = page_view.entry.compare_and_set(
1281                    page_view.read,
1282                    new_page,
1283                    SeqCst,
1284                    guard,
1285                );
1286
1287                if let Ok(new_shared) = result {
1288                    unsafe {
1289                        guard.defer_destroy(page_view.read);
1290                    }
1291
1292                    let lsn = log_reservation.lsn();
1293
1294                    self.log.iobufs.sa_mark_replace(
1295                        pid,
1296                        lsn,
1297                        &page_view.cache_infos,
1298                        cache_info,
1299                        guard,
1300                    )?;
1301
1302                    // NB complete must happen AFTER calls to SA, because
1303                    // when the iobuf's n_writers hits 0, we may transition
1304                    // the segment to inactive, resulting in a race otherwise.
1305                    let _pointer = log_reservation.complete()?;
1306
1307                    // possibly evict an item now that our cache has grown
1308                    let total_page_size =
1309                        unsafe { new_shared.deref().log_size() };
1310                    let to_evict =
1311                        self.lru.accessed(pid, total_page_size, guard);
1312                    trace!(
1313                        "accessed pid {} -> paging out pids {:?}",
1314                        pid,
1315                        to_evict
1316                    );
1317                    if !to_evict.is_empty() {
1318                        self.page_out(to_evict, guard)?;
1319                    }
1320
1321                    trace!("rewriting pid {} succeeded", pid);
1322
1323                    return Ok(());
1324                } else {
1325                    let _pointer = log_reservation.abort()?;
1326
1327                    trace!("rewriting pid {} failed", pid);
1328                }
1329            } else {
1330                trace!("rewriting page with pid {}", pid);
1331
1332                // page-in whole page with a get
1333                let (key, update): (_, Update) = if pid == META_PID {
1334                    let meta_view = self.get_meta(guard)?;
1335                    (meta_view.0, Update::Meta(meta_view.deref().clone()))
1336                } else if pid == COUNTER_PID {
1337                    let (key, counter) = self.get_idgen(guard)?;
1338                    (key, Update::Counter(counter))
1339                } else if let Some(node_view) = self.get(pid, guard)? {
1340                    (node_view.0, Update::Node(node_view.deref().clone()))
1341                } else {
1342                    let page_view = match self.inner.get(pid, guard) {
1343                        None => panic!("expected page missing in rewrite"),
1344                        Some(p) => p,
1345                    };
1346
1347                    if page_view.is_free() {
1348                        (page_view, Update::Free)
1349                    } else {
1350                        debug!(
1351                            "when rewriting pid {} \
1352                             we encountered a rewritten \
1353                             node with a link {:?} that \
1354                             we previously witnessed a Free \
1355                             for (PageCache::get returned None), \
1356                             assuming we can just return now since \
1357                             the Free was replace'd",
1358                            pid, page_view.update
1359                        );
1360                        return Ok(());
1361                    }
1362                };
1363
1364                let res = self.cas_page(pid, key, update, true, guard).map(
1365                    |res| {
1366                        trace!(
1367                            "rewriting pid {} success: {}",
1368                            pid,
1369                            res.is_ok()
1370                        );
1371                        res
1372                    },
1373                )?;
1374                if res.is_ok() {
1375                    return Ok(());
1376                }
1377            }
1378        }
1379    }
1380
1381    /// Traverses all files and calculates their total physical
1382    /// size, then traverses all pages and calculates their
1383    /// total logical size, then divides the physical size
1384    /// by the logical size.
1385    #[allow(clippy::cast_precision_loss)]
1386    #[allow(clippy::float_arithmetic)]
1387    #[doc(hidden)]
1388    pub(crate) fn space_amplification(&self) -> Result<f64> {
1389        let on_disk_bytes = self.size_on_disk()? as f64;
1390        let logical_size = (self.logical_size_of_all_pages()?
1391            + self.config.segment_size as u64)
1392            as f64;
1393
1394        Ok(on_disk_bytes / logical_size)
1395    }
1396
1397    pub(crate) fn size_on_disk(&self) -> Result<u64> {
1398        let mut size = self.config.file.metadata()?.len();
1399
1400        let stable = self.config.blob_path(0);
1401        let blob_dir = stable.parent().expect(
1402            "should be able to determine the parent for the blob directory",
1403        );
1404        let blob_files = std::fs::read_dir(blob_dir)?;
1405
1406        for blob_file in blob_files {
1407            let blob_file = if let Ok(bf) = blob_file {
1408                bf
1409            } else {
1410                continue;
1411            };
1412
1413            // it's possible the blob file was removed lazily
1414            // in the background and no longer exists
1415            #[cfg(not(miri))]
1416            {
1417                size += blob_file.metadata().map(|m| m.len()).unwrap_or(0);
1418            }
1419
1420            // workaround to avoid missing `dirfd` shim
1421            #[cfg(miri)]
1422            {
1423                size += std::fs::metadata(blob_file.path())
1424                    .map(|m| m.len())
1425                    .unwrap_or(0);
1426            }
1427        }
1428
1429        Ok(size)
1430    }
1431
1432    fn logical_size_of_all_pages(&self) -> Result<u64> {
1433        let guard = pin();
1434        let meta_size = self.get_meta(&guard)?.rss();
1435        let idgen_size = std::mem::size_of::<u64>() as u64;
1436
1437        let mut ret = meta_size + idgen_size;
1438        let min_pid = COUNTER_PID + 1;
1439        let next_pid_to_allocate = *self.next_pid_to_allocate.lock();
1440        for pid in min_pid..next_pid_to_allocate {
1441            if let Some(node_cell) = self.get(pid, &guard)? {
1442                ret += node_cell.rss();
1443            }
1444        }
1445        Ok(ret)
1446    }
1447
1448    fn cas_page<'g>(
1449        &self,
1450        pid: PageId,
1451        mut old: PageView<'g>,
1452        update: Update,
1453        is_rewrite: bool,
1454        guard: &'g Guard,
1455    ) -> Result<CasResult<'g, Update>> {
1456        trace!(
1457            "cas_page called on pid {} to {:?} with old ts {:?}",
1458            pid,
1459            update,
1460            old.ts()
1461        );
1462
1463        let log_kind = log_kind_from_update(&update);
1464        trace!("cas_page on pid {} has log kind: {:?}", pid, log_kind);
1465
1466        let mut new_page = Some(Owned::new(Page {
1467            update: Some(Box::new(update)),
1468            cache_infos: Vec::default(),
1469        }));
1470
1471        loop {
1472            let mut page_ptr = new_page.take().unwrap();
1473            let log_reservation = match &**page_ptr.update.as_ref().unwrap() {
1474                Update::Counter(ref c) => {
1475                    self.log.reserve(log_kind, pid, c, guard)?
1476                }
1477                Update::Meta(ref m) => {
1478                    self.log.reserve(log_kind, pid, m, guard)?
1479                }
1480                Update::Free => self.log.reserve(log_kind, pid, &(), guard)?,
1481                Update::Node(ref node) => {
1482                    self.log.reserve(log_kind, pid, node, guard)?
1483                }
1484                other => {
1485                    panic!("non-replacement used in cas_page: {:?}", other)
1486                }
1487            };
1488            let lsn = log_reservation.lsn();
1489            let new_pointer = log_reservation.pointer();
1490
1491            // NB the setting of the timestamp is quite
1492            // correctness-critical! We use the ts to
1493            // ensure that fundamentally new data causes
1494            // high-level link and replace operations
1495            // to fail when the data in the pagecache
1496            // actually changes. When we just rewrite
1497            // the page for the purposes of moving it
1498            // to a new location on disk, however, we
1499            // don't want to cause threads that are
1500            // basing the correctness of their new
1501            // writes on the unchanged state to fail.
1502            // Here, we only bump it up by 1 if the
1503            // update represents a fundamental change
1504            // that SHOULD cause CAS failures.
1505            // Here, we only bump it up by 1 if the
1506            // update represents a fundamental change
1507            // that SHOULD cause CAS failures.
1508            let ts = if is_rewrite { old.ts() } else { old.ts() + 1 };
1509
1510            let cache_info = CacheInfo {
1511                ts,
1512                lsn,
1513                pointer: new_pointer,
1514                log_size: u64::try_from(log_reservation.reservation_len())
1515                    .unwrap(),
1516            };
1517
1518            page_ptr.cache_infos = vec![cache_info];
1519
1520            debug_delay();
1521            let result =
1522                old.entry.compare_and_set(old.read, page_ptr, SeqCst, guard);
1523
1524            match result {
1525                Ok(new_shared) => {
1526                    unsafe {
1527                        guard.defer_destroy(old.read);
1528                    }
1529
1530                    trace!("cas_page succeeded on pid {}", pid);
1531                    self.log.iobufs.sa_mark_replace(
1532                        pid,
1533                        lsn,
1534                        &old.cache_infos,
1535                        cache_info,
1536                        guard,
1537                    )?;
1538
1539                    // NB complete must happen AFTER calls to SA, because
1540                    // when the iobuf's n_writers hits 0, we may transition
1541                    // the segment to inactive, resulting in a race otherwise.
1542                    let _pointer = log_reservation.complete()?;
1543
1544                    // possibly evict an item now that our cache has grown
1545                    let total_page_size =
1546                        unsafe { new_shared.deref().log_size() };
1547                    let to_evict =
1548                        self.lru.accessed(pid, total_page_size, guard);
1549                    trace!(
1550                        "accessed pid {} -> paging out pids {:?}",
1551                        pid,
1552                        to_evict
1553                    );
1554                    if !to_evict.is_empty() {
1555                        self.page_out(to_evict, guard)?;
1556                    }
1557
1558                    return Ok(Ok(PageView {
1559                        read: new_shared,
1560                        entry: old.entry,
1561                    }));
1562                }
1563                Err(cas_error) => {
1564                    trace!("cas_page failed on pid {}", pid);
1565                    let _pointer = log_reservation.abort()?;
1566
1567                    let current: Shared<'_, _> = cas_error.current;
1568                    let actual_ts = unsafe { current.deref().ts() };
1569
1570                    let mut returned_update: Owned<_> = cas_error.new;
1571
1572                    if actual_ts != old.ts() || is_rewrite {
1573                        return Ok(Err(Some((
1574                            PageView { read: current, entry: old.entry },
1575                            *returned_update.update.take().unwrap(),
1576                        ))));
1577                    }
1578                    trace!(
1579                        "retrying CAS on pid {} with same ts of {}",
1580                        pid,
1581                        old.ts()
1582                    );
1583                    old.read = current;
1584                    new_page = Some(returned_update);
1585                }
1586            } // match cas result
1587        } // loop
1588    }
1589
1590    /// Retrieve the current meta page
1591    pub(crate) fn get_meta<'g>(
1592        &self,
1593        guard: &'g Guard,
1594    ) -> Result<MetaView<'g>> {
1595        trace!("getting page iter for META");
1596
1597        let page_view = match self.inner.get(META_PID, guard) {
1598            None => {
1599                return Err(Error::ReportableBug(
1600                    "failed to retrieve META page \
1601                     which should always be present"
1602                        .into(),
1603                ));
1604            }
1605            Some(p) => p,
1606        };
1607
1608        if page_view.update.is_some() {
1609            Ok(MetaView(page_view))
1610        } else {
1611            Err(Error::ReportableBug(
1612                "failed to retrieve META page \
1613                 which should always be present"
1614                    .into(),
1615            ))
1616        }
1617    }
1618
1619    /// Retrieve the current persisted IDGEN value
1620    pub(crate) fn get_idgen<'g>(
1621        &self,
1622        guard: &'g Guard,
1623    ) -> Result<(PageView<'g>, u64)> {
1624        trace!("getting page iter for idgen");
1625
1626        let page_view = match self.inner.get(COUNTER_PID, guard) {
1627            None => {
1628                return Err(Error::ReportableBug(
1629                    "failed to retrieve counter page \
1630                     which should always be present"
1631                        .into(),
1632                ));
1633            }
1634            Some(p) => p,
1635        };
1636
1637        if page_view.update.is_some() {
1638            let counter = page_view.as_counter();
1639            Ok((page_view, counter))
1640        } else {
1641            Err(Error::ReportableBug(
1642                "failed to retrieve counter page \
1643                 which should always be present"
1644                    .into(),
1645            ))
1646        }
1647    }
1648
1649    /// Try to retrieve a page by its logical ID.
1650    pub(crate) fn get<'g>(
1651        &self,
1652        pid: PageId,
1653        guard: &'g Guard,
1654    ) -> Result<Option<NodeView<'g>>> {
1655        trace!("getting page iterator for pid {}", pid);
1656        let _measure = Measure::new(&M.get_page);
1657
1658        if pid == COUNTER_PID || pid == META_PID || pid == BATCH_MANIFEST_PID {
1659            return Err(Error::Unsupported(
1660                "you are not able to iterate over \
1661                 the first couple pages, which are \
1662                 reserved for storing metadata and \
1663                 monotonic ID generator info"
1664                    .into(),
1665            ));
1666        }
1667
1668        let mut last_attempted_cache_info = None;
1669        let mut last_err = None;
1670        let mut page_view;
1671
1672        let mut updates: Vec<Update> = loop {
1673            // we loop here because if the page we want to
1674            // pull is moved, we want to retry. but if we
1675            // get a corruption and then
1676            page_view = match self.inner.get(pid, guard) {
1677                None => return Ok(None),
1678                Some(p) => p,
1679            };
1680
1681            if page_view.is_free() {
1682                return Ok(None);
1683            }
1684
1685            if page_view.update.is_some() {
1686                // possibly evict an item now that our cache has grown
1687                let total_page_size = page_view.log_size();
1688                let to_evict = self.lru.accessed(pid, total_page_size, guard);
1689                trace!(
1690                    "accessed pid {} -> paging out pids {:?}",
1691                    pid,
1692                    to_evict
1693                );
1694                if !to_evict.is_empty() {
1695                    self.page_out(to_evict, guard)?;
1696                }
1697                return Ok(Some(NodeView(page_view)));
1698            }
1699
1700            trace!(
1701                "pulling pid {} view {:?} deref {:?}",
1702                pid,
1703                page_view,
1704                page_view.deref()
1705            );
1706            if page_view.cache_infos.first()
1707                == last_attempted_cache_info.as_ref()
1708            {
1709                return Err(last_err.unwrap());
1710            } else {
1711                last_attempted_cache_info =
1712                    page_view.cache_infos.first().copied();
1713            }
1714
1715            // need to page-in
1716            let updates_result: Result<Vec<Update>> = page_view
1717                .cache_infos
1718                .iter()
1719                .map(|ci| self.pull(pid, ci.lsn, ci.pointer))
1720                .collect();
1721
1722            last_err = if let Ok(updates) = updates_result {
1723                break updates;
1724            } else {
1725                Some(updates_result.unwrap_err())
1726            };
1727        };
1728
1729        let (base_slice, links) = updates.split_at_mut(1);
1730
1731        let base: &mut Node = base_slice[0].as_node_mut();
1732
1733        for link_update in links {
1734            let link: &Link = link_update.as_link();
1735            base.apply(link);
1736        }
1737
1738        updates.truncate(1);
1739        let base = updates.pop().unwrap();
1740
1741        let page = Owned::new(Page {
1742            update: Some(Box::new(base)),
1743            cache_infos: page_view.cache_infos.clone(),
1744        });
1745
1746        debug_delay();
1747        let result = page_view.entry.compare_and_set(
1748            page_view.read,
1749            page,
1750            SeqCst,
1751            guard,
1752        );
1753
1754        if let Ok(new_shared) = result {
1755            trace!("fix-up for pid {} succeeded", pid);
1756
1757            unsafe {
1758                guard.defer_destroy(page_view.read);
1759            }
1760
1761            // possibly evict an item now that our cache has grown
1762            let total_page_size = unsafe { new_shared.deref().log_size() };
1763            let to_evict = self.lru.accessed(pid, total_page_size, guard);
1764            trace!("accessed pid {} -> paging out pids {:?}", pid, to_evict);
1765            if !to_evict.is_empty() {
1766                self.page_out(to_evict, guard)?;
1767            }
1768
1769            let mut page_view = page_view;
1770            page_view.read = new_shared;
1771
1772            Ok(Some(NodeView(page_view)))
1773        } else {
1774            trace!("fix-up for pid {} failed", pid);
1775
1776            self.get(pid, guard)
1777        }
1778    }
1779
1780    /// Returns `true` if the database was
1781    /// recovered from a previous process.
1782    /// Note that database state is only
1783    /// guaranteed to be present up to the
1784    /// last call to `flush`! Otherwise state
1785    /// is synced to disk periodically if the
1786    /// `sync_every_ms` configuration option
1787    /// is set to `Some(number_of_ms_between_syncs)`
1788    /// or if the IO buffer gets filled to
1789    /// capacity before being rotated.
1790    pub const fn was_recovered(&self) -> bool {
1791        self.was_recovered
1792    }
1793
1794    /// Generate a monotonic ID. Not guaranteed to be
1795    /// contiguous. Written to disk every `idgen_persist_interval`
1796    /// operations, followed by a blocking flush. During recovery, we
1797    /// take the last recovered generated ID and add 2x
1798    /// the `idgen_persist_interval` to it. While persisting, if the
1799    /// previous persisted counter wasn't synced to disk yet, we will do
1800    /// a blocking flush to fsync the latest counter, ensuring
1801    /// that we will never give out the same counter twice.
1802    pub(crate) fn generate_id_inner(&self) -> Result<u64> {
1803        let ret = self.idgen.fetch_add(1, Release);
1804
1805        trace!("generating ID {}", ret);
1806
1807        let interval = self.config.idgen_persist_interval;
1808        let necessary_persists = ret / interval * interval;
1809        let mut persisted = self.idgen_persists.load(Acquire);
1810
1811        while persisted < necessary_persists {
1812            let _mu = self.idgen_persist_mu.lock();
1813            persisted = self.idgen_persists.load(Acquire);
1814            if persisted < necessary_persists {
1815                // it's our responsibility to persist up to our ID
1816                trace!(
1817                    "persisting ID gen, as persist count {} \
1818                    is below necessary persists {}",
1819                    persisted,
1820                    necessary_persists
1821                );
1822                let guard = pin();
1823                let (key, current) = self.get_idgen(&guard)?;
1824
1825                assert_eq!(current, persisted);
1826
1827                let counter_update = Update::Counter(necessary_persists);
1828
1829                let old = self.idgen_persists.swap(necessary_persists, Release);
1830                assert_eq!(old, persisted);
1831
1832                if self
1833                    .cas_page(COUNTER_PID, key, counter_update, false, &guard)?
1834                    .is_err()
1835                {
1836                    // CAS failed
1837                    continue;
1838                }
1839
1840                // during recovery we add 2x the interval. we only
1841                // need to block if the last one wasn't stable yet.
1842                // we only call make_durable instead of make_stable
1843                // because we took out the initial reservation
1844                // outside of a writebatch (guaranteed by using the reader
1845                // concurrency control) and it's possible we
1846                // could cyclically wait if the reservation for
1847                // a replacement happened inside a writebatch.
1848                iobuf::make_durable(&self.log.iobufs, key.last_lsn())?;
1849            }
1850        }
1851
1852        Ok(ret)
1853    }
1854
1855    /// Look up a `PageId` for a given identifier in the `Meta`
1856    /// mapping. This is pretty cheap, but in some cases
1857    /// you may prefer to maintain your own atomic references
1858    /// to collection roots instead of relying on this. See
1859    /// sled's `Tree` root tracking for an example of
1860    /// avoiding this in a lock-free way that handles
1861    /// various race conditions.
1862    pub(crate) fn meta_pid_for_name(
1863        &self,
1864        name: &[u8],
1865        guard: &Guard,
1866    ) -> Result<PageId> {
1867        let m = self.get_meta(guard)?;
1868        if let Some(root) = m.get_root(name) {
1869            Ok(root)
1870        } else {
1871            Err(Error::CollectionNotFound(name.into()))
1872        }
1873    }
1874
1875    /// Compare-and-swap the `Meta` mapping for a given
1876    /// identifier.
1877    pub(crate) fn cas_root_in_meta<'g>(
1878        &self,
1879        name: &[u8],
1880        old: Option<PageId>,
1881        new: Option<PageId>,
1882        guard: &'g Guard,
1883    ) -> Result<std::result::Result<(), Option<PageId>>> {
1884        loop {
1885            let meta_view = self.get_meta(guard)?;
1886
1887            let actual = meta_view.get_root(name);
1888            if actual != old {
1889                return Ok(Err(actual));
1890            }
1891
1892            let mut new_meta = meta_view.deref().clone();
1893            if let Some(new) = new {
1894                new_meta.set_root(name.into(), new);
1895            } else {
1896                new_meta.del_root(name);
1897            }
1898
1899            let new_meta_link = Update::Meta(new_meta);
1900
1901            let res = self.cas_page(
1902                META_PID,
1903                meta_view.0,
1904                new_meta_link,
1905                false,
1906                guard,
1907            )?;
1908
1909            match res {
1910                Ok(_worked) => return Ok(Ok(())),
1911                Err(Some((_current_pointer, _rejected))) => {}
1912                Err(None) => {
1913                    return Err(Error::ReportableBug(
1914                        "replacing the META page has failed because \
1915                         the pagecache does not think it currently exists."
1916                            .into(),
1917                    ));
1918                }
1919            }
1920        }
1921    }
1922
1923    fn page_out(&self, to_evict: Vec<PageId>, guard: &Guard) -> Result<()> {
1924        let _measure = Measure::new(&M.page_out);
1925        for pid in to_evict {
1926            if pid == COUNTER_PID
1927                || pid == META_PID
1928                || pid == BATCH_MANIFEST_PID
1929            {
1930                // should not page these suckas out
1931                continue;
1932            }
1933            loop {
1934                if let Some(page_view) = self.inner.get(pid, guard) {
1935                    if page_view.is_free() {
1936                        // don't page-out Freed suckas
1937                        break;
1938                    }
1939                    let new_page = Owned::new(Page {
1940                        update: None,
1941                        cache_infos: page_view.cache_infos.clone(),
1942                    });
1943
1944                    debug_delay();
1945                    if page_view
1946                        .entry
1947                        .compare_and_set(
1948                            page_view.read,
1949                            new_page,
1950                            SeqCst,
1951                            guard,
1952                        )
1953                        .is_ok()
1954                    {
1955                        unsafe {
1956                            guard.defer_destroy(page_view.read);
1957                        }
1958
1959                        break;
1960                    }
1961                    // keep looping until we page this sucka out
1962                }
1963            }
1964        }
1965        Ok(())
1966    }
1967
1968    fn pull(&self, pid: PageId, lsn: Lsn, pointer: DiskPtr) -> Result<Update> {
1969        use MessageKind::*;
1970
1971        trace!("pulling pid {} lsn {} pointer {} from disk", pid, lsn, pointer);
1972        let _measure = Measure::new(&M.pull);
1973
1974        let expected_segment_number: SegmentNumber = SegmentNumber(
1975            u64::try_from(lsn).unwrap()
1976                / u64::try_from(self.config.segment_size).unwrap(),
1977        );
1978
1979        let (header, bytes) = match self.log.read(pid, lsn, pointer) {
1980            Ok(LogRead::Inline(header, buf, _len)) => {
1981                assert_eq!(
1982                    header.pid, pid,
1983                    "expected pid {} on pull of pointer {}, \
1984                     but got {} instead",
1985                    pid, pointer, header.pid
1986                );
1987                assert_eq!(
1988                    header.segment_number, expected_segment_number,
1989                    "expected segment number {:?} on pull of pointer {}, \
1990                     but got segment number {:?} instead",
1991                    expected_segment_number, pointer, header.segment_number
1992                );
1993                Ok((header, buf))
1994            }
1995            Ok(LogRead::Blob(header, buf, _blob_pointer, _inline_len)) => {
1996                assert_eq!(
1997                    header.pid, pid,
1998                    "expected pid {} on pull of pointer {}, \
1999                     but got {} instead",
2000                    pid, pointer, header.pid
2001                );
2002                assert_eq!(
2003                    header.segment_number, expected_segment_number,
2004                    "expected segment number {:?} on pull of pointer {}, \
2005                     but got segment number {:?} instead",
2006                    expected_segment_number, pointer, header.segment_number
2007                );
2008
2009                Ok((header, buf))
2010            }
2011            Ok(other) => {
2012                debug!("read unexpected page: {:?}", other);
2013                Err(Error::corruption(Some(pointer)))
2014            }
2015            Err(e) => {
2016                debug!("failed to read page: {:?}", e);
2017                Err(e)
2018            }
2019        }?;
2020
2021        // We create this &mut &[u8] to assist the `Serializer`
2022        // implementation that incrementally consumes bytes
2023        // without taking ownership of them.
2024        let buf = &mut bytes.as_slice();
2025
2026        let update_res = {
2027            let _deserialize_latency = Measure::new(&M.deserialize);
2028
2029            match header.kind {
2030                Counter => u64::deserialize(buf).map(Update::Counter),
2031                BlobMeta | InlineMeta => {
2032                    Meta::deserialize(buf).map(Update::Meta)
2033                }
2034                BlobLink | InlineLink => {
2035                    Link::deserialize(buf).map(Update::Link)
2036                }
2037                BlobNode | InlineNode => {
2038                    Node::deserialize(buf).map(Update::Node)
2039                }
2040                Free => Ok(Update::Free),
2041                Corrupted | Canceled | Cap | BatchManifest => {
2042                    panic!("unexpected pull: {:?}", header.kind)
2043                }
2044            }
2045        };
2046
2047        let update = update_res.expect("failed to deserialize data");
2048
2049        // TODO this feels racy, test it better?
2050        if let Update::Free = update {
2051            Err(Error::ReportableBug(format!(
2052                "non-link/replace found in pull of pid {}",
2053                pid
2054            )))
2055        } else {
2056            Ok(update)
2057        }
2058    }
2059
2060    fn load_snapshot(&mut self, snapshot: &Snapshot) -> Result<()> {
2061        let next_pid_to_allocate = snapshot.pt.len() as PageId;
2062
2063        self.next_pid_to_allocate = Mutex::new(next_pid_to_allocate);
2064
2065        debug!("load_snapshot loading pages from 0..{}", next_pid_to_allocate);
2066        for pid in 0..next_pid_to_allocate {
2067            let state = if let Some(state) =
2068                snapshot.pt.get(usize::try_from(pid).unwrap())
2069            {
2070                state
2071            } else {
2072                panic!(
2073                    "load_snapshot pid {} not found, despite being below the max pid {}",
2074                    pid, next_pid_to_allocate
2075                );
2076            };
2077
2078            trace!("load_snapshot pid {} {:?}", pid, state);
2079
2080            let mut cache_infos = Vec::default();
2081
2082            let guard = pin();
2083
2084            match *state {
2085                PageState::Present { base, ref frags } => {
2086                    cache_infos.push(CacheInfo {
2087                        lsn: base.0,
2088                        pointer: base.1,
2089                        log_size: base.2,
2090                        ts: 0,
2091                    });
2092                    for (lsn, pointer, sz) in frags {
2093                        let cache_info = CacheInfo {
2094                            lsn: *lsn,
2095                            pointer: *pointer,
2096                            log_size: *sz,
2097                            ts: 0,
2098                        };
2099
2100                        cache_infos.push(cache_info);
2101                    }
2102                }
2103                PageState::Free(lsn, pointer) => {
2104                    // blow away any existing state
2105                    trace!("load_snapshot freeing pid {}", pid);
2106                    let cache_info = CacheInfo {
2107                        lsn,
2108                        pointer,
2109                        log_size: u64::try_from(MAX_MSG_HEADER_LEN).unwrap(),
2110                        ts: 0,
2111                    };
2112                    cache_infos.push(cache_info);
2113                    self.free.lock().push(pid);
2114                }
2115                _ => panic!("tried to load a {:?}", state),
2116            }
2117
2118            // Set up new page
2119            trace!("installing page for pid {}", pid);
2120
2121            let update = if pid == META_PID || pid == COUNTER_PID {
2122                let update =
2123                    self.pull(pid, cache_infos[0].lsn, cache_infos[0].pointer)?;
2124                Some(Box::new(update))
2125            } else if state.is_free() {
2126                Some(Box::new(Update::Free))
2127            } else {
2128                None
2129            };
2130            let page = Page { update, cache_infos };
2131
2132            self.inner.insert(pid, page, &guard);
2133        }
2134
2135        Ok(())
2136    }
2137
2138    /// A snapshot is to recover the pageTable at a point in time.
2139    ///
2140    /// This is called fuzzy snapshot because while
2141    /// we are taking a snapshot, the ongoing inserts
2142    /// into the database will keep bumping up the Lsn.
2143    /// Therefore, the `stable_lsn` gives us the state
2144    /// of the world, when the snapshot was taken.
2145    #[allow(unused)]
2146    fn take_fuzzy_snapshot(self) -> Snapshot {
2147        let stable_lsn_now: Lsn = self.log.stable_offset();
2148
2149        // This is how we determine the number of the pages we will snapshot.
2150        let pid_bound = *self.next_pid_to_allocate.lock();
2151
2152        let pid_bound_usize = assert_usize(pid_bound);
2153
2154        let mut page_states = Vec::<PageState>::with_capacity(pid_bound_usize);
2155        let guard = pin();
2156        for pid in 0..pid_bound {
2157            'inner: loop {
2158                if let Some(pg_view) = self.inner.get(pid, &guard) {
2159                    if pg_view.cache_infos.is_empty() {
2160                        // there is a benign race with the thread
2161                        // that is allocating this page. the allocating
2162                        // thread has not yet written the new page to disk,
2163                        // and it does not yet have any storage tracking
2164                        // information.
2165                        std::thread::yield_now();
2166                    } else {
2167                        let page_state = pg_view.to_page_state();
2168                        page_states.push(page_state);
2169                        break 'inner;
2170                    }
2171                } else {
2172                    // there is a benign race with the thread
2173                    // that bumped the next_pid_to_allocate
2174                    // atomic counter above. it has not yet
2175                    // installed the page that it is allocating.
2176                    std::thread::yield_now();
2177                }
2178            }
2179        }
2180
2181        Snapshot {
2182            stable_lsn: Some(stable_lsn_now),
2183            active_segment: None,
2184            pt: page_states,
2185        }
2186    }
2187}