1use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor};
19use crate::sync::Mutex;
20use crate::util::logger::Logger;
21use crate::util::persist::{
22 KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
23 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
24};
25use crate::util::ser::{Readable, ReadableArgs, Writeable};
26use crate::{impl_writeable_tlv_based, log_debug, log_error};
27
28use bitcoin::block::Header;
29use bitcoin::locktime::absolute::LockTime;
30use bitcoin::secp256k1::Secp256k1;
31use bitcoin::{BlockHash, Transaction, Txid};
32
33use core::ops::Deref;
34
35pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
37
38#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct TrackedSpendableOutput {
41 pub descriptor: SpendableOutputDescriptor,
43 pub channel_id: Option<ChannelId>,
47 pub status: OutputSpendStatus,
49}
50
51impl TrackedSpendableOutput {
52 fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
53 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
54 match &self.descriptor {
55 SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
56 WatchedOutput {
57 block_hash,
58 outpoint: *outpoint,
59 script_pubkey: output.script_pubkey.clone(),
60 }
61 },
62 SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
63 block_hash,
64 outpoint: output.outpoint,
65 script_pubkey: output.output.script_pubkey.clone(),
66 },
67 SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
68 block_hash,
69 outpoint: output.outpoint,
70 script_pubkey: output.output.script_pubkey.clone(),
71 },
72 }
73 }
74
75 pub fn is_spent_in(&self, tx: &Transaction) -> bool {
77 let prev_outpoint = self.descriptor.outpoint().into_bitcoin_outpoint();
78 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
79 }
80}
81
82impl_writeable_tlv_based!(TrackedSpendableOutput, {
83 (0, descriptor, required),
84 (2, channel_id, option),
85 (4, status, required),
86});
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum OutputSpendStatus {
91 PendingInitialBroadcast {
94 delayed_until_height: Option<u32>,
96 },
97 PendingFirstConfirmation {
99 first_broadcast_hash: BlockHash,
101 latest_broadcast_height: u32,
103 latest_spending_tx: Transaction,
105 },
106 PendingThresholdConfirmations {
113 first_broadcast_hash: BlockHash,
115 latest_broadcast_height: u32,
117 latest_spending_tx: Transaction,
119 confirmation_height: u32,
121 confirmation_hash: BlockHash,
123 },
124}
125
126impl OutputSpendStatus {
127 fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
128 match self {
129 Self::PendingInitialBroadcast { delayed_until_height } => {
130 if let Some(delayed_until_height) = delayed_until_height {
131 debug_assert!(
132 cur_height >= *delayed_until_height,
133 "We should never broadcast before the required height is reached."
134 );
135 }
136 *self = Self::PendingFirstConfirmation {
137 first_broadcast_hash: cur_hash,
138 latest_broadcast_height: cur_height,
139 latest_spending_tx,
140 };
141 },
142 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
143 *self = Self::PendingFirstConfirmation {
144 first_broadcast_hash: *first_broadcast_hash,
145 latest_broadcast_height: cur_height,
146 latest_spending_tx,
147 };
148 },
149 Self::PendingThresholdConfirmations { .. } => {
150 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
151 },
152 }
153 }
154
155 fn confirmed(
156 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
157 latest_spending_tx: Transaction,
158 ) {
159 match self {
160 Self::PendingInitialBroadcast { .. } => {
161 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
164 *self = Self::PendingThresholdConfirmations {
165 first_broadcast_hash: confirmation_hash,
166 latest_broadcast_height: confirmation_height,
167 latest_spending_tx,
168 confirmation_height,
169 confirmation_hash,
170 };
171 },
172 Self::PendingFirstConfirmation {
173 first_broadcast_hash,
174 latest_broadcast_height,
175 ..
176 } => {
177 *self = Self::PendingThresholdConfirmations {
178 first_broadcast_hash: *first_broadcast_hash,
179 latest_broadcast_height: *latest_broadcast_height,
180 latest_spending_tx,
181 confirmation_height,
182 confirmation_hash,
183 };
184 },
185 Self::PendingThresholdConfirmations {
186 first_broadcast_hash,
187 latest_broadcast_height,
188 ..
189 } => {
190 *self = Self::PendingThresholdConfirmations {
191 first_broadcast_hash: *first_broadcast_hash,
192 latest_broadcast_height: *latest_broadcast_height,
193 latest_spending_tx,
194 confirmation_height,
195 confirmation_hash,
196 };
197 },
198 }
199 }
200
201 fn unconfirmed(&mut self) {
202 match self {
203 Self::PendingInitialBroadcast { .. } => {
204 debug_assert!(
205 false,
206 "We should only mark a spend as unconfirmed if it used to be confirmed."
207 );
208 },
209 Self::PendingFirstConfirmation { .. } => {
210 debug_assert!(
211 false,
212 "We should only mark a spend as unconfirmed if it used to be confirmed."
213 );
214 },
215 Self::PendingThresholdConfirmations {
216 first_broadcast_hash,
217 latest_broadcast_height,
218 latest_spending_tx,
219 ..
220 } => {
221 *self = Self::PendingFirstConfirmation {
222 first_broadcast_hash: *first_broadcast_hash,
223 latest_broadcast_height: *latest_broadcast_height,
224 latest_spending_tx: latest_spending_tx.clone(),
225 };
226 },
227 }
228 }
229
230 fn is_delayed(&self, cur_height: u32) -> bool {
231 match self {
232 Self::PendingInitialBroadcast { delayed_until_height } => {
233 delayed_until_height.map_or(false, |req_height| cur_height < req_height)
234 },
235 Self::PendingFirstConfirmation { .. } => false,
236 Self::PendingThresholdConfirmations { .. } => false,
237 }
238 }
239
240 fn first_broadcast_hash(&self) -> Option<BlockHash> {
241 match self {
242 Self::PendingInitialBroadcast { .. } => None,
243 Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
244 Some(*first_broadcast_hash)
245 },
246 Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
247 Some(*first_broadcast_hash)
248 },
249 }
250 }
251
252 fn latest_broadcast_height(&self) -> Option<u32> {
253 match self {
254 Self::PendingInitialBroadcast { .. } => None,
255 Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
256 Some(*latest_broadcast_height)
257 },
258 Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
259 Some(*latest_broadcast_height)
260 },
261 }
262 }
263
264 fn confirmation_height(&self) -> Option<u32> {
265 match self {
266 Self::PendingInitialBroadcast { .. } => None,
267 Self::PendingFirstConfirmation { .. } => None,
268 Self::PendingThresholdConfirmations { confirmation_height, .. } => {
269 Some(*confirmation_height)
270 },
271 }
272 }
273
274 fn confirmation_hash(&self) -> Option<BlockHash> {
275 match self {
276 Self::PendingInitialBroadcast { .. } => None,
277 Self::PendingFirstConfirmation { .. } => None,
278 Self::PendingThresholdConfirmations { confirmation_hash, .. } => {
279 Some(*confirmation_hash)
280 },
281 }
282 }
283
284 fn latest_spending_tx(&self) -> Option<&Transaction> {
285 match self {
286 Self::PendingInitialBroadcast { .. } => None,
287 Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
288 Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
289 Some(latest_spending_tx)
290 },
291 }
292 }
293
294 fn is_confirmed(&self) -> bool {
295 match self {
296 Self::PendingInitialBroadcast { .. } => false,
297 Self::PendingFirstConfirmation { .. } => false,
298 Self::PendingThresholdConfirmations { .. } => true,
299 }
300 }
301}
302
303impl_writeable_tlv_based_enum!(OutputSpendStatus,
304 (0, PendingInitialBroadcast) => {
305 (0, delayed_until_height, option),
306 },
307 (2, PendingFirstConfirmation) => {
308 (0, first_broadcast_hash, required),
309 (2, latest_broadcast_height, required),
310 (4, latest_spending_tx, required),
311 },
312 (4, PendingThresholdConfirmations) => {
313 (0, first_broadcast_hash, required),
314 (2, latest_broadcast_height, required),
315 (4, latest_spending_tx, required),
316 (6, confirmation_height, required),
317 (8, confirmation_hash, required),
318 },
319);
320
321pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
336where
337 B::Target: BroadcasterInterface,
338 D::Target: ChangeDestinationSource,
339 E::Target: FeeEstimator,
340 F::Target: Filter + Sync + Send,
341 K::Target: KVStore,
342 L::Target: Logger,
343 O::Target: OutputSpender,
344{
345 sweeper_state: Mutex<SweeperState>,
346 broadcaster: B,
347 fee_estimator: E,
348 chain_data_source: Option<F>,
349 output_spender: O,
350 change_destination_source: D,
351 kv_store: K,
352 logger: L,
353}
354
355impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356 OutputSweeper<B, D, E, F, K, L, O>
357where
358 B::Target: BroadcasterInterface,
359 D::Target: ChangeDestinationSource,
360 E::Target: FeeEstimator,
361 F::Target: Filter + Sync + Send,
362 K::Target: KVStore,
363 L::Target: Logger,
364 O::Target: OutputSpender,
365{
366 pub fn new(
371 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373 ) -> Self {
374 let outputs = Vec::new();
375 let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
376 Self {
377 sweeper_state,
378 broadcaster,
379 fee_estimator,
380 chain_data_source,
381 output_spender,
382 change_destination_source,
383 kv_store,
384 logger,
385 }
386 }
387
388 pub fn track_spendable_outputs(
404 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
405 exclude_static_outputs: bool, delay_until_height: Option<u32>,
406 ) -> Result<(), ()> {
407 let mut relevant_descriptors = output_descriptors
408 .into_iter()
409 .filter(|desc| {
410 !(exclude_static_outputs
411 && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
412 })
413 .peekable();
414
415 if relevant_descriptors.peek().is_none() {
416 return Ok(());
417 }
418
419 let spending_tx_opt;
420 {
421 let mut state_lock = self.sweeper_state.lock().unwrap();
422 for descriptor in relevant_descriptors {
423 let output_info = TrackedSpendableOutput {
424 descriptor,
425 channel_id,
426 status: OutputSpendStatus::PendingInitialBroadcast {
427 delayed_until_height: delay_until_height,
428 },
429 };
430
431 if state_lock
432 .outputs
433 .iter()
434 .find(|o| o.descriptor == output_info.descriptor)
435 .is_some()
436 {
437 continue;
438 }
439
440 state_lock.outputs.push(output_info);
441 }
442 spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
443 self.persist_state(&*state_lock).map_err(|e| {
444 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
445 })?;
446 }
447
448 if let Some(spending_tx) = spending_tx_opt {
449 self.broadcaster.broadcast_transactions(&[&spending_tx]);
450 }
451
452 Ok(())
453 }
454
455 pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
457 self.sweeper_state.lock().unwrap().outputs.clone()
458 }
459
460 pub fn current_best_block(&self) -> BestBlock {
463 self.sweeper_state.lock().unwrap().best_block
464 }
465
466 fn regenerate_spend_if_necessary(
467 &self, sweeper_state: &mut SweeperState,
468 ) -> Option<Transaction> {
469 let cur_height = sweeper_state.best_block.height;
470 let cur_hash = sweeper_state.best_block.block_hash;
471 let filter_fn = |o: &TrackedSpendableOutput| {
472 if o.status.is_confirmed() {
473 return false;
475 }
476
477 if o.status.is_delayed(cur_height) {
478 return false;
480 }
481
482 if o.status.latest_broadcast_height() >= Some(cur_height) {
483 return false;
485 }
486
487 true
488 };
489
490 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
491 sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
492
493 if respend_descriptors.is_empty() {
494 return None;
496 }
497
498 let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) {
499 Ok(spending_tx) => {
500 log_debug!(
501 self.logger,
502 "Generating and broadcasting sweeping transaction {}",
503 spending_tx.compute_txid()
504 );
505 spending_tx
506 },
507 Err(e) => {
508 log_error!(self.logger, "Error spending outputs: {:?}", e);
509 return None;
510 },
511 };
512
513 let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
516 for output_info in respend_outputs {
517 if let Some(filter) = self.chain_data_source.as_ref() {
518 let watched_output = output_info.to_watched_output(cur_hash);
519 filter.register_output(watched_output);
520 }
521
522 output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
523 }
524
525 Some(spending_tx)
526 }
527
528 fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
529 let cur_height = sweeper_state.best_block.height;
530
531 sweeper_state.outputs.retain(|o| {
533 if let Some(confirmation_height) = o.status.confirmation_height() {
534 if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
537 log_debug!(self.logger,
538 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
539 o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
540 );
541 return false;
542 }
543 }
544 true
545 });
546 }
547
548 fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
549 self.kv_store
550 .write(
551 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
552 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
553 OUTPUT_SWEEPER_PERSISTENCE_KEY,
554 &sweeper_state.encode(),
555 )
556 .map_err(|e| {
557 log_error!(
558 self.logger,
559 "Write for key {}/{}/{} failed due to: {}",
560 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
561 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
562 OUTPUT_SWEEPER_PERSISTENCE_KEY,
563 e
564 );
565 e
566 })
567 }
568
569 fn spend_outputs(
570 &self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
571 ) -> Result<Transaction, ()> {
572 let tx_feerate =
573 self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
574 let change_destination_script =
575 self.change_destination_source.get_change_destination_script()?;
576 let cur_height = sweeper_state.best_block.height;
577 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
578 self.output_spender.spend_spendable_outputs(
579 &descriptors,
580 Vec::new(),
581 change_destination_script,
582 tx_feerate,
583 locktime,
584 &Secp256k1::new(),
585 )
586 }
587
588 fn transactions_confirmed_internal(
589 &self, sweeper_state: &mut SweeperState, header: &Header,
590 txdata: &chain::transaction::TransactionData, height: u32,
591 ) {
592 let confirmation_hash = header.block_hash();
593 for (_, tx) in txdata {
594 for output_info in sweeper_state.outputs.iter_mut() {
595 if output_info.is_spent_in(*tx) {
596 output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
597 }
598 }
599 }
600 }
601
602 fn best_block_updated_internal(
603 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
604 ) -> Option<Transaction> {
605 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
606 self.prune_confirmed_outputs(sweeper_state);
607 let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
608 spending_tx_opt
609 }
610}
611
612impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
613 for OutputSweeper<B, D, E, F, K, L, O>
614where
615 B::Target: BroadcasterInterface,
616 D::Target: ChangeDestinationSource,
617 E::Target: FeeEstimator,
618 F::Target: Filter + Sync + Send,
619 K::Target: KVStore,
620 L::Target: Logger,
621 O::Target: OutputSpender,
622{
623 fn filtered_block_connected(
624 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
625 ) {
626 let mut spending_tx_opt;
627 {
628 let mut state_lock = self.sweeper_state.lock().unwrap();
629 assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
630 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
631 assert_eq!(state_lock.best_block.height, height - 1,
632 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
633
634 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
635 spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
636
637 self.persist_state(&*state_lock).unwrap_or_else(|e| {
638 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
639 spending_tx_opt = None;
641 });
642 }
643
644 if let Some(spending_tx) = spending_tx_opt {
645 self.broadcaster.broadcast_transactions(&[&spending_tx]);
646 }
647 }
648
649 fn block_disconnected(&self, header: &Header, height: u32) {
650 let mut state_lock = self.sweeper_state.lock().unwrap();
651
652 let new_height = height - 1;
653 let block_hash = header.block_hash();
654
655 assert_eq!(state_lock.best_block.block_hash, block_hash,
656 "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
657 assert_eq!(state_lock.best_block.height, height,
658 "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
659 state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
660
661 for output_info in state_lock.outputs.iter_mut() {
662 if output_info.status.confirmation_hash() == Some(block_hash) {
663 debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
664 output_info.status.unconfirmed();
665 }
666 }
667
668 self.persist_state(&*state_lock).unwrap_or_else(|e| {
669 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
670 });
671 }
672}
673
674impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
675 for OutputSweeper<B, D, E, F, K, L, O>
676where
677 B::Target: BroadcasterInterface,
678 D::Target: ChangeDestinationSource,
679 E::Target: FeeEstimator,
680 F::Target: Filter + Sync + Send,
681 K::Target: KVStore,
682 L::Target: Logger,
683 O::Target: OutputSpender,
684{
685 fn transactions_confirmed(
686 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
687 ) {
688 let mut state_lock = self.sweeper_state.lock().unwrap();
689 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
690 self.persist_state(&*state_lock).unwrap_or_else(|e| {
691 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
692 });
693 }
694
695 fn transaction_unconfirmed(&self, txid: &Txid) {
696 let mut state_lock = self.sweeper_state.lock().unwrap();
697
698 let unconf_height = state_lock
700 .outputs
701 .iter()
702 .find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
703 .and_then(|o| o.status.confirmation_height());
704
705 if let Some(unconf_height) = unconf_height {
706 state_lock
708 .outputs
709 .iter_mut()
710 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
711 .for_each(|o| o.status.unconfirmed());
712
713 self.persist_state(&*state_lock).unwrap_or_else(|e| {
714 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
715 });
716 }
717 }
718
719 fn best_block_updated(&self, header: &Header, height: u32) {
720 let mut spending_tx_opt;
721 {
722 let mut state_lock = self.sweeper_state.lock().unwrap();
723 spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
724 self.persist_state(&*state_lock).unwrap_or_else(|e| {
725 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
726 spending_tx_opt = None;
728 });
729 }
730
731 if let Some(spending_tx) = spending_tx_opt {
732 self.broadcaster.broadcast_transactions(&[&spending_tx]);
733 }
734 }
735
736 fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
737 let state_lock = self.sweeper_state.lock().unwrap();
738 state_lock
739 .outputs
740 .iter()
741 .filter_map(|o| match o.status {
742 OutputSpendStatus::PendingThresholdConfirmations {
743 ref latest_spending_tx,
744 confirmation_height,
745 confirmation_hash,
746 ..
747 } => Some((
748 latest_spending_tx.compute_txid(),
749 confirmation_height,
750 Some(confirmation_hash),
751 )),
752 _ => None,
753 })
754 .collect::<Vec<_>>()
755 }
756}
757
758#[derive(Debug, Clone)]
759struct SweeperState {
760 outputs: Vec<TrackedSpendableOutput>,
761 best_block: BestBlock,
762}
763
764impl_writeable_tlv_based!(SweeperState, {
765 (0, outputs, required_vec),
766 (2, best_block, required),
767});
768
769#[derive(Debug, Clone)]
772pub enum SpendingDelay {
773 Relative {
776 num_blocks: u32,
778 },
779 Absolute {
781 height: u32,
783 },
784}
785
786impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
787 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for OutputSweeper<B, D, E, F, K, L, O>
788where
789 B::Target: BroadcasterInterface,
790 D::Target: ChangeDestinationSource,
791 E::Target: FeeEstimator,
792 F::Target: Filter + Sync + Send,
793 K::Target: KVStore,
794 L::Target: Logger,
795 O::Target: OutputSpender,
796{
797 #[inline]
798 fn read<R: io::Read>(
799 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
800 ) -> Result<Self, DecodeError> {
801 let (
802 broadcaster,
803 fee_estimator,
804 chain_data_source,
805 output_spender,
806 change_destination_source,
807 kv_store,
808 logger,
809 ) = args;
810 let state = SweeperState::read(reader)?;
811 let best_block = state.best_block;
812
813 if let Some(filter) = chain_data_source.as_ref() {
814 for output_info in &state.outputs {
815 let watched_output = output_info.to_watched_output(best_block.block_hash);
816 filter.register_output(watched_output);
817 }
818 }
819
820 let sweeper_state = Mutex::new(state);
821 Ok(Self {
822 sweeper_state,
823 broadcaster,
824 fee_estimator,
825 chain_data_source,
826 output_spender,
827 change_destination_source,
828 kv_store,
829 logger,
830 })
831 }
832}
833
834impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
835 ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
836where
837 B::Target: BroadcasterInterface,
838 D::Target: ChangeDestinationSource,
839 E::Target: FeeEstimator,
840 F::Target: Filter + Sync + Send,
841 K::Target: KVStore,
842 L::Target: Logger,
843 O::Target: OutputSpender,
844{
845 #[inline]
846 fn read<R: io::Read>(
847 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
848 ) -> Result<Self, DecodeError> {
849 let (
850 broadcaster,
851 fee_estimator,
852 chain_data_source,
853 output_spender,
854 change_destination_source,
855 kv_store,
856 logger,
857 ) = args;
858 let state = SweeperState::read(reader)?;
859 let best_block = state.best_block;
860
861 if let Some(filter) = chain_data_source.as_ref() {
862 for output_info in &state.outputs {
863 let watched_output = output_info.to_watched_output(best_block.block_hash);
864 filter.register_output(watched_output);
865 }
866 }
867
868 let sweeper_state = Mutex::new(state);
869 Ok((
870 best_block,
871 OutputSweeper {
872 sweeper_state,
873 broadcaster,
874 fee_estimator,
875 chain_data_source,
876 output_spender,
877 change_destination_source,
878 kv_store,
879 logger,
880 },
881 ))
882 }
883}