extern crate persy;
use persy::{Persy, PRes, Config};
use std::fs;
use std::sync::{Mutex, Condvar, Arc};
use std::thread;
pub struct CountDown {
lock: Mutex<u64>,
cond: Condvar,
}
impl CountDown {
pub fn new(count: u64) -> CountDown {
CountDown {
lock: Mutex::new(count),
cond: Condvar::new(),
}
}
pub fn wait(&self) -> PRes<bool> {
let guard = self.lock.lock()?;
if *guard != 0 {
let _ = self.cond.wait(guard)?;
}
Ok(true)
}
pub fn count_down(&self) -> PRes<()> {
let mut count = self.lock.lock()?;
*count = (*count) - 1;
if *count == 0 {
self.cond.notify_all();
}
Ok(())
}
}
#[test]
fn create() {
{
Persy::create("./file.persy").unwrap();
}
fs::remove_file("./file.persy").unwrap();
}
#[test]
fn fail_double_create() {
{
let res = Persy::create("./file2.persy");
assert!(!res.is_err());
}
let res = Persy::create("./file2.persy");
fs::remove_file("./file2.persy").unwrap();
assert!(res.is_err());
}
#[test]
fn create_open() {
{
Persy::create("./open.persy").unwrap();
let open = Persy::open("./open.persy", Config::new());
assert!(!open.is_err());
}
fs::remove_file("./open.persy").unwrap();
}
#[test]
fn test_rollback() {
{
Persy::create("./rollback.persy").unwrap();
}
{
let persy = Persy::open("./rollback.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
persy.rollback(tx).unwrap();
let read_after = persy.read_record("test", &id).unwrap();
if let Some(_) = read_after {
assert!(false);
} else {
assert!(true);
}
}
fs::remove_file("./rollback.persy").unwrap();
}
#[test]
fn test_rollback_precommit() {
{
Persy::create("./rollback_pre.persy").unwrap();
}
{
let persy = Persy::open("./rollback_pre.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.rollback_prepared(finalizer).unwrap();
let read_after = persy.read_record("test", &id).unwrap();
if let Some(_) = read_after {
assert!(false);
} else {
assert!(true);
}
}
fs::remove_file("./rollback_pre.persy").unwrap();
}
#[test]
fn test_rollback_update() {
{
Persy::create("./rollback_update.persy").unwrap();
}
{
let persy = Persy::open("./rollback_update.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let read_opt = persy.read_record_tx(&mut tx, "test", &id).unwrap();
if let Some(read) = read_opt {
assert_eq!(bytes, read);
} else {
assert!(false);
}
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx1 = persy.begin().unwrap();
let rec_data_1: String = "something2".into();
let bytes_1 = rec_data_1.into_bytes();
persy.update_record(&mut tx1, "test", &id, &bytes_1).unwrap();
let read_after = persy.read_record_tx(&mut tx1, "test", &id).unwrap();
if let Some(val) = read_after {
assert_eq!(val, bytes_1);
} else {
assert!(false);
}
persy.rollback(tx1).unwrap();
let read_after = persy.read_record("test", &id).unwrap();
if let Some(val) = read_after {
assert_eq!(val, bytes);
} else {
assert!(false);
}
}
fs::remove_file("./rollback_update.persy").unwrap();
}
#[test()]
pub fn councurrent_create() {
Persy::create("./concurrent_create.persy").unwrap();
let persy = Persy::open("concurrent_create.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let count = Arc::new(CountDown::new(2));
for _ in &[1, 2] {
let count = count.clone();
let persy = persy.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
count.count_down().expect("lock not panic");
});
}
count.wait().expect("threas not finisced");
let val = String::from("aaa").into_bytes();
let mut cc = 0;
for cur in persy.scan_records("def").expect("error on scan") {
assert_eq!(cur.content, val);
cc += 1;
}
assert_eq!(cc, 2);
fs::remove_file("./concurrent_create.persy").unwrap();
}
#[test()]
pub fn councurrent_update_removed() {
Persy::create("./concurrent_update_remove.persy").unwrap();
let persy = Persy::open("concurrent_update_remove.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
let id = persy.insert_record(&mut tx, "def", &val).expect("error on inser value");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("cccc").into_bytes();
persy.update_record(&mut tx, "def", &id, &val).expect("error on update value");
let count = Arc::new(CountDown::new(1));
{
let count = count.clone();
let persy = persy.clone();
let id = id.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
persy.delete_record(&mut tx, "def", &id).expect("error on delete value");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
count.count_down().expect("lock not panic");
});
}
count.wait().expect("threas not finisced");
let fin = persy.prepare_commit(tx);
assert!(fin.is_err());
fs::remove_file("./concurrent_update_remove.persy").unwrap();
}
#[test()]
pub fn test_recover_prepared_tx() {
Persy::create("./test_recover_prepared.persy").unwrap();
let id;
let val;
{
let persy = Persy::open("./test_recover_prepared.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
val = String::from("aaa").into_bytes();
id = persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
persy.prepare_commit(tx).expect("error on commit prepare");
}
{
let persy = Persy::open("./test_recover_prepared.persy", Config::new()).unwrap();
assert_eq!(persy.read_record("def", &id).expect("error reading record"),
Some(val));
}
fs::remove_file("./test_recover_prepared.persy").unwrap();
}
#[test()]
pub fn test_recover_stale_tx() {
Persy::create("./test_recover_stale.persy").unwrap();
let id;
{
let persy = Persy::open("./test_recover_stale.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
id = persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
}
{
let persy = Persy::open("./test_recover_stale.persy", Config::new()).unwrap();
assert_eq!(persy.read_record("def", &id).expect("error reading record"),
None);
}
fs::remove_file("./test_recover_stale.persy").unwrap();
}
#[test()]
pub fn test_multiple_open_tx_close() {
Persy::create("./multiple_open_tx_close.persy").unwrap();
{
let persy = Persy::open("./multiple_open_tx_close.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transactoin begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
}
for ite in 1..10 {
let persy = Persy::open("./multiple_open_tx_close.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let mut counter = 0;
for _ in persy.scan_records("def").expect("read persisten records ") {
counter += 1;
}
assert_eq!(ite, counter);
}
fs::remove_file("./multiple_open_tx_close.persy").unwrap();
}