use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use futures::lock::Mutex;
use parking_lot::Mutex as PLMutex;
use tokio::select;
use tracing::{info, warn};
use super::manifest::*;
use super::{DeleteVector, DiskRowset, StorageOptions, StorageResult};
pub enum EpochOp {
CreateTable(CreateTableEntry),
DropTable(DropTableEntry),
AddRowSet((AddRowSetEntry, DiskRowset)),
DeleteRowSet(DeleteRowsetEntry),
AddDV((AddDVEntry, DeleteVector)),
DeleteDV(DeleteDVEntry),
}
impl std::fmt::Debug for EpochOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CreateTable(e) => f.debug_tuple("EpochOp::CreateTable").field(e).finish(),
Self::DropTable(e) => f.debug_tuple("EpochOp::DropTable").field(e).finish(),
Self::AddRowSet((e, _)) => f.debug_tuple("EpochOp::AddRowSet").field(e).finish(),
Self::DeleteRowSet(e) => f.debug_tuple("EpochOp::DeleteRowSet").field(e).finish(),
Self::AddDV((e, _)) => f.debug_tuple("EpochOp::AddDV").field(e).finish(),
Self::DeleteDV(e) => f.debug_tuple("EpochOp::DeleteDV").field(e).finish(),
}
}
}
#[derive(Clone, Default)]
pub struct Snapshot {
rowsets: HashMap<u32, HashSet<u32>>,
dvs: HashMap<u32, HashMap<u32, HashSet<u64>>>,
}
impl Snapshot {
pub fn add_rowset(&mut self, table_id: u32, rowset_id: u32) {
self.rowsets.entry(table_id).or_default().insert(rowset_id);
}
pub fn delete_rowset(&mut self, table_id: u32, rowset_id: u32) {
let table = self.rowsets.get_mut(&table_id).unwrap();
table.remove(&rowset_id);
if table.is_empty() {
self.rowsets.remove(&table_id);
}
}
pub fn add_dv(&mut self, table_id: u32, rowset_id: u32, dv_id: u64) {
self.dvs
.entry(table_id)
.or_default()
.entry(rowset_id)
.or_default()
.insert(dv_id);
}
pub fn delete_dv(&mut self, table_id: u32, rowset_id: u32, dv_id: u64) {
let table = self.dvs.get_mut(&table_id).unwrap();
let dvs = table.get_mut(&rowset_id).unwrap();
dvs.remove(&dv_id);
if dvs.is_empty() {
table.remove(&rowset_id);
}
if table.is_empty() {
self.dvs.remove(&table_id);
}
}
pub fn get_dvs_of(&self, table_id: u32, rowset_id: u32) -> Option<&HashSet<u64>> {
if let Some(rowset) = self.dvs.get(&table_id) {
if let Some(dvs) = rowset.get(&rowset_id) {
return Some(dvs);
}
}
None
}
pub fn get_rowsets_of(&self, table_id: u32) -> Option<&HashSet<u32>> {
if let Some(rowset) = self.rowsets.get(&table_id) {
return Some(rowset);
}
None
}
}
#[derive(Default)]
pub struct VersionManagerInner {
status: HashMap<u64, Arc<Snapshot>>,
rowsets: HashMap<(u32, u32), Arc<DiskRowset>>,
dvs: HashMap<(u32, u64), Arc<DeleteVector>>,
ref_cnt: HashMap<u64, usize>,
rowset_deletion_to_apply: HashMap<u64, Vec<(u32, u32)>>,
epoch: u64,
}
pub struct VersionManager {
inner: Arc<PLMutex<VersionManagerInner>>,
manifest: Mutex<Manifest>,
tx: tokio::sync::mpsc::UnboundedSender<()>,
rx: PLMutex<Option<tokio::sync::mpsc::UnboundedReceiver<()>>>,
storage_options: Arc<StorageOptions>,
}
impl VersionManager {
pub fn new(manifest: Manifest, storage_options: Arc<StorageOptions>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Self {
manifest: Mutex::new(manifest),
inner: Arc::new(PLMutex::new(VersionManagerInner::default())),
tx,
rx: PLMutex::new(Some(rx)),
storage_options,
}
}
pub async fn commit_changes(&self, ops: Vec<EpochOp>) -> StorageResult<u64> {
let mut manifest = self.manifest.lock().await;
let mut snapshot;
let mut entries;
let current_epoch;
let mut rowset_deletion_to_apply = vec![];
{
let mut inner = self.inner.lock();
current_epoch = inner.epoch;
snapshot = inner
.status
.get(¤t_epoch)
.map(|x| x.as_ref().clone())
.unwrap_or_default();
entries = Vec::with_capacity(ops.len());
for op in ops {
match op {
EpochOp::CreateTable(entry) => {
entries.push(ManifestOperation::CreateTable(entry))
}
EpochOp::DropTable(entry) => entries.push(ManifestOperation::DropTable(entry)),
EpochOp::AddRowSet((entry, rowset)) => {
inner
.rowsets
.insert((entry.table_id.table_id, entry.rowset_id), Arc::new(rowset));
snapshot.add_rowset(entry.table_id.table_id, entry.rowset_id);
entries.push(ManifestOperation::AddRowSet(entry));
}
EpochOp::DeleteRowSet(entry) => {
rowset_deletion_to_apply.push((entry.table_id.table_id, entry.rowset_id));
snapshot.delete_rowset(entry.table_id.table_id, entry.rowset_id);
entries.push(ManifestOperation::DeleteRowSet(entry));
}
EpochOp::AddDV((entry, dv)) => {
inner
.dvs
.insert((entry.table_id.table_id, entry.dv_id), Arc::new(dv));
snapshot.add_dv(entry.table_id.table_id, entry.rowset_id, entry.dv_id);
entries.push(ManifestOperation::AddDV(entry));
}
EpochOp::DeleteDV(entry) => {
snapshot.delete_dv(entry.table_id.table_id, entry.rowset_id, entry.dv_id);
entries.push(ManifestOperation::DeleteDV(entry));
}
}
}
}
manifest.append(&entries).await?;
let mut inner = self.inner.lock();
assert_eq!(inner.epoch, current_epoch);
inner.epoch += 1;
let epoch = inner.epoch;
inner.status.insert(epoch, Arc::new(snapshot));
inner
.rowset_deletion_to_apply
.insert(epoch, rowset_deletion_to_apply);
Ok(epoch)
}
pub fn pin(&self) -> Arc<Version> {
let mut inner = self.inner.lock();
let epoch = inner.epoch;
*inner.ref_cnt.entry(epoch).or_default() += 1;
Arc::new(Version {
epoch,
snapshot: inner.status.get(&epoch).unwrap().clone(),
inner: self.inner.clone(),
tx: self.tx.clone(),
})
}
pub fn get_rowset(&self, table_id: u32, rowset_id: u32) -> Arc<DiskRowset> {
let inner = self.inner.lock();
inner.rowsets.get(&(table_id, rowset_id)).unwrap().clone()
}
pub fn get_dv(&self, table_id: u32, dv_id: u64) -> Arc<DeleteVector> {
let inner = self.inner.lock();
inner.dvs.get(&(table_id, dv_id)).unwrap().clone()
}
pub async fn find_vacuum(self: &Arc<Self>) -> StorageResult<Vec<(u32, u32)>> {
let mut inner = self.inner.lock();
let min_pinned_epoch = inner.ref_cnt.keys().min().cloned();
let vacuum_epoch = min_pinned_epoch.unwrap_or(inner.epoch);
let can_apply = |epoch, vacuum_epoch| epoch <= vacuum_epoch;
let mut deletions = vec![];
for (epoch, deletion) in &inner.rowset_deletion_to_apply {
if can_apply(*epoch, vacuum_epoch) {
deletions.extend(deletion.iter().cloned());
}
}
inner
.rowset_deletion_to_apply
.retain(|k, _| !can_apply(*k, vacuum_epoch));
for deletion in &deletions {
if let Some(rowset) = inner.rowsets.remove(deletion) {
match Arc::try_unwrap(rowset) {
Ok(rowset) => drop(rowset),
Err(_) => panic!("rowset {:?} is still being used", deletion),
}
} else {
warn!("duplicated deletion dectected, but we can't solve this issue for now -- see https://github.com/risinglightdb/risinglight/issues/566 for more information.");
}
}
Ok(deletions)
}
pub async fn do_vacuum(self: &Arc<Self>) -> StorageResult<()> {
let deletions = self.find_vacuum().await?;
for (table_id, rowset_id) in deletions {
let path = self
.storage_options
.path
.join(format!("{}_{}", table_id, rowset_id));
info!("vacuum {}_{}", table_id, rowset_id);
if !self.storage_options.disable_all_disk_operation {
tokio::fs::remove_dir_all(path).await?;
}
}
Ok(())
}
pub async fn run(
self: &Arc<Self>,
mut stop: tokio::sync::mpsc::UnboundedReceiver<()>,
) -> StorageResult<()> {
let mut vacuum_notifier = self.rx.lock().take().unwrap();
loop {
select! {
Some(_) = vacuum_notifier.recv() => self.do_vacuum().await?,
Some(_) = stop.recv() => break
}
}
Ok(())
}
}
pub struct Version {
pub epoch: u64,
pub snapshot: Arc<Snapshot>,
inner: Arc<PLMutex<VersionManagerInner>>,
tx: tokio::sync::mpsc::UnboundedSender<()>,
}
impl Drop for Version {
fn drop(&mut self) {
let mut inner = self.inner.lock();
let ref_cnt = inner
.ref_cnt
.get_mut(&self.epoch)
.expect("epoch not registered!");
*ref_cnt -= 1;
if *ref_cnt == 0 {
inner.ref_cnt.remove(&self.epoch).unwrap();
if self.epoch != inner.epoch {
self.tx.send(()).unwrap();
}
}
}
}