mod docs {}
use std::{
cmp,
collections::{BTreeMap, BTreeSet},
io,
ops::Bound,
str::FromStr,
sync::RwLock,
};
use anyhow::Context;
#[cfg(doc)]
use lexe_api::types::payments::VecDbPaymentV2;
use lexe_api::{
def::AppNodeRunApi,
error::NodeApiError,
models::command,
types::payments::{
BasicPaymentV2, PaymentCreatedIndex, PaymentStatus,
PaymentUpdatedIndex, VecBasicPaymentV2,
},
};
use lexe_node_client::client::NodeClient;
use tracing::warn;
use crate::{
types::{
command::PaymentSyncSummary,
payment::{Order, PaymentFilter},
},
unstable::ffs::Ffs,
};
pub struct PaymentsDb<F> {
ffs: F,
state: RwLock<PaymentsDbState>,
}
#[derive(Debug, PartialEq)]
struct PaymentsDbState {
payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
pending: BTreeSet<PaymentCreatedIndex>,
latest_updated_index: Option<PaymentUpdatedIndex>,
}
#[allow(private_bounds)]
pub(crate) async fn sync_payments<F: Ffs>(
db: &PaymentsDb<F>,
node_client: &impl AppNodeRunSyncApi,
batch_size: u16,
) -> anyhow::Result<PaymentSyncSummary> {
assert!(batch_size > 0);
let mut start_index = db.state.read().unwrap().latest_updated_index;
let mut summary = PaymentSyncSummary {
num_new: 0,
num_updated: 0,
};
loop {
let req = command::GetUpdatedPayments {
start_index,
limit: Some(batch_size),
};
let updated_payments = node_client
.get_updated_payments(req)
.await
.context("Failed to fetch updated payments")?
.payments;
let updated_payments_len = updated_payments.len();
let (new, updated, latest_updated_index) = db
.upsert_payments(updated_payments)
.context("Failed to upsert payments")?;
summary.num_new += new;
summary.num_updated += updated;
start_index = latest_updated_index;
if updated_payments_len < usize::from(batch_size) {
break;
}
}
Ok(summary)
}
trait AppNodeRunSyncApi {
async fn get_updated_payments(
&self,
req: command::GetUpdatedPayments,
) -> Result<VecBasicPaymentV2, NodeApiError>;
}
impl AppNodeRunSyncApi for NodeClient {
async fn get_updated_payments(
&self,
req: command::GetUpdatedPayments,
) -> Result<VecBasicPaymentV2, NodeApiError> {
AppNodeRunApi::get_updated_payments(self, req).await
}
}
impl<F: Ffs> PaymentsDb<F> {
pub(crate) fn read(ffs: F) -> anyhow::Result<Self> {
let state = PaymentsDbState::read(&ffs)
.map(RwLock::new)
.context("Failed to read on-disk PaymentsDb state")?;
Ok(Self { ffs, state })
}
pub(crate) fn empty(ffs: F) -> Self {
let state = RwLock::new(PaymentsDbState::empty());
Self { ffs, state }
}
pub fn clear(&self) -> io::Result<()> {
*self.state.write().unwrap() = PaymentsDbState::empty();
self.ffs.delete_all()
}
pub fn num_payments(&self) -> usize {
self.state.read().unwrap().num_payments()
}
pub fn num_pending(&self) -> usize {
self.state.read().unwrap().num_pending()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn num_finalized(&self) -> usize {
self.state.read().unwrap().num_finalized()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn num_pending_not_junk(&self) -> usize {
self.state.read().unwrap().num_pending_not_junk()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn num_finalized_not_junk(&self) -> usize {
self.state.read().unwrap().num_finalized_not_junk()
}
pub fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
self.state.read().unwrap().latest_updated_index()
}
pub fn get_payment_by_created_index(
&self,
created_index: &PaymentCreatedIndex,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_payment_by_created_index(created_index)
.cloned()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn get_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_payment_by_scroll_idx(scroll_idx)
.cloned()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn get_pending_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_pending_payment_by_scroll_idx(scroll_idx)
.cloned()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn get_pending_not_junk_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_pending_not_junk_payment_by_scroll_idx(scroll_idx)
.cloned()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn get_finalized_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_finalized_payment_by_scroll_idx(scroll_idx)
.cloned()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
pub fn get_finalized_not_junk_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<BasicPaymentV2> {
self.state
.read()
.unwrap()
.get_finalized_not_junk_payment_by_scroll_idx(scroll_idx)
.cloned()
}
pub fn list_payments(
&self,
filter: &PaymentFilter,
order: Order,
limit: usize,
after: Option<&PaymentCreatedIndex>,
) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
self.state
.read()
.unwrap()
.list_payments(filter, order, limit, after)
}
fn upsert_payments(
&self,
payments: impl IntoIterator<Item = BasicPaymentV2>,
) -> io::Result<(usize, usize, Option<PaymentUpdatedIndex>)> {
let mut state = self.state.write().unwrap();
let mut num_new = 0;
let mut num_updated = 0;
for payment in payments {
let (new, updated) =
Self::upsert_payment(&self.ffs, &mut state, payment)?;
num_new += new;
num_updated += updated;
}
Ok((num_new, num_updated, state.latest_updated_index))
}
fn upsert_payment(
ffs: &F,
state: &mut PaymentsDbState,
payment: BasicPaymentV2,
) -> io::Result<(usize, usize)> {
let created_index = payment.created_index();
let maybe_existing = state.payments.get(&created_index);
let already_existed = maybe_existing.is_some();
if let Some(existing) = maybe_existing
&& payment == *existing
{
return Ok((0, 0));
}
Self::write_payment(ffs, &payment)?;
if payment.is_pending() {
state.pending.insert(created_index);
} else {
state.pending.remove(&created_index);
}
state.latest_updated_index =
cmp::max(state.latest_updated_index, Some(payment.updated_index()));
state.payments.insert(created_index, payment);
if already_existed {
Ok((0, 1))
} else {
Ok((1, 0))
}
}
fn write_payment(ffs: &F, payment: &BasicPaymentV2) -> io::Result<()> {
let filename = payment.created_index().to_string();
let data =
serde_json::to_vec(&payment).expect("Failed to serialize payment");
ffs.write(&filename, &data)
}
pub(crate) fn update_payment_note(
&self,
req: command::UpdatePaymentNote,
) -> anyhow::Result<()> {
let mut state = self.state.write().unwrap();
let payment = state
.get_mut_payment_by_created_index(&req.index)
.context("Updating non-existent payment")?;
payment.note = req.note.map(|n| n.into_inner());
Self::write_payment(&self.ffs, payment)
.context("Failed to write payment to local db")?;
Ok(())
}
#[cfg(test)]
fn debug_assert_invariants(&self) {
if cfg!(not(debug_assertions)) {
return;
}
let state = self.state.read().unwrap();
state.debug_assert_invariants();
let on_disk_state = PaymentsDbState::read(&self.ffs)
.expect("Failed to re-read on-disk state");
assert_eq!(on_disk_state, *state);
}
}
impl PaymentsDbState {
fn empty() -> Self {
Self {
payments: BTreeMap::new(),
pending: BTreeSet::new(),
latest_updated_index: None,
}
}
fn read(ffs: &impl Ffs) -> anyhow::Result<Self> {
let mut buf = Vec::<u8>::new();
let mut payments = Vec::<BasicPaymentV2>::new();
ffs.read_dir_visitor(|filename| {
let created_index = match PaymentCreatedIndex::from_str(filename) {
Ok(idx) => idx,
Err(e) => {
warn!(
%filename,
"Error: unrecognized filename in payments dir: {e:#}"
);
return Ok(());
}
};
buf.clear();
ffs.read_into(filename, &mut buf)?;
let payment = serde_json::from_slice::<BasicPaymentV2>(&buf)
.with_context(|| filename.to_owned())
.context("Failed to deserialize payment file")
.map_err(io_error_invalid_data)?;
let payment_created_index = payment.created_index();
if created_index != payment_created_index {
return Err(io_error_invalid_data(format!(
"Payment DB corruption: filename index '{filename}'
different from index in contents '{payment_created_index}'"
)));
}
payments.push(payment);
Ok(())
})
.context("Failed to read payments db, possibly corrupted?")?;
Ok(Self::from_vec(payments))
}
fn from_vec(payments: Vec<BasicPaymentV2>) -> Self {
let payments = payments
.into_iter()
.map(|p| (p.created_index(), p))
.collect();
let pending = build_index::pending(&payments);
let latest_updated_index = build_index::latest_updated_index(&payments);
Self {
payments,
pending,
latest_updated_index,
}
}
fn num_payments(&self) -> usize {
self.payments.len()
}
fn num_pending(&self) -> usize {
self.pending.len()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn num_finalized(&self) -> usize {
self.payments.len() - self.pending.len()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn num_pending_not_junk(&self) -> usize {
self.pending
.iter()
.filter_map(|created_idx| self.payments.get(created_idx))
.filter(|p| p.is_pending_not_junk())
.count()
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn num_finalized_not_junk(&self) -> usize {
self.payments
.values()
.filter(|p| p.is_finalized_not_junk())
.count()
}
fn latest_updated_index(&self) -> Option<PaymentUpdatedIndex> {
self.latest_updated_index
}
fn get_payment_by_created_index(
&self,
created_index: &PaymentCreatedIndex,
) -> Option<&BasicPaymentV2> {
self.payments.get(created_index)
}
fn get_mut_payment_by_created_index(
&mut self,
created_index: &PaymentCreatedIndex,
) -> Option<&mut BasicPaymentV2> {
self.payments.get_mut(created_index)
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn get_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<&BasicPaymentV2> {
self.payments.values().nth_back(scroll_idx)
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn get_pending_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<&BasicPaymentV2> {
self.pending
.iter()
.nth_back(scroll_idx)
.and_then(|created_idx| self.payments.get(created_idx))
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn get_pending_not_junk_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<&BasicPaymentV2> {
self.pending
.iter()
.rev()
.filter_map(|created_idx| self.payments.get(created_idx))
.filter(|p| p.is_pending_not_junk())
.nth_back(scroll_idx)
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn get_finalized_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<&BasicPaymentV2> {
self.payments
.values()
.filter(|p| p.is_finalized())
.nth_back(scroll_idx)
}
#[cfg_attr(not(feature = "unstable"), allow(dead_code))]
fn get_finalized_not_junk_payment_by_scroll_idx(
&self,
scroll_idx: usize,
) -> Option<&BasicPaymentV2> {
self.payments
.values()
.filter(|p| p.is_finalized_not_junk())
.nth_back(scroll_idx)
}
fn list_payments(
&self,
filter: &PaymentFilter,
order: Order,
limit: usize,
after: Option<&PaymentCreatedIndex>,
) -> (Vec<BasicPaymentV2>, Option<PaymentCreatedIndex>) {
if limit == 0 {
return (Vec::new(), None);
}
let matches_filter = |p: &&BasicPaymentV2| match filter {
PaymentFilter::All => true,
PaymentFilter::Pending => p.status == PaymentStatus::Pending,
PaymentFilter::Completed => p.status == PaymentStatus::Completed,
PaymentFilter::Failed => p.status == PaymentStatus::Failed,
PaymentFilter::Finalized => p.status != PaymentStatus::Pending,
};
let take = limit.saturating_add(1);
let mut payments: Vec<BasicPaymentV2> = match (order, after) {
(Order::Asc, Some(c)) => self
.payments
.range((Bound::Excluded(c), Bound::Unbounded))
.map(|(_, p)| p)
.filter(matches_filter)
.take(take)
.cloned()
.collect(),
(Order::Asc, None) => self
.payments
.values()
.filter(matches_filter)
.take(take)
.cloned()
.collect(),
(Order::Desc, Some(c)) => self
.payments
.range(..c)
.rev()
.map(|(_, p)| p)
.filter(matches_filter)
.take(take)
.cloned()
.collect(),
(Order::Desc, None) => self
.payments
.values()
.rev()
.filter(matches_filter)
.take(take)
.cloned()
.collect(),
};
let has_more = payments.len() > limit;
if has_more {
payments.truncate(limit);
}
let next_index = has_more.then(|| {
payments
.last()
.expect("has_more implies non-empty")
.created_index()
});
(payments, next_index)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.payments.is_empty()
}
#[cfg(test)]
fn debug_assert_invariants(&self) {
if cfg!(not(debug_assertions)) {
return;
}
for (idx, payment) in &self.payments {
assert_eq!(*idx, payment.created_index());
}
let rebuilt_pending_index = build_index::pending(&self.payments);
assert_eq!(rebuilt_pending_index, self.pending);
self.payments
.values()
.filter(|p| p.is_pending())
.all(|p| self.pending.contains(&p.created_index()));
let recomputed_latest_updated_index =
build_index::latest_updated_index(&self.payments);
assert_eq!(recomputed_latest_updated_index, self.latest_updated_index);
}
}
mod build_index {
use super::*;
pub(super) fn pending(
payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
) -> BTreeSet<PaymentCreatedIndex> {
payments
.iter()
.filter(|(_, p)| p.is_pending())
.map(|(idx, _)| *idx)
.collect()
}
pub(super) fn latest_updated_index(
payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
) -> Option<PaymentUpdatedIndex> {
payments.values().map(BasicPaymentV2::updated_index).max()
}
}
fn io_error_invalid_data(
error: impl Into<Box<dyn std::error::Error + Send + Sync>>,
) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, error)
}
#[cfg(test)]
mod test_utils {
use proptest::{
prelude::{Strategy, any},
sample::SizeRange,
};
use super::*;
pub(super) struct MockNode {
pub payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
}
impl MockNode {
pub(super) fn new(
payments: BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
) -> Self {
Self { payments }
}
pub(super) fn from_payments(
payments: BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
) -> Self {
let payments = payments
.into_values()
.map(|p| (p.updated_index(), p))
.collect();
Self { payments }
}
}
impl AppNodeRunSyncApi for MockNode {
async fn get_updated_payments(
&self,
req: command::GetUpdatedPayments,
) -> Result<VecBasicPaymentV2, NodeApiError> {
let limit = req.limit.unwrap_or(u16::MAX);
let payments = match req.start_index {
Some(start_index) => self
.payments
.iter()
.filter(|(idx, _)| &start_index < *idx)
.take(limit as usize)
.map(|(_idx, payment)| payment.clone())
.collect(),
None => self
.payments
.iter()
.take(limit as usize)
.map(|(_idx, payment)| payment.clone())
.collect(),
};
Ok(VecBasicPaymentV2 { payments })
}
}
pub(super) fn any_payments(
approx_size: impl Into<SizeRange>,
) -> impl Strategy<Value = BTreeMap<PaymentCreatedIndex, BasicPaymentV2>>
{
proptest::collection::vec(any::<BasicPaymentV2>(), approx_size)
.prop_map(|payments| {
payments
.into_iter()
.map(|payment| (payment.created_index(), payment))
.collect::<BTreeMap<_, _>>()
})
}
}
#[cfg(test)]
mod test {
use std::{collections::HashSet, time::Duration};
use lexe_api::types::payments::PaymentStatus;
use lexe_crypto::rng::{FastRng, RngExt};
use proptest::{
collection::vec, prelude::any, proptest, sample::Index,
test_runner::Config,
};
use tempfile::tempdir;
use super::{test_utils::MockNode, *};
use crate::unstable::ffs::{DiskFs, test_utils::InMemoryFfs};
#[test]
fn read_from_empty() {
let mock_ffs = InMemoryFfs::new();
let mock_ffs_db = PaymentsDb::read(mock_ffs).unwrap();
assert!(mock_ffs_db.state.read().unwrap().is_empty());
mock_ffs_db.debug_assert_invariants();
let tempdir = tempfile::tempdir().unwrap();
let temp_fs =
DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
let temp_fs_db = PaymentsDb::read(temp_fs).unwrap();
assert!(temp_fs_db.state.read().unwrap().is_empty());
temp_fs_db.debug_assert_invariants();
assert_eq!(
*mock_ffs_db.state.read().unwrap(),
*temp_fs_db.state.read().unwrap()
);
}
#[test]
fn test_upsert() {
fn visit_batches<T>(
iter: &mut impl Iterator<Item = T>,
batch_sizes: Vec<usize>,
mut f: impl FnMut(Vec<T>),
) {
let batch_sizes = batch_sizes.into_iter();
for batch_size in batch_sizes {
let batch = take_n(iter, batch_size);
let batch_len = batch.len();
if batch_len == 0 {
return;
}
f(batch);
if batch_len < batch_size {
return;
}
}
let batch = iter.collect::<Vec<_>>();
if !batch.is_empty() {
f(batch);
}
}
fn take_n<T>(iter: &mut impl Iterator<Item = T>, n: usize) -> Vec<T> {
let mut out = Vec::with_capacity(n);
while out.len() < n {
match iter.next() {
Some(value) => out.push(value),
None => break,
}
}
out
}
proptest!(
Config::with_cases(10),
|(
rng: FastRng,
payments in test_utils::any_payments(0..20),
batch_sizes in vec(1_usize..20, 0..5),
)| {
let tempdir = tempdir().unwrap();
let temp_fs = DiskFs::create_dir_all(tempdir.path().to_path_buf()).unwrap();
let temp_fs_db = PaymentsDb::empty(temp_fs);
let mock_ffs = InMemoryFfs::from_rng(rng);
let mock_ffs_db = PaymentsDb::empty(mock_ffs);
let mut payments_iter = payments.clone().into_values();
visit_batches(&mut payments_iter, batch_sizes, |new_payment_batch| {
let _ = mock_ffs_db.upsert_payments(
new_payment_batch.clone()
).unwrap();
let _ = temp_fs_db.upsert_payments(new_payment_batch).unwrap();
mock_ffs_db.debug_assert_invariants();
temp_fs_db.debug_assert_invariants();
});
assert_eq!(
*mock_ffs_db.state.read().unwrap(),
*temp_fs_db.state.read().unwrap()
);
}
);
}
#[tokio::test]
async fn test_sync_empty() {
let mock_node_client = MockNode::new(BTreeMap::new());
let mock_ffs = InMemoryFfs::new();
let db = PaymentsDb::empty(mock_ffs);
sync_payments(&db, &mock_node_client, 5).await.unwrap();
assert!(db.state.read().unwrap().is_empty());
db.debug_assert_invariants();
}
#[test]
fn test_sync() {
fn assert_db_payments_eq(
db_payments: &BTreeMap<PaymentCreatedIndex, BasicPaymentV2>,
node_payments: &BTreeMap<PaymentUpdatedIndex, BasicPaymentV2>,
) {
assert_eq!(db_payments.len(), node_payments.len());
db_payments.iter().for_each(|(_created_idx, payment)| {
let node_payment =
node_payments.get(&payment.updated_index()).unwrap();
assert_eq!(payment, node_payment);
});
node_payments.iter().for_each(|(_updated_idx, payment)| {
let db_payment =
db_payments.get(&payment.created_index()).unwrap();
assert_eq!(payment, db_payment);
});
}
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
proptest!(
Config::with_cases(4),
|(
mut rng: FastRng,
payments in test_utils::any_payments(1..20),
req_batch_size in 1_u16..5,
finalize_indexes in
proptest::collection::vec(any::<Index>(), 1..5),
)| {
let mut mock_node = MockNode::from_payments(payments);
let mut rng2 = FastRng::from_u64(rng.gen_u64());
let mock_ffs = InMemoryFfs::from_rng(rng);
let db = PaymentsDb::empty(mock_ffs);
rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
.unwrap();
assert_db_payments_eq(
&db.state.read().unwrap().payments,
&mock_node.payments,
);
db.debug_assert_invariants();
let mock_ffs = db.ffs;
let db = PaymentsDb::read(mock_ffs).unwrap();
rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
.unwrap();
assert_db_payments_eq(
&db.state.read().unwrap().payments,
&mock_node.payments,
);
db.debug_assert_invariants();
let finalize_some_payments = || {
let pending_payments = mock_node
.payments
.values()
.filter(|p| p.is_pending())
.cloned()
.collect::<Vec<_>>();
if pending_payments.is_empty() {
return;
}
let mut current_time = db
.state
.read()
.unwrap()
.latest_updated_index
.expect("DB should have payments")
.updated_at;
let finalize_idxs = finalize_indexes
.into_iter()
.map(|index| index.index(pending_payments.len()))
.collect::<HashSet<_>>();
for finalize_idx in finalize_idxs {
let final_updated_idx =
pending_payments[finalize_idx].updated_index();
let mut payment = mock_node
.payments
.remove(&final_updated_idx)
.unwrap();
let new_status = if rng2.gen_boolean() {
PaymentStatus::Completed
} else {
PaymentStatus::Failed
};
let bump_u64 = u64::from(rng2.gen_range_u32(1..11));
let bump_dur = Duration::from_millis(bump_u64);
current_time = current_time.saturating_add(bump_dur);
payment.status = new_status;
payment.updated_at = current_time;
let new_updated_index = payment.updated_index();
mock_node
.payments
.insert(new_updated_index, payment);
}
};
finalize_some_payments();
rt.block_on(sync_payments(&db, &mock_node, req_batch_size))
.unwrap();
assert_db_payments_eq(
&db.state.read().unwrap().payments,
&mock_node.payments,
);
db.debug_assert_invariants();
}
);
}
}