1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::collections::BTreeMap;
use std::fmt::{self, Debug};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;

use bincode::{Infinite, deserialize, serialize};
use serde::de::DeserializeOwned;
use serde::Serialize;
use coco::epoch::Ptr;

use super::*;

mod page_cache;

pub use self::page_cache::PageCache;

/// A user of a `PageCache` needs to provide a `Materializer` which
/// handles the merging of page fragments.
pub trait Materializer {
    /// The possibly fragmented page, written to log storage sequentially, and
    /// read in parallel from multiple locations on disk when serving
    /// a request to read the page. These will be merged to a single version
    /// at read time, and possibly cached.
    type PageFrag;

    /// The state returned by a call to `PageCache::recover`, as
    /// described by `Materializer::recover`
    type Recovery;

    /// Used to compress long chains of partial pages into a condensed form
    /// during compaction.
    fn merge(&self, &[&Self::PageFrag]) -> Self::PageFrag;

    /// Used to feed custom recovery information back to a higher-level abstraction
    /// during startup. For example, a B-Link tree must know what the current
    /// root node is before it can start serving requests.
    fn recover(&self, &Self::PageFrag) -> Option<Self::Recovery>;
}

/// Points to either a memory location or a disk location to page-in data from.
#[derive(Debug, Clone, PartialEq)]
pub enum CacheEntry<M: Send + Sync> {
    /// A cache item that contains the most recent fully-merged page state, also in secondary
    /// storage.
    MergedResident(M, LogID),
    /// A cache item that is in memory, and also in secondary storage.
    Resident(M, LogID),
    /// A cache item that is present in secondary storage.
    PartialFlush(LogID),
    /// A cache item that is present in secondary storage, and is the base segment
    /// of a page.
    Flush(LogID),
}

/// A wrapper struct for a pointer to a (possibly invalid, hence inaccessible)
/// `PageFrag` used for applying updates atomically to shared pages.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct CasKey<P>
    where P: 'static + Send + Sync
{
    ptr: *const ds::stack::Node<CacheEntry<P>>,
    tag: usize,
}

impl<'s, P> From<Ptr<'s, ds::stack::Node<CacheEntry<P>>>> for CasKey<P>
    where P: 'static + Send + Sync
{
    fn from(ptr: Ptr<'s, ds::stack::Node<CacheEntry<P>>>) -> CasKey<P> {
        CasKey {
            ptr: ptr.as_raw(),
            tag: ptr.tag(),
        }
    }
}

impl<'s, P> Into<Ptr<'s, ds::stack::Node<CacheEntry<P>>>> for CasKey<P>
    where P: 'static + Send + Sync
{
    fn into(self) -> Ptr<'s, ds::stack::Node<CacheEntry<P>>> {
        unsafe { Ptr::from_raw(self.ptr).with_tag(self.tag) }
    }
}

/// `LoggedUpdate` is for writing blocks of `Update`'s to disk
/// sequentially, to reduce IO during page reads.
#[serde(bound(deserialize = ""))]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(super) struct LoggedUpdate<PageFrag>
    where PageFrag: Serialize + DeserializeOwned
{
    pid: PageID,
    update: Update<PageFrag>,
}

#[serde(bound(deserialize = ""))]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
enum Update<PageFrag>
    where PageFrag: DeserializeOwned + Serialize
{
    Append(PageFrag),
    Compact(PageFrag),
    Del,
    Alloc,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
struct Snapshot<R> {
    pub max_lid: LogID,
    pub max_pid: PageID,
    pub pt: BTreeMap<PageID, Vec<LogID>>,
    pub free: Vec<PageID>,
    pub recovery: Option<R>,
}

impl<R> Default for Snapshot<R> {
    fn default() -> Snapshot<R> {
        Snapshot {
            max_lid: 0,
            max_pid: 0,
            pt: BTreeMap::new(),
            free: vec![],
            recovery: None,
        }
    }
}

struct PidDropper(PageID, Arc<Stack<PageID>>);

impl Drop for PidDropper {
    fn drop(&mut self) {
        self.1.push(self.0);
    }
}

struct LidDropper(Vec<LogID>, Config);

impl Drop for LidDropper {
    fn drop(&mut self) {
        let cached_f = self.1.cached_file();
        let mut f = cached_f.borrow_mut();
        for lid in &self.0 {
            if let Err(_e) = log::punch_hole(&mut f, *lid) {
            #[cfg(feature = "log")]
                error!("failed to punch hole in log: {}", _e);
            }
        }
    }
}