1use alloc::collections::BTreeSet as Set;
2
3use bitcoin::absolute::LockTime;
4use bitcoin::blockdata::block::Header as BlockHeader;
5use bitcoin::secp256k1::Secp256k1;
6use bitcoin::transaction::Version;
7use bitcoin::{BlockHash, OutPoint, Transaction, TxIn, TxOut, Txid};
8use log::*;
9use push_decoder::Listener as _;
10use serde_derive::{Deserialize, Serialize};
11use serde_with::serde_as;
12
13use crate::chain::tracker::ChainListener;
14use crate::channel::ChannelId;
15use crate::policy::validator::ChainState;
16use crate::prelude::*;
17use crate::util::transaction_utils::{decode_commitment_number, decode_commitment_tx};
18use crate::{Arc, CommitmentPointProvider};
19
20const MIN_DEPTH: u32 = 100;
22
23const MAX_CLOSING_DEPTH: u32 = 2016;
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
30struct ClosingOutpoints {
31 txid: Txid,
32 our_output: Option<(u32, bool)>,
33 htlc_outputs: Vec<u32>,
34 htlc_spents: Vec<bool>,
35}
36
37impl ClosingOutpoints {
38 fn new(txid: Txid, our_output_index: Option<u32>, htlc_output_indexes: Vec<u32>) -> Self {
40 let v = vec![false; htlc_output_indexes.len()];
41 ClosingOutpoints {
42 txid,
43 our_output: our_output_index.map(|i| (i, false)),
44 htlc_outputs: htlc_output_indexes,
45 htlc_spents: v,
46 }
47 }
48
49 fn includes_our_output(&self, outpoint: &OutPoint) -> bool {
51 self.txid == outpoint.txid && self.our_output.map(|(i, _)| i) == Some(outpoint.vout)
52 }
53
54 fn includes_htlc_output(&self, outpoint: &OutPoint) -> bool {
56 self.txid == outpoint.txid && self.htlc_outputs.contains(&(outpoint.vout))
57 }
58
59 fn set_our_output_spent(&mut self, vout: u32, spent: bool) {
60 let p = self.our_output.as_mut().unwrap();
62 assert_eq!(p.0, vout);
63 p.1 = spent;
64 }
65
66 fn set_htlc_output_spent(&mut self, vout: u32, spent: bool) {
67 let i = self.htlc_outputs.iter().position(|&x| x == vout).unwrap();
69 self.htlc_spents[i] = spent;
70 }
71
72 fn is_all_spent(&self) -> bool {
74 self.our_output.as_ref().map(|(_, b)| *b).unwrap_or(true)
75 && self.htlc_spents.iter().all(|b| *b)
76 }
77}
78
79#[serde_as]
81#[derive(Clone, Debug, Serialize, Deserialize)]
82pub struct State {
83 height: u32,
85 funding_txids: Vec<Txid>,
87 funding_vouts: Vec<u32>,
89 funding_inputs: Set<OutPoint>,
91 funding_height: Option<u32>,
93 funding_outpoint: Option<OutPoint>,
95 funding_double_spent_height: Option<u32>,
97 mutual_closing_height: Option<u32>,
99 unilateral_closing_height: Option<u32>,
101 closing_outpoints: Option<ClosingOutpoints>,
103 closing_swept_height: Option<u32>,
105 our_output_swept_height: Option<u32>,
107 #[serde(default)]
109 saw_block: bool,
110 #[serde(default)]
112 saw_forget_channel: bool,
113 #[serde(skip)]
116 channel_id: Option<ChannelId>,
117}
118
119struct PushListener<'a> {
123 commitment_point_provider: &'a dyn CommitmentPointProvider,
124 decode_state: &'a mut BlockDecodeState,
125 saw_block: bool,
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize)]
130enum StateChange {
131 FundingConfirmed(OutPoint),
133 FundingInputSpent(OutPoint),
136 UnilateralCloseConfirmed(Txid, OutPoint, Option<u32>, Vec<u32>),
139 MutualCloseConfirmed(Txid, OutPoint),
141 OurOutputSpent(u32),
143 HTLCOutputSpent(u32),
145}
146
147#[derive(Clone, Debug)]
149struct BlockDecodeState {
150 changes: Vec<StateChange>,
152 version: i32,
154 input_num: u32,
156 output_num: u32,
158 closing_tx: Option<Transaction>,
160 block_hash: Option<BlockHash>,
162 state: State,
165}
166
167impl BlockDecodeState {
168 fn new(state: &State) -> Self {
169 BlockDecodeState {
170 changes: Vec::new(),
171 version: 0,
172 input_num: 0,
173 output_num: 0,
174 closing_tx: None,
175 block_hash: None,
176 state: state.clone(),
177 }
178 }
179
180 fn new_with_block_hash(state: &State, block_hash: &BlockHash) -> Self {
181 BlockDecodeState {
182 changes: Vec::new(),
183 version: 0,
184 input_num: 0,
185 output_num: 0,
186 closing_tx: None,
187 block_hash: Some(*block_hash),
188 state: state.clone(),
189 }
190 }
191
192 fn add_change(&mut self, change: StateChange) {
197 self.changes.push(change.clone());
198 let mut adds = Vec::new();
199 let mut removes = Vec::new();
200 self.state.apply_forward_change(&mut adds, &mut removes, change);
201 }
202}
203
204const MAX_COMMITMENT_OUTPUTS: u32 = 600;
205
206impl<'a> PushListener<'a> {
207 fn is_not_ready_for_push(&self) -> bool {
211 if self.saw_block {
212 assert!(self.decode_state.block_hash.is_some(), "saw block but no decode state");
215 false
216 } else {
217 assert!(
219 self.decode_state.block_hash.is_none(),
220 "never saw a block but decode state is present"
221 );
222 true
223 }
224 }
225}
226
227impl<'a> push_decoder::Listener for PushListener<'a> {
228 fn on_block_start(&mut self, header: &BlockHeader) {
229 assert!(self.decode_state.block_hash.is_none(), "saw more than one on_block_start");
232 self.decode_state.block_hash = Some(header.block_hash());
233 self.saw_block = true;
234 }
235
236 fn on_transaction_start(&mut self, version: i32) {
237 if self.is_not_ready_for_push() {
238 return;
239 }
240 let state = &mut self.decode_state;
241 state.version = version;
242 state.input_num = 0;
243 state.output_num = 0;
244 state.closing_tx = None;
245 }
246
247 fn on_transaction_input(&mut self, input: &TxIn) {
248 if self.is_not_ready_for_push() {
249 return;
250 }
251
252 let decode_state = &mut self.decode_state;
253
254 if decode_state.state.funding_inputs.contains(&input.previous_output) {
255 decode_state.add_change(StateChange::FundingInputSpent(input.previous_output));
257 }
258
259 if Some(input.previous_output) == decode_state.state.funding_outpoint {
260 let tx = Transaction {
264 version: Version(decode_state.version),
265 lock_time: LockTime::ZERO,
266 input: vec![input.clone()],
267 output: vec![],
268 };
269 decode_state.closing_tx = Some(tx);
270 }
271
272 let closing_change = if let Some(ref c) = decode_state.state.closing_outpoints {
275 if c.includes_our_output(&input.previous_output) {
276 Some(StateChange::OurOutputSpent(input.previous_output.vout))
278 } else if c.includes_htlc_output(&input.previous_output) {
279 Some(StateChange::HTLCOutputSpent(input.previous_output.vout))
281 } else {
282 None
283 }
284 } else {
285 None
286 };
287
288 closing_change.map(|c| decode_state.add_change(c));
289
290 if decode_state.closing_tx.is_some() {
291 assert_eq!(decode_state.input_num, 0, "closing tx must have only one input");
292 }
293 decode_state.input_num += 1;
294 }
295
296 fn on_transaction_output(&mut self, output: &TxOut) {
297 if self.is_not_ready_for_push() {
298 return;
299 }
300
301 let decode_state = &mut self.decode_state;
302 if let Some(closing_tx) = &mut decode_state.closing_tx {
303 closing_tx.output.push(output.clone());
304 assert!(
305 decode_state.output_num < MAX_COMMITMENT_OUTPUTS,
306 "more than {} commitment outputs",
307 MAX_COMMITMENT_OUTPUTS
308 );
309 }
310
311 decode_state.output_num += 1;
312 }
313
314 fn on_transaction_end(&mut self, lock_time: LockTime, txid: Txid) {
315 if self.is_not_ready_for_push() {
316 return;
317 }
318
319 let decode_state = &mut self.decode_state;
320
321 if let Some(ind) = decode_state.state.funding_txids.iter().position(|i| *i == txid) {
322 let vout = decode_state.state.funding_vouts[ind];
323 assert!(
325 vout < decode_state.output_num,
326 "tx {} doesn't have funding output index {}",
327 txid,
328 vout
329 );
330 let outpoint = OutPoint { txid, vout };
331 decode_state.add_change(StateChange::FundingConfirmed(outpoint));
332 }
333
334 if let Some(mut closing_tx) = decode_state.closing_tx.take() {
336 closing_tx.lock_time = lock_time;
337 assert_eq!(closing_tx.input.len(), 1);
339 let provider = self.commitment_point_provider;
340 let parameters = provider.get_transaction_parameters();
341
342 let commitment_number_opt = decode_commitment_number(&closing_tx, ¶meters);
344 if let Some(commitment_number) = commitment_number_opt {
345 let secp_ctx = Secp256k1::new();
346 info!("unilateral close {} at commitment {} confirmed", txid, commitment_number);
347 let holder_per_commitment = provider.get_holder_commitment_point(commitment_number);
348 let cp_per_commitment =
349 provider.get_counterparty_commitment_point(commitment_number);
350 let (our_output_index, htlc_indices) = decode_commitment_tx(
351 &closing_tx,
352 &holder_per_commitment,
353 &cp_per_commitment,
354 ¶meters,
355 &secp_ctx,
356 );
357 info!("our_output_index: {:?}, htlc_indices: {:?}", our_output_index, htlc_indices);
358 decode_state.add_change(StateChange::UnilateralCloseConfirmed(
359 txid,
360 closing_tx.input[0].previous_output,
361 our_output_index,
362 htlc_indices,
363 ));
364 } else {
365 decode_state.add_change(StateChange::MutualCloseConfirmed(
366 txid,
367 closing_tx.input[0].previous_output,
368 ));
369 info!("mutual close {} confirmed", txid);
370 }
371 }
372 }
373
374 fn on_block_end(&mut self) {
375 }
378}
379
380impl State {
381 fn channel_id(&self) -> &ChannelId {
382 self.channel_id.as_ref().expect("missing associated channel_id in monitor::State")
384 }
385
386 fn depth_of(&self, other_height: Option<u32>) -> u32 {
387 (self.height + 1).saturating_sub(other_height.unwrap_or(self.height + 1))
388 }
389
390 fn deep_enough_and_saw_node_forget(&self, other_height: Option<u32>, limit: u32) -> bool {
391 let depth = self.depth_of(other_height);
394 if depth < limit {
395 false
397 } else if self.saw_forget_channel {
398 true
400 } else {
401 warn!(
403 "expected forget_channel for {} overdue by {} blocks",
404 self.channel_id(),
405 depth - limit
406 );
407 false
408 }
409 }
410
411 fn diagnostic(&self, is_closed: bool) -> String {
412 if self.funding_height.is_none() {
413 format!("UNCOMFIRMED hold till funding doublespent + {}", MIN_DEPTH)
414 } else if let Some(height) = self.funding_double_spent_height {
415 format!("AGING_FUNDING_DOUBLESPENT at {} until {}", height, height + MIN_DEPTH)
416 } else if let Some(height) = self.mutual_closing_height {
417 format!("AGING_MUTUALLY_CLOSED at {} until {}", height, height + MIN_DEPTH)
418 } else if let Some(height) = self.closing_swept_height {
419 format!("AGING_CLOSING_SWEPT at {} until {}", height, height + MIN_DEPTH)
420 } else if let Some(height) = self.our_output_swept_height {
421 format!("AGING_OUR_OUTPUT_SWEPT at {} until {}", height, height + MAX_CLOSING_DEPTH)
422 } else if is_closed {
423 "CLOSING".into()
424 } else {
425 "ACTIVE".into()
426 }
427 }
428
429 fn is_done(&self) -> bool {
430 if self.deep_enough_and_saw_node_forget(self.funding_double_spent_height, MIN_DEPTH) {
440 debug!(
441 "{} is_done because funding double spent {} blocks ago",
442 self.channel_id(),
443 MIN_DEPTH
444 );
445 return true;
446 }
447
448 if self.deep_enough_and_saw_node_forget(self.mutual_closing_height, MIN_DEPTH) {
449 debug!("{} is_done because mutual closed {} blocks ago", self.channel_id(), MIN_DEPTH);
450 return true;
451 }
452
453 if self.deep_enough_and_saw_node_forget(self.closing_swept_height, MIN_DEPTH) {
454 debug!("{} is_done because closing swept {} blocks ago", self.channel_id(), MIN_DEPTH);
455 return true;
456 }
457
458 if self.deep_enough_and_saw_node_forget(self.our_output_swept_height, MAX_CLOSING_DEPTH) {
461 debug!(
462 "{} is_done because closing output swept {} blocks ago",
463 self.channel_id(),
464 MAX_CLOSING_DEPTH
465 );
466 return true;
467 }
468
469 return false;
470 }
471
472 fn on_add_block_end(
473 &mut self,
474 block_hash: &BlockHash,
475 decode_state: &mut BlockDecodeState,
476 ) -> (Vec<OutPoint>, Vec<OutPoint>) {
477 assert_eq!(decode_state.block_hash.as_ref(), Some(block_hash));
478
479 self.saw_block = true;
480 self.height += 1;
481
482 let closing_was_swept = self.is_closing_swept();
483 let our_output_was_swept = self.is_our_output_swept();
484
485 let mut adds = Vec::new();
486 let mut removes = Vec::new();
487
488 let changed = !decode_state.changes.is_empty();
489
490 if changed {
491 debug!(
492 "{} detected add-changes at height {}: {:?}",
493 self.channel_id(),
494 self.height,
495 decode_state.changes
496 );
497 }
498
499 for change in decode_state.changes.drain(..) {
501 self.apply_forward_change(&mut adds, &mut removes, change);
502 }
503
504 let closing_is_swept = self.is_closing_swept();
505 let our_output_is_swept = self.is_our_output_swept();
506
507 if !closing_was_swept && closing_is_swept {
508 info!("{} closing tx was swept at height {}", self.channel_id(), self.height);
509 self.closing_swept_height = Some(self.height);
510 }
511
512 if !our_output_was_swept && our_output_is_swept {
513 info!("{} our output was swept at height {}", self.channel_id(), self.height);
514 self.our_output_swept_height = Some(self.height);
515 }
516
517 if self.is_done() {
518 info!("{} done at height {}", self.channel_id(), self.height);
519 }
520
521 if changed {
522 #[cfg(not(feature = "log_pretty_print"))]
523 info!("on_add_block_end state changed: {:?}", self);
524 #[cfg(feature = "log_pretty_print")]
525 info!("on_add_block_end state changed: {:#?}", self);
526 }
527
528 (adds, removes)
529 }
530
531 fn on_remove_block_end(
532 &mut self,
533 block_hash: &BlockHash,
534 decode_state: &mut BlockDecodeState,
535 ) -> (Vec<OutPoint>, Vec<OutPoint>) {
536 assert_eq!(decode_state.block_hash.as_ref(), Some(block_hash));
537
538 let closing_was_swept = self.is_closing_swept();
539 let our_output_was_swept = self.is_our_output_swept();
540
541 let mut adds = Vec::new();
542 let mut removes = Vec::new();
543
544 let changed = !decode_state.changes.is_empty();
545
546 if changed {
547 debug!(
548 "{} detected remove-changes at height {}: {:?}",
549 self.channel_id(),
550 self.height,
551 decode_state.changes
552 );
553 }
554
555 for change in decode_state.changes.drain(..) {
556 self.apply_backward_change(&mut adds, &mut removes, change);
557 }
558
559 let closing_is_swept = self.is_closing_swept();
560 let our_output_is_swept = self.is_our_output_swept();
561
562 if closing_was_swept && !closing_is_swept {
563 info!("{} closing tx was un-swept at height {}", self.channel_id(), self.height);
564 self.closing_swept_height = None;
565 }
566
567 if our_output_was_swept && !our_output_is_swept {
568 info!("{} our output was un-swept at height {}", self.channel_id(), self.height);
569 self.our_output_swept_height = None;
570 }
571
572 self.height -= 1;
573
574 if changed {
575 #[cfg(not(feature = "log_pretty_print"))]
576 info!("on_remove_block_end state changed: {:?}", self);
577 #[cfg(feature = "log_pretty_print")]
578 info!("on_remove_block_end state changed: {:#?}", self);
579 }
580
581 (adds, removes)
583 }
584
585 fn is_closing_swept(&self) -> bool {
587 self.closing_outpoints.as_ref().map(|o| o.is_all_spent()).unwrap_or(false)
588 }
589
590 fn is_our_output_swept(&self) -> bool {
592 self.closing_outpoints
593 .as_ref()
594 .map(|o| o.our_output.map(|(_, s)| s).unwrap_or(true))
595 .unwrap_or(false)
596 }
597
598 fn apply_forward_change(
599 &mut self,
600 adds: &mut Vec<OutPoint>,
601 removes: &mut Vec<OutPoint>,
602 change: StateChange,
603 ) {
604 match change {
606 StateChange::FundingConfirmed(outpoint) => {
607 self.funding_height = Some(self.height);
608 self.funding_outpoint = Some(outpoint);
609 self.funding_double_spent_height = None;
611 adds.push(outpoint);
612 }
613 StateChange::FundingInputSpent(outpoint) => {
614 self.funding_double_spent_height.get_or_insert(self.height);
620 removes.push(outpoint);
622 }
623 StateChange::UnilateralCloseConfirmed(
624 txid,
625 funding_outpoint,
626 our_output_index,
627 htlcs_indices,
628 ) => {
629 self.unilateral_closing_height = Some(self.height);
630 removes.push(funding_outpoint);
631 our_output_index.map(|i| adds.push(OutPoint { txid, vout: i }));
632 for i in htlcs_indices.iter() {
633 adds.push(OutPoint { txid, vout: *i });
634 }
635 self.closing_outpoints =
636 Some(ClosingOutpoints::new(txid, our_output_index, htlcs_indices));
637 }
638 StateChange::OurOutputSpent(vout) => {
639 let outpoints = self.closing_outpoints.as_mut().unwrap();
640 outpoints.set_our_output_spent(vout, true);
641 let outpoint = OutPoint { txid: outpoints.txid, vout };
642 removes.push(outpoint);
643 }
644 StateChange::HTLCOutputSpent(vout) => {
645 let outpoints = self.closing_outpoints.as_mut().unwrap();
646 outpoints.set_htlc_output_spent(vout, true);
647 let outpoint = OutPoint { txid: outpoints.txid, vout };
648 removes.push(outpoint);
649 }
650 StateChange::MutualCloseConfirmed(_txid, funding_outpoint) => {
651 self.mutual_closing_height = Some(self.height);
652 removes.push(funding_outpoint);
653 }
654 }
655 }
656
657 fn apply_backward_change(
661 &mut self,
662 adds: &mut Vec<OutPoint>,
663 removes: &mut Vec<OutPoint>,
664 change: StateChange,
665 ) {
666 match change {
667 StateChange::FundingConfirmed(outpoint) => {
668 assert_eq!(self.funding_height, Some(self.height));
670 self.funding_height = None;
671 self.funding_outpoint = None;
672 adds.push(outpoint);
673 }
674 StateChange::FundingInputSpent(outpoint) => {
675 if self.funding_double_spent_height == Some(self.height) {
681 self.funding_double_spent_height = None
682 }
683 removes.push(outpoint);
685 }
686 StateChange::UnilateralCloseConfirmed(
687 txid,
688 funding_outpoint,
689 our_output_index,
690 htlcs_indices,
691 ) => {
692 assert_eq!(self.unilateral_closing_height, Some(self.height));
694 self.unilateral_closing_height = None;
695 self.closing_outpoints = None;
696 our_output_index.map(|i| adds.push(OutPoint { txid, vout: i }));
697 for i in htlcs_indices {
698 adds.push(OutPoint { txid, vout: i });
699 }
700 removes.push(funding_outpoint)
701 }
702 StateChange::OurOutputSpent(vout) => {
703 let outpoints = self.closing_outpoints.as_mut().unwrap();
704 outpoints.set_our_output_spent(vout, false);
705 let outpoint = OutPoint { txid: outpoints.txid, vout };
706 removes.push(outpoint);
707 }
708 StateChange::HTLCOutputSpent(vout) => {
709 let outpoints = self.closing_outpoints.as_mut().unwrap();
710 outpoints.set_htlc_output_spent(vout, false);
711 let outpoint = OutPoint { txid: outpoints.txid, vout };
712 removes.push(outpoint);
713 }
714 StateChange::MutualCloseConfirmed(_txid, funding_outpoint) => {
715 self.mutual_closing_height = None;
716 removes.push(funding_outpoint);
717 }
718 }
719 }
720}
721
722#[derive(Clone)]
724pub struct ChainMonitorBase {
725 pub(crate) funding_outpoint: OutPoint,
727 state: Arc<Mutex<State>>,
729}
730
731impl ChainMonitorBase {
732 pub fn new(funding_outpoint: OutPoint, height: u32, chan_id: &ChannelId) -> Self {
735 let state = State {
736 height,
737 funding_txids: Vec::new(),
738 funding_vouts: Vec::new(),
739 funding_inputs: OrderedSet::new(),
740 funding_height: None,
741 funding_outpoint: None,
742 funding_double_spent_height: None,
743 mutual_closing_height: None,
744 unilateral_closing_height: None,
745 closing_outpoints: None,
746 closing_swept_height: None,
747 our_output_swept_height: None,
748 saw_block: false,
749 saw_forget_channel: false,
750 channel_id: Some(chan_id.clone()),
751 };
752
753 Self { funding_outpoint, state: Arc::new(Mutex::new(state)) }
754 }
755
756 pub fn new_from_persistence(
758 funding_outpoint: OutPoint,
759 state: State,
760 channel_id: &ChannelId,
761 ) -> Self {
762 let state = Arc::new(Mutex::new(state));
763 state.lock().unwrap().channel_id = Some(channel_id.clone());
764 Self { funding_outpoint, state }
765 }
766
767 pub fn as_monitor(
769 &self,
770 commitment_point_provider: Box<dyn CommitmentPointProvider>,
771 ) -> ChainMonitor {
772 ChainMonitor {
773 funding_outpoint: self.funding_outpoint,
774 state: self.state.clone(),
775 decode_state: Arc::new(Mutex::new(None)),
776 commitment_point_provider,
777 }
778 }
779
780 pub fn add_funding_outpoint(&self, outpoint: &OutPoint) {
783 let mut state = self.get_state();
784 assert!(state.funding_txids.is_empty(), "only a single funding tx currently supported");
785 assert_eq!(state.funding_txids.len(), state.funding_vouts.len());
786 state.funding_txids.push(outpoint.txid);
787 state.funding_vouts.push(outpoint.vout);
788 }
789
790 pub fn add_funding_inputs(&self, tx: &Transaction) {
793 let mut state = self.get_state();
794 state.funding_inputs.extend(tx.input.iter().map(|i| i.previous_output));
795 }
796
797 pub fn as_chain_state(&self) -> ChainState {
799 let state = self.get_state();
800 ChainState {
801 current_height: state.height,
802 funding_depth: state.funding_height.map(|h| state.height + 1 - h).unwrap_or(0),
803 funding_double_spent_depth: state
804 .funding_double_spent_height
805 .map(|h| state.height + 1 - h)
806 .unwrap_or(0),
807 closing_depth: state
808 .mutual_closing_height
809 .or(state.unilateral_closing_height)
810 .map(|h| state.height + 1 - h)
811 .unwrap_or(0),
812 }
813 }
814
815 pub fn is_done(&self) -> bool {
817 self.get_state().is_done()
818 }
819
820 pub fn forget_channel(&self) {
822 let mut state = self.get_state();
823 state.saw_forget_channel = true;
824 }
825
826 pub fn funding_outpoint(&self) -> Option<OutPoint> {
828 self.get_state().funding_outpoint
829 }
830
831 pub fn forget_seen(&self) -> bool {
833 self.get_state().saw_forget_channel
834 }
835
836 pub fn diagnostic(&self, is_closed: bool) -> String {
838 self.get_state().diagnostic(is_closed)
839 }
840
841 fn get_state(&self) -> MutexGuard<State> {
843 self.state.lock().expect("lock")
844 }
845}
846
847#[derive(Clone)]
850pub struct ChainMonitor {
851 pub funding_outpoint: OutPoint,
853 pub state: Arc<Mutex<State>>,
855 decode_state: Arc<Mutex<Option<BlockDecodeState>>>,
858 commitment_point_provider: Box<dyn CommitmentPointProvider>,
860}
861
862impl ChainMonitor {
863 pub fn as_base(&self) -> ChainMonitorBase {
865 ChainMonitorBase { funding_outpoint: self.funding_outpoint, state: self.state.clone() }
866 }
867
868 pub fn get_state(&self) -> MutexGuard<State> {
870 self.state.lock().expect("lock")
871 }
872
873 pub fn add_funding(&self, tx: &Transaction, vout: u32) {
876 let mut state = self.get_state();
877 assert!(state.funding_txids.is_empty(), "only a single funding tx currently supported");
878 assert_eq!(state.funding_txids.len(), state.funding_vouts.len());
879 state.funding_txids.push(tx.compute_txid());
880 state.funding_vouts.push(vout);
881 state.funding_inputs.extend(tx.input.iter().map(|i| i.previous_output));
882 }
883
884 pub fn funding_depth(&self) -> u32 {
887 let state = self.get_state();
888 state.depth_of(state.funding_height)
889 }
890
891 pub fn funding_double_spent_depth(&self) -> u32 {
894 let state = self.get_state();
895 state.depth_of(state.funding_double_spent_height)
896 }
897
898 pub fn closing_depth(&self) -> u32 {
900 let state = self.get_state();
901 let closing_height = state.unilateral_closing_height.or(state.mutual_closing_height);
902 state.depth_of(closing_height)
903 }
904
905 pub fn is_done(&self) -> bool {
911 self.get_state().is_done()
912 }
913
914 fn push_transactions(&self, block_hash: &BlockHash, txs: &[Transaction]) -> BlockDecodeState {
916 let mut state = self.get_state();
917
918 state.saw_block = true;
920
921 let mut decode_state = BlockDecodeState::new_with_block_hash(&*state, block_hash);
922
923 let mut listener = PushListener {
924 commitment_point_provider: &*self.commitment_point_provider,
925 decode_state: &mut decode_state,
926 saw_block: true,
927 };
928
929 for tx in txs {
931 listener.on_transaction_start(tx.version.0);
932 for input in tx.input.iter() {
933 listener.on_transaction_input(input);
934 }
935
936 for output in tx.output.iter() {
937 listener.on_transaction_output(output);
938 }
939 listener.on_transaction_end(tx.lock_time, tx.compute_txid());
940 }
941
942 decode_state
943 }
944}
945
946impl ChainListener for ChainMonitor {
947 type Key = OutPoint;
948
949 fn key(&self) -> &Self::Key {
950 &self.funding_outpoint
951 }
952
953 fn on_add_block(
954 &self,
955 txs: &[Transaction],
956 block_hash: &BlockHash,
957 ) -> (Vec<OutPoint>, Vec<OutPoint>) {
958 debug!("on_add_block for {}", self.funding_outpoint);
959 let mut decode_state = self.push_transactions(block_hash, txs);
960
961 let mut state = self.get_state();
962 state.on_add_block_end(block_hash, &mut decode_state)
963 }
964
965 fn on_add_streamed_block_end(&self, block_hash: &BlockHash) -> (Vec<OutPoint>, Vec<OutPoint>) {
966 let mut state = self.get_state();
967 let mut decode_state = self.decode_state.lock().expect("lock").take();
968 if !state.saw_block {
969 return (Vec::new(), Vec::new());
971 }
972 state.on_add_block_end(block_hash, decode_state.as_mut().unwrap())
974 }
975
976 fn on_remove_block(
977 &self,
978 txs: &[Transaction],
979 block_hash: &BlockHash,
980 ) -> (Vec<OutPoint>, Vec<OutPoint>) {
981 debug!("on_remove_block for {}", self.funding_outpoint);
982 let mut decode_state = self.push_transactions(block_hash, txs);
983
984 let mut state = self.get_state();
985 state.on_remove_block_end(block_hash, &mut decode_state)
986 }
987
988 fn on_remove_streamed_block_end(
989 &self,
990 block_hash: &BlockHash,
991 ) -> (Vec<OutPoint>, Vec<OutPoint>) {
992 let mut state = self.get_state();
993 let mut decode_state = self.decode_state.lock().expect("lock").take();
994 if !state.saw_block {
995 return (Vec::new(), Vec::new());
997 }
998 state.on_remove_block_end(block_hash, decode_state.as_mut().unwrap())
1000 }
1001
1002 fn on_push<F>(&self, f: F)
1003 where
1004 F: FnOnce(&mut dyn push_decoder::Listener),
1005 {
1006 let mut state = self.get_state();
1007 let saw_block = state.saw_block;
1008
1009 let mut decode_state_lock = self.decode_state.lock().expect("lock");
1010
1011 let decode_state = decode_state_lock.get_or_insert_with(|| BlockDecodeState::new(&*state));
1012
1013 let mut listener = PushListener {
1014 commitment_point_provider: &*self.commitment_point_provider,
1015 decode_state,
1016 saw_block,
1017 };
1018 f(&mut listener);
1019
1020 state.saw_block = listener.saw_block;
1022 }
1023}
1024
1025impl SendSync for ChainMonitor {}
1026
1027#[cfg(test)]
1028mod tests {
1029 use crate::channel::{
1030 ChannelBase, ChannelCommitmentPointProvider, ChannelId, ChannelSetup, CommitmentType,
1031 };
1032 use crate::node::Node;
1033 use crate::util::test_utils::key::{make_test_counterparty_points, make_test_pubkey};
1034 use crate::util::test_utils::*;
1035 use bitcoin::block::Version;
1036 use bitcoin::hash_types::TxMerkleNode;
1037 use bitcoin::hashes::Hash;
1038 use bitcoin::CompactTarget;
1039 use lightning::ln::chan_utils::HTLCOutputInCommitment;
1040 use lightning::types::payment::PaymentHash;
1041 use test_log::test;
1042
1043 use super::*;
1044
1045 #[test]
1046 fn test_funding() {
1047 let tx = make_tx(vec![make_txin(1), make_txin(2)]);
1048 let outpoint = OutPoint::new(tx.compute_txid(), 0);
1049 let cpp = Box::new(DummyCommitmentPointProvider {});
1050 let chan_id = ChannelId::new(&[33u8; 32]);
1051 let monitor = ChainMonitorBase::new(outpoint, 0, &chan_id).as_monitor(cpp);
1052 let block_hash = BlockHash::all_zeros();
1053 monitor.add_funding(&tx, 0);
1054 monitor.on_add_block(&[], &block_hash);
1055 monitor.on_add_block(&[tx.clone()], &block_hash);
1056 assert_eq!(monitor.funding_depth(), 1);
1057 assert_eq!(monitor.funding_double_spent_depth(), 0);
1058 monitor.on_add_block(&[], &block_hash);
1059 assert_eq!(monitor.funding_depth(), 2);
1060 monitor.on_remove_block(&[], &block_hash);
1061 assert_eq!(monitor.funding_depth(), 1);
1062 monitor.on_remove_block(&[tx], &block_hash);
1063 assert_eq!(monitor.funding_depth(), 0);
1064 monitor.on_remove_block(&[], &block_hash);
1065 assert_eq!(monitor.funding_depth(), 0);
1066 }
1067
1068 #[test]
1069 fn test_funding_double_spent() {
1070 let tx = make_tx(vec![make_txin(1), make_txin(2)]);
1071 let tx2 = make_tx(vec![make_txin(2)]);
1072 let outpoint = OutPoint::new(tx.compute_txid(), 0);
1073 let cpp = Box::new(DummyCommitmentPointProvider {});
1074 let chan_id = ChannelId::new(&[33u8; 32]);
1075 let monitor = ChainMonitorBase::new(outpoint, 0, &chan_id).as_monitor(cpp);
1076 let block_hash = BlockHash::all_zeros();
1077 monitor.add_funding(&tx, 0);
1078 monitor.on_add_block(&[], &block_hash);
1079 monitor.on_add_block(&[tx2.clone()], &block_hash);
1080 assert_eq!(monitor.funding_depth(), 0);
1081 assert_eq!(monitor.funding_double_spent_depth(), 1);
1082 monitor.on_add_block(&[], &block_hash);
1083 assert_eq!(monitor.funding_depth(), 0);
1084 assert_eq!(monitor.funding_double_spent_depth(), 2);
1085 monitor.on_remove_block(&[], &block_hash);
1086 assert_eq!(monitor.funding_double_spent_depth(), 1);
1087 monitor.on_remove_block(&[tx2], &block_hash);
1088 assert_eq!(monitor.funding_double_spent_depth(), 0);
1089 monitor.on_remove_block(&[], &block_hash);
1090 assert_eq!(monitor.funding_double_spent_depth(), 0);
1091 }
1092
1093 #[test]
1094 fn test_stream() {
1095 let outpoint = OutPoint::new(Txid::from_slice(&[1; 32]).unwrap(), 0);
1096 let cpp = Box::new(DummyCommitmentPointProvider {});
1097 let chan_id = ChannelId::new(&[33u8; 32]);
1098 let monitor = ChainMonitorBase::new(outpoint, 0, &chan_id).as_monitor(cpp);
1099 let header = BlockHeader {
1100 version: Version::from_consensus(0),
1101 prev_blockhash: BlockHash::all_zeros(),
1102 merkle_root: TxMerkleNode::all_zeros(),
1103 time: 0,
1104 bits: CompactTarget::from_consensus(0),
1105 nonce: 0,
1106 };
1107 let tx = make_tx(vec![make_txin(1), make_txin(2)]);
1108
1109 monitor.on_push(|listener| {
1111 listener.on_transaction_input(&tx.input[1]);
1112 listener.on_transaction_output(&tx.output[0]);
1113 listener.on_transaction_end(tx.lock_time, tx.compute_txid());
1114 listener.on_block_end();
1115 });
1116
1117 assert!(!monitor.state.lock().unwrap().saw_block);
1118
1119 monitor.on_push(|listener| {
1121 listener.on_block_start(&header);
1122 listener.on_transaction_start(2);
1123 listener.on_transaction_input(&tx.input[0]);
1124 listener.on_transaction_input(&tx.input[1]);
1125 listener.on_transaction_output(&tx.output[0]);
1126 listener.on_transaction_end(tx.lock_time, tx.compute_txid());
1127 listener.on_block_end();
1128 });
1129 monitor.on_add_streamed_block_end(&header.block_hash());
1130
1131 assert!(monitor.state.lock().unwrap().saw_block);
1132
1133 monitor.on_push(|listener| {
1135 listener.on_block_start(&header);
1136 listener.on_transaction_start(2);
1137 listener.on_transaction_input(&tx.input[0]);
1138 listener.on_transaction_input(&tx.input[1]);
1139 listener.on_transaction_output(&tx.output[0]);
1140 listener.on_transaction_end(tx.lock_time, tx.compute_txid());
1141 listener.on_block_end();
1142 });
1143 monitor.on_add_streamed_block_end(&header.block_hash());
1144
1145 assert!(monitor.state.lock().unwrap().saw_block);
1146 }
1147
1148 #[test]
1149 fn test_mutual_close() {
1150 let block_hash = BlockHash::all_zeros();
1151 let (node, channel_id, monitor, funding_txid) = setup_funded_channel();
1152
1153 node.get_heartbeat();
1155 assert!(node.get_channel(&channel_id).is_ok());
1156 assert_eq!(node.get_tracker().listeners.len(), 1);
1157
1158 let close_tx = make_tx(vec![TxIn {
1159 previous_output: OutPoint::new(funding_txid, 0),
1160 script_sig: Default::default(),
1161 sequence: Default::default(),
1162 witness: Default::default(),
1163 }]);
1164 monitor.on_add_block(&[close_tx.clone()], &block_hash);
1165 assert_eq!(monitor.closing_depth(), 1);
1166 assert!(!monitor.is_done());
1167
1168 node.get_heartbeat();
1170 assert!(node.get_channel(&channel_id).is_ok());
1171 assert_eq!(node.get_tracker().listeners.len(), 1);
1172
1173 for _ in 1..MIN_DEPTH - 1 {
1174 monitor.on_add_block(&[], &block_hash);
1175 }
1176 assert!(!monitor.is_done());
1177 node.forget_channel(&channel_id).unwrap();
1178 monitor.on_add_block(&[], &block_hash);
1179 assert!(monitor.is_done());
1180
1181 assert!(node.get_channel(&channel_id).is_ok());
1183
1184 node.get_heartbeat();
1186 assert!(node.get_channel(&channel_id).is_err());
1187 assert_eq!(node.get_tracker().listeners.len(), 0);
1188 }
1189
1190 #[test]
1191 fn test_mutual_close_with_forget_channel() {
1192 let block_hash = BlockHash::all_zeros();
1193 let (node, channel_id, monitor, funding_txid) = setup_funded_channel();
1194
1195 node.get_heartbeat();
1197 assert!(node.get_channel(&channel_id).is_ok());
1198 assert_eq!(node.get_tracker().listeners.len(), 1);
1199
1200 let close_tx = make_tx(vec![TxIn {
1201 previous_output: OutPoint::new(funding_txid, 0),
1202 script_sig: Default::default(),
1203 sequence: Default::default(),
1204 witness: Default::default(),
1205 }]);
1206 monitor.on_add_block(&[close_tx.clone()], &block_hash);
1207 assert_eq!(monitor.closing_depth(), 1);
1208 assert!(!monitor.is_done());
1209
1210 node.get_heartbeat();
1212 assert!(node.get_channel(&channel_id).is_ok());
1213 assert_eq!(node.get_tracker().listeners.len(), 1);
1214
1215 for _ in 1..MIN_DEPTH - 1 {
1216 monitor.on_add_block(&[], &block_hash);
1217 }
1218 assert!(!monitor.is_done());
1219 monitor.on_add_block(&[], &block_hash);
1220 assert!(!monitor.is_done());
1221
1222 assert!(node.get_channel(&channel_id).is_ok());
1224 node.forget_channel(&channel_id).unwrap();
1225
1226 assert!(node.get_channel(&channel_id).is_ok());
1228 node.get_heartbeat();
1229 assert!(node.get_channel(&channel_id).is_err());
1230 assert_eq!(node.get_tracker().listeners.len(), 0);
1231 }
1232
1233 #[test]
1234 fn test_mutual_close_with_missing_forget_channel() {
1235 let block_hash = BlockHash::all_zeros();
1236 let (node, channel_id, monitor, funding_txid) = setup_funded_channel();
1237
1238 node.get_heartbeat();
1240 assert!(node.get_channel(&channel_id).is_ok());
1241 assert_eq!(node.get_tracker().listeners.len(), 1);
1242
1243 let close_tx = make_tx(vec![TxIn {
1244 previous_output: OutPoint::new(funding_txid, 0),
1245 script_sig: Default::default(),
1246 sequence: Default::default(),
1247 witness: Default::default(),
1248 }]);
1249 monitor.on_add_block(&[close_tx.clone()], &block_hash);
1250 assert_eq!(monitor.closing_depth(), 1);
1251 assert!(!monitor.is_done());
1252
1253 node.get_heartbeat();
1255 assert!(node.get_channel(&channel_id).is_ok());
1256 assert_eq!(node.get_tracker().listeners.len(), 1);
1257
1258 for _ in 1..MIN_DEPTH - 1 {
1259 monitor.on_add_block(&[], &block_hash);
1260 }
1261 assert!(!monitor.is_done());
1262 monitor.on_add_block(&[], &block_hash);
1263
1264 assert!(!monitor.is_done());
1266 assert!(node.get_channel(&channel_id).is_ok());
1267
1268 node.get_heartbeat();
1270 assert!(node.get_channel(&channel_id).is_ok());
1271
1272 for _ in 0..2016 - 1 {
1274 monitor.on_add_block(&[], &block_hash);
1275 }
1276 assert!(!monitor.is_done());
1277
1278 monitor.on_add_block(&[], &block_hash);
1280 assert!(!monitor.is_done());
1281
1282 assert!(node.get_channel(&channel_id).is_ok());
1284
1285 node.get_heartbeat();
1287 assert!(node.get_channel(&channel_id).is_ok());
1288 }
1289
1290 #[test]
1291 fn test_unilateral_holder_close() {
1292 let block_hash = BlockHash::all_zeros();
1293 let (node, channel_id, monitor, _funding_txid) = setup_funded_channel();
1294
1295 let commit_num = 23;
1296 let feerate_per_kw = 1000;
1297 let to_holder = 100000;
1298 let to_cp = 200000;
1299 let htlcs = Vec::new();
1300 let closing_commitment_tx = node
1301 .with_channel(&channel_id, |chan| {
1302 chan.set_next_holder_commit_num_for_testing(commit_num);
1303 let per_commitment_point = chan.get_per_commitment_point(commit_num)?;
1304 let txkeys = chan.make_holder_tx_keys(&per_commitment_point);
1305
1306 Ok(chan.make_holder_commitment_tx(
1307 commit_num,
1308 &txkeys,
1309 feerate_per_kw,
1310 to_holder,
1311 to_cp,
1312 htlcs.clone(),
1313 ))
1314 })
1315 .expect("make_holder_commitment_tx failed");
1316 let closing_tx = closing_commitment_tx.trust().built_transaction().transaction.clone();
1317 let closing_txid = closing_tx.compute_txid();
1318 let holder_output_index =
1319 closing_tx.output.iter().position(|out| out.value.to_sat() == to_holder).unwrap()
1320 as u32;
1321 monitor.on_add_block(&[closing_tx.clone()], &block_hash);
1322 assert_eq!(monitor.closing_depth(), 1);
1323 assert!(!monitor.is_done());
1324 for _ in 1..MAX_CLOSING_DEPTH {
1326 monitor.on_add_block(&[], &block_hash);
1327 }
1328 assert!(!monitor.is_done());
1329 let sweep_cp_tx = make_tx(vec![make_txin2(closing_txid, 1 - holder_output_index)]);
1330 monitor.on_add_block(&[sweep_cp_tx], &block_hash);
1331 for _ in 1..MAX_CLOSING_DEPTH {
1333 monitor.on_add_block(&[], &block_hash);
1334 }
1335 assert!(!monitor.is_done());
1336 let sweep_holder_tx = make_tx(vec![make_txin2(closing_txid, holder_output_index)]);
1337 monitor.on_add_block(&[sweep_holder_tx], &block_hash);
1338 for _ in 1..MIN_DEPTH {
1340 monitor.on_add_block(&[], &block_hash);
1341 }
1342 node.forget_channel(&channel_id).unwrap();
1343 assert!(monitor.is_done());
1344 }
1345
1346 #[test]
1347 fn test_unilateral_cp_and_htlcs_close() {
1348 let block_hash = BlockHash::all_zeros();
1349 let (node, channel_id, monitor, _funding_txid) = setup_funded_channel();
1350
1351 let commit_num = 23;
1352 let feerate_per_kw = 1000;
1353 let to_holder = 100000;
1354 let to_cp = 200000;
1355 let htlcs = vec![HTLCOutputInCommitment {
1356 offered: false,
1357 amount_msat: 10000,
1358 cltv_expiry: 0,
1359 payment_hash: PaymentHash([0; 32]),
1360 transaction_output_index: None,
1361 }];
1362 let closing_commitment_tx = node
1363 .with_channel(&channel_id, |chan| {
1364 let per_commitment_point = make_test_pubkey(12);
1365 chan.set_next_counterparty_commit_num_for_testing(
1366 commit_num,
1367 per_commitment_point.clone(),
1368 );
1369 Ok(chan.make_counterparty_commitment_tx(
1370 &per_commitment_point,
1371 commit_num,
1372 feerate_per_kw,
1373 to_holder,
1374 to_cp,
1375 htlcs.clone(),
1376 ))
1377 })
1378 .expect("make_holder_commitment_tx failed");
1379 let closing_tx = closing_commitment_tx.trust().built_transaction().transaction.clone();
1380 let closing_txid = closing_tx.compute_txid();
1381 let holder_output_index =
1382 closing_tx.output.iter().position(|out| out.value.to_sat() == to_holder).unwrap()
1383 as u32;
1384 let cp_output_index =
1385 closing_tx.output.iter().position(|out| out.value.to_sat() == to_cp).unwrap() as u32;
1386 let htlc_output_index = closing_tx
1387 .output
1388 .iter()
1389 .position(|out| out.value.to_sat() == htlcs[0].amount_msat / 1000)
1390 .unwrap() as u32;
1391 monitor.on_add_block(&[closing_tx.clone()], &block_hash);
1392 assert_eq!(monitor.closing_depth(), 1);
1393 assert!(!monitor.is_done());
1394 for _ in 1..MAX_CLOSING_DEPTH {
1396 monitor.on_add_block(&[], &block_hash);
1397 }
1398 assert!(!monitor.is_done());
1399 let sweep_cp_tx = make_tx(vec![make_txin2(closing_txid, cp_output_index)]);
1400 monitor.on_add_block(&[sweep_cp_tx], &block_hash);
1401 for _ in 1..MAX_CLOSING_DEPTH {
1403 monitor.on_add_block(&[], &block_hash);
1404 }
1405 assert!(!monitor.is_done());
1406 let sweep_holder_tx = make_tx(vec![make_txin2(closing_txid, holder_output_index)]);
1407 monitor.on_add_block(&[sweep_holder_tx], &block_hash);
1408
1409 let monitor1 = monitor.clone();
1410
1411 for _ in 1..MAX_CLOSING_DEPTH - 1 {
1414 monitor.on_add_block(&[], &block_hash);
1415 }
1416 assert!(!monitor.is_done());
1417 monitor.on_add_block(&[], &block_hash);
1418 assert!(!monitor.is_done());
1419
1420 let sweep_htlc_tx = make_tx(vec![make_txin2(closing_txid, htlc_output_index)]);
1422 monitor1.on_add_block(&[sweep_htlc_tx], &block_hash);
1423
1424 for _ in 1..MIN_DEPTH {
1425 monitor1.on_add_block(&[], &block_hash);
1426 }
1427 assert!(!monitor1.is_done());
1429
1430 node.forget_channel(&channel_id).unwrap();
1432 assert!(monitor.is_done());
1433 assert!(monitor1.is_done());
1434 }
1435
1436 fn setup_funded_channel() -> (Arc<Node>, ChannelId, ChainMonitor, Txid) {
1437 let funding_tx = make_tx(vec![make_txin(1), make_txin(2)]);
1438 let funding_outpoint = OutPoint::new(funding_tx.compute_txid(), 0);
1439 let setup = make_channel_setup(funding_outpoint);
1440
1441 let (node, channel_id) =
1442 init_node_and_channel(TEST_NODE_CONFIG, TEST_SEED[1], setup.clone());
1443 let channel = node.get_channel(&channel_id).unwrap();
1444 let cpp = Box::new(ChannelCommitmentPointProvider::new(channel.clone()));
1445 let monitor = node
1446 .with_channel(&channel_id, |chan| Ok(chan.monitor.clone().as_monitor(cpp.clone())))
1447 .unwrap();
1448 let block_hash = BlockHash::all_zeros();
1449 monitor.on_add_block(&[], &block_hash);
1450 monitor.on_add_block(&[funding_tx.clone()], &block_hash);
1451 assert_eq!(monitor.funding_depth(), 1);
1452 (node, channel_id, monitor, funding_tx.compute_txid())
1453 }
1454
1455 fn make_txin2(prev_txid: Txid, prevout: u32) -> TxIn {
1456 TxIn {
1457 previous_output: OutPoint::new(prev_txid, prevout),
1458 script_sig: Default::default(),
1459 sequence: Default::default(),
1460 witness: Default::default(),
1461 }
1462 }
1463
1464 fn make_channel_setup(funding_outpoint: OutPoint) -> ChannelSetup {
1465 ChannelSetup {
1466 is_outbound: true,
1467 channel_value_sat: 3_000_000,
1468 push_value_msat: 0,
1469 funding_outpoint,
1470 holder_selected_contest_delay: 6,
1471 holder_shutdown_script: None,
1472 counterparty_points: make_test_counterparty_points(),
1473 counterparty_selected_contest_delay: 7,
1474 counterparty_shutdown_script: None,
1475 commitment_type: CommitmentType::StaticRemoteKey,
1476 }
1477 }
1478}