use super::{BranchId, Commit, CommitId, NO_COMMIT};
use crate::basic::persistent_btree::{EMPTY_ROOT, NodeId, PersistentBTree};
use crate::common::ende::{KeyEnDeOrdered, ValueEnDe};
use crate::common::error::{Result, VsdbError};
use crate::{Mapx, MapxOrd, Orphan};
use ruc::{RucResult, pnk};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
use std::ops::Bound;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BranchState {
name: String,
head: CommitId,
dirty_root: NodeId,
}
#[derive(Clone, Debug)]
pub struct VerMap<K, V> {
tree: PersistentBTree,
commits: MapxOrd<u64, Commit>,
branches: MapxOrd<u64, BranchState>,
branch_names: Mapx<String, u64>,
next_commit: Orphan<u64>,
next_branch: Orphan<u64>,
main_branch: Orphan<u64>,
gc_dirty: Orphan<bool>,
_phantom: PhantomData<(K, V)>,
}
impl<K, V> Serialize for VerMap<K, V> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(8)?;
t.serialize_element(&self.tree)?;
t.serialize_element(&self.commits)?;
t.serialize_element(&self.branches)?;
t.serialize_element(&self.branch_names)?;
t.serialize_element(&self.next_commit)?;
t.serialize_element(&self.next_branch)?;
t.serialize_element(&self.main_branch)?;
t.serialize_element(&self.gc_dirty)?;
t.end()
}
}
impl<'de, K, V> Deserialize<'de> for VerMap<K, V> {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Vis<K, V>(PhantomData<(K, V)>);
impl<'de, K, V> serde::de::Visitor<'de> for Vis<K, V> {
type Value = VerMap<K, V>;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("VerMap")
}
fn visit_seq<A: serde::de::SeqAccess<'de>>(
self,
mut seq: A,
) -> std::result::Result<VerMap<K, V>, A::Error> {
let tree = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
let commits = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
let branches = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
let branch_names = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
let next_commit = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(4, &self))?;
let next_branch = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(5, &self))?;
let main_branch = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(6, &self))?;
let gc_dirty = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(7, &self))?;
let mut m = VerMap {
tree,
commits,
branches,
branch_names,
next_commit,
next_branch,
main_branch,
gc_dirty,
_phantom: PhantomData,
};
m.rebuild_tree_ref_counts();
Ok(m)
}
}
deserializer.deserialize_tuple(8, Vis(PhantomData))
}
}
impl<K, V> VerMap<K, V> {
fn rebuild_tree_ref_counts(&mut self) {
let mut live_roots: Vec<NodeId> =
self.commits.iter().map(|(_, c)| c.root).collect();
for (_, s) in self.branches.iter() {
if s.dirty_root != EMPTY_ROOT {
live_roots.push(s.dirty_root);
}
}
self.tree.rebuild_ref_counts(&live_roots);
}
}
impl<K, V> Default for VerMap<K, V>
where
K: KeyEnDeOrdered,
V: ValueEnDe,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> VerMap<K, V>
where
K: KeyEnDeOrdered,
V: ValueEnDe,
{
pub fn new() -> Self {
Self::new_with_main("main")
}
pub fn instance_id(&self) -> u64 {
self.tree.instance_id()
}
pub fn save_meta(&self) -> Result<u64> {
let id = self.instance_id();
crate::common::save_instance_meta(id, self)?;
Ok(id)
}
pub fn from_meta(instance_id: u64) -> Result<Self> {
crate::common::load_instance_meta(instance_id)
}
pub fn new_with_main(name: &str) -> Self {
let mut branches: MapxOrd<u64, BranchState> = MapxOrd::new();
let mut branch_names: Mapx<String, u64> = Mapx::new();
let initial_id: BranchId = 1;
let main = BranchState {
name: name.into(),
head: NO_COMMIT,
dirty_root: EMPTY_ROOT,
};
branches.insert(&initial_id, &main);
branch_names.insert(&name.to_string(), &initial_id);
Self {
tree: PersistentBTree::new(),
commits: MapxOrd::new(),
branches,
branch_names,
next_commit: Orphan::new(1), next_branch: Orphan::new(initial_id + 1),
main_branch: Orphan::new(initial_id),
gc_dirty: Orphan::new(false),
_phantom: PhantomData,
}
}
fn get_branch(&self, id: BranchId) -> Result<BranchState> {
self.branches
.get(&id)
.ok_or(VsdbError::BranchNotFound { branch_id: id })
}
fn get_commit_inner(&self, id: CommitId) -> Result<Commit> {
self.commits
.get(&id)
.ok_or(VsdbError::CommitNotFound { commit_id: id })
}
pub fn main_branch(&self) -> BranchId {
self.main_branch.get_value()
}
pub fn set_main_branch(&mut self, branch: BranchId) -> Result<()> {
self.get_branch(branch)?;
*self.main_branch.get_mut() = branch;
Ok(())
}
pub fn create_branch(
&mut self,
name: &str,
source_branch: BranchId,
) -> Result<BranchId> {
if self.branch_names.contains_key(&name.to_string()) {
return Err(VsdbError::BranchAlreadyExists {
name: name.to_string(),
});
}
let src = self.get_branch(source_branch)?;
let id = self.next_branch.get_value();
*self.next_branch.get_mut() = id + 1;
let state = BranchState {
name: name.into(),
head: src.head,
dirty_root: src.dirty_root,
};
self.branches.insert(&id, &state);
self.branch_names.insert(&name.to_string(), &id);
self.increment_ref(src.head);
self.tree.acquire_node(src.dirty_root);
Ok(id)
}
pub fn delete_branch(&mut self, branch: BranchId) -> Result<()> {
if branch == self.main_branch.get_value() {
return Err(VsdbError::CannotDeleteMainBranch);
}
let state = self.get_branch(branch)?;
let dead_head = state.head;
let dead_dirty = state.dirty_root;
self.branch_names.remove(&state.name);
self.branches.remove(&branch);
self.tree.release_node(dead_dirty);
self.decrement_ref(dead_head);
Ok(())
}
pub fn list_branches(&self) -> Vec<(BranchId, String)> {
self.branches.iter().map(|(id, s)| (id, s.name)).collect()
}
pub fn branch_id(&self, name: &str) -> Option<BranchId> {
self.branch_names.get(&name.to_string())
}
pub fn branch_name(&self, branch: BranchId) -> Option<String> {
self.branches.get(&branch).map(|s| s.name)
}
pub fn has_uncommitted(&self, branch: BranchId) -> Result<bool> {
let state = self.get_branch(branch)?;
if state.head == NO_COMMIT {
Ok(state.dirty_root != EMPTY_ROOT)
} else {
let head_root = self.get_commit_inner(state.head)?.root;
Ok(state.dirty_root != head_root)
}
}
pub fn get(&self, branch: BranchId, key: &K) -> Result<Option<V>> {
let state = self.get_branch(branch)?;
let raw = self.tree.get(state.dirty_root, &key.to_bytes());
match raw {
Some(v) => Ok(Some(pnk!(V::decode(&v)))),
None => Ok(None),
}
}
pub fn get_at_commit(&self, commit_id: CommitId, key: &K) -> Result<Option<V>> {
let commit = self.get_commit_inner(commit_id)?;
let raw = self.tree.get(commit.root, &key.to_bytes());
match raw {
Some(v) => Ok(Some(pnk!(V::decode(&v)))),
None => Ok(None),
}
}
pub fn contains_key(&self, branch: BranchId, key: &K) -> Result<bool> {
let state = self.get_branch(branch)?;
Ok(self.tree.contains_key(state.dirty_root, &key.to_bytes()))
}
pub fn iter(&self, branch: BranchId) -> Result<impl Iterator<Item = (K, V)> + '_> {
let state = self.get_branch(branch)?;
Ok(self
.tree
.iter(state.dirty_root)
.map(|(k, v)| (pnk!(K::from_slice(&k)), pnk!(V::decode(&v)))))
}
pub fn range(
&self,
branch: BranchId,
lo: Bound<&K>,
hi: Bound<&K>,
) -> Result<impl Iterator<Item = (K, V)> + '_> {
let state = self.get_branch(branch)?;
let lo_raw = match lo {
Bound::Included(k) => Bound::Included(k.to_bytes()),
Bound::Excluded(k) => Bound::Excluded(k.to_bytes()),
Bound::Unbounded => Bound::Unbounded,
};
let hi_raw = match hi {
Bound::Included(k) => Bound::Included(k.to_bytes()),
Bound::Excluded(k) => Bound::Excluded(k.to_bytes()),
Bound::Unbounded => Bound::Unbounded,
};
Ok(self
.tree
.range(
state.dirty_root,
lo_raw.as_ref().map(|v| v.as_slice()),
hi_raw.as_ref().map(|v| v.as_slice()),
)
.map(|(k, v)| (pnk!(K::from_slice(&k)), pnk!(V::decode(&v)))))
}
pub fn iter_at_commit(
&self,
commit_id: CommitId,
) -> Result<impl Iterator<Item = (K, V)> + '_> {
let commit = self.get_commit_inner(commit_id)?;
Ok(self
.tree
.iter(commit.root)
.map(|(k, v)| (pnk!(K::from_slice(&k)), pnk!(V::decode(&v)))))
}
pub fn range_at_commit(
&self,
commit_id: CommitId,
lo: Bound<&K>,
hi: Bound<&K>,
) -> Result<impl Iterator<Item = (K, V)> + '_> {
let commit = self.get_commit_inner(commit_id)?;
let lo_raw = match lo {
Bound::Included(k) => Bound::Included(k.to_bytes()),
Bound::Excluded(k) => Bound::Excluded(k.to_bytes()),
Bound::Unbounded => Bound::Unbounded,
};
let hi_raw = match hi {
Bound::Included(k) => Bound::Included(k.to_bytes()),
Bound::Excluded(k) => Bound::Excluded(k.to_bytes()),
Bound::Unbounded => Bound::Unbounded,
};
Ok(self
.tree
.range(
commit.root,
lo_raw.as_ref().map(|v| v.as_slice()),
hi_raw.as_ref().map(|v| v.as_slice()),
)
.map(|(k, v)| (pnk!(K::from_slice(&k)), pnk!(V::decode(&v)))))
}
pub fn raw_iter(
&self,
branch: BranchId,
) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)> + '_> {
let state = self.get_branch(branch)?;
Ok(self.tree.iter(state.dirty_root))
}
pub fn raw_iter_at_commit(
&self,
commit_id: CommitId,
) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)> + '_> {
let commit = self.get_commit_inner(commit_id)?;
Ok(self.tree.iter(commit.root))
}
pub fn contains_key_at_commit(&self, commit_id: CommitId, key: &K) -> Result<bool> {
let commit = self.get_commit_inner(commit_id)?;
Ok(self.tree.contains_key(commit.root, &key.to_bytes()))
}
pub fn insert(&mut self, branch: BranchId, key: &K, value: &V) -> Result<()> {
let mut state = self.get_branch(branch)?;
let old_root = state.dirty_root;
state.dirty_root = self.tree.insert(old_root, &key.to_bytes(), &value.encode());
self.tree.acquire_node(state.dirty_root);
self.tree.release_node(old_root);
self.branches.insert(&branch, &state);
Ok(())
}
pub fn remove(&mut self, branch: BranchId, key: &K) -> Result<()> {
let mut state = self.get_branch(branch)?;
let old_root = state.dirty_root;
state.dirty_root = self.tree.remove(old_root, &key.to_bytes());
self.tree.acquire_node(state.dirty_root);
self.tree.release_node(old_root);
self.branches.insert(&branch, &state);
Ok(())
}
pub fn commit(&mut self, branch: BranchId) -> Result<CommitId> {
let state = self.get_branch(branch)?;
*self.gc_dirty.get_mut() = true;
let id = self.next_commit.get_value();
*self.next_commit.get_mut() = id + 1;
let parents = if state.head == NO_COMMIT {
vec![]
} else {
vec![state.head]
};
let commit = Commit {
id,
root: state.dirty_root,
parents,
timestamp_us: now_us(),
ref_count: 1,
};
self.commits.insert(&id, &commit);
self.tree.acquire_node(state.dirty_root);
let new_state = BranchState { head: id, ..state };
self.branches.insert(&branch, &new_state);
*self.gc_dirty.get_mut() = false;
Ok(id)
}
pub fn discard(&mut self, branch: BranchId) -> Result<()> {
let state = self.get_branch(branch)?;
let old_dirty = state.dirty_root;
let root = if state.head == NO_COMMIT {
EMPTY_ROOT
} else {
self.get_commit_inner(state.head)?.root
};
let new_state = BranchState {
dirty_root: root,
..state
};
self.tree.acquire_node(root);
self.tree.release_node(old_dirty);
self.branches.insert(&branch, &new_state);
Ok(())
}
pub fn rollback_to(&mut self, branch: BranchId, target: CommitId) -> Result<()> {
let state = self.get_branch(branch)?;
let _ = self.get_commit_inner(target)?;
if state.head != NO_COMMIT && target != state.head {
let mut queue = vec![state.head];
let mut visited = HashSet::new();
let mut found = false;
while let Some(cur) = queue.pop() {
if cur == NO_COMMIT || !visited.insert(cur) {
continue;
}
if cur == target {
found = true;
break;
}
if let Some(c) = self.commits.get(&cur) {
queue.extend_from_slice(&c.parents);
}
}
if !found {
return Err(VsdbError::Other {
detail: "target commit is not an ancestor of this branch's head"
.into(),
});
}
}
*self.gc_dirty.get_mut() = true;
let commit = self.get_commit_inner(target)?;
let old_head = state.head;
let old_dirty = state.dirty_root;
let new_state = BranchState {
name: state.name,
head: target,
dirty_root: commit.root,
};
self.branches.insert(&branch, &new_state);
self.tree.acquire_node(commit.root);
self.tree.release_node(old_dirty);
self.increment_ref(target);
self.decrement_ref(old_head);
*self.gc_dirty.get_mut() = false;
Ok(())
}
pub fn merge(&mut self, source: BranchId, target: BranchId) -> Result<CommitId> {
if source == target {
return Err(VsdbError::Other {
detail: "cannot merge a branch into itself".into(),
});
}
if self.has_uncommitted(source)? {
return Err(VsdbError::UncommittedChanges { branch_id: source });
}
if self.has_uncommitted(target)? {
return Err(VsdbError::UncommittedChanges { branch_id: target });
}
let src = self.get_branch(source)?;
let tgt = self.get_branch(target)?;
if src.head == NO_COMMIT {
return Err(VsdbError::Other {
detail: format!("source branch {source} has no commits"),
});
}
*self.gc_dirty.get_mut() = true;
if tgt.head == NO_COMMIT {
let src_commit = self.get_commit_inner(src.head)?;
let new_state = BranchState {
head: src.head,
dirty_root: src_commit.root,
..tgt
};
self.branches.insert(&target, &new_state);
self.increment_ref(src.head);
self.tree.acquire_node(src_commit.root);
self.tree.release_node(tgt.dirty_root);
*self.gc_dirty.get_mut() = false;
return Ok(src.head);
}
let src_commit = self.get_commit_inner(src.head)?;
let tgt_commit = self.get_commit_inner(tgt.head)?;
let ancestor_id = self.find_common_ancestor(src.head, tgt.head);
let ancestor_root = match ancestor_id {
Some(aid) => self.get_commit_inner(aid)?.root,
None => EMPTY_ROOT,
};
let merged_root = super::merge::three_way_merge(
&mut self.tree,
ancestor_root,
src_commit.root,
tgt_commit.root,
);
let id = self.next_commit.get_value();
*self.next_commit.get_mut() = id + 1;
let commit = Commit {
id,
root: merged_root,
parents: vec![tgt.head, src.head],
timestamp_us: now_us(),
ref_count: 1,
};
self.commits.insert(&id, &commit);
let new_state = BranchState {
head: id,
dirty_root: merged_root,
..tgt
};
self.branches.insert(&target, &new_state);
self.tree.acquire_node(merged_root); self.tree.acquire_node(merged_root); self.tree.release_node(tgt.dirty_root);
self.increment_ref(src.head);
*self.gc_dirty.get_mut() = false;
Ok(id)
}
fn find_common_ancestor(&self, a: CommitId, b: CommitId) -> Option<CommitId> {
let mut visited_a = HashSet::new();
let mut visited_b = HashSet::new();
let mut queue_a = vec![a];
let mut queue_b = vec![b];
loop {
if queue_a.is_empty() && queue_b.is_empty() {
return None;
}
let drain_end = queue_a.len();
for i in 0..drain_end {
let id = queue_a[i];
if id == NO_COMMIT || !visited_a.insert(id) {
continue;
}
if visited_b.contains(&id) {
return Some(id);
}
if let Some(c) = self.commits.get(&id) {
queue_a.extend_from_slice(&c.parents);
}
}
queue_a.drain(..drain_end);
let drain_end = queue_b.len();
for i in 0..drain_end {
let id = queue_b[i];
if id == NO_COMMIT || !visited_b.insert(id) {
continue;
}
if visited_a.contains(&id) {
return Some(id);
}
if let Some(c) = self.commits.get(&id) {
queue_b.extend_from_slice(&c.parents);
}
}
queue_b.drain(..drain_end);
}
}
pub fn fork_point(&self, a: CommitId, b: CommitId) -> Option<CommitId> {
self.find_common_ancestor(a, b)
}
pub fn commit_distance(&self, from: CommitId, ancestor: CommitId) -> Option<u64> {
let mut cur = from;
let mut count = 0u64;
while cur != ancestor {
if cur == NO_COMMIT {
return None;
}
let c = self.commits.get(&cur)?;
cur = c.parents.first().copied().unwrap_or(NO_COMMIT);
count += 1;
}
Some(count)
}
pub fn get_commit(&self, commit_id: CommitId) -> Option<Commit> {
self.commits.get(&commit_id)
}
pub fn head_commit(&self, branch: BranchId) -> Result<Option<Commit>> {
let state = self.get_branch(branch)?;
if state.head == NO_COMMIT {
Ok(None)
} else {
Ok(self.commits.get(&state.head))
}
}
pub fn log(&self, branch: BranchId) -> Result<Vec<Commit>> {
let state = self.get_branch(branch)?;
let mut result = Vec::new();
let mut cur = state.head;
while cur != NO_COMMIT {
if let Some(c) = self.commits.get(&cur) {
cur = c.parents.first().copied().unwrap_or(NO_COMMIT);
result.push(c);
} else {
break;
}
}
Ok(result)
}
pub fn diff_commits(
&self,
from: CommitId,
to: CommitId,
) -> Result<Vec<super::diff::DiffEntry>> {
let from_commit = self.get_commit_inner(from)?;
let to_commit = self.get_commit_inner(to)?;
Ok(super::diff::diff_roots(
&self.tree,
from_commit.root,
to_commit.root,
))
}
pub fn diff_uncommitted(
&self,
branch: BranchId,
) -> Result<Vec<super::diff::DiffEntry>> {
let state = self.get_branch(branch)?;
let head_root = if state.head == NO_COMMIT {
EMPTY_ROOT
} else {
self.get_commit_inner(state.head)?.root
};
Ok(super::diff::diff_roots(
&self.tree,
head_root,
state.dirty_root,
))
}
pub fn gc(&mut self) {
if self.gc_dirty.get_value()
|| self.commits.iter().any(|(_, c)| c.ref_count == 0)
{
self.rebuild_ref_counts();
}
let mut live_roots: Vec<NodeId> =
self.commits.iter().map(|(_, c)| c.root).collect();
for (_, s) in self.branches.iter() {
if s.dirty_root != EMPTY_ROOT {
live_roots.push(s.dirty_root);
}
}
self.tree.gc(&live_roots);
}
pub fn branch(&self, id: BranchId) -> Result<super::handle::Branch<'_, K, V>> {
self.get_branch(id)?;
Ok(super::handle::Branch { map: self, id })
}
pub fn branch_mut(
&mut self,
id: BranchId,
) -> Result<super::handle::BranchMut<'_, K, V>> {
self.get_branch(id)?;
Ok(super::handle::BranchMut { map: self, id })
}
pub fn main(&self) -> super::handle::Branch<'_, K, V> {
super::handle::Branch {
map: self,
id: self.main_branch(),
}
}
pub fn main_mut(&mut self) -> super::handle::BranchMut<'_, K, V> {
let id = self.main_branch();
super::handle::BranchMut { map: self, id }
}
fn increment_ref(&mut self, commit_id: CommitId) {
if commit_id == NO_COMMIT {
return;
}
if let Some(mut c) = self.commits.get(&commit_id) {
c.ref_count += 1;
self.commits.insert(&commit_id, &c);
}
}
fn decrement_ref(&mut self, commit_id: CommitId) {
if commit_id == NO_COMMIT {
return;
}
*self.gc_dirty.get_mut() = true;
let mut work = vec![commit_id];
while let Some(id) = work.pop() {
if id == NO_COMMIT {
continue;
}
let Some(mut c) = self.commits.get(&id) else {
continue; };
c.ref_count = c.ref_count.saturating_sub(1);
if c.ref_count == 0 {
let parents = c.parents.clone();
self.tree.release_node(c.root);
self.commits.remove(&id);
work.extend(parents);
} else {
self.commits.insert(&id, &c);
}
}
*self.gc_dirty.get_mut() = false;
}
fn rebuild_ref_counts(&mut self) {
let mut reachable = HashSet::new();
let mut ref_counts: HashMap<CommitId, u32> = HashMap::new();
let mut queue: Vec<CommitId> = Vec::new();
for (_, s) in self.branches.iter() {
if s.head != NO_COMMIT {
*ref_counts.entry(s.head).or_insert(0) += 1;
queue.push(s.head);
}
}
while let Some(id) = queue.pop() {
if !reachable.insert(id) {
continue;
}
if let Some(c) = self.commits.get(&id) {
for &parent in &c.parents {
if parent != NO_COMMIT {
*ref_counts.entry(parent).or_insert(0) += 1;
queue.push(parent);
}
}
}
}
for &id in &reachable {
if let Some(mut c) = self.commits.get(&id) {
let correct = *ref_counts.get(&id).unwrap_or(&0);
if c.ref_count != correct {
c.ref_count = correct;
self.commits.insert(&id, &c);
}
}
}
let all_ids: Vec<u64> = self.commits.iter().map(|(id, _)| id).collect();
for id in all_ids {
if !reachable.contains(&id) {
self.commits.remove(&id);
}
}
*self.gc_dirty.get_mut() = false;
}
}
fn now_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}