use bitcoin::blockdata::block::Header;
use bitcoin::hash_types::{Txid, BlockHash};
use crate::chain;
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
use crate::events;
use crate::events::{Event, EventHandler};
use crate::util::atomic_counter::AtomicCounter;
use crate::util::logger::{Logger, WithContext};
use crate::util::errors::APIError;
use crate::util::wakers::{Future, Notifier};
use crate::ln::channelmanager::ChannelDetails;
use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::iter::FromIterator;
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
mod update_origin {
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub(crate) enum UpdateOrigin {
OffChain(u64),
ChainSync(u64),
}
}
#[cfg(any(feature = "_test_utils", test))]
pub(crate) use update_origin::UpdateOrigin;
#[cfg(not(any(feature = "_test_utils", test)))]
use update_origin::UpdateOrigin;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct MonitorUpdateId {
pub(crate) contents: UpdateOrigin,
}
impl MonitorUpdateId {
pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self {
Self { contents: UpdateOrigin::OffChain(update.update_id) }
}
pub(crate) fn from_new_monitor<ChannelSigner: WriteableEcdsaChannelSigner>(monitor: &ChannelMonitor<ChannelSigner>) -> Self {
Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) }
}
}
pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
}
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
monitor: ChannelMonitor<ChannelSigner>,
pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
last_chain_persist_height: AtomicUsize,
}
impl<ChannelSigner: WriteableEcdsaChannelSigner> MonitorHolder<ChannelSigner> {
fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
pending_monitor_updates_lock.iter().any(|update_id|
if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false })
}
fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
pending_monitor_updates_lock.iter().any(|update_id|
if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false })
}
}
pub struct LockedChannelMonitor<'a, ChannelSigner: WriteableEcdsaChannelSigner> {
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
funding_txo: OutPoint,
}
impl<ChannelSigner: WriteableEcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
type Target = ChannelMonitor<ChannelSigner>;
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
&self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
}
}
pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
sync_persistence_id: AtomicCounter,
chain_source: Option<C>,
broadcaster: T,
logger: L,
fee_estimator: F,
persister: P,
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
highest_chain_height: AtomicUsize,
event_notifier: Notifier,
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
core::mem::drop(monitor_lock);
let _poison = self.monitors.write().unwrap();
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}
let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}
if let Some(height) = best_height {
let old_height = self.highest_chain_height.load(Ordering::Acquire);
let new_height = height as usize;
if new_height > old_height {
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
}
fn update_monitor_with_chain_data<FN>(
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData,
process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}
log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
return Err(());
},
}
}
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
chain_source.register_output(output);
}
}
}
Ok(())
}
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
Self {
monitors: RwLock::new(HashMap::new()),
sync_persistence_id: AtomicCounter::new(),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Notifier::new(),
}
}
pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
let mut ret = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
for chan in ignored_channels {
if chan.funding_txo.as_ref() == Some(funding_outpoint) {
return false;
}
}
true
}) {
ret.append(&mut monitor_state.monitor.get_claimable_balances());
}
ret
}
pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
let lock = self.monitors.read().unwrap();
if lock.get(&funding_txo).is_some() {
Ok(LockedChannelMonitor { lock, funding_txo })
} else {
Err(())
}
}
pub fn list_monitors(&self) -> Vec<OutPoint> {
self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
}
#[cfg(not(c_bindings))]
pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
(*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
}).collect()
}
#[cfg(c_bindings)]
pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec<MonitorUpdateId>)> {
self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
(*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
}).collect()
}
#[cfg(test)]
pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
}
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> {
let monitors = self.monitors.read().unwrap();
let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else {
return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) });
};
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
match completed_update_id {
MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => {
let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates);
if monitor_is_pending_updates {
return Ok(());
}
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}], monitor_data.monitor.get_counterparty_node_id()));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
}
},
}
self.event_notifier.notify();
Ok(())
}
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
let monitors = self.monitors.read().unwrap();
let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
funding_txo,
monitor_update_id,
}], counterparty_node_id));
self.event_notifier.notify();
}
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use crate::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
let event_handler = |event: events::Event| events.borrow_mut().push(event);
self.process_pending_events(&event_handler);
events.into_inner()
}
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
&self, handler: H
) {
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
for funding_txo in mons_to_process {
let mut ev;
super::channelmonitor::process_events_body!(
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
}
}
pub fn get_update_future(&self) -> Future {
self.event_notifier.get_future()
}
pub fn rebroadcast_pending_claims(&self) {
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
monitor_holder.monitor.rebroadcast_pending_claims(
&*self.broadcaster, &*self.fee_estimator, &self.logger
)
}
}
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
});
}
fn block_disconnected(&self, header: &Header, height: u32) {
let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
for monitor_state in monitor_states.values() {
monitor_state.monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger);
}
}
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
self.process_chain_data(header, None, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
});
}
fn transaction_unconfirmed(&self, txid: &Txid) {
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger);
}
}
fn best_block_updated(&self, header: &Header, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
)
});
}
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
let mut txids = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
txids.append(&mut monitor_state.monitor.get_relevant_txids());
}
txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
txids.dedup_by_key(|(txid, _, _)| *txid);
txids
}
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::Completed => {
log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source , &self.logger);
}
entry.insert(MonitorHolder {
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
});
Ok(persist_res)
}
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
None => {
let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(funding_txo.to_channel_id()));
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
#[cfg(debug_assertions)]
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
#[cfg(not(debug_assertions))]
ChannelMonitorUpdateStatus::InProgress
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor);
log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);
let update_id = MonitorUpdateId::from_monitor_update(update);
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
let persist_res = if update_res.is_err() {
log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
} else {
self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
};
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::Completed => {
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
core::mem::drop(pending_monitor_updates);
core::mem::drop(monitors);
let _poison = self.monitors.write().unwrap();
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
if update_res.is_err() {
ChannelMonitorUpdateStatus::InProgress
} else {
persist_res
}
}
}
}
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
if is_pending_monitor_update {
log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(logger, " This may cause duplicate payment events to be generated.");
}
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
}
}
}
pending_monitor_events
}
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
for monitor_state in self.monitors.read().unwrap().values() {
monitor_state.monitor.process_pending_events(&handler);
}
}
}
#[cfg(test)]
mod tests {
use crate::check_added_monitors;
use crate::{expect_payment_claimed, expect_payment_path_successful, get_event_msg};
use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields};
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::ChannelMessageHandler;
use crate::util::errors::APIError;
#[test]
fn test_async_ooo_offchain_updates() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1);
let (payment_preimage_1, payment_hash_1, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
let (payment_preimage_2, payment_hash_2, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
assert_eq!(persistences.len(), 1);
let (funding_txo, updates) = persistences.iter().next().unwrap();
assert_eq!(updates.len(), 2);
let mut update_iter = updates.iter();
let next_update = update_iter.next().unwrap().clone();
#[cfg(not(c_bindings))]
assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
.unwrap().contains(&next_update));
#[cfg(c_bindings)]
assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap();
#[cfg(not(c_bindings))]
assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
.unwrap().contains(&next_update));
#[cfg(c_bindings)]
assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
let claim_events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(claim_events.len(), 2);
match claim_events[0] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_1, *payment_hash);
},
_ => panic!("Unexpected event"),
}
match claim_events[1] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_2, *payment_hash);
},
_ => panic!("Unexpected event"),
}
let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
check_added_monitors!(nodes[1], 1);
let bs_second_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_first_update);
check_added_monitors!(nodes[1], 1);
let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_second_updates.update_fulfill_htlcs[0]);
expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
expect_payment_path_successful!(nodes[0]);
check_added_monitors!(nodes[0], 1);
let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_second_raa);
check_added_monitors!(nodes[1], 1);
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_second_update);
check_added_monitors!(nodes[1], 1);
let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_second_raa);
expect_payment_path_successful!(nodes[0]);
check_added_monitors!(nodes[0], 1);
}
fn do_chainsync_pauses_events(block_timeout: bool) {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel = create_announced_chan_between_nodes(&nodes, 0, 1);
send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
nodes[1].node.claim_funds(payment_preimage);
expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
nodes[1].node.get_and_clear_pending_msg_events();
check_added_monitors!(nodes[1], 1);
let remote_txn = get_local_commitment_txn!(nodes[1], channel.2);
assert_eq!(remote_txn.len(), 2);
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
let new_header = create_dummy_header(nodes[0].best_block_info().0, 0);
nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
&[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
nodes[0].chain_monitor.chain_monitor.best_block_updated(&new_header, nodes[0].best_block_info().1 + 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
unwrap_send_err!(nodes[0].node.send_payment_with_route(&route, second_payment_hash,
RecipientOnionFields::secret_only(second_payment_secret), PaymentId(second_payment_hash.0)
), false, APIError::MonitorUpdateInProgress, {});
check_added_monitors!(nodes[0], 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
if block_timeout {
let latest_header = create_dummy_header(nodes[0].best_block_info().0, 0);
nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
} else {
let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
for (funding_outpoint, update_ids) in persistences {
for update_id in update_ids {
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_outpoint, update_id).unwrap();
}
}
}
expect_payment_sent(&nodes[0], payment_preimage, None, true, false);
}
#[test]
fn chainsync_pauses_events() {
do_chainsync_pauses_events(false);
do_chainsync_pauses_events(true);
}
#[test]
#[cfg(feature = "std")]
fn update_during_chainsync_poisons_channel() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1);
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
assert!(std::panic::catch_unwind(|| {
connect_blocks(&nodes[0], 1);
}).is_err());
assert!(std::panic::catch_unwind(|| {
core::mem::drop(nodes);
}).is_err());
}
}