use std::{
ops::{Deref, DerefMut, Range, RangeInclusive},
time::SystemTime,
};
use bilrost::Message;
use smallvec::SmallVec;
use splinter_rs::Splinter;
use crate::core::{
LogId, PageCount, PageIdx, SegmentId, commit_hash::CommitHash, logref::LogRef, lsn::LSN,
pageset::PageSet,
};
#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
pub struct Commit {
#[bilrost(1)]
pub log: LogId,
#[bilrost(2)]
pub lsn: LSN,
#[bilrost(3)]
pub page_count: PageCount,
#[bilrost(4)]
pub commit_hash: Option<CommitHash>,
#[bilrost(5)]
pub segment_idx: Option<SegmentIdx>,
#[bilrost(6)]
pub checkpointed_at: Option<SystemTime>,
}
impl Commit {
pub fn new(log: LogId, lsn: LSN, page_count: PageCount) -> Self {
Self {
log,
lsn,
page_count,
commit_hash: None,
segment_idx: None,
checkpointed_at: None,
}
}
pub fn with_log_id(self, log: LogId) -> Self {
Self { log, ..self }
}
pub fn with_lsn(self, lsn: LSN) -> Self {
Self { lsn, ..self }
}
pub fn with_commit_hash(self, commit_hash: Option<CommitHash>) -> Self {
Self { commit_hash, ..self }
}
pub fn with_segment_idx(self, segment_idx: Option<SegmentIdx>) -> Self {
Self { segment_idx, ..self }
}
pub fn with_checkpointed_at(self, checkpointed_at: Option<SystemTime>) -> Self {
Self { checkpointed_at, ..self }
}
pub fn log(&self) -> &LogId {
&self.log
}
pub fn lsn(&self) -> LSN {
self.lsn
}
pub fn logref(&self) -> LogRef {
LogRef::new(self.log.clone(), self.lsn)
}
pub fn page_count(&self) -> PageCount {
self.page_count
}
pub fn commit_hash(&self) -> Option<&CommitHash> {
self.commit_hash.as_ref()
}
pub fn segment_idx(&self) -> Option<&SegmentIdx> {
self.segment_idx.as_ref()
}
pub fn checkpointed_at(&self) -> Option<&SystemTime> {
self.checkpointed_at.as_ref()
}
pub fn is_checkpoint(&self) -> bool {
self.checkpointed_at.is_some()
}
}
#[derive(Debug, Clone, Message, PartialEq, Eq)]
pub struct SegmentIdx {
#[bilrost(1)]
pub sid: SegmentId,
#[bilrost(2)]
pub pageset: PageSet,
#[bilrost(3)]
pub frames: SmallVec<[SegmentFrameIdx; 1]>,
}
impl SegmentIdx {
pub fn new(sid: SegmentId, pageset: PageSet) -> Self {
SegmentIdx { sid, pageset, frames: SmallVec::new() }
}
pub fn with_frames(self, frames: SmallVec<[SegmentFrameIdx; 1]>) -> Self {
Self { frames, ..self }
}
pub fn sid(&self) -> &SegmentId {
&self.sid
}
pub fn pageset(&self) -> &PageSet {
&self.pageset
}
pub fn iter_frames(
&self,
mut filter: impl FnMut(&RangeInclusive<PageIdx>) -> bool,
) -> impl Iterator<Item = SegmentRangeRef> {
let first_page = self.pageset.iter().next().unwrap_or(PageIdx::FIRST);
self.frames
.iter()
.scan((0, first_page), |(bytes_acc, pages_acc), frame| {
let bytes = *bytes_acc..(*bytes_acc + frame.frame_size);
let pages = *pages_acc..=frame.last_pageidx;
*bytes_acc += frame.frame_size;
*pages_acc = frame.last_pageidx.saturating_next();
Some((bytes, pages))
})
.filter(move |(_, pages)| filter(pages))
.map(|(bytes, pages)| {
let pages = pages.start().to_u32()..=pages.end().to_u32();
let graft = (Splinter::from(pages) & self.pageset.splinter()).into();
SegmentRangeRef {
sid: self.sid.clone(),
bytes,
pageset: graft,
}
})
}
pub fn frame_for_pageidx(&self, pageidx: PageIdx) -> Option<SegmentRangeRef> {
if !self.pageset.contains(pageidx) {
return None;
}
self.iter_frames(|pages| pages.contains(&pageidx)).next()
}
}
impl Deref for SegmentIdx {
type Target = PageSet;
fn deref(&self) -> &Self::Target {
&self.pageset
}
}
impl DerefMut for SegmentIdx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.pageset
}
}
#[derive(Debug, Clone, Message, PartialEq, Eq, Default)]
pub struct SegmentFrameIdx {
#[bilrost(1)]
frame_size: usize,
#[bilrost(2)]
last_pageidx: PageIdx,
}
impl SegmentFrameIdx {
pub fn new(frame_size: usize, last_pageidx: PageIdx) -> Self {
Self { frame_size, last_pageidx }
}
pub fn frame_size(&self) -> usize {
self.frame_size
}
pub fn last_pageidx(&self) -> PageIdx {
self.last_pageidx
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct SegmentRangeRef {
pub sid: SegmentId,
pub bytes: Range<usize>,
pub pageset: PageSet,
}
impl std::fmt::Debug for SegmentRangeRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentRangeRef")
.field("sid", &self.sid)
.field("bytes", &self.bytes)
.field("pages", &self.pageset.cardinality())
.finish()
}
}
impl SegmentRangeRef {
pub fn size(&self) -> usize {
self.bytes.end - self.bytes.start
}
#[allow(clippy::result_large_err)]
pub fn coalesce(self, other: Self) -> Result<Self, (Self, Self)> {
if self.sid != other.sid {
return Err((self, other));
}
let (left, right) = if self.bytes.end == other.bytes.start {
(self, other)
} else if other.bytes.end == self.bytes.start {
(other, self)
} else {
return Err((self, other));
};
let left_splinter: Splinter = left.pageset.into();
let right_splinter: Splinter = right.pageset.into();
Ok(Self {
sid: left.sid,
bytes: left.bytes.start..right.bytes.end,
pageset: (left_splinter | right_splinter).into(),
})
}
}
#[cfg(test)]
mod tests {
use crate::pageidx;
use super::*;
#[test]
fn test_frame_for_pageidx() {
let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
let mut frames = SmallVec::new();
frames.push(SegmentFrameIdx {
frame_size: 100,
last_pageidx: pageidx!(10),
});
frames.push(SegmentFrameIdx {
frame_size: 200,
last_pageidx: pageidx!(20),
});
frames.push(SegmentFrameIdx {
frame_size: 150,
last_pageidx: pageidx!(25),
});
let sid = SegmentId::random();
let segment_idx = SegmentIdx { sid: sid.clone(), pageset, frames };
let tests = [
(pageidx!(4), None),
(
pageidx!(5),
Some(SegmentRangeRef {
sid: sid.clone(),
bytes: 0..100,
pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
}),
),
(
pageidx!(10),
Some(SegmentRangeRef {
sid: sid.clone(),
bytes: 0..100,
pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
}),
),
(
pageidx!(11),
Some(SegmentRangeRef {
sid: sid.clone(),
bytes: 100..300,
pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
}),
),
(
pageidx!(20),
Some(SegmentRangeRef {
sid: sid.clone(),
bytes: 100..300,
pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
}),
),
(
pageidx!(25),
Some(SegmentRangeRef {
sid: sid.clone(),
bytes: 300..450,
pageset: PageSet::from_range(pageidx!(21)..=pageidx!(25)),
}),
),
(pageidx!(26), None),
];
for (pageidx, expected) in tests {
assert_eq!(
segment_idx.frame_for_pageidx(pageidx),
expected,
"wrong frame for pageidx {pageidx}"
);
}
}
#[test]
fn test_frame_for_pageidx_empty_frames() {
let segment_idx = SegmentIdx {
sid: SegmentId::random(),
pageset: PageSet::EMPTY,
frames: SmallVec::new(),
};
let result = segment_idx.frame_for_pageidx(pageidx!(1));
assert!(result.is_none());
}
#[test]
fn test_segment_range_ref_coalesce_adjacent() {
let sid = SegmentId::random();
let frame1 = SegmentRangeRef {
sid: sid.clone(),
bytes: 0..100,
pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
};
let frame2 = SegmentRangeRef {
sid: sid.clone(),
bytes: 100..200,
pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
};
let result = frame1.clone().coalesce(frame2.clone()).unwrap();
assert_eq!(result.bytes, 0..200);
assert_eq!(
result.pageset,
PageSet::from_range(pageidx!(5)..=pageidx!(20))
);
let result = frame2.coalesce(frame1).unwrap();
assert_eq!(result.bytes, 0..200);
assert_eq!(
result.pageset,
PageSet::from_range(pageidx!(5)..=pageidx!(20))
);
}
#[test]
fn test_segment_range_ref_coalesce_non_adjacent() {
let sid = SegmentId::random();
let frame1 = SegmentRangeRef {
sid: sid.clone(),
bytes: 0..100,
pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
};
let frame2 = SegmentRangeRef {
sid: sid.clone(),
bytes: 150..250,
pageset: PageSet::from_range(pageidx!(20)..=pageidx!(30)),
};
let result = frame1.clone().coalesce(frame2.clone());
assert!(result.is_err());
let (f1, f2) = result.unwrap_err();
assert_eq!(f1, frame1);
assert_eq!(f2, frame2);
}
#[test]
fn test_segment_range_ref_coalesce_diff_segment() {
let frame1 = SegmentRangeRef {
sid: SegmentId::random(),
bytes: 0..100,
pageset: PageSet::from_range(pageidx!(5)..=pageidx!(10)),
};
let frame2 = SegmentRangeRef {
sid: SegmentId::random(),
bytes: 100..200,
pageset: PageSet::from_range(pageidx!(11)..=pageidx!(20)),
};
let result = frame1.clone().coalesce(frame2.clone());
assert!(result.is_err());
let (f1, f2) = result.unwrap_err();
assert_eq!(f1, frame1);
assert_eq!(f2, frame2);
}
#[test]
fn test_iter_frames_no_filter() {
let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
let mut frames = SmallVec::new();
frames.push(SegmentFrameIdx {
frame_size: 100,
last_pageidx: pageidx!(10),
});
frames.push(SegmentFrameIdx {
frame_size: 200,
last_pageidx: pageidx!(20),
});
frames.push(SegmentFrameIdx {
frame_size: 150,
last_pageidx: pageidx!(25),
});
let segment_idx = SegmentIdx {
sid: SegmentId::random(),
pageset,
frames,
};
let all_frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
assert_eq!(all_frames.len(), 3);
assert_eq!(all_frames[0].bytes, 0..100);
assert_eq!(
all_frames[0].pageset,
PageSet::from_range(pageidx!(5)..=pageidx!(10))
);
assert_eq!(all_frames[1].bytes, 100..300);
assert_eq!(
all_frames[1].pageset,
PageSet::from_range(pageidx!(11)..=pageidx!(20))
);
assert_eq!(all_frames[2].bytes, 300..450);
assert_eq!(
all_frames[2].pageset,
PageSet::from_range(pageidx!(21)..=pageidx!(25))
);
}
#[test]
fn test_iter_frames_with_filter() {
let pageset = PageSet::from_range(pageidx!(5)..=pageidx!(25));
let mut frames = SmallVec::new();
frames.push(SegmentFrameIdx {
frame_size: 100,
last_pageidx: pageidx!(10),
});
frames.push(SegmentFrameIdx {
frame_size: 200,
last_pageidx: pageidx!(20),
});
frames.push(SegmentFrameIdx {
frame_size: 150,
last_pageidx: pageidx!(25),
});
let segment_idx = SegmentIdx {
sid: SegmentId::random(),
pageset,
frames,
};
let filtered_frames: Vec<_> = segment_idx
.iter_frames(|pages| pages.contains(&pageidx!(15)))
.collect();
assert_eq!(filtered_frames.len(), 1);
assert_eq!(filtered_frames[0].bytes, 100..300);
assert_eq!(
filtered_frames[0].pageset,
PageSet::from_range(pageidx!(11)..=pageidx!(20))
);
}
#[test]
fn test_iter_frames_empty() {
let segment_idx = SegmentIdx {
sid: SegmentId::random(),
pageset: PageSet::EMPTY,
frames: SmallVec::new(),
};
let frames: Vec<_> = segment_idx.iter_frames(|_| true).collect();
assert_eq!(frames.len(), 0);
}
}