use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek, Write};
#[cfg(feature = "zstd")]
use zstd::block::{compress, decompress};
use super::*;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Snapshot<R> {
pub max_lsn: Lsn,
pub last_lid: LogId,
pub max_pid: PageId,
pub pt: HashMap<PageId, PageState>,
pub replacements: HashMap<SegmentId, HashSet<(PageId, Lsn)>>,
pub free: HashSet<PageId>,
pub recovery: Option<R>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum PageState {
Present(Vec<(Lsn, DiskPtr)>),
Allocated(Lsn, DiskPtr),
Free(Lsn, DiskPtr),
}
impl PageState {
fn push(&mut self, item: (Lsn, DiskPtr)) {
match self {
&mut PageState::Present(ref mut items) => {
items.push(item)
}
&mut PageState::Allocated(_, _) => {
*self = PageState::Present(vec![item])
}
&mut PageState::Free(_, _) => {
panic!("pushed items to a PageState::Free")
}
}
}
pub fn iter(&self) -> Box<dyn Iterator<Item = (Lsn, DiskPtr)>> {
match self {
&PageState::Present(ref items) => {
Box::new(items.clone().into_iter())
}
&PageState::Allocated(lsn, ptr)
| &PageState::Free(lsn, ptr) => {
Box::new(vec![(lsn, ptr)].into_iter())
}
}
}
fn is_free(&self) -> bool {
match *self {
PageState::Free(_, _) => true,
_ => false,
}
}
}
impl<R> Default for Snapshot<R> {
fn default() -> Snapshot<R> {
Snapshot {
max_lsn: 0,
last_lid: 0,
max_pid: 0,
pt: HashMap::new(),
replacements: HashMap::new(),
free: HashSet::new(),
recovery: None,
}
}
}
impl<R> Snapshot<R> {
fn apply<P>(
&mut self,
materializer: &dyn Materializer<PageFrag = P, Recovery = R>,
lsn: Lsn,
disk_ptr: DiskPtr,
bytes: &[u8],
config: &Config,
) -> Result<(), ()>
where
P: 'static
+ Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync,
R: Debug + Clone + Serialize + DeserializeOwned + Send,
{
trace!(
"trying to deserialize buf for ptr {} lsn {}",
disk_ptr,
lsn
);
let deserialization = deserialize::<LoggedUpdate<P>>(&*bytes);
if let Err(e) = deserialization {
error!(
"failed to deserialize buffer for item in log: lsn {} \
ptr {}: {:?}",
lsn,
disk_ptr,
e
);
return Err(Error::Corruption { at: disk_ptr });
}
let prepend = deserialization.unwrap();
let pid = prepend.pid;
if pid >= self.max_pid {
self.max_pid = pid + 1;
}
let io_buf_size = config.io_buf_size;
let replaced_at_idx =
disk_ptr.lid() as SegmentId / io_buf_size;
match prepend.update {
Update::Append(partial_page) => {
if let Some(lids) = self.pt.get_mut(&pid) {
trace!(
"append of pid {} at lid {} lsn {}",
pid,
disk_ptr,
lsn
);
if lids.is_free() {
trace!(
"we have not yet encountered an \
allocation of this page, skipping push"
);
return Ok(());
}
if let Some(r) =
materializer.recover(&partial_page)
{
self.recovery = Some(r);
}
lids.push((lsn, disk_ptr));
}
self.free.remove(&pid);
}
Update::Compact(partial_page) => {
trace!(
"compact of pid {} at ptr {} lsn {}",
pid,
disk_ptr,
lsn
);
if let Some(r) = materializer.recover(&partial_page) {
self.recovery = Some(r);
}
self.replace_pid(
pid,
replaced_at_idx,
lsn,
io_buf_size,
config,
);
self.pt.insert(
pid,
PageState::Present(vec![(lsn, disk_ptr)]),
);
self.free.remove(&pid);
}
Update::Allocate => {
trace!(
"allocate of pid {} at ptr {} lsn {}",
pid,
disk_ptr,
lsn
);
self.replace_pid(
pid,
replaced_at_idx,
lsn,
io_buf_size,
config,
);
self.pt
.insert(pid, PageState::Allocated(lsn, disk_ptr));
self.free.remove(&pid);
}
Update::Free => {
trace!(
"free of pid {} at ptr {} lsn {}",
pid,
disk_ptr,
lsn
);
self.replace_pid(
pid,
replaced_at_idx,
lsn,
io_buf_size,
config,
);
self.pt.insert(pid, PageState::Free(lsn, disk_ptr));
self.free.insert(pid);
}
}
Ok(())
}
fn replace_pid(
&mut self,
pid: PageId,
replaced_at_idx: usize,
replaced_at_lsn: Lsn,
io_buf_size: usize,
config: &Config,
) {
match self.pt.remove(&pid) {
Some(PageState::Present(coords)) => {
for (_lsn, ptr) in &coords {
let idx = ptr.lid() as SegmentId / io_buf_size;
if replaced_at_idx == idx {
return;
}
let entry = self
.replacements
.entry(idx)
.or_insert(HashSet::new());
entry.insert((pid, replaced_at_lsn));
}
if coords.len() > 1 {
let blob_ptrs = coords
.iter()
.filter(|(_, ptr)| ptr.is_blob())
.map(|(_, ptr)| ptr.blob().1);
for blob_ptr in blob_ptrs {
trace!(
"removing blob while advancing \
snapshot: {}",
blob_ptr,
);
let _ = remove_blob(blob_ptr, config);
}
}
}
Some(PageState::Allocated(_lsn, ptr))
| Some(PageState::Free(_lsn, ptr)) => {
let idx = ptr.lid() as SegmentId / io_buf_size;
if replaced_at_idx == idx {
return;
}
let entry = self
.replacements
.entry(idx)
.or_insert(HashSet::new());
entry.insert((pid, replaced_at_lsn));
}
None => {
}
}
}
}
pub(super) fn advance_snapshot<PM, P, R>(
iter: LogIter,
mut snapshot: Snapshot<R>,
config: &Config,
) -> Result<Snapshot<R>, ()>
where
PM: Materializer<Recovery = R, PageFrag = P>,
P: 'static
+ Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync,
R: Debug + Clone + Serialize + DeserializeOwned + Send,
{
let start = clock();
trace!("building on top of old snapshot: {:?}", snapshot);
let materializer = PM::new(config.clone(), &snapshot.recovery);
let io_buf_size = config.io_buf_size;
for (lsn, ptr, bytes) in iter {
trace!(
"in advance_snapshot looking at item with lsn {} ptr {}",
lsn,
ptr
);
if lsn <= snapshot.max_lsn {
trace!(
"continuing in advance_snapshot, lsn {} ptr {} max_lsn {}",
lsn,
ptr,
snapshot.max_lsn
);
continue;
}
assert!(lsn > snapshot.max_lsn);
snapshot.max_lsn = lsn;
snapshot.last_lid = ptr.lid();
let segment_idx = ptr.lid() as SegmentId / io_buf_size;
snapshot.replacements.remove(&segment_idx);
if !PM::is_null() {
if let Err(e) = snapshot.apply(
&materializer,
lsn,
ptr,
&*bytes,
config,
) {
error!(
"encountered error while reading log message: {}",
e
);
break;
}
}
}
write_snapshot(config, &snapshot)?;
trace!("generated new snapshot: {:?}", snapshot);
M.advance_snapshot.measure(clock() - start);
Ok(snapshot)
}
pub fn read_snapshot_or_default<PM, P, R>(
config: &Config,
) -> Result<Snapshot<R>, ()>
where
PM: Materializer<Recovery = R, PageFrag = P>,
P: 'static
+ Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync,
R: Debug + Clone + Serialize + DeserializeOwned + Send,
{
let last_snap =
read_snapshot(config)?.unwrap_or_else(Snapshot::default);
let log_iter = raw_segment_iter_from(last_snap.max_lsn, config)?;
advance_snapshot::<PM, P, R>(log_iter, last_snap, config)
}
fn read_snapshot<R>(
config: &Config,
) -> std::io::Result<Option<Snapshot<R>>>
where
R: Debug + Clone + Serialize + DeserializeOwned + Send,
{
let mut candidates = config.get_snapshot_files()?;
if candidates.is_empty() {
info!("no previous snapshot found");
return Ok(None);
}
candidates.sort();
let path = candidates.pop().unwrap();
let mut f = std::fs::OpenOptions::new().read(true).open(&path)?;
if f.metadata()?.len() <= 16 {
warn!("empty/corrupt snapshot file found");
return Ok(None);
}
let mut buf = vec![];
f.read_to_end(&mut buf)?;
let len = buf.len();
buf.split_off(len - 16);
let mut len_expected_bytes = [0u8; 8];
f.seek(std::io::SeekFrom::End(-16)).unwrap();
f.read_exact(&mut len_expected_bytes).unwrap();
let mut crc_expected_bytes = [0u8; 8];
f.seek(std::io::SeekFrom::End(-8)).unwrap();
f.read_exact(&mut crc_expected_bytes).unwrap();
let crc_expected: u64 =
unsafe { std::mem::transmute(crc_expected_bytes) };
let crc_actual = crc64(&*buf);
if crc_expected != crc_actual {
error!("crc for snapshot file {:?} failed!", path);
return Ok(None);
}
#[cfg(feature = "zstd")]
let bytes = if config.use_compression {
let len_expected: u64 =
unsafe { std::mem::transmute(len_expected_bytes) };
decompress(&*buf, len_expected as usize).unwrap()
} else {
buf
};
#[cfg(not(feature = "zstd"))]
let bytes = buf;
Ok(deserialize::<Snapshot<R>>(&*bytes).ok())
}
pub(crate) fn write_snapshot<R>(
config: &Config,
snapshot: &Snapshot<R>,
) -> Result<(), ()>
where
R: Debug + Clone + Serialize + DeserializeOwned + Send,
{
let raw_bytes = serialize(&snapshot).unwrap();
let decompressed_len = raw_bytes.len();
#[cfg(feature = "zstd")]
let bytes = if config.use_compression {
compress(&*raw_bytes, config.compression_factor).unwrap()
} else {
raw_bytes
};
#[cfg(not(feature = "zstd"))]
let bytes = raw_bytes;
let crc64: [u8; 8] =
unsafe { std::mem::transmute(crc64(&*bytes)) };
let len_bytes: [u8; 8] =
unsafe { std::mem::transmute(decompressed_len as u64) };
let path_1_suffix =
format!("snap.{:016X}.in___motion", snapshot.max_lsn);
let mut path_1 = config.snapshot_prefix();
path_1.push(path_1_suffix);
let path_2_suffix = format!("snap.{:016X}", snapshot.max_lsn);
let mut path_2 = config.snapshot_prefix();
path_2.push(path_2_suffix);
let _res = std::fs::create_dir_all(path_1.parent().unwrap());
let mut f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&path_1)?;
maybe_fail!("snap write");
f.write_all(&*bytes)?;
maybe_fail!("snap write len");
f.write_all(&len_bytes)?;
maybe_fail!("snap write crc");
f.write_all(&crc64)?;
f.sync_all()?;
maybe_fail!("snap write post");
trace!("wrote snapshot to {}", path_1.to_string_lossy());
maybe_fail!("snap write mv");
std::fs::rename(path_1, &path_2)?;
maybe_fail!("snap write mv post");
trace!("renamed snapshot to {}", path_2.to_string_lossy());
let candidates = config.get_snapshot_files()?;
for path in candidates {
let path_str = path.file_name().unwrap().to_str().unwrap();
if !path_2.to_string_lossy().ends_with(&*path_str) {
debug!("removing old snapshot file {:?}", path);
maybe_fail!("snap write rm old");
if let Err(_e) = std::fs::remove_file(&path) {
warn!(
"failed to remove old snapshot file, maybe snapshot race? {}",
_e
);
}
}
}
Ok(())
}