use crate::{error::PRes, id::RecRef, journal::JournalId, transaction::FreedPage};
use std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
sync::Mutex,
};
pub type SnapshotId = u64;
#[derive(Clone, Debug)]
pub struct SnapshotEntry {
id: RecRef,
pos: u64,
version: u16,
}
impl SnapshotEntry {
pub fn new(id: &RecRef, pos: u64, version: u16) -> SnapshotEntry {
SnapshotEntry {
id: id.clone(),
pos,
version,
}
}
}
#[derive(Clone, Debug)]
pub struct Snapshot {
snapshot_id: SnapshotId,
journal_id: Option<JournalId>,
entries: Vec<SnapshotEntry>,
freed_pages: Vec<FreedPage>,
reference_count: u32,
}
#[derive(Clone, Debug, PartialEq)]
pub struct RecordVersion {
snapshot_id: SnapshotId,
pub pos: u64,
pub version: u16,
}
pub struct InternalSnapshots {
mapping: HashMap<RecRef, Vec<RecordVersion>>,
active_snapshots: Vec<Snapshot>,
snapshot_sequence: u64,
}
pub struct Snapshots {
lock: Mutex<InternalSnapshots>,
}
pub fn search(value: u64, value1: u64, top: u64) -> Ordering {
if value > top {
if value1 > top {
value.cmp(&value1)
} else {
Ordering::Less
}
} else if value1 > top {
Ordering::Greater
} else {
value.cmp(&value1)
}
}
impl Default for Snapshots {
fn default() -> Snapshots {
Self::new()
}
}
impl Snapshots {
pub fn new() -> Snapshots {
Snapshots {
lock: Mutex::new(InternalSnapshots {
mapping: HashMap::new(),
active_snapshots: Vec::new(),
snapshot_sequence: 0,
}),
}
}
pub fn read_snapshot(&self) -> PRes<SnapshotId> {
let mut lock = self.lock.lock()?;
let snapshot_id = lock.snapshot_sequence;
lock.snapshot_sequence += 1;
let snapshot_sequence = lock.snapshot_sequence;
let reference_count = if lock.active_snapshots.is_empty() { 1 } else { 2 };
let snapshot = Snapshot {
snapshot_id,
journal_id: None,
entries: Vec::new(),
freed_pages: Vec::new(),
reference_count,
};
if let Err(index) = lock
.active_snapshots
.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
lock.active_snapshots.insert(index, snapshot);
}
Ok(snapshot_id)
}
pub fn snapshot(
&self,
entries: Vec<SnapshotEntry>,
freed_pages: Vec<FreedPage>,
journal_id: JournalId,
) -> PRes<SnapshotId> {
let mut lock = self.lock.lock()?;
let snapshot_id = lock.snapshot_sequence;
lock.snapshot_sequence += 1;
let snapshot_sequence = lock.snapshot_sequence;
for entry in &entries {
let to_add = RecordVersion {
snapshot_id,
pos: entry.pos,
version: entry.version,
};
match lock.mapping.entry(entry.id.clone()) {
Entry::Occupied(mut v) => {
let vec = v.get_mut();
if let Err(index) = vec.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
vec.insert(index, to_add);
}
}
Entry::Vacant(e) => {
let mut v = Vec::new();
v.push(to_add);
e.insert(v);
}
}
}
let reference_count = if lock.active_snapshots.is_empty() { 1 } else { 2 };
let snapshot = Snapshot {
snapshot_id,
journal_id: Some(journal_id),
entries,
freed_pages,
reference_count,
};
if let Err(index) = lock
.active_snapshots
.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
lock.active_snapshots.insert(index, snapshot);
}
Ok(snapshot_id)
}
pub fn read(&self, snapshot_id: SnapshotId, id: &RecRef) -> PRes<Option<RecordVersion>> {
let lock = self.lock.lock()?;
let snapshot_sequence = lock.snapshot_sequence;
Ok(if let Some(v) = lock.mapping.get(id) {
let index = match v.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence)) {
Ok(index) => index,
Err(index) => index,
};
v.get(index).cloned()
} else {
None
})
}
fn clear_from(&self, snapshot_id: SnapshotId) -> PRes<(Vec<FreedPage>, Vec<JournalId>)> {
let mut lock = self.lock.lock()?;
let snapshot_sequence = lock.snapshot_sequence;
let mut free_pages = Vec::new();
let mut journal_ids = Vec::new();
if let Ok(index) = lock
.active_snapshots
.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
for tx in lock.active_snapshots.split_off(index) {
if let Some(id) = tx.journal_id {
journal_ids.push(id.clone());
}
for record in tx.entries {
if let Some(v) = lock.mapping.get_mut(&record.id) {
if let Ok(index) = v.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
v.split_off(index);
}
}
}
free_pages.extend(tx.freed_pages);
}
}
Ok((free_pages, journal_ids))
}
pub fn release(&self, snapshot_id: SnapshotId) -> PRes<(Vec<FreedPage>, Vec<JournalId>)> {
let mut clear_id = None;
{
let mut lock = self.lock.lock()?;
let snapshot_sequence = lock.snapshot_sequence;
if let Ok(index) = lock
.active_snapshots
.binary_search_by(|n| search(n.snapshot_id, snapshot_id, snapshot_sequence))
{
let mut loop_index = index;
while let Some(snap) = lock.active_snapshots.get_mut(loop_index) {
snap.reference_count -= 1;
if snap.reference_count > 0 {
break;
}
clear_id = Some(snap.snapshot_id);
loop_index += 1;
}
}
}
if let Some(c_id) = clear_id {
self.clear_from(c_id)
} else {
Ok((Vec::new(), Vec::new()))
}
}
}
#[cfg(test)]
mod tests {
use super::{search, RecordVersion, SnapshotEntry, Snapshots};
use crate::{id::RecRef, journal::JournalId, transaction::FreedPage};
use std::cmp::Ordering;
#[test]
fn test_search() {
assert_eq!(search(10, 20, 40), Ordering::Less);
assert_eq!(search(20, 10, 40), Ordering::Greater);
assert_eq!(search(10, 30, 20), Ordering::Greater);
assert_eq!(search(30, 10, 20), Ordering::Less);
assert_eq!(search(20, 20, 20), Ordering::Equal);
assert_eq!(search(20, 19, 20), Ordering::Greater);
assert_eq!(search(20, 21, 20), Ordering::Greater);
assert_eq!(search(21, 21, 20), Ordering::Equal);
assert_eq!(search(19, 19, 20), Ordering::Equal);
}
#[test]
fn add_and_read() {
let snap = Snapshots::new();
let mut records = Vec::new();
for x in 0..3 {
records.push(SnapshotEntry {
id: RecRef::new(10, x),
pos: x as u64,
version: x as u16,
});
}
let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
let tx = snap
.snapshot(records.clone(), freed_pages.clone(), JournalId::new(0, 0))
.unwrap();
snap.clear_from(tx).unwrap();
let tx = snap.snapshot(records, freed_pages, JournalId::new(0, 0)).unwrap();
assert_eq!(
snap.read(tx - 1, &RecRef::new(10, 2)).unwrap(),
Some(RecordVersion {
snapshot_id: 1,
pos: 2,
version: 2,
})
);
assert_eq!(
snap.read(tx - 1, &RecRef::new(10, 1)).unwrap(),
Some(RecordVersion {
snapshot_id: 1,
pos: 1,
version: 1,
})
);
assert_eq!(
snap.read(tx, &RecRef::new(10, 2)).unwrap(),
Some(RecordVersion {
snapshot_id: 1,
pos: 2,
version: 2,
})
);
assert_eq!(snap.read(tx + 1, &RecRef::new(10, 2)).unwrap(), None);
assert_eq!(snap.read(tx + 1, &RecRef::new(10, 10)).unwrap(), None);
}
#[test]
fn add_and_read_mutliple_tx() {
let snap = Snapshots::new();
let mut txs = Vec::new();
for t in 0..5 {
let mut records = Vec::new();
for x in 0..10 {
if x != t {
records.push(SnapshotEntry {
id: RecRef::new(10, x),
pos: (10 * t + x) as u64,
version: (10 * t + x) as u16,
});
}
}
let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
txs.push(snap.snapshot(records, freed_pages, JournalId::new(0, 0)).unwrap());
}
assert_eq!(
snap.read(txs[2], &RecRef::new(10, 2)).unwrap(),
Some(RecordVersion {
snapshot_id: txs[3],
pos: 32,
version: 32,
})
);
assert_eq!(
snap.read(txs[3], &RecRef::new(10, 3)).unwrap(),
Some(RecordVersion {
snapshot_id: txs[4],
pos: 43,
version: 43,
})
);
}
#[test]
fn test_snapshot_reference_count() {
let snap = Snapshots::new();
let mut records = Vec::new();
for x in 0..3 {
records.push(SnapshotEntry {
id: RecRef::new(10, x),
pos: x as u64,
version: x as u16,
});
}
let first = snap.read_snapshot().unwrap();
let freed_pages: Vec<_> = records.iter().map(|x| FreedPage::new(x.pos)).collect();
let tx = snap
.snapshot(records.clone(), freed_pages, JournalId::new(0, 0))
.unwrap();
let last = snap.read_snapshot().unwrap();
snap.release(tx).unwrap();
assert_eq!(
snap.read(first, &RecRef::new(10, 2)).unwrap(),
Some(RecordVersion {
snapshot_id: 1,
pos: 2,
version: 2,
})
);
snap.release(first).unwrap();
assert_eq!(snap.read(last, &RecRef::new(10, 2)).unwrap(), None);
}
}