use crate::runtime::l0::L0Buffer;
use crate::runtime::wal::WriteAheadLog;
use parking_lot::RwLock;
use std::sync::Arc;
#[derive(Debug)]
pub struct PinToken(());
#[derive(Clone)]
pub struct SnapshotView {
pub main: Arc<RwLock<L0Buffer>>,
pub extra: Vec<Arc<RwLock<L0Buffer>>>,
pin: Arc<PinToken>,
pub started_at_version: u64,
pub pinned_storage: Option<Arc<crate::storage::manager::StorageManager>>,
}
impl std::fmt::Debug for SnapshotView {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotView")
.field("extra_generations", &self.extra.len())
.field("pins", &Arc::strong_count(&self.pin))
.field("started_at_version", &self.started_at_version)
.finish_non_exhaustive()
}
}
pub struct L0Manager {
current: RwLock<Arc<RwLock<L0Buffer>>>,
pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
current_pin: RwLock<Arc<PinToken>>,
}
impl L0Manager {
pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
let l0 = L0Buffer::new(start_version, wal);
Self {
current: RwLock::new(Arc::new(RwLock::new(l0))),
pending_flush: RwLock::new(Vec::new()),
current_pin: RwLock::new(Arc::new(PinToken(()))),
}
}
pub fn from_snapshot(
current: Arc<RwLock<L0Buffer>>,
pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
) -> Self {
Self {
current: RwLock::new(current),
pending_flush: RwLock::new(pending_flush),
current_pin: RwLock::new(Arc::new(PinToken(()))),
}
}
pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
self.current.read().clone()
}
pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
let current = self.get_current();
let pending = self.pending_flush.read().clone();
let mut all = vec![current];
all.extend(pending);
all
}
pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
self.pending_flush.read().clone()
}
pub fn rotate(
&self,
next_version: u64,
new_wal: Option<Arc<WriteAheadLog>>,
) -> Arc<RwLock<L0Buffer>> {
let mut guard = self.current.write();
let old_l0 = guard.clone();
let new_l0 = L0Buffer::new(next_version, new_wal);
*guard = Arc::new(RwLock::new(new_l0));
*self.current_pin.write() = Arc::new(PinToken(()));
old_l0
}
pub fn begin_flush(
&self,
next_version: u64,
new_wal: Option<Arc<WriteAheadLog>>,
) -> Arc<RwLock<L0Buffer>> {
let old_l0 = self.rotate(next_version, new_wal);
self.pending_flush.write().push(old_l0.clone());
old_l0
}
pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
let mut pending = self.pending_flush.write();
pending.retain(|x| !Arc::ptr_eq(x, l0));
}
pub fn snapshot_isolated(
&self,
next_version: u64,
new_wal: Option<Arc<WriteAheadLog>>,
) -> (Arc<RwLock<L0Buffer>>, Vec<Arc<RwLock<L0Buffer>>>) {
let pending = self.pending_flush.read().clone();
let frozen = self.rotate(next_version, new_wal);
self.pending_flush.write().push(frozen.clone());
(frozen, pending)
}
pub fn pin_snapshot(&self) -> SnapshotView {
let current_guard = self.current.read();
let main = current_guard.clone();
let pin = self.current_pin.read().clone();
let started_at_version = main.read().current_version;
let extra = self.pending_flush.read().clone();
drop(current_guard);
SnapshotView {
main,
extra,
pin,
started_at_version,
pinned_storage: None,
}
}
pub fn is_current_pinned(&self) -> bool {
Arc::strong_count(&self.current_pin.read()) > 1
}
pub fn freeze_current_for_snapshot(&self) {
let mut guard = self.current.write();
let frozen = guard.clone();
let mut new_buf = frozen.read().clone();
new_buf.wal = frozen.write().wal.take();
*guard = Arc::new(RwLock::new(new_buf));
*self.current_pin.write() = Arc::new(PinToken(()));
}
pub fn min_pending_wal_lsn(&self) -> Option<u64> {
let pending = self.pending_flush.read();
if pending.is_empty() {
return None;
}
pending
.iter()
.map(|l0_arc| {
let l0 = l0_arc.read();
l0.wal_lsn_at_flush
})
.min()
}
}
#[cfg(test)]
mod snapshot_tests {
use super::*;
use crate::runtime::QueryContext;
use crate::runtime::l0_visibility::lookup_vertex_prop;
use uni_common::core::id::Vid;
use uni_common::{Properties, Value};
fn named(name: &str) -> Properties {
let mut props = Properties::new();
props.insert("name".to_string(), Value::String(name.to_string()));
props
}
fn name_of(vid: Vid, ctx: &QueryContext) -> Option<String> {
match lookup_vertex_prop(vid, "name", Some(ctx)) {
Some(Value::String(s)) => Some(s),
_ => None,
}
}
#[test]
fn snapshot_isolated_from_later_writes() {
let mgr = L0Manager::new(0, None);
let alice = Vid::from(1_u64);
let bob = Vid::from(2_u64);
let labels = ["Node".to_string()];
{
let current = mgr.get_current();
let mut guard = current.write();
guard.insert_vertex_with_labels(alice, named("alice"), &labels);
guard.insert_vertex_with_labels(bob, named("bob"), &labels);
}
let (frozen, pending) = mgr.snapshot_isolated(1, None);
let snap = QueryContext::new_with_pending(frozen, None, pending);
mgr.get_current()
.write()
.insert_vertex_with_labels(alice, named("alice2"), &labels);
assert_eq!(name_of(alice, &snap).as_deref(), Some("alice"));
let latest =
QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
assert_eq!(name_of(bob, &latest).as_deref(), Some("bob"));
}
#[test]
fn pin_marks_current_generation() {
let mgr = L0Manager::new(0, None);
assert!(!mgr.is_current_pinned());
let snap = mgr.pin_snapshot();
assert!(mgr.is_current_pinned());
drop(snap);
assert!(
!mgr.is_current_pinned(),
"dropping the snapshot releases the pin"
);
}
#[test]
fn clone_freeze_isolates_pinned_snapshot() {
let mgr = L0Manager::new(0, None);
let alice = Vid::from(1_u64);
let labels = ["Node".to_string()];
mgr.get_current()
.write()
.insert_vertex_with_labels(alice, named("alice"), &labels);
let snap = mgr.pin_snapshot();
assert!(mgr.is_current_pinned());
mgr.freeze_current_for_snapshot();
assert!(
!mgr.is_current_pinned(),
"the fresh generation starts unpinned"
);
mgr.get_current()
.write()
.insert_vertex_with_labels(alice, named("alice2"), &labels);
let snap_ctx = QueryContext::new_with_pending(snap.main.clone(), None, snap.extra.clone());
assert_eq!(name_of(alice, &snap_ctx).as_deref(), Some("alice"));
let latest =
QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
drop(snap);
}
}