mod helpers;
use helpers::{create_and_drop, create_and_drop_with_config, CountDown};
use persy::{Config, OpenError, OpenOptions, Persy, ValueMode};
use std::fs;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc;
use std::thread;
use tempfile::{tempfile, Builder, NamedTempFile};
#[test]
fn openoptions_truncate_locked() {
{
let _persy_create = OpenOptions::new()
.create_new(true)
.prepare_with(|persy: &Persy| {
let mut tx = persy.begin()?;
tx.create_segment("seg")?;
let data = b"hello";
tx.insert("seg", &data[..])?;
let prepared = tx.prepare()?;
prepared.commit()?;
Ok(())
})
.open("./target/truncate-locked.persy")
.unwrap();
}
let persy_lock = OpenOptions::new().open("./target/truncate-locked.persy").unwrap();
let error = OpenOptions::new()
.truncate(true)
.open("./target/truncate-locked.persy")
.map(drop)
.unwrap_err();
match &*error {
OpenError::AlreadyInUse(_) => (),
otherwise => panic!("error is {:?} and not an Io error", otherwise),
}
let any_hello = persy_lock.scan("seg").unwrap().any(|(_, content)| content == b"hello");
assert!(any_hello);
fs::remove_file("./target/truncate-locked.persy").unwrap();
}
#[test]
fn openoptions_create() {
{
let _persy_create = OpenOptions::new()
.create_new(true)
.open("./target/create.persy")
.unwrap();
}
let _persy_create = OpenOptions::new().create(true).open("./target/create.persy").unwrap();
fs::remove_file("./target/create.persy").unwrap();
}
#[test]
fn openoptions_create_new() {
let _persy_create = OpenOptions::new()
.create_new(true)
.open("./target/create-new.persy")
.unwrap();
let error = OpenOptions::new()
.create_new(true)
.open("./target/create-new.persy")
.map(drop)
.unwrap_err();
match &*error {
OpenError::AlreadyExists => (),
otherwise => panic!("error is {:?} and not an Io error", otherwise),
}
fs::remove_file("./target/create-new.persy").unwrap();
}
#[test]
fn openoptions_open() {
{
let _persy_create = OpenOptions::new().create_new(true).open("./target/open.persy").unwrap();
}
let _persy_open = OpenOptions::new().open("./target/open.persy").unwrap();
fs::remove_file("./target/open.persy").unwrap();
}
#[test]
fn openoptions_memory() {
let persy_open = OpenOptions::new().memory().unwrap();
let mut tx = persy_open.begin().unwrap();
tx.create_segment("one").unwrap();
tx.insert("one", "".as_bytes()).unwrap();
tx.prepare().unwrap().commit().unwrap();
}
#[test]
fn openoptions_prepare() {
{
let _persy_create = OpenOptions::new()
.create_new(true)
.open("./target/prepare.persy")
.unwrap();
}
{
let _persy_prepare = OpenOptions::new()
.prepare_with(|_persy: &Persy| panic!("prepare has been called"))
.open("./target/prepare.persy")
.unwrap();
}
let as_been_initialized = Arc::new(AtomicBool::new(false));
let _persy_open = OpenOptions::new()
.truncate(true)
.prepare_with({
let as_been_initialized = as_been_initialized.clone();
move |_persy: &Persy| {
as_been_initialized.store(true, Relaxed);
Ok(())
}
})
.open("./target/prepare.persy")
.unwrap();
assert_eq!(as_been_initialized.load(Relaxed), true);
fs::remove_file("./target/prepare.persy").unwrap();
}
#[test]
fn create() {
Persy::create_from_file(tempfile().unwrap()).unwrap();
}
#[test]
fn lock_double_open() {
{
Persy::create("./target/file_dd.persy").unwrap();
let open = Persy::open("./target/file_dd.persy", Config::new());
assert!(!open.is_err());
let open1 = Persy::open("./target/file_dd.persy", Config::new());
assert!(open1.is_err());
}
fs::remove_file("./target/file_dd.persy").unwrap();
}
#[test]
fn fail_double_create() {
{
let res = Persy::create("./target/file2.persy");
assert!(!res.is_err());
}
let res = Persy::create("./target/file2.persy");
fs::remove_file("./target/file2.persy").unwrap();
assert!(res.is_err());
}
#[test]
fn create_open() {
let file = Builder::new()
.prefix("open")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().unwrap()).unwrap();
let open = Persy::open_from_file(file.reopen().unwrap(), Config::new());
assert!(!open.is_err());
}
#[test]
fn test_rollback() {
create_and_drop("rollback", |persy| {
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let mut tx = persy.begin().unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = tx.insert("test", &bytes).unwrap();
tx.rollback().unwrap();
let read_after = persy.read("test", &id).unwrap();
if let Some(_) = read_after {
assert!(false);
} else {
assert!(true);
}
});
}
#[test]
fn test_rollback_precommit() {
create_and_drop("rollback_pre", |persy| {
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let mut tx = persy.begin().unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = tx.insert("test", &bytes).unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.rollback().unwrap();
let read_after = persy.read("test", &id).unwrap();
if let Some(_) = read_after {
assert!(false);
} else {
assert!(true);
}
});
}
#[test]
fn test_rollback_update() {
create_and_drop("rollback_update", |persy| {
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = tx.insert("test", &bytes).unwrap();
let read_opt = tx.read("test", &id).unwrap();
if let Some(read) = read_opt {
assert_eq!(bytes, read);
} else {
assert!(false);
}
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let mut tx1 = persy.begin().unwrap();
let rec_data_1: String = "something2".into();
let bytes_1 = rec_data_1.into_bytes();
tx1.update("test", &id, &bytes_1).unwrap();
let read_after = tx1.read("test", &id).unwrap();
if let Some(val) = read_after {
assert_eq!(val, bytes_1);
} else {
assert!(false);
}
tx1.rollback().unwrap();
let read_after = persy.read("test", &id).unwrap();
if let Some(val) = read_after {
assert_eq!(val, bytes);
} else {
assert!(false);
}
});
}
#[test]
pub fn concurrent_create() {
create_and_drop("rollback_create", |persy| {
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let count = Arc::new(CountDown::new(2));
for _ in &[1, 2] {
let count = count.clone();
let persy = persy.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
count.count_down().expect("lock not panic");
});
}
count.wait().expect("threads not finisced");
let val = String::from("aaa").into_bytes();
let mut cc = 0;
for (_, content) in persy.scan("def").expect("error on scan") {
assert_eq!(content, val);
cc += 1;
}
assert_eq!(cc, 2);
});
}
#[test]
pub fn concurrent_update_removed() {
create_and_drop("concurrent_update_remove", |persy| {
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
let id = tx.insert("def", &val).expect("error on insert value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("cccc").into_bytes();
tx.update("def", &id, &val).expect("error on update value");
let count = Arc::new(CountDown::new(1));
{
let count = count.clone();
let persy = persy.clone();
let id = id.clone();
thread::spawn(move || {
let mut tx = persy.begin().expect("error on transaction begin");
tx.delete("def", &id).expect("error on delete value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
count.count_down().expect("lock not panic");
});
}
count.wait().expect("threads not finisced");
let fin = tx.prepare();
assert!(fin.is_err());
});
}
#[test]
#[allow(unused_must_use)]
pub fn test_rollback_prepared_tx() {
Persy::create("./target/test_recover_rollback_prepared.persy").unwrap();
let id;
let val;
{
let persy = Persy::open("./target/test_recover_rollback_prepared.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
val = String::from("aaa").into_bytes();
id = tx.insert("def", &val).expect("error on insert value");
tx.prepare().expect("error on commit prepare");
}
{
let persy = Persy::open_with_recover("./target/test_recover_rollback_prepared.persy", Config::new(), |_| {
false
})
.unwrap();
assert_eq!(persy.read("def", &id).expect("error reading record"), None);
}
fs::remove_file("./target/test_recover_rollback_prepared.persy").unwrap();
}
#[test]
#[allow(unused_must_use)]
pub fn test_autorollback_lost_finalize() {
Persy::create("./target/test_auto_rollback.persy").unwrap();
let id;
{
let persy = Persy::open("./target/test_auto_rollback.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
id = tx.insert("def", &val).expect("error on insert value");
tx.prepare().expect("error on commit prepare");
}
{
let persy = Persy::open("./target/test_auto_rollback.persy", Config::new()).unwrap();
assert_eq!(persy.read("def", &id).expect("error reading record"), None);
}
fs::remove_file("./target/test_auto_rollback.persy").unwrap();
}
#[test]
pub fn test_recover_stale_tx() {
Persy::create("./target/test_recover_stale.persy").unwrap();
let id;
{
let persy = Persy::open("./target/test_recover_stale.persy", Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
id = tx.insert("def", &val).expect("error on insert value");
}
{
let persy = Persy::open("./target/test_recover_stale.persy", Config::new()).unwrap();
assert_eq!(persy.read("def", &id).expect("error reading record"), None);
}
fs::remove_file("./target/test_recover_stale.persy").unwrap();
}
#[test]
pub fn test_multiple_open_tx_close() {
let file = Builder::new()
.prefix("multiple_open_tx_close")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().unwrap()).unwrap();
{
let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
for ite in 1..10 {
let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let mut counter = 0;
for _ in persy.scan("def").expect("read persistent records ") {
counter += 1;
}
assert_eq!(ite, counter);
}
}
#[test]
pub fn test_multiple_open_close_restore_tx() {
let file = Builder::new()
.prefix("multiple_open_restore_close_tx")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().unwrap()).unwrap();
{
let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
for ite in 1..20 {
let recover = OpenOptions::new().recover_file(file.reopen().unwrap()).unwrap();
assert!(recover.list_transactions().len() <= 10);
let persy = recover.finalize().expect("open correctly");
let mut tx = persy.begin().expect("error on transaction begin");
for _ in 0..20 {
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
}
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
let counter = persy.scan("def").expect("read persistent records ").count();
assert_eq!(ite * 20, counter);
}
}
#[test]
pub fn test_open_close_restore_multiple_tx() {
let file = Builder::new()
.prefix("open_restore_close_multiple_tx")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().unwrap()).unwrap();
{
let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap();
let mut tx = persy.begin().expect("error on transaction begin");
tx.create_segment("def").expect("error on segment creation");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
for ite in 1..20 {
let recover = OpenOptions::new().recover_file(file.reopen().unwrap()).unwrap();
assert!(recover.list_transactions().len() <= 60);
let persy = recover.finalize().expect("open correctly");
for _ in 0..80 {
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
let counter = persy.scan("def").expect("read persistent records ").count();
assert_eq!(ite * 80, counter);
}
}
#[cfg(feature = "background_ops")]
#[test]
pub fn test_background_sync() {
use persy::TransactionConfig;
create_and_drop("background_sync", |persy| {
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let mut tx = persy
.begin_with(TransactionConfig::new().set_background_sync(true))
.unwrap();
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let id = tx.insert("test", &bytes).unwrap();
let prep = tx.prepare().unwrap();
prep.commit().unwrap();
let read_after = persy.read("test", &id).unwrap();
assert!(read_after.is_some());
});
}
#[test]
fn test_operations_no_cache() {
let mut config = Config::new();
config.change_cache_size(0);
create_and_drop_with_config("operations_no_cache", config, |persy| {
let rec_data: String = "something".into();
let bytes = rec_data.into_bytes();
let other_rec_data: String = "other".into();
let other_bytes = other_rec_data.into_bytes();
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
tx.create_index::<u8, u8>("test", ValueMode::Cluster).unwrap();
let id_to_update = tx.insert("test", &bytes).unwrap();
tx.prepare().unwrap().commit().unwrap();
let mut tx = persy.begin().unwrap();
let id = tx.insert("test", &bytes).unwrap();
tx.put::<u8, u8>("test", 10, 20).unwrap();
tx.update("test", &id_to_update, &other_bytes).unwrap();
tx.prepare().unwrap().commit().unwrap();
let read_after = persy.read("test", &id).unwrap();
assert!(read_after.is_some());
let read_after = persy.read("test", &id_to_update).unwrap();
assert_eq!(read_after, Some(other_bytes));
let mut read_after = persy.get::<u8, u8>("test", &10).unwrap();
assert!(read_after.next().is_some());
let mut tx = persy.begin().unwrap();
tx.delete("test", &id).unwrap();
tx.delete("test", &id_to_update).unwrap();
tx.remove::<u8, u8>("test", 10, None).unwrap();
tx.prepare().unwrap().commit().unwrap();
let read_after = persy.read("test", &id).unwrap();
assert!(read_after.is_none());
let mut read_after = persy.get::<u8, u8>("test", &10).unwrap();
assert!(read_after.next().is_none());
let mut tx = persy.begin().unwrap();
tx.drop_segment("test").unwrap();
tx.drop_index("test").unwrap();
tx.prepare().unwrap().commit().unwrap();
});
}
#[test]
pub fn test_not_overgrow_reopen() {
let file = Builder::new()
.prefix("not_overgrow_reopen")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file));
do_ops(&file, |persy| {
let mut tx = persy.begin().expect("transaction started");
tx.create_segment("def").expect("create segment work");
tx.create_index::<u32, u32>("def_index", ValueMode::Cluster)
.expect("create index works");
tx.prepare().expect("prepare works").commit().expect("commit works");
insert_ops(persy);
});
let size = file.as_file().metadata().expect("error on metadata").len();
let max = size * 2;
for i in 0..10 {
do_ops(&file, delete_ops);
let size = file.as_file().metadata().expect("error on metadata").len();
assert!(size < max, " current size: {} max: {} iter:{}", size, max, i);
do_ops(&file, insert_ops);
let size = file.as_file().metadata().expect("error on metadata").len();
assert!(size < max, " current size: {} max: {} iter:{}", size, max, i);
}
}
fn insert_ops(persy: &Persy) {
for v in 0..100 {
let mut tx = persy.begin().expect("error on transaction begin");
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
tx.put::<u32, u32>("def_index", v, v).expect("error on index");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
}
fn delete_ops(persy: &Persy) {
let cursor = persy.scan("def").expect("error on scan");
for (id, _) in cursor {
let mut tx = persy.begin().expect("error on transaction begin");
tx.delete("def", &id).expect("error on insert value");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
for v in 0..100 {
let mut tx = persy.begin().expect("error on transaction begin");
tx.remove::<u32, u32>("def_index", v, Some(v)).expect("error on index");
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
}
pub fn do_ops<F>(file: &NamedTempFile, test: F)
where
F: FnOnce(&Persy),
{
let persy = Persy::open_from_file(file.reopen().expect("reopen"), Config::new()).unwrap();
test(&persy);
}
#[test]
pub fn test_not_overgrow() {
let file = Builder::new()
.prefix("not_overgrow")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file));
let persy = Persy::open_from_file(file.reopen().expect("reopen"), Config::new()).unwrap();
let mut tx = persy.begin().expect("transaction started");
tx.create_segment("def").expect("create segment work");
tx.create_index::<u32, u32>("def_index", ValueMode::Cluster)
.expect("create index works");
tx.prepare().expect("prepare works").commit().expect("commit works");
insert_ops(&persy);
let size = file.as_file().metadata().expect("error on metadata").len();
let max = size + size / 3;
for i in 0..10 {
delete_ops(&persy);
let size = file.as_file().metadata().expect("error on metadata").len();
assert!(size < max, " current size: {} max: {} iter:{}", size, max, i);
insert_ops(&persy);
let size = file.as_file().metadata().expect("error on metadata").len();
assert!(size < max, " current size: {} max: {} iter:{}", size, max, i);
}
}
#[test]
pub fn test_delete_reopen() {
let file = Builder::new()
.prefix("delete_reopen")
.suffix(".persy")
.tempfile()
.expect("expect temp file creation");
Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file));
do_ops(&file, |persy| {
let mut tx = persy.begin().expect("transaction started");
tx.create_segment("def").expect("create segment work");
tx.prepare().expect("prepare works").commit().expect("commit works");
});
do_ops(&file, insert_data_ops);
do_ops(&file, delete_data_ops);
do_ops(&file, delete_data_ops);
}
fn insert_data_ops(persy: &Persy) {
let mut tx = persy.begin().expect("error on transaction begin");
for _ in 0..100 {
let val = String::from("aaa").into_bytes();
tx.insert("def", &val).expect("error on insert value");
}
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}
fn delete_data_ops(persy: &Persy) {
let cursor = persy.scan("def").expect("error on scan");
let mut tx = persy.begin().expect("error on transaction begin");
for (id, _) in cursor {
tx.delete("def", &id).expect("error on insert value");
}
let fin = tx.prepare().expect("error on commit prepare");
fin.commit().expect("error on commit");
}