persy 0.2.0

Transactional Persistence Engine
Documentation
extern crate persy;


use persy::Persy;
use persy::Config;
use persy::PRes;
use persy::PersyError;
use std::fs;
use std::sync::{Mutex, Condvar, Arc};
use std::thread;


#[test]
fn test_create_drop_segment() {
    {
        Persy::create("./cds.persy").unwrap();
    }
    {
        let persy = Persy::open("./cds.persy", Config::new()).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        assert!(persy.exists_segment("test").unwrap());
        let mut tx = persy.begin().unwrap();
        persy.drop_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();
        assert!(!persy.exists_segment("test").unwrap());

    }
    fs::remove_file("./cds.persy").unwrap();
}

#[test]
fn test_create_drop_segment_same_tx() {
    {
        Persy::create("./cdss.persy").unwrap();
    }
    {
        let persy = Persy::open("./cdss.persy", Config::new()).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
        let err = persy.drop_segment(&mut tx, "test");
        assert!(err.is_err());
    }
    fs::remove_file("./cdss.persy").unwrap();
}

#[test]
fn test_create_drop_recreate_segment_same_tx() {
    {
        Persy::create("./cdcs.persy").unwrap();
    }
    {
        let persy = Persy::open("./cdcs.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.drop_segment(&mut tx, "test").unwrap();
        assert!(!persy.exists_segment_tx(&mut tx, "test").unwrap());
        persy.create_segment(&mut tx, "test").unwrap();
        assert!(persy.exists_segment_tx(&mut tx, "test").unwrap());
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();
        assert!(persy.exists_segment("test").unwrap());

    }
    fs::remove_file("./cdcs.persy").unwrap();
}

#[test]
fn test_update_record_of_dropped_segment_other_tx() {

    {
        Persy::create("./urds.persy").unwrap();
    }
    {
        let persy = Persy::open("./urds.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.update_record(&mut tx, "test", &rec, &bytes).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./urds.persy").unwrap();
}

#[test]
fn test_update_record_of_dropped_recreated_segment_other_tx() {

    {
        Persy::create("./urdcs.persy").unwrap();
    }
    {
        let persy = Persy::open("./urdcs.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.update_record(&mut tx, "test", &rec, &bytes).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        persy.create_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./urdcs.persy").unwrap();
}


#[test]
fn test_delete_record_of_dropped_segment_other_tx() {

    {
        Persy::create("./drds.persy").unwrap();
    }
    {
        let persy = Persy::open("./drds.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.delete_record(&mut tx, "test", &rec).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./drds.persy").unwrap();
}

#[test]
fn test_delete_record_of_dropped_created_segment_other_tx() {

    {
        Persy::create("./drdcs.persy").unwrap();
    }
    {
        let persy = Persy::open("./drdcs.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let rec = persy.insert_record(&mut tx, "test", &bytes).unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.delete_record(&mut tx, "test", &rec).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        persy.create_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./drdcs.persy").unwrap();
}

#[test]
fn test_insert_record_of_dropped_recreated_segment_other_tx() {

    {
        Persy::create("./irdcs.persy").unwrap();
    }
    {
        let persy = Persy::open("./irdcs.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.insert_record(&mut tx, "test", &bytes).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        persy.create_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./irdcs.persy").unwrap();
}

#[test]
fn test_insert_record_of_dropped_segment_other_tx() {

    {
        Persy::create("./irds.persy").unwrap();
    }
    {
        let persy = Persy::open("./irds.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let bytes = String::from("some").into_bytes();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        persy.insert_record(&mut tx, "test", &bytes).unwrap();

        let mut tx1 = persy.begin().unwrap();
        persy.drop_segment(&mut tx1, "test").unwrap();
        let finalizer = persy.prepare_commit(tx1).unwrap();
        persy.commit(finalizer).unwrap();

        let finalizer = persy.prepare_commit(tx);
        assert!(finalizer.is_err());
    }
    fs::remove_file("./irds.persy").unwrap();
}

#[test]
fn test_record_of_drop_segment_same_tx() {
    {
        Persy::create("./rds.persy").unwrap();
    }
    {
        let persy = Persy::open("./rds.persy", Config::new()).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();

        let mut tx = persy.begin().unwrap();
        let bytes = String::from("some").into_bytes();
        let id = persy.insert_record(&mut tx, "test", &bytes).expect("insert record works");
        persy.drop_segment(&mut tx, "test").unwrap();
        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();
        let mut tx = persy.begin().unwrap();
        persy.create_segment(&mut tx, "test").unwrap();
        assert_eq!(persy.update_record(&mut tx, "test", &id, &String::from("none").into_bytes()),
                   Err(PersyError::RecordNotFound));
        assert_eq!(persy.read_record_tx(&mut tx, "test", &id), Ok(None));

        let finalizer = persy.prepare_commit(tx).unwrap();
        persy.commit(finalizer).unwrap();
    }
    fs::remove_file("./rds.persy").unwrap();
}

pub struct CountDown {
    lock: Mutex<u64>,
    cond: Condvar,
}

impl CountDown {
    pub fn new(count: u64) -> CountDown {
        CountDown {
            lock: Mutex::new(count),
            cond: Condvar::new(),
        }
    }

    pub fn wait(&self) -> PRes<bool> {
        let guard = self.lock.lock()?;
        if *guard != 0 {
            let _ = self.cond.wait(guard)?;
        }
        Ok(true)
    }

    pub fn count_down(&self) -> PRes<()> {
        let mut count = self.lock.lock()?;
        *count = (*count) - 1;
        if *count == 0 {
            self.cond.notify_all();
        }
        Ok(())
    }
}

#[test()]
pub fn test_councurrent_double_create() {
    Persy::create("./concurrent_segment_double_create.persy").unwrap();
    let persy = Persy::open("concurrent_segment_double_create.persy", Config::new()).unwrap();
    let both_create = Arc::new(CountDown::new(2));
    let end = Arc::new(CountDown::new(2));
    for _ in [0; 2].iter() {
        let both_create_moved = both_create.clone();
        let end_moved = end.clone();
        let persy = persy.clone();
        thread::spawn(move || {
            let mut tx = persy.begin().expect("error on transaction begin");
            persy.create_segment(&mut tx, "def").expect("error on segment creation");
            both_create_moved.count_down().expect("lock not panic");
            both_create_moved.wait().expect("thread wait the other");
            let fin = persy.prepare_commit(tx).expect("error on commit prepare");
            persy.commit(fin).expect("error on commit");
            end_moved.count_down().expect("lock not panic");
        });
    }

    end.wait().expect("threas finisced");
    assert!(persy.exists_segment("def").unwrap());
    fs::remove_file("./concurrent_segment_double_create.persy").unwrap();
}

#[test()]
pub fn test_councurrent_double_drop() {
    Persy::create("./concurrent_segment_double_drop.persy").unwrap();
    let persy = Persy::open("concurrent_segment_double_drop.persy", Config::new()).unwrap();
    let mut tx = persy.begin().expect("error on transaction begin");
    persy.create_segment(&mut tx, "def").expect("error on segment creation");
    let fin = persy.prepare_commit(tx).expect("error on commit prepare");
    persy.commit(fin).expect("error on commit");
    let both_create = Arc::new(CountDown::new(2));
    let end = Arc::new(CountDown::new(2));
    for _ in [0; 2].iter() {
        let both_create_moved = both_create.clone();
        let end_moved = end.clone();
        let persy = persy.clone();
        thread::spawn(move || {
            let mut tx = persy.begin().expect("error on transaction begin");
            persy.drop_segment(&mut tx, "def").expect("error on segment creation");
            both_create_moved.count_down().expect("lock not panic");
            both_create_moved.wait().expect("thread wait the other");
            let fin = persy.prepare_commit(tx).expect("error on commit prepare");
            persy.commit(fin).expect("error on commit");
            end_moved.count_down().expect("lock not panic");
        });
    }

    end.wait().expect("threas finisced");
    assert!(!persy.exists_segment("def").unwrap());
    fs::remove_file("./concurrent_segment_double_drop.persy").unwrap();
}