1use bitcoin::block::Header;
27use bitcoin::hash_types::{Txid, BlockHash};
28
29use crate::chain;
30use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
31use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
33use crate::chain::transaction::{OutPoint, TransactionData};
34use crate::ln::types::ChannelId;
35use crate::sign::ecdsa::EcdsaChannelSigner;
36use crate::events::{self, Event, EventHandler, ReplayEvent};
37use crate::util::logger::{Logger, WithContext};
38use crate::util::errors::APIError;
39use crate::util::wakers::{Future, Notifier};
40use crate::ln::channel_state::ChannelDetails;
41
42use crate::prelude::*;
43use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
44use core::ops::Deref;
45use core::sync::atomic::{AtomicUsize, Ordering};
46use bitcoin::hashes::Hash;
47use bitcoin::secp256k1::PublicKey;
48
49pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
105 fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
122
123 fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus;
161 fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
173}
174
175struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
176 monitor: ChannelMonitor<ChannelSigner>,
177 pending_monitor_updates: Mutex<Vec<u64>>,
192}
193
194impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
195 fn has_pending_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<u64>>) -> bool {
196 !pending_monitor_updates_lock.is_empty()
197 }
198}
199
200pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
205 lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
206 funding_txo: OutPoint,
207}
208
209impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
210 type Target = ChannelMonitor<ChannelSigner>;
211 fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
212 &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
213 }
214}
215
216pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
233 where C::Target: chain::Filter,
234 T::Target: BroadcasterInterface,
235 F::Target: FeeEstimator,
236 L::Target: Logger,
237 P::Target: Persist<ChannelSigner>,
238{
239 monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
240 chain_source: Option<C>,
241 broadcaster: T,
242 logger: L,
243 fee_estimator: F,
244 persister: P,
245 pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>>,
248 highest_chain_height: AtomicUsize,
250
251 event_notifier: Notifier,
254}
255
256impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
257where C::Target: chain::Filter,
258 T::Target: BroadcasterInterface,
259 F::Target: FeeEstimator,
260 L::Target: Logger,
261 P::Target: Persist<ChannelSigner>,
262{
263 fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
275 where
276 FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
277 {
278 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
279 let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
280 let channel_count = funding_outpoints.len();
281 for funding_outpoint in funding_outpoints.iter() {
282 let monitor_lock = self.monitors.read().unwrap();
283 if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
284 if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
285 core::mem::drop(monitor_lock);
288 let _poison = self.monitors.write().unwrap();
289 log_error!(self.logger, "{}", err_str);
290 panic!("{}", err_str);
291 }
292 }
293 }
294
295 let monitor_states = self.monitors.write().unwrap();
297 for (funding_outpoint, monitor_state) in monitor_states.iter() {
298 if !funding_outpoints.contains(funding_outpoint) {
299 if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
300 log_error!(self.logger, "{}", err_str);
301 panic!("{}", err_str);
302 }
303 }
304 }
305
306 if let Some(height) = best_height {
307 let old_height = self.highest_chain_height.load(Ordering::Acquire);
310 let new_height = height as usize;
311 if new_height > old_height {
312 self.highest_chain_height.store(new_height, Ordering::Release);
313 }
314 }
315 }
316
317 fn update_monitor_with_chain_data<FN>(
318 &self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
319 monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
320 ) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
321 let monitor = &monitor_state.monitor;
322 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
323
324 let mut txn_outputs = process(monitor, txdata);
325
326 let get_partition_key = |funding_outpoint: &OutPoint| {
327 let funding_txid_hash = funding_outpoint.txid.to_raw_hash();
328 let funding_txid_hash_bytes = funding_txid_hash.as_byte_array();
329 let funding_txid_u32 = u32::from_be_bytes([funding_txid_hash_bytes[0], funding_txid_hash_bytes[1], funding_txid_hash_bytes[2], funding_txid_hash_bytes[3]]);
330 funding_txid_u32.wrapping_add(best_height.unwrap_or_default())
331 };
332
333 let partition_factor = if channel_count < 15 {
334 5
335 } else {
336 50 };
338
339 let has_pending_claims = monitor_state.monitor.has_pending_claims();
340 if has_pending_claims || get_partition_key(funding_outpoint) % partition_factor == 0 {
341 log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
342 let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
347 match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) {
348 ChannelMonitorUpdateStatus::Completed =>
349 log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
350 log_funding_info!(monitor)
351 ),
352 ChannelMonitorUpdateStatus::InProgress => {
353 log_trace!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor));
354 }
355 ChannelMonitorUpdateStatus::UnrecoverableError => {
356 return Err(());
357 }
358 }
359 }
360
361 if let Some(ref chain_source) = self.chain_source {
364 let block_hash = header.block_hash();
365 for (txid, mut outputs) in txn_outputs.drain(..) {
366 for (idx, output) in outputs.drain(..) {
367 let output = WatchedOutput {
369 block_hash: Some(block_hash),
370 outpoint: OutPoint { txid, index: idx as u16 },
371 script_pubkey: output.script_pubkey,
372 };
373 log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
374 chain_source.register_output(output);
375 }
376 }
377 }
378 Ok(())
379 }
380
381 pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
389 Self {
390 monitors: RwLock::new(new_hash_map()),
391 chain_source,
392 broadcaster,
393 logger,
394 fee_estimator: feeest,
395 persister,
396 pending_monitor_events: Mutex::new(Vec::new()),
397 highest_chain_height: AtomicUsize::new(0),
398 event_notifier: Notifier::new(),
399 }
400 }
401
402 pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
411 let mut ret = Vec::new();
412 let monitor_states = self.monitors.read().unwrap();
413 for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
414 for chan in ignored_channels {
415 if chan.funding_txo.as_ref() == Some(funding_outpoint) {
416 return false;
417 }
418 }
419 true
420 }) {
421 ret.append(&mut monitor_state.monitor.get_claimable_balances());
422 }
423 ret
424 }
425
426 pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
432 let lock = self.monitors.read().unwrap();
433 if lock.get(&funding_txo).is_some() {
434 Ok(LockedChannelMonitor { lock, funding_txo })
435 } else {
436 Err(())
437 }
438 }
439
440 pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
445 self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| {
446 let channel_id = monitor_holder.monitor.channel_id();
447 (*outpoint, channel_id)
448 }).collect()
449 }
450
451 #[cfg(not(c_bindings))]
452 pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<u64>> {
457 hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
458 (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
459 }))
460 }
461
462 #[cfg(c_bindings)]
463 pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec<u64>)> {
468 self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
469 (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
470 }).collect()
471 }
472
473
474 #[cfg(test)]
475 pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
476 self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
477 }
478
479 pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: u64) -> Result<(), APIError> {
500 let monitors = self.monitors.read().unwrap();
501 let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else {
502 return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) });
503 };
504 let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
505 pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
506
507 let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates);
510 log_debug!(self.logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}",
511 completed_update_id,
512 funding_txo,
513 if monitor_is_pending_updates {
514 "still have pending off-chain updates"
515 } else {
516 "all off-chain updates complete, returning a MonitorEvent"
517 });
518 if monitor_is_pending_updates {
519 return Ok(());
522 }
523 let channel_id = monitor_data.monitor.channel_id();
524 self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
525 funding_txo, channel_id,
526 monitor_update_id: monitor_data.monitor.get_latest_update_id(),
527 }], monitor_data.monitor.get_counterparty_node_id()));
528
529 self.event_notifier.notify();
530 Ok(())
531 }
532
533 #[cfg(any(test, fuzzing))]
537 pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
538 let monitors = self.monitors.read().unwrap();
539 let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) {
540 (m.monitor.get_counterparty_node_id(), m.monitor.channel_id())
541 } else {
542 (None, ChannelId::v1_from_funding_outpoint(funding_txo))
543 };
544 self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
545 funding_txo,
546 channel_id,
547 monitor_update_id,
548 }], counterparty_node_id));
549 self.event_notifier.notify();
550 }
551
552 #[cfg(any(test, feature = "_test_utils"))]
553 pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
554 use crate::events::EventsProvider;
555 let events = core::cell::RefCell::new(Vec::new());
556 let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
557 self.process_pending_events(&event_handler);
558 events.into_inner()
559 }
560
561 pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
568 &self, handler: H
569 ) {
570 let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
573 for funding_txo in mons_to_process {
574 let mut ev;
575 match super::channelmonitor::process_events_body!(
576 self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), self.logger, ev, handler(ev).await) {
577 Ok(()) => {},
578 Err(ReplayEvent ()) => {
579 self.event_notifier.notify();
580 }
581 }
582 }
583 }
584
585 pub fn get_update_future(&self) -> Future {
594 self.event_notifier.get_future()
595 }
596
597 pub fn rebroadcast_pending_claims(&self) {
603 let monitors = self.monitors.read().unwrap();
604 for (_, monitor_holder) in &*monitors {
605 monitor_holder.monitor.rebroadcast_pending_claims(
606 &*self.broadcaster, &*self.fee_estimator, &self.logger
607 )
608 }
609 }
610
611 pub fn signer_unblocked(&self, monitor_opt: Option<OutPoint>) {
616 let monitors = self.monitors.read().unwrap();
617 if let Some(funding_txo) = monitor_opt {
618 if let Some(monitor_holder) = monitors.get(&funding_txo) {
619 monitor_holder.monitor.signer_unblocked(
620 &*self.broadcaster, &*self.fee_estimator, &self.logger
621 )
622 }
623 } else {
624 for (_, monitor_holder) in &*monitors {
625 monitor_holder.monitor.signer_unblocked(
626 &*self.broadcaster, &*self.fee_estimator, &self.logger
627 )
628 }
629 }
630 }
631
632 pub fn archive_fully_resolved_channel_monitors(&self) {
642 let mut have_monitors_to_prune = false;
643 for (funding_txo, monitor_holder) in self.monitors.read().unwrap().iter() {
644 let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
645 let (is_fully_resolved, needs_persistence) = monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
646 if is_fully_resolved {
647 have_monitors_to_prune = true;
648 }
649 if needs_persistence {
650 self.persister.update_persisted_channel(*funding_txo, None, &monitor_holder.monitor);
651 }
652 }
653 if have_monitors_to_prune {
654 let mut monitors = self.monitors.write().unwrap();
655 monitors.retain(|funding_txo, monitor_holder| {
656 let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor, None);
657 let (is_fully_resolved, _) = monitor_holder.monitor.check_and_update_full_resolution_status(&logger);
658 if is_fully_resolved {
659 log_info!(logger,
660 "Archiving fully resolved ChannelMonitor for funding txo {}",
661 funding_txo
662 );
663 self.persister.archive_persisted_channel(*funding_txo);
664 false
665 } else {
666 true
667 }
668 });
669 }
670 }
671}
672
673impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
674chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
675where
676 C::Target: chain::Filter,
677 T::Target: BroadcasterInterface,
678 F::Target: FeeEstimator,
679 L::Target: Logger,
680 P::Target: Persist<ChannelSigner>,
681{
682 fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
683 log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
684 self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
685 monitor.block_connected(
686 header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
687 });
688 self.event_notifier.notify();
690 }
691
692 fn block_disconnected(&self, header: &Header, height: u32) {
693 let monitor_states = self.monitors.read().unwrap();
694 log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
695 for monitor_state in monitor_states.values() {
696 monitor_state.monitor.block_disconnected(
697 header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger);
698 }
699 }
700}
701
702impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
703chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
704where
705 C::Target: chain::Filter,
706 T::Target: BroadcasterInterface,
707 F::Target: FeeEstimator,
708 L::Target: Logger,
709 P::Target: Persist<ChannelSigner>,
710{
711 fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
712 log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
713 self.process_chain_data(header, None, txdata, |monitor, txdata| {
714 monitor.transactions_confirmed(
715 header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
716 });
717 self.event_notifier.notify();
719 }
720
721 fn transaction_unconfirmed(&self, txid: &Txid) {
722 log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
723 let monitor_states = self.monitors.read().unwrap();
724 for monitor_state in monitor_states.values() {
725 monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger);
726 }
727 }
728
729 fn best_block_updated(&self, header: &Header, height: u32) {
730 log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
731 self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
732 debug_assert!(txdata.is_empty());
735 monitor.best_block_updated(
736 header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
737 )
738 });
739 self.event_notifier.notify();
741 }
742
743 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
744 let mut txids = Vec::new();
745 let monitor_states = self.monitors.read().unwrap();
746 for monitor_state in monitor_states.values() {
747 txids.append(&mut monitor_state.monitor.get_relevant_txids());
748 }
749
750 txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
751 txids.dedup_by_key(|(txid, _, _)| *txid);
752 txids
753 }
754}
755
756impl<ChannelSigner: EcdsaChannelSigner, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
757chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
758where C::Target: chain::Filter,
759 T::Target: BroadcasterInterface,
760 F::Target: FeeEstimator,
761 L::Target: Logger,
762 P::Target: Persist<ChannelSigner>,
763{
764 fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
765 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
766 let mut monitors = self.monitors.write().unwrap();
767 let entry = match monitors.entry(funding_outpoint) {
768 hash_map::Entry::Occupied(_) => {
769 log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
770 return Err(());
771 },
772 hash_map::Entry::Vacant(e) => e,
773 };
774 log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
775 let update_id = monitor.get_latest_update_id();
776 let mut pending_monitor_updates = Vec::new();
777 let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor);
778 match persist_res {
779 ChannelMonitorUpdateStatus::InProgress => {
780 log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
781 pending_monitor_updates.push(update_id);
782 },
783 ChannelMonitorUpdateStatus::Completed => {
784 log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
785 },
786 ChannelMonitorUpdateStatus::UnrecoverableError => {
787 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
788 log_error!(logger, "{}", err_str);
789 panic!("{}", err_str);
790 },
791 }
792 if let Some(ref chain_source) = self.chain_source {
793 monitor.load_outputs_to_watch(chain_source , &self.logger);
794 }
795 entry.insert(MonitorHolder {
796 monitor,
797 pending_monitor_updates: Mutex::new(pending_monitor_updates),
798 });
799 Ok(persist_res)
800 }
801
802 fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
803 let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo));
806 let monitors = self.monitors.read().unwrap();
808 match monitors.get(&funding_txo) {
809 None => {
810 let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id), None);
811 log_error!(logger, "Failed to update channel monitor: no such monitor registered");
812
813 #[cfg(debug_assertions)]
817 panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
818 #[cfg(not(debug_assertions))]
819 ChannelMonitorUpdateStatus::InProgress
820 },
821 Some(monitor_state) => {
822 let monitor = &monitor_state.monitor;
823 let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
824 log_trace!(logger, "Updating ChannelMonitor to id {} for channel {}", update.update_id, log_funding_info!(monitor));
825
826 let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
830 let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);
831
832 let update_id = update.update_id;
833 let persist_res = if update_res.is_err() {
834 log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
840 self.persister.update_persisted_channel(funding_txo, None, monitor)
841 } else {
842 self.persister.update_persisted_channel(funding_txo, Some(update), monitor)
843 };
844 match persist_res {
845 ChannelMonitorUpdateStatus::InProgress => {
846 pending_monitor_updates.push(update_id);
847 log_debug!(logger,
848 "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress",
849 update_id,
850 log_funding_info!(monitor)
851 );
852 },
853 ChannelMonitorUpdateStatus::Completed => {
854 log_debug!(logger,
855 "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed",
856 update_id,
857 log_funding_info!(monitor)
858 );
859 },
860 ChannelMonitorUpdateStatus::UnrecoverableError => {
861 core::mem::drop(pending_monitor_updates);
864 core::mem::drop(monitors);
865 let _poison = self.monitors.write().unwrap();
866 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
867 log_error!(logger, "{}", err_str);
868 panic!("{}", err_str);
869 },
870 }
871 if update_res.is_err() {
872 ChannelMonitorUpdateStatus::InProgress
873 } else {
874 persist_res
875 }
876 }
877 }
878 }
879
880 fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
881 let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
882 for monitor_state in self.monitors.read().unwrap().values() {
883 let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
884 if monitor_events.len() > 0 {
885 let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
886 let monitor_channel_id = monitor_state.monitor.channel_id();
887 let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
888 pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id));
889 }
890 }
891 pending_monitor_events
892 }
893}
894
895impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
896 where C::Target: chain::Filter,
897 T::Target: BroadcasterInterface,
898 F::Target: FeeEstimator,
899 L::Target: Logger,
900 P::Target: Persist<ChannelSigner>,
901{
902 fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
916 for monitor_state in self.monitors.read().unwrap().values() {
917 match monitor_state.monitor.process_pending_events(&handler, &self.logger) {
918 Ok(()) => {},
919 Err(ReplayEvent ()) => {
920 self.event_notifier.notify();
921 }
922 }
923 }
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use crate::{check_added_monitors, check_closed_event};
930 use crate::{expect_payment_path_successful, get_event_msg};
931 use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
932 use crate::chain::{ChannelMonitorUpdateStatus, Watch};
933 use crate::chain::channelmonitor::ANTI_REORG_DELAY;
934 use crate::events::{ClosureReason, Event, MessageSendEvent, MessageSendEventsProvider};
935 use crate::ln::functional_test_utils::*;
936 use crate::ln::msgs::ChannelMessageHandler;
937
938 const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5;
939
940 #[test]
941 fn test_async_ooo_offchain_updates() {
942 let chanmon_cfgs = create_chanmon_cfgs(2);
946 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
947 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
948 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
949 create_announced_chan_between_nodes(&nodes, 0, 1);
950
951 let (payment_preimage_1, payment_hash_1, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
953 let (payment_preimage_2, payment_hash_2, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
954
955 chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
956 chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
957 chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
958
959 nodes[1].node.claim_funds(payment_preimage_1);
960 check_added_monitors!(nodes[1], 1);
961 nodes[1].node.claim_funds(payment_preimage_2);
962 check_added_monitors!(nodes[1], 1);
963
964 let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
965 assert_eq!(persistences.len(), 1);
966 let (funding_txo, updates) = persistences.iter().next().unwrap();
967 assert_eq!(updates.len(), 2);
968
969 let mut update_iter = updates.iter();
972 let next_update = update_iter.next().unwrap().clone();
973 #[cfg(not(c_bindings))]
975 assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
976 .unwrap().contains(&next_update));
977 #[cfg(c_bindings)]
978 assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
979 .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
980 nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap();
981 #[cfg(not(c_bindings))]
983 assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
984 .unwrap().contains(&next_update));
985 #[cfg(c_bindings)]
986 assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
987 .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
988 assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
989 assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
990 assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
991 nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
992
993 let claim_events = nodes[1].node.get_and_clear_pending_events();
994 assert_eq!(claim_events.len(), 2);
995 match claim_events[0] {
996 Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
997 assert_eq!(payment_hash_1, *payment_hash);
998 },
999 _ => panic!("Unexpected event"),
1000 }
1001 match claim_events[1] {
1002 Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
1003 assert_eq!(payment_hash_2, *payment_hash);
1004 },
1005 _ => panic!("Unexpected event"),
1006 }
1007
1008 let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1012 nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
1013 expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
1014 nodes[0].node.handle_commitment_signed(nodes[1].node.get_our_node_id(), &updates.commitment_signed);
1015 check_added_monitors!(nodes[0], 1);
1016 let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
1017
1018 nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_first_raa);
1019 check_added_monitors!(nodes[1], 1);
1020 let bs_second_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1021 nodes[1].node.handle_commitment_signed(nodes[0].node.get_our_node_id(), &as_first_update);
1022 check_added_monitors!(nodes[1], 1);
1023 let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
1024
1025 nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), &bs_second_updates.update_fulfill_htlcs[0]);
1026 expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
1027 nodes[0].node.handle_commitment_signed(nodes[1].node.get_our_node_id(), &bs_second_updates.commitment_signed);
1028 check_added_monitors!(nodes[0], 1);
1029 nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_first_raa);
1030 expect_payment_path_successful!(nodes[0]);
1031 check_added_monitors!(nodes[0], 1);
1032 let (as_second_raa, as_second_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
1033
1034 nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_second_raa);
1035 check_added_monitors!(nodes[1], 1);
1036 nodes[1].node.handle_commitment_signed(nodes[0].node.get_our_node_id(), &as_second_update);
1037 check_added_monitors!(nodes[1], 1);
1038 let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
1039
1040 nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_second_raa);
1041 expect_payment_path_successful!(nodes[0]);
1042 check_added_monitors!(nodes[0], 1);
1043 }
1044
1045 #[test]
1046 fn test_chainsync_triggers_distributed_monitor_persistence() {
1047 let chanmon_cfgs = create_chanmon_cfgs(3);
1048 let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
1049 let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
1050 let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
1051
1052 *nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1055 *nodes[1].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1056 *nodes[2].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1057
1058 let _channel_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1059 let channel_2 = create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 1_000_000, 0).2;
1060
1061 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1062 chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1063 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1064
1065 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1066 connect_blocks(&nodes[1], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1067 connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR * 2);
1068
1069 assert_eq!(2 * 2, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1072 assert_eq!(2, chanmon_cfgs[1].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1073 assert_eq!(2, chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1074
1075 nodes[0].node.force_close_broadcasting_latest_txn(&channel_2, &nodes[2].node.get_our_node_id(), "Channel force-closed".to_string()).unwrap();
1078 check_closed_event!(&nodes[0], 1, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, false,
1079 [nodes[2].node.get_our_node_id()], 1000000);
1080 check_closed_broadcast(&nodes[0], 1, true);
1081 let close_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
1082 assert_eq!(close_tx.len(), 1);
1083
1084 mine_transaction(&nodes[2], &close_tx[0]);
1085 check_added_monitors(&nodes[2], 1);
1086 check_closed_broadcast(&nodes[2], 1, true);
1087 check_closed_event!(&nodes[2], 1, ClosureReason::CommitmentTxConfirmed, false,
1088 [nodes[0].node.get_our_node_id()], 1000000);
1089
1090 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1091 chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1092
1093 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1098 connect_blocks(&nodes[2], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1099
1100 assert_eq!((CHAINSYNC_MONITOR_PARTITION_FACTOR + 1) as usize, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1103 assert_eq!(1, chanmon_cfgs[2].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1105
1106 mine_transaction(&nodes[0], &close_tx[0]);
1108 connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
1109 check_added_monitors(&nodes[0], 1);
1110 chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
1111
1112 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1115 assert_eq!(2, chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().len());
1116 }
1117
1118 #[test]
1119 #[cfg(feature = "std")]
1120 fn update_during_chainsync_poisons_channel() {
1121 let chanmon_cfgs = create_chanmon_cfgs(2);
1122 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1123 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1124 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1125 create_announced_chan_between_nodes(&nodes, 0, 1);
1126 *nodes[0].connect_style.borrow_mut() = ConnectStyle::FullBlockViaListen;
1127
1128 chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
1129
1130 assert!(std::panic::catch_unwind(|| {
1131 connect_blocks(&nodes[0], CHAINSYNC_MONITOR_PARTITION_FACTOR);
1135 }).is_err());
1136 assert!(std::panic::catch_unwind(|| {
1137 core::mem::drop(nodes);
1139 }).is_err());
1140 }
1141}
1142