use crate::blinded_path::message::{AsyncPaymentsContext, BlindedMessagePath};
use crate::io;
use crate::io::Read;
use crate::ln::msgs::DecodeError;
use crate::offers::nonce::Nonce;
use crate::offers::offer::Offer;
use crate::onion_message::messenger::Responder;
use crate::prelude::*;
use crate::util::ser::{Readable, Writeable, Writer};
use core::time::Duration;
#[derive(Clone, PartialEq)]
enum OfferStatus {
Used {
invoice_created_at: Duration,
},
Ready {
invoice_created_at: Duration,
},
Pending,
}
#[derive(Clone)]
struct AsyncReceiveOffer {
offer: Offer,
created_at: Duration,
status: OfferStatus,
offer_nonce: Nonce,
update_static_invoice_path: Responder,
}
impl AsyncReceiveOffer {
fn needs_refresh(&self, duration_since_epoch: Duration) -> bool {
let awhile_ago = duration_since_epoch.saturating_sub(OFFER_REFRESH_THRESHOLD);
match self.status {
OfferStatus::Ready { .. } => self.created_at < awhile_ago,
_ => false,
}
}
}
impl_writeable_tlv_based_enum!(OfferStatus,
(0, Used) => {
(0, invoice_created_at, required),
},
(1, Ready) => {
(0, invoice_created_at, required),
},
(2, Pending) => {},
);
impl_writeable_tlv_based!(AsyncReceiveOffer, {
(0, offer, required),
(2, offer_nonce, required),
(4, status, required),
(6, update_static_invoice_path, required),
(8, created_at, required),
});
pub struct AsyncReceiveOfferCache {
offers: Vec<Option<AsyncReceiveOffer>>,
#[allow(unused)] offer_paths_request_attempts: u8,
#[allow(unused)] paths_to_static_invoice_server: Vec<BlindedMessagePath>,
}
impl AsyncReceiveOfferCache {
pub fn new() -> Self {
Self {
offers: Vec::new(),
offer_paths_request_attempts: 0,
paths_to_static_invoice_server: Vec::new(),
}
}
pub(super) fn paths_to_static_invoice_server(&self) -> &[BlindedMessagePath] {
&self.paths_to_static_invoice_server[..]
}
pub(crate) fn set_paths_to_static_invoice_server(
&mut self, paths_to_static_invoice_server: Vec<BlindedMessagePath>,
) -> Result<(), ()> {
if paths_to_static_invoice_server.is_empty() {
return Err(());
}
self.paths_to_static_invoice_server = paths_to_static_invoice_server;
if self.offers.is_empty() {
self.offers = vec![None; MAX_CACHED_OFFERS_TARGET];
}
Ok(())
}
}
const MAX_CACHED_OFFERS_TARGET: usize = 10;
const MAX_UPDATE_ATTEMPTS: u8 = 3;
const OFFER_REFRESH_THRESHOLD: Duration = Duration::from_secs(2 * 60 * 60);
const INVOICE_REFRESH_THRESHOLD: Duration = Duration::from_secs(2 * 60 * 60);
const MIN_OFFER_PATHS_RELATIVE_EXPIRY_SECS: u64 = 3 * 30 * 24 * 60 * 60;
#[cfg(test)]
pub(crate) const TEST_MAX_CACHED_OFFERS_TARGET: usize = MAX_CACHED_OFFERS_TARGET;
#[cfg(test)]
pub(crate) const TEST_MAX_UPDATE_ATTEMPTS: u8 = MAX_UPDATE_ATTEMPTS;
#[cfg(test)]
pub(crate) const TEST_OFFER_REFRESH_THRESHOLD: Duration = OFFER_REFRESH_THRESHOLD;
#[cfg(test)]
pub(crate) const TEST_INVOICE_REFRESH_THRESHOLD: Duration = INVOICE_REFRESH_THRESHOLD;
#[cfg(test)]
pub(crate) const TEST_MIN_OFFER_PATHS_RELATIVE_EXPIRY_SECS: u64 =
MIN_OFFER_PATHS_RELATIVE_EXPIRY_SECS;
impl AsyncReceiveOfferCache {
pub(crate) fn get_async_receive_offer(
&mut self, duration_since_epoch: Duration,
) -> Result<(Offer, bool), ()> {
self.prune_expired_offers(duration_since_epoch, false);
let newest_unused_offer_opt = self
.unused_ready_offers()
.max_by(|(_, offer_a, _), (_, offer_b, _)| offer_a.created_at.cmp(&offer_b.created_at))
.map(|(idx, offer, invoice_created_at)| (idx, offer.offer.clone(), invoice_created_at));
if let Some((idx, newest_ready_offer, invoice_created_at)) = newest_unused_offer_opt {
self.offers[idx]
.as_mut()
.map(|offer| offer.status = OfferStatus::Used { invoice_created_at });
return Ok((newest_ready_offer, true));
}
self.offers_with_idx()
.filter(|(_, offer)| matches!(offer.status, OfferStatus::Used { .. }))
.max_by(|a, b| {
let abs_expiry_a = a.1.offer.absolute_expiry().unwrap_or(Duration::MAX);
let abs_expiry_b = b.1.offer.absolute_expiry().unwrap_or(Duration::MAX);
abs_expiry_a.cmp(&abs_expiry_b)
})
.map(|(_, cache_offer)| (cache_offer.offer.clone(), false))
.ok_or(())
}
pub(super) fn prune_expired_offers(
&mut self, duration_since_epoch: Duration, force_reset_request_attempts: bool,
) -> Option<u16> {
let mut offer_was_removed = false;
for offer_opt in self.offers.iter_mut() {
let offer_is_expired = offer_opt
.as_ref()
.map_or(false, |offer| offer.offer.is_expired_no_std(duration_since_epoch));
if offer_is_expired {
offer_opt.take();
offer_was_removed = true;
}
}
if force_reset_request_attempts || offer_was_removed {
self.reset_offer_paths_request_attempts()
}
if self.offer_paths_request_attempts >= MAX_UPDATE_ATTEMPTS {
return None;
}
self.needs_new_offer_idx(duration_since_epoch).and_then(|idx| {
debug_assert!(idx < MAX_CACHED_OFFERS_TARGET);
idx.try_into().ok()
})
}
pub(super) fn should_build_offer_with_paths(
&self, offer_paths: &[BlindedMessagePath], offer_paths_absolute_expiry_secs: Option<u64>,
slot: u16, duration_since_epoch: Duration,
) -> bool {
if !self.slot_needs_offer(slot, duration_since_epoch) {
return false;
}
let min_offer_paths_absolute_expiry =
duration_since_epoch.as_secs().saturating_add(MIN_OFFER_PATHS_RELATIVE_EXPIRY_SECS);
let offer_paths_absolute_expiry = offer_paths_absolute_expiry_secs.unwrap_or(u64::MAX);
if offer_paths_absolute_expiry < min_offer_paths_absolute_expiry {
return false;
}
self.offers_with_idx().all(|(_, offer)| offer.offer.paths() != offer_paths)
}
pub(super) fn cache_pending_offer(
&mut self, offer: Offer, offer_paths_absolute_expiry_secs: Option<u64>, offer_nonce: Nonce,
update_static_invoice_path: Responder, duration_since_epoch: Duration, slot: u16,
) -> Result<(), ()> {
self.prune_expired_offers(duration_since_epoch, false);
if !self.should_build_offer_with_paths(
offer.paths(),
offer_paths_absolute_expiry_secs,
slot,
duration_since_epoch,
) {
return Err(());
}
match self.offers.get_mut(slot as usize) {
Some(slot) => {
*slot = Some(AsyncReceiveOffer {
offer,
created_at: duration_since_epoch,
offer_nonce,
status: OfferStatus::Pending,
update_static_invoice_path,
})
},
None => {
debug_assert!(false, "Slot in cache was invalid but should'be been checked above");
return Err(());
},
}
Ok(())
}
fn slot_needs_offer(&self, slot: u16, duration_since_epoch: Duration) -> bool {
match self.offers.get(slot as usize) {
Some(Some(offer)) => offer.needs_refresh(duration_since_epoch),
Some(None) => true,
None => {
debug_assert!(false, "Got offer paths for a non-existent slot in the cache");
false
},
}
}
fn needs_new_offer_idx(&self, duration_since_epoch: Duration) -> Option<usize> {
let empty_slot_idx_opt = self.offers.iter().position(|offer_opt| offer_opt.is_none());
if empty_slot_idx_opt.is_some() {
return empty_slot_idx_opt;
}
let no_replaceable_offers = self.offers_with_idx().all(|(_, offer)| {
matches!(offer.status, OfferStatus::Used { .. } | OfferStatus::Pending)
});
if no_replaceable_offers {
return None;
}
let num_payable_offers = self
.offers_with_idx()
.filter(|(_, offer)| {
matches!(offer.status, OfferStatus::Used { .. } | OfferStatus::Ready { .. })
})
.count();
if num_payable_offers <= 1 {
return None;
}
self.offers_with_idx()
.filter(|(_, offer)| offer.needs_refresh(duration_since_epoch))
.min_by(|(_, offer_a), (_, offer_b)| offer_a.created_at.cmp(&offer_b.created_at))
.map(|(idx, _)| idx)
}
fn offers_with_idx(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer)> {
self.offers.iter().enumerate().filter_map(|(idx, offer_opt)| {
if let Some(offer) = offer_opt {
Some((idx, offer))
} else {
None
}
})
}
fn unused_ready_offers(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer, Duration)> {
self.offers_with_idx().filter_map(|(idx, offer)| match offer.status {
OfferStatus::Ready { invoice_created_at } => Some((idx, offer, invoice_created_at)),
_ => None,
})
}
pub(super) fn new_offers_requested(&mut self) {
self.offer_paths_request_attempts += 1;
}
fn reset_offer_paths_request_attempts(&mut self) {
self.offer_paths_request_attempts = 0;
}
pub(super) fn offers_needing_invoice_refresh(
&self, duration_since_epoch: Duration,
) -> impl Iterator<Item = (&Offer, Nonce, &Responder)> {
self.offers_with_idx().filter_map(move |(_, offer)| {
let needs_invoice_update = match offer.status {
OfferStatus::Used { invoice_created_at } => {
invoice_created_at.saturating_add(INVOICE_REFRESH_THRESHOLD)
< duration_since_epoch
},
OfferStatus::Pending => true,
OfferStatus::Ready { .. } => false,
};
if needs_invoice_update {
Some((&offer.offer, offer.offer_nonce, &offer.update_static_invoice_path))
} else {
None
}
})
}
pub(super) fn static_invoice_persisted(&mut self, context: AsyncPaymentsContext) -> bool {
let (invoice_created_at, offer_id) = match context {
AsyncPaymentsContext::StaticInvoicePersisted { invoice_created_at, offer_id } => {
(invoice_created_at, offer_id)
},
_ => return false,
};
let mut offers = self.offers.iter_mut();
let offer_entry = offers.find(|o| o.as_ref().map_or(false, |o| o.offer.id() == offer_id));
if let Some(Some(ref mut offer)) = offer_entry {
match offer.status {
OfferStatus::Used { invoice_created_at: ref mut inv_created_at }
| OfferStatus::Ready { invoice_created_at: ref mut inv_created_at } => {
*inv_created_at = core::cmp::min(invoice_created_at, *inv_created_at);
},
OfferStatus::Pending => offer.status = OfferStatus::Ready { invoice_created_at },
}
return true;
}
false
}
#[cfg(test)]
pub(super) fn test_get_payable_offers(&self) -> Vec<Offer> {
self.offers_with_idx()
.filter_map(|(_, offer)| {
if matches!(offer.status, OfferStatus::Ready { .. })
|| matches!(offer.status, OfferStatus::Used { .. })
{
Some(offer.offer.clone())
} else {
None
}
})
.collect()
}
}
impl Writeable for AsyncReceiveOfferCache {
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
write_tlv_fields!(w, {
(0, self.offers, required_vec),
(2, self.paths_to_static_invoice_server, required_vec),
});
Ok(())
}
}
impl Readable for AsyncReceiveOfferCache {
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
_init_and_read_len_prefixed_tlv_fields!(r, {
(0, offers, required_vec),
(2, paths_to_static_invoice_server, required_vec),
});
let offers: Vec<Option<AsyncReceiveOffer>> = offers;
Ok(Self { offers, offer_paths_request_attempts: 0, paths_to_static_invoice_server })
}
}