use bitcoin::block::Header;
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::secp256k1::PublicKey;
use crate::chain;
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
#[cfg(peer_storage)]
use crate::chain::channelmonitor::write_chanmon_internal;
use crate::chain::channelmonitor::{
Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs,
WithChannelMonitor,
};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::events::{self, Event, EventHandler, ReplayEvent};
use crate::ln::channel_state::ChannelDetails;
#[cfg(peer_storage)]
use crate::ln::msgs::PeerStorage;
use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
#[cfg(peer_storage)]
use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
use crate::ln::types::ChannelId;
use crate::prelude::*;
use crate::sign::ecdsa::EcdsaChannelSigner;
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::types::features::{InitFeatures, NodeFeatures};
use crate::util::async_poll::{MaybeSend, MaybeSync};
use crate::util::errors::APIError;
use crate::util::logger::{Logger, WithContext};
use crate::util::native_async::FutureSpawner;
use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
#[cfg(peer_storage)]
use crate::util::ser::{VecWriter, Writeable};
use crate::util::wakers::{Future, Notifier};
use alloc::sync::Arc;
#[cfg(peer_storage)]
use core::iter::Cycle;
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
fn persist_new_channel(
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
) -> ChannelMonitorUpdateStatus;
fn update_persisted_channel(
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner>,
) -> ChannelMonitorUpdateStatus;
fn archive_persisted_channel(&self, monitor_name: MonitorName);
#[doc(hidden)]
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
Vec::new()
}
}
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
monitor: ChannelMonitor<ChannelSigner>,
pending_monitor_updates: Mutex<Vec<u64>>,
}
impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
fn has_pending_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<u64>>) -> bool {
!pending_monitor_updates_lock.is_empty()
}
}
pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
lock: RwLockReadGuard<'a, HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
channel_id: ChannelId,
}
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
type Target = ChannelMonitor<ChannelSigner>;
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
&self.lock.get(&self.channel_id).expect("Checked at construction").monitor
}
}
pub struct AsyncPersister<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
event_notifier: Arc<Notifier>,
}
impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
type Target = Self;
fn deref(&self) -> &Self {
self
}
}
impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
{
fn persist_new_channel(
&self, monitor_name: MonitorName,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
let notifier = Arc::clone(&self.event_notifier);
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
ChannelMonitorUpdateStatus::InProgress
}
fn update_persisted_channel(
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
let notifier = Arc::clone(&self.event_notifier);
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
ChannelMonitorUpdateStatus::InProgress
}
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
self.persister.spawn_async_archive_persisted_channel(monitor_name);
}
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
self.persister.get_and_clear_completed_updates()
}
}
pub struct ChainMonitor<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
monitors: RwLock<HashMap<ChannelId, MonitorHolder<ChannelSigner>>>,
chain_source: Option<C>,
broadcaster: T,
logger: L,
fee_estimator: F,
persister: P,
_entropy_source: ES,
pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>>,
highest_chain_height: AtomicUsize,
event_notifier: Arc<Notifier>,
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
#[cfg(peer_storage)]
our_peerstorage_encryption_key: PeerStorageKey,
}
impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
SP: Deref + MaybeSend + MaybeSync + 'static,
C: Deref,
T: Deref + MaybeSend + MaybeSync + 'static,
F: Deref + MaybeSend + MaybeSync + 'static,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
>
ChainMonitor<
<SP::Target as SignerProvider>::EcdsaSigner,
C,
T,
F,
L,
AsyncPersister<K, S, L, ES, SP, T, F>,
ES,
> where
K::Target: KVStore + MaybeSync,
SP::Target: SignerProvider + Sized,
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
ES::Target: EntropySource + Sized,
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
{
pub fn new_async_beta(
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
_our_peerstorage_encryption_key: PeerStorageKey,
) -> Self {
let event_notifier = Arc::new(Notifier::new());
Self {
monitors: RwLock::new(new_hash_map()),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
_entropy_source,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Arc::clone(&event_notifier),
persister: AsyncPersister { persister, event_notifier },
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn process_chain_data<FN>(
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
) where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
let channel_ids = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
let channel_count = channel_ids.len();
for channel_id in channel_ids.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(channel_id) {
let update_res = self.update_monitor_with_chain_data(
header,
best_height,
txdata,
&process,
channel_id,
&monitor_state,
channel_count,
);
if update_res.is_err() {
core::mem::drop(monitor_lock);
let _poison = self.monitors.write().unwrap();
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}
let monitor_states = self.monitors.write().unwrap();
for (channel_id, monitor_state) in monitor_states.iter() {
if !channel_ids.contains(channel_id) {
let update_res = self.update_monitor_with_chain_data(
header,
best_height,
txdata,
&process,
channel_id,
&monitor_state,
channel_count,
);
if update_res.is_err() {
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}
if let Some(height) = best_height {
let old_height = self.highest_chain_height.load(Ordering::Acquire);
let new_height = height as usize;
if new_height > old_height {
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
}
fn update_monitor_with_chain_data<FN>(
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN,
channel_id: &ChannelId, monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
) -> Result<(), ()>
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
{
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let mut txn_outputs = process(monitor, txdata);
let get_partition_key = |channel_id: &ChannelId| {
let channel_id_bytes = channel_id.0;
let channel_id_u32 = u32::from_be_bytes([
channel_id_bytes[0],
channel_id_bytes[1],
channel_id_bytes[2],
channel_id_bytes[3],
]);
channel_id_u32.wrapping_add(best_height.unwrap_or_default())
};
let partition_factor = if channel_count < 15 {
5
} else {
50 };
let has_pending_claims = monitor_state.monitor.has_pending_claims();
if has_pending_claims || get_partition_key(channel_id) % partition_factor == 0 {
log_trace!(
logger,
"Syncing Channel Monitor for channel {}",
log_funding_info!(monitor)
);
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
match self.persister.update_persisted_channel(monitor.persistence_key(), None, monitor)
{
ChannelMonitorUpdateStatus::Completed => log_trace!(
logger,
"Finished syncing Channel Monitor for channel {} for block-data",
log_funding_info!(monitor)
),
ChannelMonitorUpdateStatus::InProgress => {
log_trace!(
logger,
"Channel Monitor sync for channel {} in progress.",
log_funding_info!(monitor)
);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
return Err(());
},
}
}
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
log_trace!(
logger,
"Adding monitoring for spends of outpoint {} to the filter",
output.outpoint
);
chain_source.register_output(output);
}
}
}
Ok(())
}
pub fn new(
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
_entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey,
) -> Self {
Self {
monitors: RwLock::new(new_hash_map()),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister,
_entropy_source,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Arc::new(Notifier::new()),
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
}
pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
let mut ret = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for (_, monitor_state) in monitor_states.iter().filter(|(channel_id, _)| {
for chan in ignored_channels {
if chan.channel_id == **channel_id {
return false;
}
}
true
}) {
ret.append(&mut monitor_state.monitor.get_claimable_balances());
}
ret
}
pub fn get_monitor(
&self, channel_id: ChannelId,
) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
let lock = self.monitors.read().unwrap();
if lock.get(&channel_id).is_some() {
Ok(LockedChannelMonitor { lock, channel_id })
} else {
Err(())
}
}
pub fn list_monitors(&self) -> Vec<ChannelId> {
self.monitors.read().unwrap().keys().copied().collect()
}
#[cfg(not(c_bindings))]
pub fn list_pending_monitor_updates(&self) -> HashMap<ChannelId, Vec<u64>> {
hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(channel_id, holder)| {
(*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
}))
}
#[cfg(c_bindings)]
pub fn list_pending_monitor_updates(&self) -> Vec<(ChannelId, Vec<u64>)> {
let monitors = self.monitors.read().unwrap();
monitors
.iter()
.map(|(channel_id, holder)| {
(*channel_id, holder.pending_monitor_updates.lock().unwrap().clone())
})
.collect()
}
#[cfg(any(test, feature = "_test_utils"))]
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
}
pub fn channel_monitor_updated(
&self, channel_id: ChannelId, completed_update_id: u64,
) -> Result<(), APIError> {
let monitors = self.monitors.read().unwrap();
let monitor_data = if let Some(mon) = monitors.get(&channel_id) {
mon
} else {
return Err(APIError::APIMisuseError {
err: format!("No ChannelMonitor matching channel ID {} found", channel_id),
});
};
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
log_debug!(
self.logger,
"Completed off-chain monitor update {} for channel with channel ID {}, {}",
completed_update_id,
channel_id,
if monitor_is_pending_updates {
"still have pending off-chain updates"
} else {
"all off-chain updates complete, returning a MonitorEvent"
}
);
if monitor_is_pending_updates {
return Ok(());
}
let funding_txo = monitor_data.monitor.get_funding_txo();
self.pending_monitor_events.lock().unwrap().push((
funding_txo,
channel_id,
vec![MonitorEvent::Completed {
funding_txo,
channel_id,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}],
monitor_data.monitor.get_counterparty_node_id(),
));
self.event_notifier.notify();
Ok(())
}
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) {
let monitors = self.monitors.read().unwrap();
let monitor = &monitors.get(&channel_id).unwrap().monitor;
let counterparty_node_id = monitor.get_counterparty_node_id();
let funding_txo = monitor.get_funding_txo();
self.pending_monitor_events.lock().unwrap().push((
funding_txo,
channel_id,
vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }],
counterparty_node_id,
));
self.event_notifier.notify();
}
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use crate::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
self.process_pending_events(&event_handler);
events.into_inner()
}
pub async fn process_pending_events_async<
Future: core::future::Future<Output = Result<(), ReplayEvent>>,
H: Fn(Event) -> Future,
>(
&self, handler: H,
) {
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
for channel_id in mons_to_process {
let mut ev;
match super::channelmonitor::process_events_body!(
self.monitors.read().unwrap().get(&channel_id).map(|m| &m.monitor),
self.logger,
ev,
handler(ev).await
) {
Ok(()) => {},
Err(ReplayEvent()) => {
self.event_notifier.notify();
},
}
}
}
pub fn get_update_future(&self) -> Future {
self.event_notifier.get_future()
}
pub fn rebroadcast_pending_claims(&self) {
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
monitor_holder.monitor.rebroadcast_pending_claims(
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
}
}
pub fn signer_unblocked(&self, monitor_opt: Option<ChannelId>) {
let monitors = self.monitors.read().unwrap();
if let Some(channel_id) = monitor_opt {
if let Some(monitor_holder) = monitors.get(&channel_id) {
monitor_holder.monitor.signer_unblocked(
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
}
} else {
for (_, monitor_holder) in &*monitors {
monitor_holder.monitor.signer_unblocked(
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
}
}
}
pub fn archive_fully_resolved_channel_monitors(&self) {
let mut have_monitors_to_prune = false;
for monitor_holder in self.monitors.read().unwrap().values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
let (is_fully_resolved, needs_persistence) =
monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
if is_fully_resolved {
have_monitors_to_prune = true;
}
if needs_persistence {
self.persister.update_persisted_channel(
monitor_holder.monitor.persistence_key(),
None,
&monitor_holder.monitor,
);
}
}
if have_monitors_to_prune {
let mut monitors = self.monitors.write().unwrap();
monitors.retain(|channel_id, monitor_holder| {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
let (is_fully_resolved, _) =
monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
if is_fully_resolved {
log_info!(
logger,
"Archiving fully resolved ChannelMonitor for channel ID {}",
channel_id
);
self.persister
.archive_persisted_channel(monitor_holder.monitor.persistence_key());
false
} else {
true
}
});
}
}
#[cfg(peer_storage)]
fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
let mon = self.monitors.read().unwrap();
mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
}
#[cfg(peer_storage)]
fn send_peer_storage(&self, their_node_id: PublicKey) {
let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
let random_bytes = self._entropy_source.get_secure_random_bytes();
const MAX_PEER_STORAGE_SIZE: usize = 65531;
const USIZE_LEN: usize = core::mem::size_of::<usize>();
let mut random_bytes_cycle_iter = random_bytes.iter().cycle();
let mut current_size = 0;
let monitors_lock = self.monitors.read().unwrap();
let mut channel_ids = monitors_lock.keys().copied().collect();
fn next_random_id(
channel_ids: &mut Vec<ChannelId>,
random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
) -> Option<ChannelId> {
if channel_ids.is_empty() {
return None;
}
let random_idx = {
let mut usize_bytes = [0u8; USIZE_LEN];
usize_bytes.iter_mut().for_each(|b| {
*b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
});
random_bytes_cycle_iter.next().expect("A cycle never ends");
usize::from_le_bytes(usize_bytes) % channel_ids.len()
};
Some(channel_ids.swap_remove(random_idx))
}
while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
{
let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
monitor_holder
} else {
debug_assert!(
false,
"Tried to access non-existing monitor, this should never happen"
);
break;
};
let mut serialized_channel = VecWriter(Vec::new());
let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
{
let inner_lock = monitor_holder.monitor.inner.lock().unwrap();
write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
.expect("can not write Channel Monitor for peer storage message");
}
let peer_storage_monitor = PeerStorageMonitorHolder {
channel_id,
min_seen_secret,
counterparty_node_id,
monitor_bytes: serialized_channel.0,
};
let serialized_length = peer_storage_monitor.serialized_length();
if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
continue;
} else {
current_size += serialized_length;
monitors_list.push(peer_storage_monitor);
}
}
let serialised_channels = monitors_list.encode();
let our_peer_storage = DecryptedOurPeerStorage::new(serialised_channels);
let cipher = our_peer_storage.encrypt(&self.our_peerstorage_encryption_key, &random_bytes);
log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
node_id: their_node_id,
msg: PeerStorage { data: cipher.into_vec() },
};
self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)
}
pub fn load_existing_monitor(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()> {
if !monitor.written_by_0_1_or_later() {
return chain::Watch::watch_channel(self, channel_id, monitor);
}
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(channel_id) {
hash_map::Entry::Occupied(_) => {
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
log_trace!(
logger,
"Loaded existing ChannelMonitor for channel {}",
log_funding_info!(monitor)
);
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source, &self.logger);
}
entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) });
Ok(ChannelMonitorUpdateStatus::Completed)
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> BaseMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let mut pending_events = self.pending_send_only_events.lock().unwrap();
core::mem::take(&mut *pending_events)
}
fn peer_disconnected(&self, _their_node_id: PublicKey) {}
fn provided_node_features(&self) -> NodeFeatures {
NodeFeatures::empty()
}
fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
InitFeatures::empty()
}
fn peer_connected(
&self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool,
) -> Result<(), ()> {
Ok(())
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> SendOnlyMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(
self.logger,
"New best block {} at height {} provided via block_connected",
header.block_hash(),
height
);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
header,
txdata,
height,
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
});
#[cfg(peer_storage)]
for node_id in self.all_counterparty_node_ids() {
self.send_peer_storage(node_id);
}
self.event_notifier.notify();
}
fn blocks_disconnected(&self, fork_point: BestBlock) {
let monitor_states = self.monitors.read().unwrap();
log_debug!(
self.logger,
"Block(s) removed to height {} via blocks_disconnected. New best block is {}",
fork_point.height,
fork_point.block_hash,
);
for monitor_state in monitor_states.values() {
monitor_state.monitor.blocks_disconnected(
fork_point,
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
);
}
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(
self.logger,
"{} provided transactions confirmed at height {} in block {}",
txdata.len(),
height,
header.block_hash()
);
self.process_chain_data(header, None, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header,
txdata,
height,
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
});
self.event_notifier.notify();
}
fn transaction_unconfirmed(&self, txid: &Txid) {
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
monitor_state.monitor.transaction_unconfirmed(
txid,
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
);
}
}
fn best_block_updated(&self, header: &Header, height: u32) {
log_debug!(
self.logger,
"New best block {} at height {} provided via best_block_updated",
header.block_hash(),
height
);
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header,
height,
&*self.broadcaster,
&*self.fee_estimator,
&self.logger,
)
});
#[cfg(peer_storage)]
for node_id in self.all_counterparty_node_ids() {
self.send_peer_storage(node_id);
}
self.event_notifier.notify();
}
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
let mut txids = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
txids.append(&mut monitor_state.monitor.get_relevant_txids());
}
txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
txids.dedup_by_key(|(txid, _, _)| *txid);
txids
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn watch_channel(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()> {
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(channel_id) {
hash_map::Entry::Occupied(_) => {
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = monitor.get_latest_update_id();
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(
logger,
"Persistence of new ChannelMonitor for channel {} in progress",
log_funding_info!(monitor)
);
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::Completed => {
log_info!(
logger,
"Persistence of new ChannelMonitor for channel {} completed",
log_funding_info!(monitor)
);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source, &self.logger);
}
entry.insert(MonitorHolder {
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
});
Ok(persist_res)
}
fn update_channel(
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
) -> ChannelMonitorUpdateStatus {
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
let monitors = self.monitors.read().unwrap();
match monitors.get(&channel_id) {
None => {
let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
#[cfg(debug_assertions)]
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
#[cfg(not(debug_assertions))]
ChannelMonitorUpdateStatus::InProgress
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
log_trace!(
logger,
"Updating ChannelMonitor to id {} for channel {}",
update.update_id,
log_funding_info!(monitor)
);
let mut pending_monitor_updates =
monitor_state.pending_monitor_updates.lock().unwrap();
let update_res = monitor.update_monitor(
update,
&self.broadcaster,
&self.fee_estimator,
&self.logger,
);
let update_id = update.update_id;
let persist_res = if update_res.is_err() {
log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
self.persister.update_persisted_channel(
monitor.persistence_key(),
None,
monitor,
)
} else {
self.persister.update_persisted_channel(
monitor.persistence_key(),
Some(update),
monitor,
)
};
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
log_debug!(logger,
"Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
update_id,
log_funding_info!(monitor)
);
},
ChannelMonitorUpdateStatus::Completed => {
log_debug!(
logger,
"Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
update_id,
log_funding_info!(monitor)
);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
core::mem::drop(pending_monitor_updates);
core::mem::drop(monitors);
let _poison = self.monitors.write().unwrap();
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
if let Some(ref chain_source) = self.chain_source {
for (funding_outpoint, funding_script) in
update.internal_renegotiated_funding_data()
{
log_trace!(
logger,
"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
funding_outpoint
);
chain_source.register_tx(&funding_outpoint.txid, &funding_script);
chain_source.register_output(WatchedOutput {
block_hash: None,
outpoint: funding_outpoint,
script_pubkey: funding_script,
});
}
}
if update_res.is_err() {
ChannelMonitorUpdateStatus::InProgress
} else {
persist_res
}
},
}
}
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
let _ = self.channel_monitor_updated(channel_id, update_id);
}
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_funding_txo = monitor_state.monitor.get_funding_txo();
let monitor_channel_id = monitor_state.monitor.channel_id();
let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
pending_monitor_events.push((
monitor_funding_txo,
monitor_channel_id,
monitor_events,
counterparty_node_id,
));
}
}
pending_monitor_events
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
T: Deref,
F: Deref,
L: Deref,
P: Deref,
ES: Deref,
> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
ES::Target: EntropySource,
{
fn process_pending_events<H: Deref>(&self, handler: H)
where
H::Target: EventHandler,
{
for monitor_state in self.monitors.read().unwrap().values() {
match monitor_state.monitor.process_pending_events(&handler, &self.logger) {
Ok(()) => {},
Err(ReplayEvent()) => {
self.event_notifier.notify();
},
}
}
}
}
#[cfg(test)]
mod tests {
use crate::chain::channelmonitor::ANTI_REORG_DELAY;
use crate::chain::{ChannelMonitorUpdateStatus, Watch};
use crate::events::{ClosureReason, Event};
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent};
use crate::{check_added_monitors, check_closed_event};
use crate::{expect_payment_path_successful, get_event_msg};
use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5;
#[test]
fn test_async_ooo_offchain_updates() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
let (payment_preimage_1, payment_hash_1, ..) =
route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
let (payment_preimage_2, payment_hash_2, ..) =
route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
let persistences =
chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
assert_eq!(persistences.len(), 1);
let (_, updates) = persistences.iter().next().unwrap();
assert_eq!(updates.len(), 2);
let mut update_iter = updates.iter();
let next_update = update_iter.next().unwrap().clone();
let node_b_mon = &nodes[1].chain_monitor.chain_monitor;
let pending_updates = node_b_mon.list_pending_monitor_updates();
#[cfg(not(c_bindings))]
let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
#[cfg(c_bindings)]
let pending_chan_updates =
&pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
assert!(pending_chan_updates.contains(&next_update));
node_b_mon.channel_monitor_updated(channel_id, next_update.clone()).unwrap();
let pending_updates = node_b_mon.list_pending_monitor_updates();
#[cfg(not(c_bindings))]
let pending_chan_updates = pending_updates.get(&channel_id).unwrap();
#[cfg(c_bindings)]
let pending_chan_updates =
&pending_updates.iter().find(|(chan_id, _)| *chan_id == channel_id).unwrap().1;
assert!(!pending_chan_updates.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
let next_update = update_iter.next().unwrap().clone();
node_b_mon.channel_monitor_updated(channel_id, next_update).unwrap();
let claim_events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(claim_events.len(), 2);
match claim_events[0] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_1, *payment_hash);
},
_ => panic!("Unexpected event"),
}
match claim_events[1] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_2, *payment_hash);
},
_ => panic!("Unexpected event"),
}
let mut updates = get_htlc_update_msgs!(nodes[1], node_a_id);
nodes[0].node.handle_update_fulfill_htlc(node_b_id, updates.update_fulfill_htlcs.remove(0));
expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
nodes[1].node.handle_revoke_and_ack(node_a_id, &as_first_raa);
check_added_monitors!(nodes[1], 1);
let mut bs_2nd_updates = get_htlc_update_msgs!(nodes[1], node_a_id);
nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_first_update);
check_added_monitors!(nodes[1], 1);
let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
nodes[0]
.node
.handle_update_fulfill_htlc(node_b_id, bs_2nd_updates.update_fulfill_htlcs.remove(0));
expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
nodes[0]
.node
.handle_commitment_signed_batch_test(node_b_id, &bs_2nd_updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_first_raa);
expect_payment_path_successful!(nodes[0]);
check_added_monitors!(nodes[0], 1);
let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], node_b_id);
nodes[1].node.handle_revoke_and_ack(node_a_id, &as_second_raa);
check_added_monitors!(nodes[1], 1);
nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_second_update);
check_added_monitors!(nodes[1], 1);
let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id);
nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa);
expect_payment_path_successful!(nodes[0]);
check_added_monitors!(nodes[0], 1);
}
#[test]
fn test_chainsync_triggers_distributed_monitor_persistence() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
let node_a_id = nodes[0].node.get_our_node_id();
let node_c_id = nodes[2].node.get_our_node_id();
*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
*nodes[1].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
*nodes[2].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
let _channel_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let channel_2 =
create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 1_000_000, 0).2;
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
connect_blocks(&nodes[1], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
assert_eq!(
2 * 2,
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
assert_eq!(
2,
chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
assert_eq!(
2,
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
let message = "Channel force-closed".to_owned();
nodes[0]
.node
.force_close_broadcasting_latest_txn(&channel_2, &node_c_id, message.clone())
.unwrap();
let closure_reason =
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
check_closed_event!(&nodes[0], 1, closure_reason, false, [node_c_id], 1000000);
check_closed_broadcast(&nodes[0], 1, true);
let close_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(close_tx.len(), 1);
mine_transaction(&nodes[2], &close_tx[0]);
check_closed_broadcast(&nodes[2], 1, true);
check_added_monitors(&nodes[2], 1);
let closure_reason = ClosureReason::CommitmentTxConfirmed;
check_closed_event!(&nodes[2], 1, closure_reason, false, [node_a_id], 1000000);
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR);
assert_eq!(
(CHAINSYNC_MONITOR_PARTITION_FACTOR + 1) as usize,
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
assert_eq!(
1,
chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
mine_transaction(&nodes[0], &close_tx[0]);
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
check_added_monitors(&nodes[0], 1);
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
assert_eq!(
2,
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len()
);
}
#[test]
#[cfg(feature = "std")]
fn update_during_chainsync_poisons_channel() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1);
*nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
assert!(std::panic::catch_unwind(|| {
connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
})
.is_err());
assert!(std::panic::catch_unwind(|| {
core::mem::drop(nodes);
})
.is_err());
}
}