use crate::core::universe::UniverseSnapshot;
use crate::core::{SharedUniverse, Type, TypeId, TypeUniverse};
use super::session::{
CommitError, CommitResult, Conflict, ConflictReason, IsolationLevel, RollbackError,
};
use im::HashMap as ImHashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct Branch {
parent: BranchParent,
isolation_level: IsolationLevel,
universe: SharedUniverse,
local_types: RwLock<ImHashMap<TypeId, Arc<Type>>>,
checkpoint: RwLock<Option<UniverseSnapshot>>,
committed: RwLock<bool>,
base_snapshot: UniverseSnapshot,
}
enum BranchParent {
Universe(SharedUniverse),
Branch(Arc<RwLock<Branch>>),
}
impl Branch {
pub async fn new(parent: SharedUniverse, isolation_level: IsolationLevel) -> Self {
let base_snapshot = create_snapshot(&parent).await;
let universe = match isolation_level {
IsolationLevel::Full => {
Arc::new(TypeUniverse::new()) }
IsolationLevel::SharedRead | IsolationLevel::Snapshot => {
parent.clone()
}
};
Self {
parent: BranchParent::Universe(parent),
isolation_level,
universe,
local_types: RwLock::new(ImHashMap::new()),
checkpoint: RwLock::new(None),
committed: RwLock::new(false),
base_snapshot,
}
}
pub fn universe(&self) -> &SharedUniverse {
&self.universe
}
pub async fn get_type(&self, id: TypeId) -> Option<Arc<Type>> {
if let Some(local) = self.local_types.read().await.get(&id).cloned() {
return Some(local);
}
match &self.parent {
BranchParent::Universe(u) => u.get_type(id),
BranchParent::Branch(_) => None, }
}
pub async fn insert_type(&self, id: TypeId, typ: Arc<Type>) {
let mut local = self.local_types.write().await;
*local = local.update(id, typ);
}
pub async fn checkpoint(&self) {
let snapshot = create_snapshot(&self.universe).await;
*self.checkpoint.write().await = Some(snapshot);
}
pub async fn rollback(&self) -> Result<(), RollbackError> {
if self.checkpoint.read().await.is_some() {
*self.local_types.write().await = ImHashMap::new();
Ok(())
} else {
Err(RollbackError::NothingToRollback)
}
}
pub async fn commit(&self) -> Result<CommitResult, CommitError> {
if *self.committed.read().await {
return Err(CommitError::AlreadyCommitted);
}
let local_types = self.local_types.read().await.clone();
let mut conflicts = Vec::new();
for (type_id, local_type) in local_types.iter() {
if let Some(parent_type) = self.get_parent_type(*type_id).await {
if let Some(base_type) = self.base_snapshot.types.get(type_id) {
if !Arc::ptr_eq(&parent_type, &Arc::new(base_type.clone())) {
conflicts.push(Conflict {
type_id: *type_id,
reason: ConflictReason::ConcurrentModification,
});
continue;
}
}
if !types_compatible(&local_type, &parent_type) {
conflicts.push(Conflict {
type_id: *type_id,
reason: ConflictReason::TypeMismatch,
});
}
}
}
if !conflicts.is_empty() {
return Err(CommitError::ValidationFailed(
conflicts
.iter()
.map(|c| format!("{:?}", c.reason))
.collect(),
));
}
match &self.parent {
BranchParent::Universe(u) => {
for (type_id, typ) in local_types.iter() {
u.insert_type(*type_id, typ.clone());
}
}
BranchParent::Branch(b) => {
let parent_branch = b.read().await;
for (type_id, typ) in local_types.iter() {
parent_branch.insert_type(*type_id, typ.clone()).await;
}
}
}
*self.committed.write().await = true;
Ok(CommitResult {
types_added: local_types.len(),
types_modified: 0, conflicts,
})
}
async fn get_parent_type(&self, id: TypeId) -> Option<Arc<Type>> {
match &self.parent {
BranchParent::Universe(u) => u.get_type(id),
BranchParent::Branch(b) => b.read().await.get_type(id).await,
}
}
pub async fn is_committed(&self) -> bool {
*self.committed.read().await
}
pub async fn local_changes(&self) -> usize {
self.local_types.read().await.len()
}
}
async fn create_snapshot(_universe: &SharedUniverse) -> UniverseSnapshot {
UniverseSnapshot::empty()
}
fn types_compatible(a: &Type, b: &Type) -> bool {
a.id == b.id || a.fingerprint == b.fingerprint
}
pub struct BranchManager {
branches: RwLock<im::HashMap<super::session::SessionId, Arc<RwLock<Branch>>>>,
max_branches: usize,
}
impl BranchManager {
pub fn new(max_branches: usize) -> Self {
Self {
branches: RwLock::new(im::HashMap::new()),
max_branches,
}
}
pub async fn create_branch(
&self,
session_id: super::session::SessionId,
parent: SharedUniverse,
isolation: IsolationLevel,
) -> Result<Arc<RwLock<Branch>>, BranchError> {
let branches = self.branches.read().await;
if branches.len() >= self.max_branches {
return Err(BranchError::MaxBranchesReached);
}
drop(branches);
let branch = Arc::new(RwLock::new(Branch::new(parent, isolation).await));
let mut branches = self.branches.write().await;
*branches = branches.update(session_id, branch.clone());
Ok(branch)
}
pub async fn get_branch(
&self,
session_id: super::session::SessionId,
) -> Option<Arc<RwLock<Branch>>> {
self.branches.read().await.get(&session_id).cloned()
}
pub async fn remove_branch(&self, session_id: super::session::SessionId) {
let mut branches = self.branches.write().await;
*branches = branches.without(&session_id);
}
pub async fn active_branch_count(&self) -> usize {
self.branches.read().await.len()
}
}
#[derive(Debug, Clone)]
pub enum BranchError {
MaxBranchesReached,
ParentNotFound,
InvalidIsolationLevel,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::TypeUniverse;
#[tokio::test]
async fn test_branch_creation() {
let universe = Arc::new(TypeUniverse::new());
let branch = Branch::new(universe, IsolationLevel::Full).await;
assert!(!branch.is_committed().await);
}
#[tokio::test]
async fn test_branch_local_changes() {
let universe = Arc::new(TypeUniverse::new());
let branch = Branch::new(universe, IsolationLevel::Full).await;
let type_id = TypeId(1000);
let typ = Arc::new(Type::new(
type_id,
crate::core::TypeKind::Primitive(crate::core::PrimitiveType::Int),
));
branch.insert_type(type_id, typ).await;
assert_eq!(branch.local_changes().await, 1);
}
}