use std::mem::take;
use std::sync::Arc;
use as_any::AsAny;
use async_trait::async_trait;
use crate::table::Table;
use crate::transaction::Transaction;
use crate::{Result, TableRequirement, TableUpdate};
pub(crate) type BoxedTransactionAction = Arc<dyn TransactionAction>;
#[async_trait]
pub(crate) trait TransactionAction: AsAny + Sync + Send {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit>;
}
pub trait ApplyTransactionAction {
fn apply(self, tx: Transaction) -> Result<Transaction>;
}
impl<T: TransactionAction + 'static> ApplyTransactionAction for T {
fn apply(self, mut tx: Transaction) -> Result<Transaction>
where Self: Sized {
tx.actions.push(Arc::new(self));
Ok(tx)
}
}
pub struct ActionCommit {
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
}
impl ActionCommit {
pub fn new(updates: Vec<TableUpdate>, requirements: Vec<TableRequirement>) -> Self {
Self {
updates,
requirements,
}
}
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
take(&mut self.updates)
}
pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
take(&mut self.requirements)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::sync::Arc;
use as_any::Downcast;
use async_trait::async_trait;
use uuid::Uuid;
use crate::table::Table;
use crate::transaction::Transaction;
use crate::transaction::action::{ActionCommit, ApplyTransactionAction, TransactionAction};
use crate::transaction::tests::make_v2_table;
use crate::{Result, TableRequirement, TableUpdate};
struct TestAction;
#[async_trait]
impl TransactionAction for TestAction {
async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
Ok(ActionCommit::new(
vec![TableUpdate::SetLocation {
location: String::from("s3://bucket/prefix/table/"),
}],
vec![TableRequirement::UuidMatch {
uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1")?,
}],
))
}
}
#[tokio::test]
async fn test_commit_transaction_action() {
let table = make_v2_table();
let action = TestAction;
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();
let requirements = action_commit.take_requirements();
assert_eq!(updates[0], TableUpdate::SetLocation {
location: String::from("s3://bucket/prefix/table/")
});
assert_eq!(requirements[0], TableRequirement::UuidMatch {
uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap()
});
}
#[test]
fn test_apply_transaction_action() {
let table = make_v2_table();
let action = TestAction;
let tx = Transaction::new(&table);
let updated_tx = action.apply(tx).unwrap();
assert_eq!(updated_tx.actions.len(), 1);
(*updated_tx.actions[0])
.downcast_ref::<TestAction>()
.expect("TestAction was not applied to Transaction!");
}
#[test]
fn test_action_commit() {
let location = String::from("s3://bucket/prefix/table/");
let uuid = Uuid::new_v4();
let updates = vec![TableUpdate::SetLocation { location }];
let requirements = vec![TableRequirement::UuidMatch { uuid }];
let mut action_commit = ActionCommit::new(updates.clone(), requirements.clone());
let taken_updates = action_commit.take_updates();
let taken_requirements = action_commit.take_requirements();
assert_eq!(taken_updates, updates);
assert_eq!(taken_requirements, requirements);
assert!(action_commit.take_updates().is_empty());
assert!(action_commit.take_requirements().is_empty());
}
}