use std::fmt::{self, Debug};
use std::sync::{Arc, RwLock};
use linked_hash_map::LinkedHashMap;
use super::wal::Wal;
use super::{Eid, EntityType, Id, Txid};
use base::IntoRef;
use error::{Error, Result};
use volume::{Arm, Armor, VolumeArmor, VolumeRef};
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
pub enum Action {
New,
Update,
Delete,
}
pub trait Transable: Debug + Id + Send + Sync {
fn action(&self) -> Action;
fn commit(&mut self, vol: &VolumeRef) -> Result<()>;
fn complete_commit(&mut self);
fn abort(&mut self);
}
pub type TransableRef = Arc<RwLock<Transable>>;
pub struct Trans {
txid: Txid,
cohorts: LinkedHashMap<Eid, TransableRef>,
wal: Wal,
wal_armor: VolumeArmor<Wal>,
}
impl Trans {
pub fn new(txid: Txid, vol: &VolumeRef) -> Self {
Trans {
txid,
cohorts: LinkedHashMap::new(),
wal: Wal::new(txid),
wal_armor: VolumeArmor::new(vol),
}
}
#[inline]
pub fn get_wal(&self) -> Wal {
self.wal.clone()
}
#[inline]
pub fn begin_trans(&mut self) -> Result<()> {
self.wal_armor.save_item(&mut self.wal)
}
pub fn add_entity(
&mut self,
id: &Eid,
entity: TransableRef,
action: Action,
ent_type: EntityType,
arm: Arm,
) -> Result<()> {
self.wal.add_entry(id, action, ent_type, arm);
self.wal_armor.save_item(&mut self.wal).or_else(|err| {
self.wal.remove_entry(id);
Err(err)
})?;
self.cohorts.entry(id.clone()).or_insert(entity);
Ok(())
}
pub fn commit(&mut self, vol: &VolumeRef) -> Result<Wal> {
debug!("commit tx#{}, cohorts: {}", self.txid, self.cohorts.len());
for entity in self.cohorts.values() {
let mut ent = entity.write().unwrap();
if ent.action() == Action::Delete {
let using_cnt = Arc::strong_count(&entity);
if using_cnt > 1 {
error!(
"cannot delete entity in use (using: {})",
using_cnt,
);
return Err(Error::InUse);
}
}
ent.commit(&vol)?;
}
Ok(self.wal.clone())
}
pub fn complete_commit(&mut self) {
for entity in self.cohorts.values() {
let mut ent = entity.write().unwrap();
ent.complete_commit();
}
self.cohorts.clear();
}
pub fn abort(&mut self, vol: &VolumeRef) -> Result<()> {
for entity in self.cohorts.values() {
let mut ent = entity.write().unwrap();
ent.abort();
}
self.cohorts.clear();
self.wal.clean_aborted(vol)?;
self.wal_armor.remove_all_arms(self.wal.id())
}
}
impl IntoRef for Trans {}
impl Debug for Trans {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Trans")
.field("txid", &self.txid)
.field("cohorts", &self.cohorts)
.field("wal", &self.wal)
.finish()
}
}
pub type TransRef = Arc<RwLock<Trans>>;