persy 0.2.0

Transactional Persistence Engine
Documentation
extern crate persy;


use persy::{Persy, PRes, Config};
use std::fs;
use std::sync::{Mutex, Condvar, Arc};
use std::thread;

pub struct CountDown {
    lock: Mutex<u64>,
    cond: Condvar,
}

impl CountDown {
    pub fn new(count: u64) -> CountDown {
        CountDown {
            lock: Mutex::new(count),
            cond: Condvar::new(),
        }
    }

    pub fn wait(&self) -> PRes<bool> {
        let guard = self.lock.lock()?;
        if *guard != 0 {
            let _ = self.cond.wait(guard)?;
        }
        Ok(true)
    }

    pub fn count_down(&self) -> PRes<()> {
        let mut count = self.lock.lock()?;
        *count = (*count) - 1;
        if *count == 0 {
            self.cond.notify_all();
        }
        Ok(())
    }
}


#[test]
fn create() {
    {
        Persy::create("./file.persy").unwrap();
    }
    fs::remove_file("./file.persy").unwrap();
}

#[test]
fn fail_double_create() {
    {
        let res = Persy::create("./file2.persy");
        assert!(!res.is_err());
    }
    let res = Persy::create("./file2.persy");
    fs::remove_file("./file2.persy").unwrap();
    assert!(res.is_err());
}

#[test]
fn create_open() {
    {
        Persy::create("./open.persy").unwrap();
        let open = Persy::open("./open.persy", Config::new());
        assert!(!open.is_err());
    }
    fs::remove_file("./open.persy").unwrap();
}



#[test]
fn test_rollback() {
    {
        Persy::create("./rollback.persy").unwrap();
    }
    {
        let persy = Persy::open("./rollback.persy", Config::new()).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        let rec_data: String = "something".into();
        let bytes = rec_data.into_bytes();
        let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        persy.rollback(tx).unwrap();
        let read_after = persy.read_record("test", &id).unwrap();
        if let Some(_) = read_after {
            assert!(false);
        } else {
            assert!(true);
        }
    }
    fs::remove_file("./rollback.persy").unwrap();

}

#[test]
fn test_rollback_precommit() {
    {
        Persy::create("./rollback_pre.persy").unwrap();
    }
    {
        let persy = Persy::open("./rollback_pre.persy", Config::new()).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        let rec_data: String = "something".into();
        let bytes = rec_data.into_bytes();
        let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.rollback_prepared(finalizer).unwrap();
        let read_after = persy.read_record("test", &id).unwrap();
        if let Some(_) = read_after {
            assert!(false);
        } else {
            assert!(true);
        }
    }
    fs::remove_file("./rollback_pre.persy").unwrap();

}


#[test]
fn test_rollback_update() {
    {
        Persy::create("./rollback_update.persy").unwrap();
    }
    {
        let persy = Persy::open("./rollback_update.persy", Config::new()).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let rec_data: String = "something".into();
        let bytes = rec_data.into_bytes();
        let id = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let read_opt = persy.read_record_tx(&mut tx, "test", &id).unwrap();
        if let Some(read) = read_opt {
            assert_eq!(bytes, read);
        } else {
            assert!(false);
        }
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx1 = persy.begin().unwrap();
        let rec_data_1: String = "something2".into();
        let bytes_1 = rec_data_1.into_bytes();
        persy.update_record(&mut tx1, "test", &id, &bytes_1).unwrap();
        let read_after = persy.read_record_tx(&mut tx1, "test", &id).unwrap();
        if let Some(val) = read_after {
            assert_eq!(val, bytes_1);
        } else {
            assert!(false);
        }
        persy.rollback(tx1).unwrap();

        let read_after = persy.read_record("test", &id).unwrap();
        if let Some(val) = read_after {
            assert_eq!(val, bytes);
        } else {
            assert!(false);
        }
    }
    fs::remove_file("./rollback_update.persy").unwrap();
}

#[test()]
pub fn councurrent_create() {
    Persy::create("./concurrent_create.persy").unwrap();
    let persy = Persy::open("concurrent_create.persy", Config::new()).unwrap();
    let mut tx = persy.begin().expect("error on transactoin begin");
    persy.create_segment(&mut tx, "def").expect("error on segment creation");
    let fin = persy.prepare_commit(tx).expect("error on commit prepare");
    persy.commit(fin).expect("error on commit");

    let count = Arc::new(CountDown::new(2));

    for _ in &[1, 2] {
        let count = count.clone();
        let persy = persy.clone();
        thread::spawn(move || {
            let mut tx = persy.begin().expect("error on transaction begin");
            let val = String::from("aaa").into_bytes();
            persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
            let fin = persy.prepare_commit(tx).expect("error on commit prepare");
            persy.commit(fin).expect("error on commit");
            count.count_down().expect("lock not panic");
        });
    }

    count.wait().expect("threas not finisced");

    let val = String::from("aaa").into_bytes();
    let mut cc = 0;
    for cur in persy.scan_records("def").expect("error on scan") {
        assert_eq!(cur.content, val);
        cc += 1;
    }
    assert_eq!(cc, 2);
    fs::remove_file("./concurrent_create.persy").unwrap();

}



