use std::{
collections::BinaryHeap,
ops::Deref,
sync::{Arc, Mutex},
};
use crate::sync::{Owned, Shared};
use rayon::prelude::*;
use pagetable::PageTable;
use super::*;
type PagePtrInner<'g, P> = sync::Shared<'g, Node<CacheEntry<P>>>;
#[derive(Debug, Clone, PartialEq)]
pub struct PagePtr<'g, P>(PagePtrInner<'g, P>)
where
P: 'static + Send;
impl<'g, P> PagePtr<'g, P>
where
P: 'static + Send,
{
pub fn allocated() -> PagePtr<'g, P> {
PagePtr(sync::Shared::null())
}
pub fn is_allocated(&self) -> bool {
self.0.is_null()
}
unsafe fn deref_merged_resident(&self) -> &'g P
where
P: Debug,
{
match self.0.deref().deref() {
CacheEntry::MergedResident(m, ..) => m,
other => {
panic!("called deref_merged_resident on {:?}", other);
}
}
}
}
unsafe impl<'g, P> Send for PagePtr<'g, P> where P: Send {}
unsafe impl<'g, P> Sync for PagePtr<'g, P> where P: Send + Sync {}
#[derive(Debug, Clone, PartialEq)]
pub enum CacheEntry<M: Send> {
MergedResident(M, Lsn, DiskPtr),
Resident(M, Lsn, DiskPtr),
PartialFlush(Lsn, DiskPtr),
Flush(Lsn, DiskPtr),
Free(Lsn, DiskPtr),
}
impl<M: Send> CacheEntry<M> {
fn ptr(&self) -> DiskPtr {
use self::CacheEntry::*;
match self {
MergedResident(_, _, ptr)
| Resident(_, _, ptr)
| PartialFlush(_, ptr)
| Flush(_, ptr)
| Free(_, ptr) => *ptr,
}
}
fn ptr_ref_mut(&mut self) -> &mut DiskPtr {
use self::CacheEntry::*;
match self {
MergedResident(_, _, ptr)
| Resident(_, _, ptr)
| PartialFlush(_, ptr)
| Flush(_, ptr)
| Free(_, ptr) => ptr,
}
}
}
#[serde(bound(deserialize = ""))]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(super) struct LoggedUpdate<PageFrag>
where
PageFrag: Serialize + DeserializeOwned,
{
pub(super) pid: PageId,
pub(super) update: Update<PageFrag>,
}
#[serde(bound(deserialize = ""))]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(super) enum Update<PageFrag>
where
PageFrag: DeserializeOwned + Serialize,
{
Append(PageFrag),
Compact(PageFrag),
Free,
Allocate,
}
#[derive(Clone, Debug, PartialEq)]
pub enum PageGet<'a, PageFrag>
where
PageFrag: 'static + DeserializeOwned + Serialize + Send + Sync,
{
Materialized(&'a PageFrag, PagePtr<'a, PageFrag>),
Free(PagePtr<'a, PageFrag>),
Allocated,
Unallocated,
}
unsafe impl<'a, P> Send for PageGet<'a, P> where
P: DeserializeOwned + Serialize + Send + Sync
{}
unsafe impl<'a, P> Sync for PageGet<'a, P> where
P: DeserializeOwned + Serialize + Send + Sync
{}
impl<'a, P> PageGet<'a, P>
where
P: DeserializeOwned + Serialize + Send + Sync,
{
pub fn unwrap(self) -> (&'a P, PagePtr<'a, P>) {
match self {
PageGet::Materialized(pr, hptr) => (pr, hptr),
_ => panic!("unwrap called on non-Materialized"),
}
}
pub fn is_materialized(&self) -> bool {
match *self {
PageGet::Materialized(..) => true,
_ => false,
}
}
pub fn is_free(&self) -> bool {
match *self {
PageGet::Free(_) => true,
_ => false,
}
}
pub fn is_allocated(&self) -> bool {
match *self {
PageGet::Allocated => true,
_ => false,
}
}
pub fn is_unallocated(&self) -> bool {
match *self {
PageGet::Unallocated => true,
_ => false,
}
}
}
pub struct PageCache<PM, P, R>
where
P: 'static + Send + Sync,
{
t: Arc<PM>,
config: Config,
inner: Arc<PageTable<Stack<CacheEntry<P>>>>,
max_pid: AtomicUsize,
free: Arc<Mutex<BinaryHeap<PageId>>>,
log: Log,
lru: Lru,
updates: AtomicUsize,
last_snapshot: Arc<Mutex<Option<Snapshot<R>>>>,
}
unsafe impl<PM, P, R> Send for PageCache<PM, P, R>
where
PM: Send + Sync,
P: 'static + Send + Sync,
R: Send + Sync,
{}
unsafe impl<PM, P, R> Sync for PageCache<PM, P, R>
where
PM: Send + Sync,
P: 'static + Send + Sync,
R: Send + Sync,
{}
impl<PM, P, R> Debug for PageCache<PM, P, R>
where
PM: Send + Sync,
P: Debug + Send + Sync,
R: Debug + Send + Sync,
{
fn fmt(
&self,
f: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
f.write_str(&*format!(
"PageCache {{ max: {:?} free: {:?} }}\n",
self.max_pid.load(SeqCst),
self.free
))
}
}
impl<PM, P, R> PageCache<PM, P, R>
where
PM: Materializer<PageFrag = P, Recovery = R>,
PM: 'static + Send + Sync,
P: 'static
+ Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync,
R: Debug + Clone + Serialize + DeserializeOwned + Send + Sync,
{
pub fn start(config: Config) -> Result<PageCache<PM, P, R>, ()> {
let cache_capacity = config.cache_capacity;
let cache_shard_bits = config.cache_bits;
let lru = Lru::new(cache_capacity, cache_shard_bits);
let snapshot = read_snapshot_or_default::<PM, P, R>(&config)?;
let materializer =
Arc::new(PM::new(config.clone(), &snapshot.recovery));
let mut pc = PageCache {
t: materializer,
config: config.clone(),
inner: Arc::new(PageTable::default()),
max_pid: AtomicUsize::new(0),
free: Arc::new(Mutex::new(BinaryHeap::new())),
log: Log::start(config, snapshot.clone())?,
lru: lru,
updates: AtomicUsize::new(0),
last_snapshot: Arc::new(Mutex::new(Some(snapshot))),
};
pc.load_snapshot();
Ok(pc)
}
pub fn flush(&self) -> Result<(), ()> {
self.log.flush()
}
pub fn recovered_state(&self) -> Option<R> {
let mu = match self.last_snapshot.lock() {
Ok(mu) => mu,
Err(_) => return None,
};
if let Some(ref snapshot) = *mu {
snapshot.recovery.clone()
} else {
None
}
}
pub fn allocate<'g>(
&self,
guard: &'g Guard,
) -> Result<PageId, ()> {
let pid = if let Some(pid) = self.free.lock().unwrap().pop() {
trace!("re-allocating pid {}", pid);
let p = self.inner.del(pid, guard).unwrap();
unsafe {
match p.deref().head(guard).deref().deref() {
CacheEntry::Free(..) => (),
_ => panic!("expected page {} to be Free", pid),
}
};
pid
} else {
let pid = self.max_pid.fetch_add(1, SeqCst);
trace!("allocating pid {}", pid);
pid
};
let new_stack = Stack::default();
let stack_ptr = Owned::new(new_stack).into_shared(guard);
self.inner
.cas(pid, Shared::null(), stack_ptr, guard)
.expect("allocating new page should never encounter existing data");
self.cas_page(pid, Shared::null(), Update::Allocate, &guard)
.map_err(|e| e.danger_cast())?;
Ok(pid)
}
pub fn free<'g>(
&self,
pid: PageId,
old: PagePtr<'g, P>,
guard: &'g Guard,
) -> Result<(), Option<PagePtr<'g, P>>> {
trace!("attempting to free pid {}", pid);
self.cas_page(pid, old.0, Update::Free, guard)?;
let free = self.free.clone();
guard.defer(move || {
let mut free = free.lock().unwrap();
for &e in free.iter() {
assert_ne!(e, pid, "page {} was double-freed", pid);
}
free.push(pid);
});
Ok(())
}
pub fn link<'g>(
&self,
pid: PageId,
old: PagePtr<'g, P>,
new: P,
guard: &'g Guard,
) -> Result<PagePtr<'g, P>, Option<PagePtr<'g, P>>> {
if old.is_allocated() {
return self.replace(pid, old, new, guard);
}
let stack_ptr = match self.inner.get(pid, guard) {
None => return Err(Error::CasFailed(None)),
Some(s) => s,
};
let prepend: LoggedUpdate<P> = LoggedUpdate {
pid: pid,
update: Update::Append(new.clone()),
};
let bytes =
measure(&M.serialize, || serialize(&prepend).unwrap());
let log_reservation =
self.log.reserve(bytes).map_err(|e| e.danger_cast())?;
let lsn = log_reservation.lsn();
let ptr = log_reservation.ptr();
let cache_entry = CacheEntry::Resident(new, lsn, ptr);
debug_delay();
let result = unsafe {
stack_ptr.deref().cap(old.0, cache_entry, guard)
};
if result.is_err() {
log_reservation.abort().map_err(|e| e.danger_cast())?;
} else {
let to_clean = self.log.with_sa(|sa| {
sa.mark_link(pid, lsn, ptr);
sa.clean(None)
});
log_reservation
.complete()
.map_err(|e| e.danger_cast())?;
if let Some(to_clean) = to_clean {
let _ = self.rewrite_page(to_clean, guard);
}
let count = self.updates.fetch_add(1, SeqCst) + 1;
let should_snapshot =
count % self.config.snapshot_after_ops == 0;
if should_snapshot {
self.advance_snapshot()
.map_err(|e| e.danger_cast())?;
}
}
result
.map(|p| PagePtr(p))
.map_err(|e| Error::CasFailed(Some(PagePtr(e))))
}
pub fn replace<'g>(
&self,
pid: PageId,
old: PagePtr<'g, P>,
new: P,
guard: &'g Guard,
) -> Result<PagePtr<'g, P>, Option<PagePtr<'g, P>>> {
trace!("replacing pid {}", pid);
let result =
self.cas_page(pid, old.0, Update::Compact(new), guard);
if result.is_ok() {
let to_clean = self.log.with_sa(|sa| sa.clean(Some(pid)));
if let Some(to_clean) = to_clean {
assert_ne!(pid, to_clean);
let _ = self.rewrite_page(to_clean, guard);
}
let count = self.updates.fetch_add(1, SeqCst) + 1;
let should_snapshot =
count % self.config.snapshot_after_ops == 0;
if should_snapshot {
self.advance_snapshot()
.map_err(|e| e.danger_cast())?;
}
}
result.map(|p| PagePtr(p))
}
fn rewrite_page<'g>(
&self,
pid: PageId,
guard: &'g Guard,
) -> Result<(), Option<PagePtr<'g, P>>> {
let _measure = Measure::new(&M.rewrite_page);
let stack_ptr = match self.inner.get(pid, guard) {
None => return Ok(()),
Some(s) => s,
};
debug_delay();
let head = unsafe { stack_ptr.deref().head(guard) };
let stack_iter = StackIter::from_ptr(head, guard);
let cache_entries: Vec<_> = stack_iter.collect();
if cache_entries.len() == 1
&& cache_entries[0].ptr().is_blob()
{
trace!("rewriting blob with pid {}", pid);
let blob_ptr = cache_entries[0].ptr().blob().1;
let log_reservation = self
.log
.reserve_blob(blob_ptr)
.map_err(|e| e.danger_cast())?;
let new_ptr = log_reservation.ptr();
let mut new_cache_entry = cache_entries[0].clone();
*new_cache_entry.ptr_ref_mut() = new_ptr.clone();
let node = node_from_frag_vec(vec![new_cache_entry])
.into_shared(guard);
debug_delay();
let result =
unsafe { stack_ptr.deref().cas(head, node, guard) };
if result.is_ok() {
let ptrs = ptrs_from_stack(head, guard);
let lsn = log_reservation.lsn();
self.log
.with_sa(|sa| {
sa.mark_replace(pid, lsn, ptrs, new_ptr)
}).map_err(|e| e.danger_cast())?;
log_reservation
.complete()
.map_err(|e| e.danger_cast())?;
} else {
log_reservation
.abort()
.map_err(|e| e.danger_cast())?;
}
result
.map(|_| ())
.map_err(|e| Error::CasFailed(Some(PagePtr(e))))
} else {
trace!("rewriting page with pid {}", pid);
let (key, update) = match self.get(pid, guard)? {
PageGet::Materialized(data, key) => {
(key, Update::Compact(data.clone()))
}
PageGet::Free(key) => (key, Update::Free),
PageGet::Allocated => {
(PagePtr(Shared::null()), Update::Allocate)
}
PageGet::Unallocated => {
panic!("get returned Unallocated");
}
};
self.cas_page(pid, key.0, update, guard).map(|_| ())?;
Ok(())
}
}
fn cas_page<'g>(
&self,
pid: PageId,
old: PagePtrInner<'g, P>,
new: Update<P>,
guard: &'g Guard,
) -> Result<PagePtrInner<'g, P>, Option<PagePtr<'g, P>>> {
let stack_ptr = match self.inner.get(pid, guard) {
None => {
trace!(
"early-returning from cas_page, no stack found"
);
return Err(Error::CasFailed(None));
}
Some(s) => s,
};
let replace: LoggedUpdate<P> = LoggedUpdate {
pid: pid,
update: new.clone(),
};
let bytes =
measure(&M.serialize, || serialize(&replace).unwrap());
let log_reservation =
self.log.reserve(bytes).map_err(|e| e.danger_cast())?;
let lsn = log_reservation.lsn();
let new_ptr = log_reservation.ptr();
let cache_entry = match new {
Update::Compact(m) => {
Some(CacheEntry::MergedResident(m, lsn, new_ptr))
}
Update::Free => Some(CacheEntry::Free(lsn, new_ptr)),
Update::Allocate => None,
Update::Append(_) => {
panic!("tried to cas a page using an Append")
}
};
let node = cache_entry
.map(|cache_entry| {
node_from_frag_vec(vec![cache_entry])
.into_shared(guard)
}).unwrap_or_else(|| Shared::null());
debug_delay();
let result =
unsafe { stack_ptr.deref().cas(old, node, guard) };
if result.is_ok() {
let ptrs = ptrs_from_stack(old, guard);
self.log
.with_sa(|sa| {
sa.mark_replace(pid, lsn, ptrs, new_ptr)
}).map_err(|e| e.danger_cast())?;
log_reservation
.complete()
.map_err(|e| e.danger_cast())?;
} else {
log_reservation.abort().map_err(|e| e.danger_cast())?;
}
result.map_err(|e| Error::CasFailed(Some(PagePtr(e))))
}
pub fn get<'g>(
&self,
pid: PageId,
guard: &'g Guard,
) -> Result<PageGet<'g, PM::PageFrag>, Option<PagePtr<'g, P>>>
{
let stack_ptr = match self.inner.get(pid, guard) {
None => return Ok(PageGet::Unallocated),
Some(s) => s,
};
self.page_in(pid, stack_ptr, guard)
}
fn page_in<'g>(
&self,
pid: PageId,
stack_ptr: Shared<'g, Stack<CacheEntry<P>>>,
guard: &'g Guard,
) -> Result<PageGet<'g, PM::PageFrag>, Option<PagePtr<'g, P>>>
{
loop {
let inner_res =
self.page_in_inner(pid, stack_ptr, guard)?;
if let Some(res) = inner_res {
return Ok(res);
}
}
}
fn page_in_inner<'g>(
&self,
pid: PageId,
stack_ptr: Shared<'g, Stack<CacheEntry<P>>>,
guard: &'g Guard,
) -> Result<
Option<PageGet<'g, PM::PageFrag>>,
Option<PagePtr<'g, P>>,
> {
let _measure = Measure::new(&M.page_in);
debug_delay();
let mut head = unsafe { stack_ptr.deref().head(guard) };
let mut stack_iter =
StackIter::from_ptr(head, guard).peekable();
if let Some(CacheEntry::MergedResident { .. }) =
stack_iter.peek()
{
let ptr = PagePtr(head);
let mr = unsafe { ptr.deref_merged_resident() };
return Ok(Some(PageGet::Materialized(mr, ptr)));
}
let mut to_merge = vec![];
let mut merged_resident = false;
let mut ptrs = vec![];
let mut fix_up_length = 0;
for cache_entry_ptr in stack_iter {
match *cache_entry_ptr {
CacheEntry::Resident(ref page_frag, lsn, ptr) => {
if !merged_resident {
to_merge.push(page_frag);
}
ptrs.push((lsn, ptr));
}
CacheEntry::MergedResident(
ref page_frag,
lsn,
ptr,
) => {
if !merged_resident {
to_merge.push(page_frag);
merged_resident = true;
fix_up_length = ptrs.len();
}
ptrs.push((lsn, ptr));
}
CacheEntry::PartialFlush(lsn, ptr)
| CacheEntry::Flush(lsn, ptr) => {
ptrs.push((lsn, ptr));
}
CacheEntry::Free(_, _) => {
return Ok(Some(PageGet::Free(PagePtr(head))));
}
}
}
if ptrs.is_empty() {
return Ok(Some(PageGet::Allocated));
}
let mut fetched = Vec::with_capacity(ptrs.len());
if !merged_resident {
let to_pull = &ptrs[to_merge.len()..];
let pulled_res: Vec<_> = to_pull
.par_iter()
.map(|&(lsn, ptr)| self.pull(lsn, ptr))
.collect();
for res in pulled_res {
let item = res.map_err(|e| e.danger_cast())?;
fetched.push(item);
}
}
let combined: Vec<&P> = to_merge
.iter()
.cloned()
.chain(fetched.iter())
.rev()
.collect();
let merged =
measure(&M.merge_page, || self.t.merge(&*combined));
let size = self.t.size_in_bytes(&merged);
let to_evict = self.lru.accessed(pid, size);
trace!(
"accessed pid {} -> paging out pids {:?}",
pid,
to_evict
);
trace!("accessed page: {:?}", merged);
self.page_out(to_evict, guard)
.map_err(|e| e.danger_cast())?;
if ptrs.len() > self.config.page_consolidation_threshold {
trace!(
"consolidating pid {} with len {}!",
pid,
ptrs.len()
);
match self.cas_page(
pid,
head,
Update::Compact(merged),
guard,
) {
Ok(new_head) => head = new_head,
Err(Error::CasFailed(None)) => {
return Ok(Some(PageGet::Unallocated));
}
_ => {
return Ok(None);
}
}
} else {
trace!(
"fixing up pid {} with {} traversed frags",
pid,
fix_up_length
);
let mut new_entries = Vec::with_capacity(ptrs.len());
let (head_lsn, head_ptr) = ptrs.remove(0);
let head_entry = CacheEntry::MergedResident(
merged, head_lsn, head_ptr,
);
new_entries.push(head_entry);
let mut tail = if let Some((lsn, ptr)) = ptrs.pop() {
Some(CacheEntry::Flush(lsn, ptr))
} else {
None
};
for (lsn, ptr) in ptrs {
new_entries.push(CacheEntry::PartialFlush(lsn, ptr));
}
if let Some(tail) = tail.take() {
new_entries.push(tail);
}
let node = node_from_frag_vec(new_entries);
debug_delay();
let res = unsafe {
stack_ptr.deref().cas(
head,
node.into_shared(guard),
guard,
)
};
if let Ok(new_head) = res {
head = new_head;
} else {
return Ok(None);
}
}
let ret_ptr = PagePtr(head);
let mr = unsafe { ret_ptr.deref_merged_resident() };
Ok(Some(PageGet::Materialized(mr, ret_ptr)))
}
fn page_out<'g>(
&self,
to_evict: Vec<PageId>,
guard: &'g Guard,
) -> Result<(), ()> {
let _measure = Measure::new(&M.page_out);
for pid in to_evict {
let stack_ptr = match self.inner.get(pid, guard) {
None => continue,
Some(s) => s,
};
debug_delay();
let head = unsafe { stack_ptr.deref().head(guard) };
let stack_iter = StackIter::from_ptr(head, guard);
let mut cache_entries: Vec<CacheEntry<P>> =
stack_iter.map(|ptr| (*ptr).clone()).collect();
let last_ce = match cache_entries.pop() {
None => return Ok(()),
Some(c) => c,
};
let last = match last_ce {
CacheEntry::MergedResident(_, lsn, ptr)
| CacheEntry::Resident(_, lsn, ptr)
| CacheEntry::Flush(lsn, ptr) => {
self.log.make_stable(lsn)?;
CacheEntry::Flush(lsn, ptr)
}
CacheEntry::PartialFlush(_, _) => {
panic!("got PartialFlush at end of stack...")
}
CacheEntry::Free(_, _) => {
return Ok(());
}
};
let mut new_stack =
Vec::with_capacity(cache_entries.len() + 1);
for entry in cache_entries {
match entry {
CacheEntry::PartialFlush(lsn, ptr) |
CacheEntry::MergedResident(_, lsn, ptr) |
CacheEntry::Resident(_, lsn, ptr) => {
new_stack.push(CacheEntry::PartialFlush(lsn, ptr));
}
CacheEntry::Flush(_, _) => {
panic!("got Flush in middle of stack...")
}
CacheEntry::Free(_, _) => {
panic!(
"encountered a Free tombstone page in middle of stack..."
)
}
}
}
new_stack.push(last);
let node = node_from_frag_vec(new_stack);
debug_delay();
unsafe {
if stack_ptr
.deref()
.cas(head, node.into_shared(guard), guard)
.is_err()
{}
}
}
Ok(())
}
#[cfg(feature = "rayon")]
fn rayon_pull<'g>(
&self,
lsn: Lsn,
ptr: DiskPtr,
) -> Result<P, Option<PagePtr<'g, P>>> {
self.pull(lsn, ptr).map_err(|e1| e1.danger_cast())
}
fn pull<'g>(
&self,
lsn: Lsn,
ptr: DiskPtr,
) -> Result<P, Option<PagePtr<'g, P>>> {
trace!("pulling lsn {} ptr {} from disk", lsn, ptr);
let _measure = Measure::new(&M.pull);
let bytes = match self.log.read(lsn, ptr).map_err(|_| ()) {
Ok(LogRead::Inline(read_lsn, buf, _len)) => {
assert_eq!(
read_lsn, lsn,
"expected lsn {} on pull of ptr {}, \
but got lsn {} instead",
lsn, ptr, read_lsn
);
Ok(buf)
}
Ok(LogRead::Blob(read_lsn, buf, _blob_pointer)) => {
assert_eq!(
read_lsn, lsn,
"expected lsn {} on pull of ptr {}, \
but got lsn {} instead",
lsn, ptr, read_lsn
);
Ok(buf)
}
_other => Err(Error::Corruption { at: ptr }),
}?;
let logged_update = measure(&M.deserialize, || {
deserialize::<LoggedUpdate<P>>(&*bytes)
.map_err(|_| ())
.expect("failed to deserialize data")
});
match logged_update.update {
Update::Compact(page_frag)
| Update::Append(page_frag) => Ok(page_frag),
_ => {
return Err(Error::ReportableBug(
"non-append/compact found in pull".to_owned(),
))
}
}
}
fn advance_snapshot(&self) -> Result<(), ()> {
let snapshot_opt_res = self.last_snapshot.try_lock();
if snapshot_opt_res.is_err() {
warn!(
"snapshot skipped because previous attempt \
appears not to have completed"
);
return Ok(());
}
let mut snapshot_opt = snapshot_opt_res.unwrap();
let last_snapshot = snapshot_opt.take().expect(
"PageCache::advance_snapshot called before recovery",
);
if let Err(e) = self.log.flush() {
error!(
"failed to flush log during advance_snapshot: {}",
e
);
self.log.with_sa(|sa| sa.resume_rewriting());
*snapshot_opt = Some(last_snapshot);
return Err(e);
}
self.log.with_sa(|sa| sa.pause_rewriting());
let max_lsn = last_snapshot.max_lsn;
let start_lsn =
max_lsn - (max_lsn % self.config.io_buf_size as Lsn);
debug!(
"snapshot starting from offset {} to the segment containing ~{}",
last_snapshot.max_lsn,
self.log.stable_offset(),
);
let iter = self.log.iter_from(start_lsn);
let res = advance_snapshot::<PM, P, R>(
iter,
last_snapshot,
&self.config,
);
self.log.with_sa(|sa| sa.resume_rewriting());
match res {
Err(e) => {
*snapshot_opt = Some(Snapshot::default());
Err(e)
}
Ok(next_snapshot) => {
*snapshot_opt = Some(next_snapshot);
Ok(())
}
}
}
fn load_snapshot(&mut self) {
let snapshot =
self.last_snapshot.try_lock().unwrap().clone().unwrap();
self.max_pid.store(snapshot.max_pid, SeqCst);
let mut snapshot_free = snapshot.free.clone();
for (pid, state) in &snapshot.pt {
trace!("load_snapshot page {} {:?}", pid, state);
let stack = Stack::default();
match state {
&PageState::Present(ref ptrs) => {
let (base_lsn, base_ptr) = ptrs[0];
stack.push(CacheEntry::Flush(base_lsn, base_ptr));
for &(lsn, ptr) in &ptrs[1..] {
stack
.push(CacheEntry::PartialFlush(lsn, ptr));
}
}
&PageState::Free(lsn, ptr) => {
trace!("load_snapshot freeing pid {}", *pid);
stack.push(CacheEntry::Free(lsn, ptr));
self.free.lock().unwrap().push(*pid);
snapshot_free.remove(&pid);
}
&PageState::Allocated(_lsn, _ptr) => {
assert!(!snapshot.free.contains(pid));
}
}
let guard = pin();
let shared_stack = Owned::new(stack).into_shared(&guard);
self.inner
.cas(*pid, Shared::null(), shared_stack, &guard)
.unwrap();
}
assert!(
snapshot_free.is_empty(),
"pages present in Snapshot free list \
({:?})
not found in recovered page table",
snapshot_free
);
}
}
fn ptrs_from_stack<'g, P: Send + Sync>(
head_ptr: PagePtrInner<'g, P>,
guard: &'g Guard,
) -> Vec<DiskPtr> {
let stack_iter = StackIter::from_ptr(head_ptr, guard);
let mut ptrs = vec![];
for cache_entry_ptr in stack_iter {
match *cache_entry_ptr {
CacheEntry::Resident(_, _, ref ptr)
| CacheEntry::MergedResident(_, _, ref ptr)
| CacheEntry::PartialFlush(_, ref ptr)
| CacheEntry::Free(_, ref ptr)
| CacheEntry::Flush(_, ref ptr) => {
ptrs.push(*ptr);
}
}
}
ptrs
}