pub mod error;
mod redolog;
pub mod serializer;
use error::PrevaylerResult;
use redolog::ReDoLog;
use serializer::Serializer;
use async_std::path::Path;
pub struct Prevayler<D, T, S> {
data: T,
redo_log: ReDoLog<D, S>,
}
pub struct PrevaylerBuilder<T, D, S, P> {
path: Option<P>,
serializer: Option<S>,
data: Option<T>,
_redolog: Option<ReDoLog<D, S>>,
max_log_size: u64,
}
impl<T, D, S, P> PrevaylerBuilder<T, D, S, P> {
pub fn new() -> Self {
PrevaylerBuilder {
path: None,
serializer: None,
data: None,
_redolog: None,
max_log_size: 64000,
}
}
pub fn path(mut self, path: P) -> Self
where
P: AsRef<Path>,
{
self.path = Some(path);
self
}
pub fn serializer(mut self, serializer: S) -> Self
where
S: Serializer<D>,
{
self.serializer = Some(serializer);
self
}
pub fn max_log_size(mut self, max_log_size: u64) -> Self {
self.max_log_size = max_log_size;
self
}
pub fn data(mut self, data: T) -> Self {
self.data = Some(data);
self
}
pub async fn build(mut self) -> PrevaylerResult<Prevayler<D, T, S>>
where
D: Transaction<T>,
S: Serializer<D>,
P: AsRef<Path>,
{
Prevayler::new(
self.path.expect("You need to define a path"),
self.max_log_size,
self.serializer.expect("You need to define a serializer"),
self.data
.take()
.expect("You need to define a data to be prevailed"),
)
.await
}
pub async fn build_with_snapshots(mut self) -> PrevaylerResult<Prevayler<D, T, S>>
where
D: Transaction<T>,
S: Serializer<D> + Serializer<T>,
P: AsRef<Path>,
{
Prevayler::new_with_snapshot(
self.path.expect("You need to define a path"),
self.max_log_size,
self.serializer.expect("You need to define a serializer"),
self.data
.take()
.expect("You need to define a data to be prevailed"),
)
.await
}
}
impl<D, T, S> Prevayler<D, T, S> {
async fn new<P>(path: P, max_log_size: u64, serializer: S, mut data: T) -> PrevaylerResult<Self>
where
D: Transaction<T>,
S: Serializer<D>,
P: AsRef<Path>,
{
let redo_log = ReDoLog::new(path, max_log_size, serializer, &mut data).await?;
Ok(Prevayler {
data,
redo_log: redo_log,
})
}
async fn new_with_snapshot<P>(
path: P,
max_log_size: u64,
serializer: S,
mut data: T,
) -> PrevaylerResult<Self>
where
D: Transaction<T>,
S: Serializer<D> + Serializer<T>,
P: AsRef<Path>,
{
let redo_log =
ReDoLog::new_with_snapshot(path, max_log_size, serializer, &mut data).await?;
Ok(Prevayler {
data,
redo_log: redo_log,
})
}
pub async fn execute_transaction<TR>(&mut self, transaction: TR) -> PrevaylerResult<()>
where
TR: Into<D>,
D: Transaction<T>,
S: Serializer<D>,
{
let transaction: D = transaction.into();
let serialized = self.redo_log.serialize(&transaction)?;
transaction.execute(&mut self.data);
self.redo_log.write_to_log(serialized).await?;
Ok(())
}
pub async fn execute_transaction_with_query<TR, R>(
&mut self,
transaction: TR,
) -> PrevaylerResult<R>
where
TR: TransactionWithQuery<T, Output = R> + Into<D>,
S: Serializer<D>,
{
let result = transaction.execute_and_return(&mut self.data);
let transaction: D = transaction.into();
let serialized = self.redo_log.serialize(&transaction)?;
self.redo_log.write_to_log(serialized).await?;
Ok(result)
}
pub async fn execute_transaction_panic_safe<TR>(
&mut self,
transaction: TR,
) -> PrevaylerResult<()>
where
TR: Into<D>,
D: Transaction<T>,
S: Serializer<D>,
T: Clone,
{
let transaction: D = transaction.into();
let serialized = self.redo_log.serialize(&transaction)?;
let mut data = self.data.clone();
transaction.execute(&mut data);
self.data = data;
self.redo_log.write_to_log(serialized).await?;
Ok(())
}
pub async fn snapshot(&mut self) -> PrevaylerResult<()>
where
S: Serializer<T>,
{
self.redo_log.snapshot(&self.data).await?;
Ok(())
}
pub fn query(&self) -> &T {
&self.data
}
}
pub trait Transaction<T> {
fn execute(self, data: &mut T);
}
pub trait TransactionWithQuery<T> {
type Output;
fn execute_and_return(&self, data: &mut T) -> Self::Output;
}
impl<T, D> Transaction<T> for D
where
D: TransactionWithQuery<T>,
{
fn execute(self, data: &mut T) {
self.execute_and_return(data);
}
}
#[cfg(test)]
mod tests {
use super::{
error::PrevaylerResult, serializer::JsonSerializer, Prevayler, PrevaylerBuilder,
Transaction, TransactionWithQuery,
};
use async_std::sync::Mutex;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::thread;
use temp_testdir::TempDir;
#[derive(Serialize, Deserialize)]
struct ChangeFirstElement {
value: u8,
}
#[derive(Serialize, Deserialize)]
struct ChangeSecondElement {
value: u8,
}
#[derive(Serialize, Deserialize)]
struct AddToFirstElement {
value: u8,
}
#[derive(Serialize, Deserialize)]
struct AddToSecondElementFailling {
value: u8,
}
#[derive(Serialize, Deserialize)]
struct AddToSecondElement {
value: u8,
}
impl Transaction<(u8, u8)> for ChangeFirstElement {
fn execute(self, data: &mut (u8, u8)) {
data.0 = self.value;
}
}
impl Transaction<(u8, u8)> for ChangeSecondElement {
fn execute(self, data: &mut (u8, u8)) {
data.1 = self.value;
}
}
impl Transaction<(u8, u8)> for AddToFirstElement {
fn execute(self, data: &mut (u8, u8)) {
data.0 += self.value;
}
}
impl Transaction<(u8, u8)> for AddToSecondElementFailling {
fn execute(self, data: &mut (u8, u8)) {
data.0 += self.value;
panic!("Fail");
}
}
impl TransactionWithQuery<(u8, u8)> for AddToSecondElement {
type Output = u8;
fn execute_and_return(&self, data: &mut (u8, u8)) -> u8 {
let old_value = data.0;
data.0 += self.value;
return old_value;
}
}
#[derive(Serialize, Deserialize)]
enum Transactions {
ChangeFirstElement(ChangeFirstElement),
ChangeSecondElement(ChangeSecondElement),
AddToFirstElement(AddToFirstElement),
AddToSecondElementFailling(AddToSecondElementFailling),
AddToSecondElement(AddToSecondElement),
}
impl Transaction<(u8, u8)> for Transactions {
fn execute(self, data: &mut (u8, u8)) {
match self {
Transactions::ChangeFirstElement(e) => {
e.execute(data);
}
Transactions::ChangeSecondElement(e) => {
e.execute(data);
}
Transactions::AddToFirstElement(e) => {
e.execute(data);
}
Transactions::AddToSecondElementFailling(e) => {
e.execute(data);
}
Transactions::AddToSecondElement(e) => {
e.execute(data);
}
};
}
}
impl Into<Transactions> for ChangeFirstElement {
fn into(self) -> Transactions {
Transactions::ChangeFirstElement(self)
}
}
impl Into<Transactions> for ChangeSecondElement {
fn into(self) -> Transactions {
Transactions::ChangeSecondElement(self)
}
}
impl Into<Transactions> for AddToFirstElement {
fn into(self) -> Transactions {
Transactions::AddToFirstElement(self)
}
}
impl Into<Transactions> for AddToSecondElementFailling {
fn into(self) -> Transactions {
Transactions::AddToSecondElementFailling(self)
}
}
impl Into<Transactions> for AddToSecondElement {
fn into(self) -> Transactions {
Transactions::AddToSecondElement(self)
}
}
#[async_std::test]
async fn test_transaction() -> PrevaylerResult<()> {
let temp = TempDir::default();
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
prevayler
.execute_transaction(ChangeFirstElement { value: 7 })
.await?;
prevayler
.execute_transaction(ChangeSecondElement { value: 32 })
.await?;
assert_eq!(&(7, 32), prevayler.query());
Ok(())
}
#[async_std::test]
async fn test_multi_threading() -> PrevaylerResult<()> {
let temp = TempDir::default();
let data = (3, 4);
let prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
let prevayler = Arc::new(Mutex::new(prevayler));
let prevayler_clone = prevayler.clone();
let handle_1 = thread::spawn(move || {
async_std::task::block_on(async {
let mut guard = prevayler_clone.lock().await;
guard
.execute_transaction(ChangeFirstElement { value: 7 })
.await
.expect("Error executing transaction")
});
});
let prevayler_clone = prevayler.clone();
let handle_2 = thread::spawn(move || {
async_std::task::block_on(async {
let mut guard = prevayler_clone.lock().await;
guard
.execute_transaction(ChangeSecondElement { value: 32 })
.await
.expect("Error executing transaction")
});
});
handle_1.join().unwrap();
handle_2.join().unwrap();
let guard = prevayler.lock().await;
let query = guard.query();
assert_eq!(7, query.0);
assert_eq!(32, query.1);
Ok(())
}
#[async_std::test]
async fn test_panic_in_execute_transaction_panic_safe() -> PrevaylerResult<()> {
let temp = TempDir::default();
let data = (3, 4);
let prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
let prevayler = Arc::new(Mutex::new(prevayler));
let prevayler_clone = prevayler.clone();
let handle_1 = thread::spawn(move || {
async_std::task::block_on(async {
let mut guard = prevayler_clone.lock().await;
guard
.execute_transaction(ChangeFirstElement { value: 7 })
.await
.expect("Error executing transaction")
});
});
let prevayler_clone = prevayler.clone();
let handle_2 = thread::spawn(move || {
async_std::task::block_on(async {
let mut guard = prevayler_clone.lock().await;
guard
.execute_transaction_panic_safe(AddToSecondElementFailling { value: 32 })
.await
.expect("Error executing transaction")
});
});
handle_1.join().unwrap();
assert_eq!(true, handle_2.join().is_err());
let guard = prevayler.lock().await;
let query = guard.query();
assert_eq!(7, query.0);
assert_eq!(4, query.1);
Ok(())
}
#[async_std::test]
async fn test_should_save_state() -> PrevaylerResult<()> {
let temp = TempDir::default();
{
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
prevayler
.execute_transaction(ChangeFirstElement { value: 7 })
.await?;
}
{
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
prevayler
.execute_transaction(ChangeSecondElement { value: 32 })
.await?;
}
{
let data = (3, 4);
let prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
assert_eq!(&(7, 32), prevayler.query());
}
Ok(())
}
#[async_std::test]
async fn test_redo_log_with_snapshot() -> PrevaylerResult<()> {
let temp = TempDir::default();
{
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build_with_snapshots()
.await?;
prevayler
.execute_transaction(AddToFirstElement { value: 7 })
.await?;
prevayler.snapshot().await?;
}
{
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build_with_snapshots()
.await?;
prevayler
.execute_transaction(AddToFirstElement { value: 1 })
.await?;
assert_eq!(&(11, 4), prevayler.query());
}
{
let data = (0, 0);
let prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build_with_snapshots()
.await?;
assert_eq!(&(11, 4), prevayler.query());
}
Ok(())
}
#[async_std::test]
async fn test_transaction_with_query() -> PrevaylerResult<()> {
let temp = TempDir::default();
let data = (3, 4);
let mut prevayler: Prevayler<Transactions, _, _> = PrevaylerBuilder::new()
.path(&temp.as_os_str())
.max_log_size(10)
.serializer(JsonSerializer::new())
.data(data)
.build()
.await?;
assert_eq!(
3,
prevayler
.execute_transaction_with_query(AddToSecondElement { value: 7 })
.await?
);
assert_eq!(
10,
prevayler
.execute_transaction_with_query(AddToSecondElement { value: 5 })
.await?
);
assert_eq!(&(15, 4), prevayler.query());
Ok(())
}
}