1use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{
19 ChangeDestinationSource, ChangeDestinationSourceSync, ChangeDestinationSourceSyncWrapper,
20 OutputSpender, SpendableOutputDescriptor,
21};
22use crate::sync::Mutex;
23use crate::util::logger::Logger;
24use crate::util::persist::{
25 KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY,
26 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
27};
28use crate::util::ser::{Readable, ReadableArgs, Writeable};
29use crate::{impl_writeable_tlv_based, log_debug, log_error};
30
31use bitcoin::block::Header;
32use bitcoin::locktime::absolute::LockTime;
33use bitcoin::secp256k1::Secp256k1;
34use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
35
36use core::future::Future;
37use core::ops::Deref;
38use core::pin::Pin;
39use core::sync::atomic::{AtomicBool, Ordering};
40use core::task;
41
42use super::async_poll::dummy_waker;
43
44pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
46
47#[derive(Clone, Debug, PartialEq, Eq)]
49pub struct TrackedSpendableOutput {
50 pub descriptor: SpendableOutputDescriptor,
52 pub channel_id: Option<ChannelId>,
56 pub status: OutputSpendStatus,
58}
59
60impl TrackedSpendableOutput {
61 fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
62 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
63 match &self.descriptor {
64 SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
65 WatchedOutput {
66 block_hash,
67 outpoint: *outpoint,
68 script_pubkey: output.script_pubkey.clone(),
69 }
70 },
71 SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
72 block_hash,
73 outpoint: output.outpoint,
74 script_pubkey: output.output.script_pubkey.clone(),
75 },
76 SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
77 block_hash,
78 outpoint: output.outpoint,
79 script_pubkey: output.output.script_pubkey.clone(),
80 },
81 }
82 }
83
84 pub fn is_spent_in(&self, tx: &Transaction) -> bool {
86 let prev_outpoint = self.descriptor.spendable_outpoint().into_bitcoin_outpoint();
87 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
88 }
89}
90
91impl_writeable_tlv_based!(TrackedSpendableOutput, {
92 (0, descriptor, required),
93 (2, channel_id, option),
94 (4, status, required),
95});
96
97#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum OutputSpendStatus {
100 PendingInitialBroadcast {
103 delayed_until_height: Option<u32>,
105 },
106 PendingFirstConfirmation {
108 first_broadcast_hash: BlockHash,
110 latest_broadcast_height: u32,
112 latest_spending_tx: Transaction,
114 },
115 PendingThresholdConfirmations {
122 first_broadcast_hash: BlockHash,
124 latest_broadcast_height: u32,
126 latest_spending_tx: Transaction,
128 confirmation_height: u32,
130 confirmation_hash: BlockHash,
132 },
133}
134
135impl OutputSpendStatus {
136 fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
137 match self {
138 Self::PendingInitialBroadcast { delayed_until_height } => {
139 if let Some(delayed_until_height) = delayed_until_height {
140 debug_assert!(
141 cur_height >= *delayed_until_height,
142 "We should never broadcast before the required height is reached."
143 );
144 }
145 *self = Self::PendingFirstConfirmation {
146 first_broadcast_hash: cur_hash,
147 latest_broadcast_height: cur_height,
148 latest_spending_tx,
149 };
150 },
151 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
152 *self = Self::PendingFirstConfirmation {
153 first_broadcast_hash: *first_broadcast_hash,
154 latest_broadcast_height: cur_height,
155 latest_spending_tx,
156 };
157 },
158 Self::PendingThresholdConfirmations { .. } => {
159 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
160 },
161 }
162 }
163
164 fn confirmed(
165 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
166 latest_spending_tx: Transaction,
167 ) {
168 match self {
169 Self::PendingInitialBroadcast { .. } => {
170 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
173 *self = Self::PendingThresholdConfirmations {
174 first_broadcast_hash: confirmation_hash,
175 latest_broadcast_height: confirmation_height,
176 latest_spending_tx,
177 confirmation_height,
178 confirmation_hash,
179 };
180 },
181 Self::PendingFirstConfirmation {
182 first_broadcast_hash,
183 latest_broadcast_height,
184 ..
185 } => {
186 *self = Self::PendingThresholdConfirmations {
187 first_broadcast_hash: *first_broadcast_hash,
188 latest_broadcast_height: *latest_broadcast_height,
189 latest_spending_tx,
190 confirmation_height,
191 confirmation_hash,
192 };
193 },
194 Self::PendingThresholdConfirmations {
195 first_broadcast_hash,
196 latest_broadcast_height,
197 ..
198 } => {
199 *self = Self::PendingThresholdConfirmations {
200 first_broadcast_hash: *first_broadcast_hash,
201 latest_broadcast_height: *latest_broadcast_height,
202 latest_spending_tx,
203 confirmation_height,
204 confirmation_hash,
205 };
206 },
207 }
208 }
209
210 fn unconfirmed(&mut self) {
211 match self {
212 Self::PendingInitialBroadcast { .. } => {
213 debug_assert!(
214 false,
215 "We should only mark a spend as unconfirmed if it used to be confirmed."
216 );
217 },
218 Self::PendingFirstConfirmation { .. } => {
219 debug_assert!(
220 false,
221 "We should only mark a spend as unconfirmed if it used to be confirmed."
222 );
223 },
224 Self::PendingThresholdConfirmations {
225 first_broadcast_hash,
226 latest_broadcast_height,
227 latest_spending_tx,
228 ..
229 } => {
230 *self = Self::PendingFirstConfirmation {
231 first_broadcast_hash: *first_broadcast_hash,
232 latest_broadcast_height: *latest_broadcast_height,
233 latest_spending_tx: latest_spending_tx.clone(),
234 };
235 },
236 }
237 }
238
239 fn is_delayed(&self, cur_height: u32) -> bool {
240 match self {
241 Self::PendingInitialBroadcast { delayed_until_height } => {
242 delayed_until_height.map_or(false, |req_height| cur_height < req_height)
243 },
244 Self::PendingFirstConfirmation { .. } => false,
245 Self::PendingThresholdConfirmations { .. } => false,
246 }
247 }
248
249 fn first_broadcast_hash(&self) -> Option<BlockHash> {
250 match self {
251 Self::PendingInitialBroadcast { .. } => None,
252 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
253 Some(*first_broadcast_hash)
254 },
255 Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
256 Some(*first_broadcast_hash)
257 },
258 }
259 }
260
261 fn latest_broadcast_height(&self) -> Option<u32> {
262 match self {
263 Self::PendingInitialBroadcast { .. } => None,
264 Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
265 Some(*latest_broadcast_height)
266 },
267 Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
268 Some(*latest_broadcast_height)
269 },
270 }
271 }
272
273 fn confirmation_height(&self) -> Option<u32> {
274 match self {
275 Self::PendingInitialBroadcast { .. } => None,
276 Self::PendingFirstConfirmation { .. } => None,
277 Self::PendingThresholdConfirmations { confirmation_height, .. } => {
278 Some(*confirmation_height)
279 },
280 }
281 }
282
283 fn latest_spending_tx(&self) -> Option<&Transaction> {
284 match self {
285 Self::PendingInitialBroadcast { .. } => None,
286 Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
287 Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
288 Some(latest_spending_tx)
289 },
290 }
291 }
292
293 fn is_confirmed(&self) -> bool {
294 match self {
295 Self::PendingInitialBroadcast { .. } => false,
296 Self::PendingFirstConfirmation { .. } => false,
297 Self::PendingThresholdConfirmations { .. } => true,
298 }
299 }
300}
301
302impl_writeable_tlv_based_enum!(OutputSpendStatus,
303 (0, PendingInitialBroadcast) => {
304 (0, delayed_until_height, option),
305 },
306 (2, PendingFirstConfirmation) => {
307 (0, first_broadcast_hash, required),
308 (2, latest_broadcast_height, required),
309 (4, latest_spending_tx, required),
310 },
311 (4, PendingThresholdConfirmations) => {
312 (0, first_broadcast_hash, required),
313 (2, latest_broadcast_height, required),
314 (4, latest_spending_tx, required),
315 (6, confirmation_height, required),
316 (8, confirmation_hash, required),
317 },
318);
319
320pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
335where
336 B::Target: BroadcasterInterface,
337 D::Target: ChangeDestinationSource,
338 E::Target: FeeEstimator,
339 F::Target: Filter,
340 K::Target: KVStore,
341 L::Target: Logger,
342 O::Target: OutputSpender,
343{
344 sweeper_state: Mutex<SweeperState>,
345 pending_sweep: AtomicBool,
346 broadcaster: B,
347 fee_estimator: E,
348 chain_data_source: Option<F>,
349 output_spender: O,
350 change_destination_source: D,
351 kv_store: K,
352 logger: L,
353}
354
355impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356 OutputSweeper<B, D, E, F, K, L, O>
357where
358 B::Target: BroadcasterInterface,
359 D::Target: ChangeDestinationSource,
360 E::Target: FeeEstimator,
361 F::Target: Filter,
362 K::Target: KVStore,
363 L::Target: Logger,
364 O::Target: OutputSpender,
365{
366 pub fn new(
371 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373 ) -> Self {
374 let outputs = Vec::new();
375 let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
376 Self {
377 sweeper_state,
378 pending_sweep: AtomicBool::new(false),
379 broadcaster,
380 fee_estimator,
381 chain_data_source,
382 output_spender,
383 change_destination_source,
384 kv_store,
385 logger,
386 }
387 }
388
389 pub async fn track_spendable_outputs(
405 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
406 exclude_static_outputs: bool, delay_until_height: Option<u32>,
407 ) -> Result<(), ()> {
408 let mut relevant_descriptors = output_descriptors
409 .into_iter()
410 .filter(|desc| {
411 !(exclude_static_outputs
412 && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
413 })
414 .peekable();
415
416 if relevant_descriptors.peek().is_none() {
417 return Ok(());
418 }
419
420 self.update_state(|state_lock| -> Result<((), bool), ()> {
421 for descriptor in relevant_descriptors {
422 let output_info = TrackedSpendableOutput {
423 descriptor,
424 channel_id,
425 status: OutputSpendStatus::PendingInitialBroadcast {
426 delayed_until_height: delay_until_height,
427 },
428 };
429
430 if state_lock
431 .outputs
432 .iter()
433 .find(|o| o.descriptor == output_info.descriptor)
434 .is_some()
435 {
436 continue;
437 }
438
439 state_lock.outputs.push(output_info);
440 state_lock.dirty = true;
441 }
442
443 Ok(((), false))
444 })
445 .await
446 }
447
448 pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
450 self.sweeper_state.lock().unwrap().outputs.clone()
451 }
452
453 pub fn current_best_block(&self) -> BestBlock {
456 self.sweeper_state.lock().unwrap().best_block
457 }
458
459 pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
462 if self
464 .pending_sweep
465 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
466 .is_err()
467 {
468 return Ok(());
469 }
470
471 let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
472
473 self.pending_sweep.store(false, Ordering::Release);
475
476 result
477 }
478
479 async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<(), ()> {
481 let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
482 if o.status.is_confirmed() {
483 return false;
485 }
486
487 if o.status.is_delayed(cur_height) {
488 return false;
490 }
491
492 if o.status.latest_broadcast_height() >= Some(cur_height) {
493 return false;
495 }
496
497 true
498 };
499
500 let has_respends = self
502 .update_state(|sweeper_state| -> Result<(bool, bool), ()> {
503 let cur_height = sweeper_state.best_block.height;
504 let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
505
506 Ok((has_respends, has_respends))
508 })
509 .await?;
510
511 if !has_respends {
512 return Ok(());
513 }
514
515 let change_destination_script =
517 self.change_destination_source.get_change_destination_script().await?;
518
519 let spending_tx = self
521 .update_state(|sweeper_state| -> Result<(Option<Transaction>, bool), ()> {
522 let cur_height = sweeper_state.best_block.height;
523 let cur_hash = sweeper_state.best_block.block_hash;
524
525 let respend_descriptors_set: HashSet<&SpendableOutputDescriptor> = sweeper_state
526 .outputs
527 .iter()
528 .filter(|o| filter_fn(*o, cur_height))
529 .map(|o| &o.descriptor)
530 .collect();
531
532 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
536 respend_descriptors_set.into_iter().collect();
537
538 if !respend_descriptors.is_empty() {
540 let spending_tx = self
541 .spend_outputs(
542 &sweeper_state,
543 &respend_descriptors,
544 change_destination_script,
545 )
546 .map_err(|e| {
547 log_error!(self.logger, "Error spending outputs: {:?}", e);
548 })?;
549
550 log_debug!(
551 self.logger,
552 "Generating and broadcasting sweeping transaction {}",
553 spending_tx.compute_txid()
554 );
555
556 let respend_outputs =
559 sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
560 for output_info in respend_outputs {
561 if let Some(filter) = self.chain_data_source.as_ref() {
562 let watched_output = output_info.to_watched_output(cur_hash);
563 filter.register_output(watched_output);
564 }
565
566 output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
567 sweeper_state.dirty = true;
568 }
569
570 Ok((Some(spending_tx), false))
571 } else {
572 Ok((None, false))
573 }
574 })
575 .await?;
576
577 if let Some(spending_tx) = spending_tx {
579 self.broadcaster.broadcast_transactions(&[&spending_tx]);
580 }
581
582 Ok(())
583 }
584
585 fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
586 let cur_height = sweeper_state.best_block.height;
587
588 sweeper_state.outputs.retain(|o| {
590 if let Some(confirmation_height) = o.status.confirmation_height() {
591 if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
594 log_debug!(self.logger,
595 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
596 o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
597 );
598 return false;
599 }
600 }
601 true
602 });
603
604 sweeper_state.dirty = true;
605 }
606
607 fn persist_state<'a>(
608 &self, sweeper_state: &SweeperState,
609 ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> {
610 let encoded = sweeper_state.encode();
611
612 self.kv_store.write(
613 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
614 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
615 OUTPUT_SWEEPER_PERSISTENCE_KEY,
616 encoded,
617 )
618 }
619
620 async fn update_state<X>(
624 &self, callback: impl FnOnce(&mut SweeperState) -> Result<(X, bool), ()>,
625 ) -> Result<X, ()> {
626 let (fut, res) = {
627 let mut state_lock = self.sweeper_state.lock().unwrap();
628
629 let (res, skip_persist) = callback(&mut state_lock)?;
630 if !state_lock.dirty || skip_persist {
631 return Ok(res);
632 }
633
634 state_lock.dirty = false;
635
636 (self.persist_state(&state_lock), res)
637 };
638
639 fut.await.map_err(|e| {
640 self.sweeper_state.lock().unwrap().dirty = true;
641
642 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
643 })?;
644
645 Ok(res)
646 }
647
648 fn spend_outputs(
649 &self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
650 change_destination_script: ScriptBuf,
651 ) -> Result<Transaction, ()> {
652 let tx_feerate =
653 self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
654 let cur_height = sweeper_state.best_block.height;
655 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
656 self.output_spender.spend_spendable_outputs(
657 descriptors,
658 Vec::new(),
659 change_destination_script,
660 tx_feerate,
661 locktime,
662 &Secp256k1::new(),
663 )
664 }
665
666 fn transactions_confirmed_internal(
667 &self, sweeper_state: &mut SweeperState, header: &Header,
668 txdata: &chain::transaction::TransactionData, height: u32,
669 ) {
670 let confirmation_hash = header.block_hash();
671 for (_, tx) in txdata {
672 for output_info in sweeper_state.outputs.iter_mut() {
673 if output_info.is_spent_in(*tx) {
674 output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
675 }
676 }
677 }
678
679 sweeper_state.dirty = true;
680 }
681
682 fn best_block_updated_internal(
683 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
684 ) {
685 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
686 self.prune_confirmed_outputs(sweeper_state);
687
688 sweeper_state.dirty = true;
689 }
690}
691
692impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
693 for OutputSweeper<B, D, E, F, K, L, O>
694where
695 B::Target: BroadcasterInterface,
696 D::Target: ChangeDestinationSource,
697 E::Target: FeeEstimator,
698 F::Target: Filter + Sync + Send,
699 K::Target: KVStore,
700 L::Target: Logger,
701 O::Target: OutputSpender,
702{
703 fn filtered_block_connected(
704 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
705 ) {
706 let mut state_lock = self.sweeper_state.lock().unwrap();
707 assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
708 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
709 assert_eq!(state_lock.best_block.height, height - 1,
710 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
711
712 self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
713 self.best_block_updated_internal(&mut state_lock, header, height);
714 }
715
716 fn blocks_disconnected(&self, fork_point: BestBlock) {
717 let mut state_lock = self.sweeper_state.lock().unwrap();
718
719 assert!(state_lock.best_block.height > fork_point.height,
720 "Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
721 state_lock.best_block = fork_point;
722
723 for output_info in state_lock.outputs.iter_mut() {
724 if output_info.status.confirmation_height() > Some(fork_point.height) {
725 output_info.status.unconfirmed();
726 }
727 }
728
729 state_lock.dirty = true;
730 }
731}
732
733impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
734 for OutputSweeper<B, D, E, F, K, L, O>
735where
736 B::Target: BroadcasterInterface,
737 D::Target: ChangeDestinationSource,
738 E::Target: FeeEstimator,
739 F::Target: Filter + Sync + Send,
740 K::Target: KVStore,
741 L::Target: Logger,
742 O::Target: OutputSpender,
743{
744 fn transactions_confirmed(
745 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
746 ) {
747 let mut state_lock = self.sweeper_state.lock().unwrap();
748 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
749 }
750
751 fn transaction_unconfirmed(&self, txid: &Txid) {
752 let mut state_lock = self.sweeper_state.lock().unwrap();
753
754 let unconf_height = state_lock
756 .outputs
757 .iter()
758 .find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
759 .and_then(|o| o.status.confirmation_height());
760
761 if let Some(unconf_height) = unconf_height {
762 state_lock
764 .outputs
765 .iter_mut()
766 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
767 .for_each(|o| o.status.unconfirmed());
768
769 state_lock.dirty = true;
770 }
771 }
772
773 fn best_block_updated(&self, header: &Header, height: u32) {
774 let mut state_lock = self.sweeper_state.lock().unwrap();
775 self.best_block_updated_internal(&mut state_lock, header, height);
776 }
777
778 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
779 let state_lock = self.sweeper_state.lock().unwrap();
780 state_lock
781 .outputs
782 .iter()
783 .filter_map(|o| match o.status {
784 OutputSpendStatus::PendingThresholdConfirmations {
785 ref latest_spending_tx,
786 confirmation_height,
787 confirmation_hash,
788 ..
789 } => Some((
790 latest_spending_tx.compute_txid(),
791 confirmation_height,
792 Some(confirmation_hash),
793 )),
794 _ => None,
795 })
796 .collect::<Vec<_>>()
797 }
798}
799
800#[derive(Debug, Clone)]
801struct SweeperState {
802 outputs: Vec<TrackedSpendableOutput>,
803 best_block: BestBlock,
804 dirty: bool,
805}
806
807impl_writeable_tlv_based!(SweeperState, {
808 (0, outputs, required_vec),
809 (2, best_block, required),
810 (_unused, dirty, (static_value, false)),
811});
812
813#[derive(Debug, Clone)]
816pub enum SpendingDelay {
817 Relative {
820 num_blocks: u32,
822 },
823 Absolute {
825 height: u32,
827 },
828}
829
830impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
831 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
832where
833 B::Target: BroadcasterInterface,
834 D::Target: ChangeDestinationSource,
835 E::Target: FeeEstimator,
836 F::Target: Filter + Sync + Send,
837 K::Target: KVStore,
838 L::Target: Logger,
839 O::Target: OutputSpender,
840{
841 #[inline]
842 fn read<R: io::Read>(
843 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
844 ) -> Result<Self, DecodeError> {
845 let (
846 broadcaster,
847 fee_estimator,
848 chain_data_source,
849 output_spender,
850 change_destination_source,
851 kv_store,
852 logger,
853 ) = args;
854 let state = SweeperState::read(reader)?;
855 let best_block = state.best_block;
856
857 if let Some(filter) = chain_data_source.as_ref() {
858 for output_info in &state.outputs {
859 let watched_output = output_info.to_watched_output(best_block.block_hash);
860 filter.register_output(watched_output);
861 }
862 }
863
864 let sweeper_state = Mutex::new(state);
865 Ok((
866 best_block,
867 OutputSweeper {
868 sweeper_state,
869 pending_sweep: AtomicBool::new(false),
870 broadcaster,
871 fee_estimator,
872 chain_data_source,
873 output_spender,
874 change_destination_source,
875 kv_store,
876 logger,
877 },
878 ))
879 }
880}
881
882pub struct OutputSweeperSync<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
884where
885 B::Target: BroadcasterInterface,
886 D::Target: ChangeDestinationSourceSync,
887 E::Target: FeeEstimator,
888 F::Target: Filter,
889 K::Target: KVStoreSync,
890 L::Target: Logger,
891 O::Target: OutputSpender,
892{
893 sweeper:
894 OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>,
895}
896
897impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
898 OutputSweeperSync<B, D, E, F, K, L, O>
899where
900 B::Target: BroadcasterInterface,
901 D::Target: ChangeDestinationSourceSync,
902 E::Target: FeeEstimator,
903 F::Target: Filter,
904 K::Target: KVStoreSync,
905 L::Target: Logger,
906 O::Target: OutputSpender,
907{
908 pub fn new(
910 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
911 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
912 ) -> Self {
913 let change_destination_source =
914 ChangeDestinationSourceSyncWrapper::new(change_destination_source);
915
916 let kv_store = KVStoreSyncWrapper(kv_store);
917
918 let sweeper = OutputSweeper::new(
919 best_block,
920 broadcaster,
921 fee_estimator,
922 chain_data_source,
923 output_spender,
924 change_destination_source,
925 kv_store,
926 logger,
927 );
928 Self { sweeper }
929 }
930
931 pub fn track_spendable_outputs(
933 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
934 exclude_static_outputs: bool, delay_until_height: Option<u32>,
935 ) -> Result<(), ()> {
936 let mut fut = Box::pin(self.sweeper.track_spendable_outputs(
937 output_descriptors,
938 channel_id,
939 exclude_static_outputs,
940 delay_until_height,
941 ));
942 let mut waker = dummy_waker();
943 let mut ctx = task::Context::from_waker(&mut waker);
944 match fut.as_mut().poll(&mut ctx) {
945 task::Poll::Ready(result) => result,
946 task::Poll::Pending => {
947 unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context");
949 },
950 }
951 }
952
953 pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
955 self.sweeper.tracked_spendable_outputs()
956 }
957
958 pub fn current_best_block(&self) -> BestBlock {
961 self.sweeper.current_best_block()
962 }
963
964 pub fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
967 let mut fut = Box::pin(self.sweeper.regenerate_and_broadcast_spend_if_necessary());
968 let mut waker = dummy_waker();
969 let mut ctx = task::Context::from_waker(&mut waker);
970 match fut.as_mut().poll(&mut ctx) {
971 task::Poll::Ready(result) => result,
972 task::Poll::Pending => {
973 unreachable!("OutputSweeper::regenerate_and_broadcast_spend_if_necessary should not be pending in a sync context");
975 },
976 }
977 }
978
979 #[doc(hidden)]
986 pub fn sweeper_async(
987 &self,
988 ) -> &OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>
989 {
990 &self.sweeper
991 }
992}
993
994impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
995 for OutputSweeperSync<B, D, E, F, K, L, O>
996where
997 B::Target: BroadcasterInterface,
998 D::Target: ChangeDestinationSourceSync,
999 E::Target: FeeEstimator,
1000 F::Target: Filter + Sync + Send,
1001 K::Target: KVStoreSync,
1002 L::Target: Logger,
1003 O::Target: OutputSpender,
1004{
1005 fn filtered_block_connected(
1006 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1007 ) {
1008 self.sweeper.filtered_block_connected(header, txdata, height);
1009 }
1010
1011 fn blocks_disconnected(&self, fork_point: BestBlock) {
1012 self.sweeper.blocks_disconnected(fork_point);
1013 }
1014}
1015
1016impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
1017 for OutputSweeperSync<B, D, E, F, K, L, O>
1018where
1019 B::Target: BroadcasterInterface,
1020 D::Target: ChangeDestinationSourceSync,
1021 E::Target: FeeEstimator,
1022 F::Target: Filter + Sync + Send,
1023 K::Target: KVStoreSync,
1024 L::Target: Logger,
1025 O::Target: OutputSpender,
1026{
1027 fn transactions_confirmed(
1028 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1029 ) {
1030 self.sweeper.transactions_confirmed(header, txdata, height)
1031 }
1032
1033 fn transaction_unconfirmed(&self, txid: &Txid) {
1034 self.sweeper.transaction_unconfirmed(txid)
1035 }
1036
1037 fn best_block_updated(&self, header: &Header, height: u32) {
1038 self.sweeper.best_block_updated(header, height);
1039 }
1040
1041 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1042 self.sweeper.get_relevant_txids()
1043 }
1044}
1045
1046impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
1047 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeperSync<B, D, E, F, K, L, O>)
1048where
1049 B::Target: BroadcasterInterface,
1050 D::Target: ChangeDestinationSourceSync,
1051 E::Target: FeeEstimator,
1052 F::Target: Filter + Sync + Send,
1053 K::Target: KVStoreSync,
1054 L::Target: Logger,
1055 O::Target: OutputSpender,
1056{
1057 #[inline]
1058 fn read<R: io::Read>(
1059 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
1060 ) -> Result<Self, DecodeError> {
1061 let (a, b, c, d, change_destination_source, kv_store, e) = args;
1062 let change_destination_source =
1063 ChangeDestinationSourceSyncWrapper::new(change_destination_source);
1064 let kv_store = KVStoreSyncWrapper(kv_store);
1065 let args = (a, b, c, d, change_destination_source, kv_store, e);
1066 let (best_block, sweeper) =
1067 <(BestBlock, OutputSweeper<_, _, _, _, _, _, _>)>::read(reader, args)?;
1068 Ok((best_block, OutputSweeperSync { sweeper }))
1069 }
1070}