#![doc(html_root_url = "https://docs.rs/graph_mvcc/0.2.0")]
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(non_snake_case)]
use std::fmt::{self, Display};
use uuid::Uuid;
use std::hash::Hash;
use std::collections::BTreeSet;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
pub type TxResult<T> = Result<T, TxError>;
#[derive(Debug, PartialEq)]
pub enum TxError {
Abort,
DatabaseFailure,
NodeNotFound,
ElementNotFound,
Collision(String),
InvalidRecord,
TransactionLocked,
}
impl Display for TxError {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
TxError::Abort => write!(f, "Transaction Aborted"),
TxError::DatabaseFailure => write!(f, "Database failure"),
TxError::NodeNotFound => write!(f, "Node not found"),
TxError::ElementNotFound => write!(f, "Element not found"),
TxError::Collision(ref msg) => write!(f, "Collision: {}", msg),
TxError::InvalidRecord => write!(f, "Invalid record"),
TxError::TransactionLocked => write!(f, "Transaction locked"),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum EdgeId {
String(String),
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct Edge {
pub id: EdgeId,
edgetype: String,
}
impl Edge {
fn new(typ: String) -> Self {
Edge {
id: EdgeId::String(Uuid::new_v4().to_string().chars().take(8).collect()),
edgetype: typ,
}
}
pub fn id(&self) -> &EdgeId {
&self.id
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum NodeId {
String(String),
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct Node {
id: NodeId,
}
impl Node {
fn new() -> Self {
Node {
id: NodeId::String(Uuid::new_v4().to_string().chars().take(8).collect()),
}
}
pub fn id(&self) -> &NodeId {
&self.id
}
}
#[derive(Debug, Clone)]
pub struct Graph {
nodes: HashMap<Node, HashSet<Edge>>,
adjacencylist: HashMap<Node, Vec<(Node, Edge)>>,
next_transaction_id: u32,
active_transactions: BTreeSet<u32>,
records: BTreeSet<BTreeMap<MVCC, u32>>,
}
impl Default for Graph {
fn default() -> Self {
Self::new()
}
}
impl Graph {
pub fn txid_current(self) -> u32 {
self.next_transaction_id
}
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
adjacencylist: HashMap::new(),
next_transaction_id : 0,
active_transactions : BTreeSet::new(),
records : BTreeSet::new(),
}
}
pub fn add_node(&mut self, t: &mut TransactionId) -> Node {
if t.snapshot.is_none() {
t.snapshot = Some(self.create_snapshot(t));
}
let minted_node = Node::new();
let node = minted_node.clone();
self.nodes.entry(minted_node).or_insert_with(HashSet::new);
t.read_locks.insert((node.id().clone(), "NODE_CREATION".to_string()));
node
}
pub fn add_edge(&mut self, t: &mut TransactionId, from: &Node, to: &Node, edge_type: String) -> TxResult<()> {
if t.snapshot.is_none() {
t.snapshot = Some(self.create_snapshot(t));
}
if self.has_collision_excluding_destination(from, to, &edge_type) {
return Err(TxError::Collision(format!("edge type '{}' already exists for source node", edge_type)));
}
t.read_locks.insert((from.id().clone(), edge_type.clone()));
t.read_locks.insert((to.id().clone(), edge_type.clone()));
let minted_edge = Edge::new(edge_type);
self.set_directed_edge(from, to, minted_edge.clone());
self.set_directed_edge(to, from, minted_edge);
Ok(())
}
pub fn set_directed_edge(&mut self, from: &Node, to: &Node, edge: Edge) {
let src_edge_dst = self.adjacencylist
.entry(from.clone()).or_default();
src_edge_dst.push((to.clone(), edge));
}
pub fn get_nodes_internal(&self, t: &mut TransactionId, origin: &Node, search_path: Vec<String>) -> Vec<Node> {
for edge_type in &search_path {
t.read_locks.insert((origin.id().clone(), edge_type.clone()));
}
if t.snapshot.is_none() {
t.snapshot = Some(self.create_snapshot(t));
}
self.traverse_with_snapshot(t, origin, search_path)
}
fn traverse_with_snapshot(&self, t: &TransactionId, origin: &Node, search_path: Vec<String>) -> Vec<Node> {
let type_path = TypePath {
graph: self,
current_node: Some(origin.clone()),
type_list: search_path,
path_list: VecDeque::new(),
};
type_path.into_iter().collect()
}
}
#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Clone)]
pub enum MVCC {
TransactionCreationId,
TransactionExpirationId,
TransactionExpired,
AddElementToTransaction,
DeleteElementFromTransaction,
ElementId,
}
#[derive(Debug, Clone)]
pub struct TransactionId {
pub txid: u32,
pub rollback_actions: BTreeSet<BTreeMap<MVCC, u32>>,
pub read_locks: HashSet<(NodeId, String)>, pub snapshot: Option<BTreeSet<BTreeMap<MVCC, u32>>>, }
impl TransactionId {
pub fn new(txid: u32) -> Self {
TransactionId {
txid,
rollback_actions: BTreeSet::new(),
read_locks: HashSet::new(),
snapshot: None,
}
}
}
pub struct TypePath<'graph> {
graph: &'graph Graph,
current_node: Option<Node>,
type_list: Vec<String>,
path_list: VecDeque<Node>,
}
impl<'graph> Iterator for TypePath<'graph> {
type Item = Node;
fn next(&mut self) -> Option<Node> {
if let Some(node) = self.current_node.take() {
let edge_list = self.graph.adjacencylist.get(&node)?;
if let Some(current_type) = self.type_list.pop() {
if let Some((node,edge)) = edge_list.iter().next() {
if edge.edgetype == current_type {
self.path_list.push_back(node.clone());
self.current_node = Some(node.clone());
return Some(node.clone());
}
}
}
}
self.current_node = None;
None
}
}
impl Graph {
pub fn start_transaction(&mut self) -> TransactionId {
self.next_transaction_id += 1;
self.active_transactions.insert(self.next_transaction_id);
TransactionId::new(self.next_transaction_id)
}
pub fn set_transaction_expiration(&mut self, pos: u32, n:u32) {
let mut i:u32 = 0;
for item in &self.records {
if i == pos {
let mut updated_item = item.clone();
updated_item.insert(MVCC::TransactionExpired, n);
break;
} else {
i+=1;
}
}
}
pub fn add_record(&mut self, t: &mut TransactionId, record: &mut BTreeMap<MVCC, u32>) {
record.insert(MVCC::TransactionCreationId, t.txid);
record.insert(MVCC::TransactionExpirationId, 0);
let mut action:BTreeMap<MVCC, u32> = BTreeMap::new();
action.insert(MVCC::DeleteElementFromTransaction, self.records.len() as u32);
t.rollback_actions.insert(action);
self.records.insert(record.clone());
}
pub fn delete_record(&mut self, t: &mut TransactionId, id: u32) -> TxResult<()> {
let mut records_to_update = Vec::new();
for (i, record) in self.records.iter().enumerate() {
if let Some(element_id) = record.get(&MVCC::ElementId) {
if self.record_is_visible(t, &record) && element_id == &id {
if self.row_is_locked(&record) {
return Err(TxError::TransactionLocked);
} else {
records_to_update.push((i, record.clone()));
}
}
}
}
if records_to_update.is_empty() {
return Err(TxError::ElementNotFound);
}
for (i, mut record) in records_to_update {
record.insert(MVCC::TransactionExpirationId, t.txid);
let mut new_rec: BTreeMap<MVCC,u32> = BTreeMap::new();
new_rec.insert(MVCC::AddElementToTransaction, i as u32);
t.rollback_actions.insert(new_rec);
self.records.replace(record);
}
Ok(())
}
fn record_is_visible(&self, t: &TransactionId, record: &BTreeMap<MVCC, u32>) -> bool {
if let Some(creation_id) = record.get(&MVCC::TransactionCreationId) {
if self.active_transactions.contains(creation_id) && creation_id != &t.txid {
return false;
}
}
if let Some(expiration_id) = record.get(&MVCC::TransactionExpirationId) {
if expiration_id != &0 {
if !self.active_transactions.contains(expiration_id) ||
record.get(&MVCC::TransactionCreationId) == Some(&t.txid) {
return false;
}
}
}
true
}
fn row_is_locked(&self, record: &BTreeMap<MVCC, u32>) -> bool {
if let Some(expiration_id) = record.get(&MVCC::TransactionExpirationId) {
expiration_id != &0 && self.active_transactions.contains(expiration_id)
} else {
false
}
}
pub fn update_record(&mut self, t: &mut TransactionId, id:u32, _num:String) -> TxResult<()> {
self.delete_record(t, id)?;
let mut new_modification_version: BTreeMap<MVCC,u32> = BTreeMap::new();
new_modification_version.insert(MVCC::ElementId, id);
self.add_record(t, &mut new_modification_version);
Ok(())
}
fn create_snapshot(&self, t: &TransactionId) -> BTreeSet<BTreeMap<MVCC, u32>> {
let mut visible_modifications = BTreeSet::new();
for records in self.records.iter() {
if self.record_is_visible(t, records) {
visible_modifications.insert(records.clone());
}
}
visible_modifications
}
pub fn commit_transaction(&mut self, t: &TransactionId) -> TxResult<()> {
if self.has_read_lock_conflicts(t) {
let _ = self.rollback_transaction(t);
return Err(TxError::Abort);
}
self.active_transactions.remove(&t.txid);
Ok(())
}
pub fn abort_transaction(&mut self, t: &TransactionId) -> TxResult<()> {
self.rollback_transaction(t)
}
fn has_read_lock_conflicts(&self, t: &TransactionId) -> bool {
for (node_id, edge_type) in &t.read_locks {
if self.has_conflicting_write(t, node_id, edge_type) {
return true;
}
}
false
}
fn has_conflicting_write(&self, t: &TransactionId, node_id: &NodeId, edge_type: &str) -> bool {
for record in &self.records {
if let (Some(&creation_id), Some(&expiration_id)) =
(record.get(&MVCC::TransactionCreationId), record.get(&MVCC::TransactionExpirationId)) {
if creation_id == t.txid {
continue;
}
if creation_id > t.txid && expiration_id == 0 {
return true;
}
}
}
false
}
fn rollback_transaction(&mut self, t: &TransactionId) -> TxResult<()> {
for action in t.rollback_actions.iter().rev() {
let mut map = action.iter();
if let Some((action_type, action_position)) = map.next() {
let pos:u32 = *action_position;
match action_type {
&MVCC::AddElementToTransaction =>
self.set_transaction_expiration(pos, 0),
&MVCC::DeleteElementFromTransaction =>
self.set_transaction_expiration(pos, t.txid),
_ => return Err(TxError::InvalidRecord)
}
}
}
self.active_transactions.remove(&t.txid);
Ok(())
}
}
pub trait IGraph {
fn start_transaction(&mut self) -> TransactionId;
fn commit_transaction(&mut self, transaction_id: TransactionId) -> TxResult<()>;
fn abort_transaction(&mut self, transaction_id: TransactionId) -> TxResult<()>;
fn add_node(&mut self, transaction_id: Option<TransactionId>) -> TxResult<NodeId>;
fn add_edge(&mut self, transaction_id: Option<TransactionId>, src: NodeId, dst: NodeId, edge_type: String) -> TxResult<()>;
fn get_nodes(&mut self, transaction_id: Option<TransactionId>, origin: NodeId, search_path: Vec<String>) -> TxResult<Vec<NodeId>>;
}
impl IGraph for Graph {
fn start_transaction(&mut self) -> TransactionId {
self.start_transaction()
}
fn commit_transaction(&mut self, transaction_id: TransactionId) -> TxResult<()> {
self.commit_transaction(&transaction_id)
}
fn abort_transaction(&mut self, transaction_id: TransactionId) -> TxResult<()> {
self.abort_transaction(&transaction_id)
}
fn add_node(&mut self, transaction_id: Option<TransactionId>) -> TxResult<NodeId> {
match transaction_id {
Some(mut txid) => {
let node = self.add_node(&mut txid);
Ok(node.id().clone())
},
None => {
let mut temp_txid = self.start_transaction();
let node = self.add_node(&mut temp_txid);
let node_id = node.id().clone();
self.commit_transaction(&temp_txid)?;
Ok(node_id)
}
}
}
fn add_edge(&mut self, transaction_id: Option<TransactionId>, src: NodeId, dst: NodeId, edge_type: String) -> TxResult<()> {
let src_node = self.find_node_by_id(&src).ok_or(TxError::NodeNotFound)?;
let dst_node = self.find_node_by_id(&dst).ok_or(TxError::NodeNotFound)?;
match transaction_id {
Some(mut txid) => {
self.add_edge(&mut txid, &src_node, &dst_node, edge_type)
},
None => {
let mut temp_txid = self.start_transaction();
self.add_edge(&mut temp_txid, &src_node, &dst_node, edge_type)?;
self.commit_transaction(&temp_txid)
}
}
}
fn get_nodes(&mut self, transaction_id: Option<TransactionId>, origin: NodeId, search_path: Vec<String>) -> TxResult<Vec<NodeId>> {
let origin_node = self.find_node_by_id(&origin).ok_or(TxError::NodeNotFound)?;
match transaction_id {
Some(mut txid) => {
let nodes = self.get_nodes_internal(&mut txid, &origin_node, search_path);
Ok(nodes.into_iter().map(|node| node.id().clone()).collect())
},
None => {
let mut temp_txid = self.start_transaction();
let nodes = self.get_nodes_internal(&mut temp_txid, &origin_node, search_path);
let node_ids: Vec<NodeId> = nodes.into_iter().map(|node| node.id().clone()).collect();
self.commit_transaction(&temp_txid)?;
Ok(node_ids)
}
}
}
}
impl Graph {
fn find_node_by_id(&self, node_id: &NodeId) -> Option<Node> {
for node in self.nodes.keys() {
if node.id() == node_id {
return Some(node.clone());
}
}
None
}
fn has_collision(&self, _txid: &TransactionId, node: &Node, edge_type: &str) -> bool {
self.has_collision_detailed(node, edge_type)
}
fn has_collision_detailed(&self, node: &Node, edge_type: &str) -> bool {
if let Some(edges) = self.adjacencylist.get(node) {
edges.iter().any(|(_, edge)| edge.edgetype == edge_type)
} else {
false
}
}
fn has_collision_in_snapshot(&self, t: &TransactionId, node: &Node, edge_type: &str) -> bool {
self.has_collision_detailed(node, edge_type)
}
fn has_undirected_edge(&self, from: &Node, to: &Node, edge_type: &str) -> bool {
if let Some(edges) = self.adjacencylist.get(from) {
if edges.iter().any(|(dest, edge)| dest == to && edge.edgetype == edge_type) {
return true;
}
}
if let Some(edges) = self.adjacencylist.get(to) {
if edges.iter().any(|(dest, edge)| dest == from && edge.edgetype == edge_type) {
return true;
}
}
false
}
fn has_collision_excluding_destination(&self, from: &Node, to: &Node, edge_type: &str) -> bool {
if let Some(edges) = self.adjacencylist.get(from) {
edges.iter().any(|(dest, edge)| dest != to && edge.edgetype == edge_type)
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_start_transaction() {
let mut graph = Graph::new();
let tx1 = graph.start_transaction();
let tx2 = graph.start_transaction();
assert_eq!(tx2.txid, tx1.txid + 1);
}
#[test]
fn test_add_node_with_transaction() {
let mut graph = Graph::new();
let mut tx = graph.start_transaction();
let node = graph.add_node(&mut tx);
assert!(matches!(node.id(), NodeId::String(_)));
assert!(tx.read_locks.contains(&(node.id().clone(), "NODE_CREATION".to_string())));
}
#[test]
fn test_add_node_without_transaction() {
let mut graph = Graph::new();
let node_id: NodeId = IGraph::add_node(&mut graph, None).unwrap();
assert!(matches!(node_id, NodeId::String(_)));
}
#[test]
fn test_add_edge_with_transaction() {
let mut graph = Graph::new();
let mut tx = graph.start_transaction();
let node1 = graph.add_node(&mut tx);
let node2 = graph.add_node(&mut tx);
let result = graph.add_edge(&mut tx, &node1, &node2, "CONNECTS".to_string());
assert!(result.is_ok());
assert!(tx.read_locks.contains(&(node1.id().clone(), "CONNECTS".to_string())));
assert!(tx.read_locks.contains(&(node2.id().clone(), "CONNECTS".to_string())));
}
#[test]
fn test_edge_collision_detection() {
let mut graph = Graph::new();
let mut tx = graph.start_transaction();
let node1 = graph.add_node(&mut tx);
let node2 = graph.add_node(&mut tx);
let node3 = graph.add_node(&mut tx);
let result1 = graph.add_edge(&mut tx, &node1, &node2, "SAME_TYPE".to_string());
assert!(result1.is_ok());
let result2 = graph.add_edge(&mut tx, &node1, &node3, "SAME_TYPE".to_string());
assert!(matches!(result2, Err(TxError::Collision(_))));
}
#[test]
fn test_edge_no_collision_different_types() {
let mut graph = Graph::new();
let mut tx = graph.start_transaction();
let node1 = graph.add_node(&mut tx);
let node2 = graph.add_node(&mut tx);
let node3 = graph.add_node(&mut tx);
let result1 = graph.add_edge(&mut tx, &node1, &node2, "TYPE_A".to_string());
let result2 = graph.add_edge(&mut tx, &node1, &node3, "TYPE_B".to_string());
assert!(result1.is_ok());
assert!(result2.is_ok());
assert!(tx.read_locks.len() > 0);
}
#[test]
fn test_transaction_commit() {
let mut graph = Graph::new();
let tx = graph.start_transaction();
let tx_id = tx.txid;
assert!(graph.active_transactions.contains(&tx_id));
let result = graph.commit_transaction(&tx);
assert!(result.is_ok());
assert!(!graph.active_transactions.contains(&tx_id));
}
#[test]
fn test_transaction_abort() {
let mut graph = Graph::new();
let tx = graph.start_transaction();
let tx_id = tx.txid;
assert!(graph.active_transactions.contains(&tx_id));
let _ = graph.abort_transaction(&tx);
assert!(!graph.active_transactions.contains(&tx_id));
}
#[test]
fn test_igraph_interface() {
let mut graph = Graph::new();
let tx = IGraph::start_transaction(&mut graph);
let node_id: NodeId = IGraph::add_node(&mut graph, Some(tx.clone())).unwrap();
assert!(matches!(node_id, NodeId::String(_)));
let result = IGraph::commit_transaction(&mut graph, tx);
assert!(result.is_ok());
}
#[test]
fn test_snapshot_isolation() {
let mut graph = Graph::new();
let mut tx1 = graph.start_transaction();
let mut tx2 = graph.start_transaction();
let node = graph.add_node(&mut tx1);
assert!(tx1.snapshot.is_some());
let _node2 = graph.add_node(&mut tx2);
assert!(tx2.snapshot.is_some());
}
}