#![allow(missing_docs)]
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use itertools::Itertools;
use thiserror::Error;
use crate::dag_walk;
use crate::op_store::{OpStore, OperationId};
use crate::operation::Operation;
#[derive(Debug, Error, PartialEq, Eq)]
pub enum OpHeadResolutionError<E> {
#[error("Operation log has no heads")]
NoHeads,
#[error("Op resolution error: {0}")]
Err(#[from] E),
}
pub trait OpHeadsStoreLock<'a> {
fn promote_new_op(&self, new_op: &Operation);
}
pub trait OpHeadsStore: Send + Sync + Debug {
fn name(&self) -> &str;
fn add_op_head(&self, id: &OperationId);
fn remove_op_head(&self, id: &OperationId);
fn get_op_heads(&self) -> Vec<OperationId>;
fn lock<'a>(&'a self) -> Box<dyn OpHeadsStoreLock<'a> + 'a>;
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> {
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
let op_heads = dag_walk::heads(
op_heads,
|op: &Operation| op.id().clone(),
|op: &Operation| op.parents(),
);
let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) {
self.remove_op_head(removed_op_head);
}
op_heads.into_iter().collect()
}
}
pub fn resolve_op_heads<E>(
op_heads_store: &dyn OpHeadsStore,
op_store: &Arc<dyn OpStore>,
resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
) -> Result<Operation, OpHeadResolutionError<E>> {
let mut op_heads = op_heads_store.get_op_heads();
if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(Operation::new(op_store.clone(), operation_id, operation));
}
let lock = op_heads_store.lock();
let op_head_ids = op_heads_store.get_op_heads();
if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_head_ids.len() == 1 {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
}
let op_heads = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect_vec();
let mut op_heads = op_heads_store.handle_ancestor_ops(op_heads);
if op_heads.len() == 1 {
return Ok(op_heads.pop().unwrap());
}
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
match resolver(op_heads) {
Ok(new_op) => {
lock.promote_new_op(&new_op);
Ok(new_op)
}
Err(e) => Err(OpHeadResolutionError::Err(e)),
}
}