#[test()]
pub fn councurrent_update_removed() {
    Persy::create("./concurrent_update_remove.persy").unwrap();
    let persy = Persy::open("concurrent_update_remove.persy", Config::new()).unwrap();
    let mut tx = persy.begin().expect("error on transactoin begin");
    persy.create_segment(&mut tx, "def").expect("error on segment creation");
    let fin = persy.prepare_commit(tx).expect("error on commit prepare");
    persy.commit(fin).expect("error on commit");

    let mut tx = persy.begin().expect("error on transaction begin");
    let val = String::from("aaa").into_bytes();
    let id = persy.insert_record(&mut tx, "def", &val).expect("error on inser value");
    let fin = persy.prepare_commit(tx).expect("error on commit prepare");
    persy.commit(fin).expect("error on commit");

    let mut tx = persy.begin().expect("error on transaction begin");
    let val = String::from("cccc").into_bytes();
    persy.update_record(&mut tx, "def", &id, &val).expect("error on update value");

    let count = Arc::new(CountDown::new(1));

    {
        let count = count.clone();
        let persy = persy.clone();
        let id = id.clone();
        thread::spawn(move || {
            let mut tx = persy.begin().expect("error on transaction begin");
            persy.delete_record(&mut tx, "def", &id).expect("error on delete value");
            let fin = persy.prepare_commit(tx).expect("error on commit prepare");
            persy.commit(fin).expect("error on commit");
            count.count_down().expect("lock not panic");
        });
    }

    count.wait().expect("threas not finisced");
    let fin = persy.prepare_commit(tx);
    assert!(fin.is_err());
    fs::remove_file("./concurrent_update_remove.persy").unwrap();

}

#[test()]
pub fn test_recover_prepared_tx() {
    Persy::create("./test_recover_prepared.persy").unwrap();
    let id;
    let val;
    {
        let persy = Persy::open("./test_recover_prepared.persy", Config::new()).unwrap();
        let mut tx = persy.begin().expect("error on transactoin begin");
        persy.create_segment(&mut tx, "def").expect("error on segment creation");
        let fin = persy.prepare_commit(tx).expect("error on commit prepare");
        persy.commit(fin).expect("error on commit");

        let mut tx = persy.begin().expect("error on transaction begin");
        val = String::from("aaa").into_bytes();
        id = persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
        persy.prepare_commit(tx).expect("error on commit prepare");
    }
    {

        let persy = Persy::open("./test_recover_prepared.persy", Config::new()).unwrap();
        assert_eq!(persy.read_record("def", &id).expect("error reading record"),
                   Some(val));
    }

    fs::remove_file("./test_recover_prepared.persy").unwrap();
}


#[test()]
pub fn test_recover_stale_tx() {
    Persy::create("./test_recover_stale.persy").unwrap();
    let id;
    {
        let persy = Persy::open("./test_recover_stale.persy", Config::new()).unwrap();
        let mut tx = persy.begin().expect("error on transactoin begin");
        persy.create_segment(&mut tx, "def").expect("error on segment creation");
        let fin = persy.prepare_commit(tx).expect("error on commit prepare");
        persy.commit(fin).expect("error on commit");

        let mut tx = persy.begin().expect("error on transaction begin");
        let val = String::from("aaa").into_bytes();
        id = persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
    }
    {

        let persy = Persy::open("./test_recover_stale.persy", Config::new()).unwrap();
        assert_eq!(persy.read_record("def", &id).expect("error reading record"),
                   None);
    }

    fs::remove_file("./test_recover_stale.persy").unwrap();
}

#[test()]
pub fn test_multiple_open_tx_close() {

    Persy::create("./multiple_open_tx_close.persy").unwrap();
    {
        let persy = Persy::open("./multiple_open_tx_close.persy", Config::new()).unwrap();
        let mut tx = persy.begin().expect("error on transactoin begin");
        persy.create_segment(&mut tx, "def").expect("error on segment creation");
        let fin = persy.prepare_commit(tx).expect("error on commit prepare");
        persy.commit(fin).expect("error on commit");
    }
    for ite in 1..10 {
        let persy = Persy::open("./multiple_open_tx_close.persy", Config::new()).unwrap();
        let mut tx = persy.begin().expect("error on transaction begin");
        let val = String::from("aaa").into_bytes();
        persy.insert_record(&mut tx, "def", &val).expect("error on insert value");
        let fin = persy.prepare_commit(tx).expect("error on commit prepare");
        persy.commit(fin).expect("error on commit");
        let mut counter = 0;
        for _ in persy.scan_records("def").expect("read persisten records ") {
            counter += 1;
        }
        assert_eq!(ite, counter);
    }
    fs::remove_file("./multiple_open_tx_close.persy").unwrap();
}