extern crate persy;
use persy::Persy;
use persy::Config;
use persy::PRes;
use persy::PersyError;
use std::fs;
use std::sync::{Mutex, Condvar, Arc};
use std::thread;
#[test]
fn test_create_drop_segment() {
{
Persy::create("./cds.persy").unwrap();
}
{
let persy = Persy::open("./cds.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
assert!(persy.exists_segment("test").unwrap());
let mut tx = persy.begin().unwrap();
persy.drop_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
assert!(!persy.exists_segment("test").unwrap());
}
fs::remove_file("./cds.persy").unwrap();
}
#[test]
fn test_create_drop_segment_same_tx() {
{
Persy::create("./cdss.persy").unwrap();
}
{
let persy = Persy::open("./cdss.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
let err = persy.drop_segment(&mut tx, "test");
assert!(err.is_err());
}
fs::remove_file("./cdss.persy").unwrap();
}
#[test]
fn test_create_drop_recreate_segment_same_tx() {
{
Persy::create("./cdcs.persy").unwrap();
}
{
let persy = Persy::open("./cdcs.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.drop_segment(&mut tx, "test").unwrap();
assert!(!persy.exists_segment_tx(&mut tx, "test").unwrap());
persy.create_segment(&mut tx, "test").unwrap();
assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
assert!(persy.exists_segment("test").unwrap());
}
fs::remove_file("./cdcs.persy").unwrap();
}
#[test]
fn test_update_record_of_dropped_segment_other_tx() {
{
Persy::create("./urds.persy").unwrap();
}
{
let persy = Persy::open("./urds.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.update_record(&mut tx, "test", &rec, &bytes).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./urds.persy").unwrap();
}
#[test]
fn test_update_record_of_dropped_recreated_segment_other_tx() {
{
Persy::create("./urdcs.persy").unwrap();
}
{
let persy = Persy::open("./urdcs.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.update_record(&mut tx, "test", &rec, &bytes).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
persy.create_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./urdcs.persy").unwrap();
}
#[test]
fn test_delete_record_of_dropped_segment_other_tx() {
{
Persy::create("./drds.persy").unwrap();
}
{
let persy = Persy::open("./drds.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.delete_record(&mut tx, "test", &rec).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./drds.persy").unwrap();
}
#[test]
fn test_delete_record_of_dropped_created_segment_other_tx() {
{
Persy::create("./drdcs.persy").unwrap();
}
{
let persy = Persy::open("./drdcs.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.delete_record(&mut tx, "test", &rec).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
persy.create_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./drdcs.persy").unwrap();
}
#[test]
fn test_insert_record_of_dropped_recreated_segment_other_tx() {
{
Persy::create("./irdcs.persy").unwrap();
}
{
let persy = Persy::open("./irdcs.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.insert_record(&mut tx, "test", &bytes).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
persy.create_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./irdcs.persy").unwrap();
}
#[test]
fn test_insert_record_of_dropped_segment_other_tx() {
{
Persy::create("./irds.persy").unwrap();
}
{
let persy = Persy::open("./irds.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let bytes = String::from("some").into_bytes();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.insert_record(&mut tx, "test", &bytes).unwrap();
let mut tx1 = persy.begin().unwrap();
persy.drop_segment(&mut tx1, "test").unwrap();
let finalizer = persy.prepare_commit(tx1).unwrap();
persy.commit(finalizer).unwrap();
let finalizer = persy.prepare_commit(tx);
assert!(finalizer.is_err());
}
fs::remove_file("./irds.persy").unwrap();
}
#[test]
fn test_record_of_drop_segment_same_tx() {
{
Persy::create("./rds.persy").unwrap();
}
{
let persy = Persy::open("./rds.persy", Config::new()).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
let bytes = String::from("some").into_bytes();
let id = persy.insert_record(&mut tx, "test", &bytes).expect("insert record works");
persy.drop_segment(&mut tx, "test").unwrap();
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
let mut tx = persy.begin().unwrap();
persy.create_segment(&mut tx, "test").unwrap();
assert_eq!(persy.update_record(&mut tx, "test", &id, &String::from("none").into_bytes()),
Err(PersyError::RecordNotFound));
assert_eq!(persy.read_record_tx(&mut tx, "test", &id), Ok(None));
let finalizer = persy.prepare_commit(tx).unwrap();
persy.commit(finalizer).unwrap();
}
fs::remove_file("./rds.persy").unwrap();
}
pub struct CountDown {
lock: Mutex<u64>,
cond: Condvar,
}
impl CountDown {
pub fn new(count: u64) -> CountDown {
CountDown {
lock: Mutex::new(count),
cond: Condvar::new(),
}
}
pub fn wait(&self) -> PRes<bool> {
let guard = self.lock.lock()?;
if *guard != 0 {
let _ = self.cond.wait(guard)?;
}
Ok(true)
}
pub fn count_down(&self) -> PRes<()> {
let mut count = self.lock.lock()?;
*count = (*count) - 1;
if *count == 0 {
self.cond.notify_all();
}
Ok(())
}
}
#[test()]
pub fn test_councurrent_double_create() {
Persy::create("./concurrent_segment_double_create.persy").unwrap();
let persy = Persy::open("concurrent_segment_double_create.persy", Config::new()).unwrap();
let both_create = Arc::new(CountDown::new(2));
let end = Arc::new(CountDown::new(2));
for _ in [0; 2].iter() {
let both_create_moved = both_create.clone();
let end_moved = end.clone();
let persy = persy.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
both_create_moved.count_down().expect("lock not panic");
both_create_moved.wait().expect("thread wait the other");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
end_moved.count_down().expect("lock not panic");
});
}
end.wait().expect("threas finisced");
assert!(persy.exists_segment("def").unwrap());
fs::remove_file("./concurrent_segment_double_create.persy").unwrap();
}
#[test()]
pub fn test_councurrent_double_drop() {
Persy::create("./concurrent_segment_double_drop.persy").unwrap();
let persy = Persy::open("concurrent_segment_double_drop.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
persy.create_segment(&mut tx, "def").expect("error on segment creation");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
let both_create = Arc::new(CountDown::new(2));
let end = Arc::new(CountDown::new(2));
for _ in [0; 2].iter() {
let both_create_moved = both_create.clone();
let end_moved = end.clone();
let persy = persy.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
persy.drop_segment(&mut tx, "def").expect("error on segment creation");
both_create_moved.count_down().expect("lock not panic");
both_create_moved.wait().expect("thread wait the other");
let fin = persy.prepare_commit(tx).expect("error on commit prepare");
persy.commit(fin).expect("error on commit");
end_moved.count_down().expect("lock not panic");
});
}
end.wait().expect("threas finisced");
assert!(!persy.exists_segment("def").unwrap());
fs::remove_file("./concurrent_segment_double_drop.persy").unwrap();
}