use std::collections::BTreeMap;
use kuatia_types::*;
use crate::error::StoreError;
use crate::events::{LedgerEvent, LedgerEventKind};
use crate::store::*;
fn make_account(id: i64, policy: AccountPolicy) -> Account {
Account {
id: AccountId::new(id),
version: 1,
policy,
flags: AccountFlags::empty(),
book: BookId(0),
user_data: UserData::default(),
metadata: BTreeMap::new(),
}
}
fn make_posting(
transfer_hash: [u8; 32],
index: u16,
owner: i64,
asset: u32,
value: i64,
) -> Posting {
Posting::new(
PostingId {
transfer: EnvelopeId(transfer_hash),
index,
},
AccountId::new(owner),
AssetId::new(asset),
Cent::from(value),
)
}
fn make_envelope_with_book(book: BookId) -> (Envelope, EnvelopeId) {
let t = EnvelopeBuilder::new()
.creates(vec![
NewPosting {
owner: AccountId::new(1),
asset: AssetId::new(1),
value: Cent::from(100),
payer: None,
},
NewPosting {
owner: AccountId::new(99),
asset: AssetId::new(1),
value: Cent::from(-100),
payer: None,
},
])
.book(book)
.build();
let mut tid_bytes = [0u8; 32];
tid_bytes[0] = book.0 as u8;
tid_bytes[1] = 42;
(t, EnvelopeId(tid_bytes))
}
fn make_envelope() -> (Envelope, EnvelopeId) {
let t = EnvelopeBuilder::new()
.creates(vec![
NewPosting {
owner: AccountId::new(1),
asset: AssetId::new(1),
value: Cent::from(100),
payer: None,
},
NewPosting {
owner: AccountId::new(99),
asset: AssetId::new(1),
value: Cent::from(-100),
payer: None,
},
])
.build();
let tid = EnvelopeId([42; 32]);
(t, tid)
}
async fn seed_active(store: &(impl Store + 'static), _tag: u8, create: &[Posting]) {
store.insert_postings(create).await.unwrap();
}
async fn commit_envelope(
store: &(impl Store + 'static),
envelope: Envelope,
tid: EnvelopeId,
created_at: i64,
) {
let create: Vec<Posting> = envelope
.creates()
.iter()
.enumerate()
.map(|(i, np)| {
Posting::new(
PostingId {
transfer: tid,
index: i as u16,
},
np.owner,
np.asset,
np.value,
)
})
.collect();
let mut involved: Vec<AccountId> = create.iter().map(|p| p.owner).collect();
involved.sort();
involved.dedup();
store.insert_postings(&create).await.unwrap();
store
.store_transfer(
EnvelopeRecord {
envelope,
receipt: Receipt { transfer_id: tid },
created_at,
},
&involved,
)
.await
.unwrap();
}
pub async fn create_and_get_account(store: &(impl Store + 'static)) {
let acc = make_account(1, AccountPolicy::NoOverdraft);
store.create_account(acc.clone()).await.unwrap();
let got = store.get_account(&AccountId::new(1)).await.unwrap();
assert_eq!(got.id, acc.id);
assert_eq!(got.version, 1);
}
pub async fn create_duplicate_account_fails(store: &(impl Store + 'static)) {
let acc = make_account(1, AccountPolicy::NoOverdraft);
store.create_account(acc.clone()).await.unwrap();
let err = store.create_account(acc).await.unwrap_err();
assert!(matches!(err, StoreError::AlreadyExists(_)));
}
pub async fn get_missing_account_fails(store: &(impl Store + 'static)) {
let err = store.get_account(&AccountId::new(999)).await.unwrap_err();
assert!(matches!(err, StoreError::NotFound(_)));
}
pub async fn get_accounts_batch(store: &(impl Store + 'static)) {
store
.create_account(make_account(1, AccountPolicy::NoOverdraft))
.await
.unwrap();
store
.create_account(make_account(2, AccountPolicy::NoOverdraft))
.await
.unwrap();
let accs = store
.get_accounts(&[AccountId::new(1), AccountId::new(2)])
.await
.unwrap();
assert_eq!(accs.len(), 2);
}
pub async fn append_account_version(store: &(impl Store + 'static)) {
let acc = make_account(1, AccountPolicy::NoOverdraft);
store.create_account(acc.clone()).await.unwrap();
let mut v2 = acc.clone();
v2.version = 2;
v2.flags = AccountFlags::FROZEN;
store.append_account_version(v2).await.unwrap();
let got = store.get_account(&AccountId::new(1)).await.unwrap();
assert_eq!(got.version, 2);
assert!(got.is_frozen());
}
pub async fn append_version_conflict(store: &(impl Store + 'static)) {
let acc = make_account(1, AccountPolicy::NoOverdraft);
store.create_account(acc.clone()).await.unwrap();
let mut bad = acc.clone();
bad.version = 5;
let err = store.append_account_version(bad).await.unwrap_err();
assert!(matches!(err, StoreError::VersionConflict { .. }));
}
pub async fn get_account_history(store: &(impl Store + 'static)) {
let acc = make_account(1, AccountPolicy::NoOverdraft);
store.create_account(acc.clone()).await.unwrap();
let mut v2 = acc.clone();
v2.version = 2;
store.append_account_version(v2).await.unwrap();
let history = store.get_account_history(&AccountId::new(1)).await.unwrap();
assert_eq!(history.len(), 2);
assert_eq!(history[0].version, 1);
assert_eq!(history[1].version, 2);
}
pub async fn list_accounts(store: &(impl Store + 'static)) {
store
.create_account(make_account(1, AccountPolicy::NoOverdraft))
.await
.unwrap();
store
.create_account(make_account(2, AccountPolicy::ExternalAccount))
.await
.unwrap();
let list = store.list_accounts().await.unwrap();
assert_eq!(list.len(), 2);
}
pub async fn commit_creates_postings(store: &(impl Store + 'static)) {
let p = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p)).await;
let got = store.get_postings(&[p.id]).await.unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].value, Cent::from(100));
}
pub async fn get_postings_missing_fails(store: &(impl Store + 'static)) {
let missing = PostingId {
transfer: EnvelopeId([0; 32]),
index: 0,
};
let err = store.get_postings(&[missing]).await.unwrap_err();
assert!(matches!(err, StoreError::NotFound(_)));
}
pub async fn get_postings_by_account_filters(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
let p2 = make_posting([1; 32], 1, 1, 2, 200);
let p3 = make_posting([1; 32], 2, 2, 1, 300);
seed_active(store, 200, &[p1, p2, p3]).await;
let all = store
.get_postings_by_account(&AccountId::new(1), None, None)
.await
.unwrap();
assert_eq!(all.len(), 2);
let filtered = store
.get_postings_by_account(&AccountId::new(1), Some(&AssetId::new(1)), None)
.await
.unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].value, Cent::from(100));
let active = store
.get_postings_by_account(&AccountId::new(1), None, Some(PostingStatus::Active))
.await
.unwrap();
assert_eq!(active.len(), 2);
}
pub async fn query_postings_pagination(store: &(impl Store + 'static)) {
let postings: Vec<Posting> = (0..5)
.map(|i| make_posting([1; 32], i, 1, 1, (i as i64 + 1) * 100))
.collect();
seed_active(store, 200, &postings).await;
let page1 = store
.query_postings(&PostingQuery {
account: AccountId::new(1),
asset: None,
status: None,
limit: Some(2),
offset: Some(0),
})
.await
.unwrap();
assert_eq!(page1.items.len(), 2);
assert_eq!(page1.total, 5);
let page2 = store
.query_postings(&PostingQuery {
account: AccountId::new(1),
asset: None,
status: None,
limit: Some(2),
offset: Some(2),
})
.await
.unwrap();
assert_eq!(page2.items.len(), 2);
assert_eq!(page2.total, 5);
let page3 = store
.query_postings(&PostingQuery {
account: AccountId::new(1),
asset: None,
status: None,
limit: Some(2),
offset: Some(4),
})
.await
.unwrap();
assert_eq!(page3.items.len(), 1);
assert_eq!(page3.total, 5);
let filtered = store
.query_postings(&PostingQuery {
account: AccountId::new(1),
asset: Some(AssetId::new(1)),
status: None,
limit: Some(10),
offset: None,
})
.await
.unwrap();
assert_eq!(filtered.total, 5);
assert_eq!(filtered.items.len(), 5);
}
pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
let p2 = make_posting([1; 32], 1, 1, 1, 200);
seed_active(store, 200, &[p1.clone(), p2.clone()]).await;
store
.reserve_postings(&[p1.id, p2.id], ReservationId::new(1))
.await
.unwrap();
let got = store.get_postings(&[p1.id, p2.id]).await.unwrap();
assert!(
got.iter()
.all(|p| p.status == PostingStatus::PendingInactive)
);
}
pub async fn reserve_skips_non_active(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
let p2 = make_posting([1; 32], 1, 1, 1, 200);
seed_active(store, 200, &[p1.clone(), p2.clone()]).await;
assert_eq!(
store
.reserve_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap(),
1
);
assert_eq!(
store
.reserve_postings(&[p1.id, p2.id], ReservationId::new(1))
.await
.unwrap(),
1
);
assert_eq!(
store.get_postings(&[p2.id]).await.unwrap()[0].status,
PostingStatus::PendingInactive
);
}
pub async fn release_postings_batch(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p1)).await;
store
.reserve_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap();
store
.release_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap();
let got = store.get_postings(&[p1.id]).await.unwrap();
assert_eq!(got[0].status, PostingStatus::Active);
}
pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p1)).await;
store
.release_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap();
let got = store.get_postings(&[p1.id]).await.unwrap();
assert_eq!(got[0].status, PostingStatus::Active);
}
pub async fn release_inactive_zero(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p1)).await;
assert_eq!(store.deactivate_postings(&[p1.id], None).await.unwrap(), 1);
assert_eq!(
store
.release_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap(),
0
);
assert_eq!(
store.get_postings(&[p1.id]).await.unwrap()[0].status,
PostingStatus::Inactive
);
}
pub async fn commit_deactivates_postings(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p1)).await;
store
.reserve_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap();
let p2 = make_posting([2; 32], 0, 1, 1, 100);
assert_eq!(
store
.deactivate_postings(&[p1.id], Some(ReservationId::new(1)))
.await
.unwrap(),
1
);
store
.insert_postings(std::slice::from_ref(&p2))
.await
.unwrap();
let got = store.get_postings(&[p1.id]).await.unwrap();
assert_eq!(got[0].status, PostingStatus::Inactive);
let got2 = store.get_postings(&[p2.id]).await.unwrap();
assert_eq!(got2[0].status, PostingStatus::Active);
}
pub async fn insert_postings_counts(store: &(impl Store + 'static)) {
let p1 = make_posting([3; 32], 0, 1, 1, 100);
let p2 = make_posting([3; 32], 1, 1, 1, 200);
assert_eq!(
store
.insert_postings(std::slice::from_ref(&p1))
.await
.unwrap(),
1
);
assert_eq!(
store
.insert_postings(&[p1.clone(), p2.clone()])
.await
.unwrap(),
1
);
assert_eq!(store.insert_postings(&[p1, p2]).await.unwrap(), 0);
}
pub async fn deactivate_postings_counts(store: &(impl Store + 'static)) {
let p1 = make_posting([4; 32], 0, 1, 1, 100);
let p2 = make_posting([4; 32], 1, 1, 1, 200);
store
.insert_postings(&[p1.clone(), p2.clone()])
.await
.unwrap();
assert_eq!(
store
.deactivate_postings(&[p1.id, p2.id], None)
.await
.unwrap(),
2
);
assert_eq!(
store
.deactivate_postings(&[p1.id, p2.id], None)
.await
.unwrap(),
0
);
assert_eq!(
store.get_postings(&[p1.id]).await.unwrap()[0].status,
PostingStatus::Inactive
);
}
pub async fn deactivate_postings_saga_path(store: &(impl Store + 'static)) {
let p1 = make_posting([5; 32], 0, 1, 1, 100);
store
.insert_postings(std::slice::from_ref(&p1))
.await
.unwrap();
store
.reserve_postings(&[p1.id], ReservationId::new(7))
.await
.unwrap();
assert_eq!(
store
.deactivate_postings(&[p1.id], Some(ReservationId::new(8)))
.await
.unwrap(),
0
);
assert_eq!(
store
.deactivate_postings(&[p1.id], Some(ReservationId::new(7)))
.await
.unwrap(),
1
);
}
pub async fn store_transfer_counts(store: &(impl Store + 'static)) {
let (envelope, tid) = make_envelope(); let record = EnvelopeRecord {
envelope,
receipt: Receipt { transfer_id: tid },
created_at: 1000,
};
let involved = [AccountId::new(1), AccountId::new(99)];
assert_eq!(
store
.store_transfer(record.clone(), &involved)
.await
.unwrap(),
1
);
assert_eq!(store.store_transfer(record, &involved).await.unwrap(), 0);
assert!(store.get_transfer(&tid).await.unwrap().is_some());
assert_eq!(
store
.get_transfers_for_account(&AccountId::new(1))
.await
.unwrap()
.len(),
1
);
}
pub async fn reserve_twice_second_zero(store: &(impl Store + 'static)) {
let p1 = make_posting([1; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&p1)).await;
assert_eq!(
store
.reserve_postings(&[p1.id], ReservationId::new(1))
.await
.unwrap(),
1
);
assert_eq!(
store
.reserve_postings(&[p1.id], ReservationId::new(2))
.await
.unwrap(),
0
);
}
pub async fn deactivate_twice_second_zero(store: &(impl Store + 'static)) {
let consumed = make_posting([7; 32], 0, 1, 1, 100);
seed_active(store, 200, std::slice::from_ref(&consumed)).await;
assert_eq!(
store
.deactivate_postings(&[consumed.id], None)
.await
.unwrap(),
1
);
assert_eq!(
store
.deactivate_postings(&[consumed.id], None)
.await
.unwrap(),
0
);
}
pub async fn append_event_idempotent(store: &(impl Store + 'static)) {
let event = LedgerEvent {
seq: 0,
timestamp: 1000,
kind: LedgerEventKind::TransferCommitted {
transfer_id: EnvelopeId([8; 32]),
},
};
let seq1 = store.append_event(&event).await.unwrap();
let seq2 = store.append_event(&event).await.unwrap();
assert_eq!(seq1, seq2);
assert_eq!(store.get_events_since(0, 10).await.unwrap().len(), 1);
}
pub async fn commit_and_get_transfer(store: &(impl Store + 'static)) {
let (envelope, tid) = make_envelope();
commit_envelope(store, envelope, tid, 1000).await;
let got = store.get_transfer(&tid).await.unwrap();
assert!(got.is_some());
assert_eq!(got.unwrap().receipt.transfer_id, tid);
}
pub async fn get_missing_transfer(store: &(impl Store + 'static)) {
let got = store.get_transfer(&EnvelopeId([0; 32])).await.unwrap();
assert!(got.is_none());
}
pub async fn get_transfers_for_account(store: &(impl Store + 'static)) {
let (envelope, tid) = make_envelope();
commit_envelope(store, envelope, tid, 1000).await;
let records = store
.get_transfers_for_account(&AccountId::new(1))
.await
.unwrap();
assert_eq!(records.len(), 1);
let empty = store
.get_transfers_for_account(&AccountId::new(999))
.await
.unwrap();
assert!(empty.is_empty());
}
pub async fn commit_preserves_created_at(store: &(impl Store + 'static)) {
let (envelope, tid) = make_envelope();
commit_envelope(store, envelope, tid, 1718000000000).await;
let got = store.get_transfer(&tid).await.unwrap().unwrap();
assert_eq!(got.created_at, 1718000000000);
}
pub async fn query_transfers_by_date_range(store: &(impl Store + 'static)) {
let (e1, t1) = make_envelope();
commit_envelope(store, e1, t1, 1000).await;
let (e2, t2) = make_envelope_with_book(BookId(1));
commit_envelope(store, e2, t2, 2000).await;
let page = store
.query_transfers(&TransferQuery {
account: Some(AccountId::new(1)),
from_ts: Some(1500),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.total, 1);
assert_eq!(page.items[0].created_at, 2000);
}
pub async fn query_transfers_pagination(store: &(impl Store + 'static)) {
for i in 0..3u8 {
let mut tid_bytes = [0u8; 32];
tid_bytes[0] = i + 10;
let (envelope, _) = make_envelope();
let tid = EnvelopeId(tid_bytes);
commit_envelope(store, envelope, tid, (i as i64 + 1) * 1000).await;
}
let page = store
.query_transfers(&TransferQuery {
account: Some(AccountId::new(1)),
limit: Some(2),
offset: Some(0),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.items.len(), 2);
assert_eq!(page.total, 3);
let page2 = store
.query_transfers(&TransferQuery {
account: Some(AccountId::new(1)),
limit: Some(2),
offset: Some(2),
..Default::default()
})
.await
.unwrap();
assert_eq!(page2.items.len(), 1);
assert_eq!(page2.total, 3);
}
pub async fn query_transfers_by_book(store: &(impl Store + 'static)) {
let (e1, t1) = make_envelope(); commit_envelope(store, e1, t1, 1000).await;
let (e2, t2) = make_envelope_with_book(BookId(5));
commit_envelope(store, e2, t2, 2000).await;
let page = store
.query_transfers(&TransferQuery {
account: Some(AccountId::new(1)),
book: Some(BookId(5)),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.total, 1);
assert_eq!(page.items[0].envelope.book(), BookId(5));
}
pub async fn save_and_list_sagas(store: &(impl Store + 'static)) {
let id: i64 = 42;
let data = vec![1, 2, 3];
store.save_saga(&id, data.clone()).await.unwrap();
let pending = store.list_pending_sagas().await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].0, id);
assert_eq!(pending[0].1, data);
}
pub async fn delete_saga(store: &(impl Store + 'static)) {
let id: i64 = 42;
store.save_saga(&id, vec![1, 2, 3]).await.unwrap();
store.delete_saga(&id).await.unwrap();
let pending = store.list_pending_sagas().await.unwrap();
assert!(pending.is_empty());
}
pub async fn append_and_query_events(store: &(impl Store + 'static)) {
let e1 = LedgerEvent {
seq: 0,
timestamp: 1000,
kind: LedgerEventKind::AccountCreated {
account_id: AccountId::new(1),
},
};
let e2 = LedgerEvent {
seq: 0,
timestamp: 2000,
kind: LedgerEventKind::TransferCommitted {
transfer_id: EnvelopeId([42; 32]),
},
};
let seq1 = store.append_event(&e1).await.unwrap();
let seq2 = store.append_event(&e2).await.unwrap();
assert!(seq2 > seq1);
let events = store.get_events_since(0, 100).await.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].seq, seq1);
assert_eq!(events[1].seq, seq2);
}
pub async fn events_sequence_ordering(store: &(impl Store + 'static)) {
for i in 0..5u64 {
store
.append_event(&LedgerEvent {
seq: 0,
timestamp: (i as i64 + 1) * 1000,
kind: LedgerEventKind::AccountCreated {
account_id: AccountId::new(i as i64 + 1),
},
})
.await
.unwrap();
}
let page1 = store.get_events_since(0, 3).await.unwrap();
assert_eq!(page1.len(), 3);
let page2 = store.get_events_since(page1[2].seq, 10).await.unwrap();
assert_eq!(page2.len(), 2);
}
fn make_book(id: i64, name: &str) -> Book {
BookBuilder::new(name)
.id(BookId::new(id))
.allow_asset(AssetId::new(1))
.build()
}
pub async fn create_and_get_book(store: &(impl Store + 'static)) {
let book = make_book(1, "sales");
store.create_book(book.clone()).await.unwrap();
let got = store.get_book(&BookId::new(1)).await.unwrap();
assert_eq!(got, book);
}
pub async fn create_duplicate_book_fails(store: &(impl Store + 'static)) {
let book = make_book(1, "sales");
store.create_book(book.clone()).await.unwrap();
let err = store.create_book(book).await.unwrap_err();
assert!(matches!(err, StoreError::AlreadyExists(_)));
}
pub async fn get_missing_book_fails(store: &(impl Store + 'static)) {
let err = store.get_book(&BookId::new(999)).await.unwrap_err();
assert!(matches!(err, StoreError::NotFound(_)));
}
pub async fn list_books(store: &(impl Store + 'static)) {
store.create_book(make_book(1, "sales")).await.unwrap();
store.create_book(make_book(2, "inventory")).await.unwrap();
let mut books = store.list_books().await.unwrap();
books.sort_by_key(|b| b.id.0);
assert_eq!(books.len(), 2);
assert_eq!(books[0].name, "sales");
assert_eq!(books[1].name, "inventory");
}
#[macro_export]
macro_rules! store_tests {
($factory:path) => {
$crate::store_tests!(@tests $factory,
create_and_get_account,
create_duplicate_account_fails,
get_missing_account_fails,
get_accounts_batch,
append_account_version,
append_version_conflict,
get_account_history,
list_accounts,
commit_creates_postings,
get_postings_missing_fails,
get_postings_by_account_filters,
query_postings_pagination,
reserve_postings_batch,
reserve_skips_non_active,
release_postings_batch,
release_active_is_noop,
release_inactive_zero,
commit_deactivates_postings,
insert_postings_counts,
deactivate_postings_counts,
deactivate_postings_saga_path,
store_transfer_counts,
reserve_twice_second_zero,
deactivate_twice_second_zero,
append_event_idempotent,
commit_and_get_transfer,
get_missing_transfer,
get_transfers_for_account,
commit_preserves_created_at,
query_transfers_by_date_range,
query_transfers_pagination,
query_transfers_by_book,
save_and_list_sagas,
delete_saga,
append_and_query_events,
events_sequence_ordering,
create_and_get_book,
create_duplicate_book_fails,
get_missing_book_fails,
list_books,
);
};
(@tests $factory:path, $($test:ident),+ $(,)?) => {
::paste::paste! {
$(
#[tokio::test]
async fn [< $test >]() {
$crate::store_tests::$test(&$factory().await).await;
}
)+
}
};
}