lightning/util/
sweep.rs

1// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4// You may not use this file except in accordance with one or both of these
5// licenses.
6
7//! This module contains an [`OutputSweeper`] utility that keeps track of
8//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries
9//! sweeping them.
10
11use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor};
19use crate::sync::Mutex;
20use crate::util::logger::Logger;
21use crate::util::persist::{
22	KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
23	OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
24};
25use crate::util::ser::{Readable, ReadableArgs, Writeable};
26use crate::{impl_writeable_tlv_based, log_debug, log_error};
27
28use bitcoin::block::Header;
29use bitcoin::locktime::absolute::LockTime;
30use bitcoin::secp256k1::Secp256k1;
31use bitcoin::{BlockHash, Transaction, Txid};
32
33use core::ops::Deref;
34
35/// The number of blocks we wait before we prune the tracked spendable outputs.
36pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
37
38/// The state of a spendable output currently tracked by an [`OutputSweeper`].
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct TrackedSpendableOutput {
41	/// The tracked output descriptor.
42	pub descriptor: SpendableOutputDescriptor,
43	/// The channel this output belongs to.
44	///
45	/// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
46	pub channel_id: Option<ChannelId>,
47	/// The current status of the output spend.
48	pub status: OutputSpendStatus,
49}
50
51impl TrackedSpendableOutput {
52	fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
53		let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
54		match &self.descriptor {
55			SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
56				WatchedOutput {
57					block_hash,
58					outpoint: *outpoint,
59					script_pubkey: output.script_pubkey.clone(),
60				}
61			},
62			SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
63				block_hash,
64				outpoint: output.outpoint,
65				script_pubkey: output.output.script_pubkey.clone(),
66			},
67			SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
68				block_hash,
69				outpoint: output.outpoint,
70				script_pubkey: output.output.script_pubkey.clone(),
71			},
72		}
73	}
74
75	/// Returns whether the output is spent in the given transaction.
76	pub fn is_spent_in(&self, tx: &Transaction) -> bool {
77		let prev_outpoint = self.descriptor.outpoint().into_bitcoin_outpoint();
78		tx.input.iter().any(|input| input.previous_output == prev_outpoint)
79	}
80}
81
82impl_writeable_tlv_based!(TrackedSpendableOutput, {
83	(0, descriptor, required),
84	(2, channel_id, option),
85	(4, status, required),
86});
87
88/// The current status of the output spend.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum OutputSpendStatus {
91	/// The output is tracked but an initial spending transaction hasn't been generated and
92	/// broadcasted yet.
93	PendingInitialBroadcast {
94		/// The height at which we will first generate and broadcast a spending transaction.
95		delayed_until_height: Option<u32>,
96	},
97	/// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
98	PendingFirstConfirmation {
99		/// The hash of the chain tip when we first broadcast a transaction spending this output.
100		first_broadcast_hash: BlockHash,
101		/// The best height when we last broadcast a transaction spending this output.
102		latest_broadcast_height: u32,
103		/// The transaction spending this output we last broadcasted.
104		latest_spending_tx: Transaction,
105	},
106	/// A transaction spending the output has been confirmed on-chain but will be tracked until it
107	/// reaches at least [`PRUNE_DELAY_BLOCKS`] confirmations to ensure [`Event::SpendableOutputs`]
108	/// stemming from lingering [`ChannelMonitor`]s can safely be replayed.
109	///
110	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
111	/// [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor
112	PendingThresholdConfirmations {
113		/// The hash of the chain tip when we first broadcast a transaction spending this output.
114		first_broadcast_hash: BlockHash,
115		/// The best height when we last broadcast a transaction spending this output.
116		latest_broadcast_height: u32,
117		/// The transaction spending this output we saw confirmed on-chain.
118		latest_spending_tx: Transaction,
119		/// The height at which the spending transaction was confirmed.
120		confirmation_height: u32,
121		/// The hash of the block in which the spending transaction was confirmed.
122		confirmation_hash: BlockHash,
123	},
124}
125
126impl OutputSpendStatus {
127	fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
128		match self {
129			Self::PendingInitialBroadcast { delayed_until_height } => {
130				if let Some(delayed_until_height) = delayed_until_height {
131					debug_assert!(
132						cur_height >= *delayed_until_height,
133						"We should never broadcast before the required height is reached."
134					);
135				}
136				*self = Self::PendingFirstConfirmation {
137					first_broadcast_hash: cur_hash,
138					latest_broadcast_height: cur_height,
139					latest_spending_tx,
140				};
141			},
142			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
143				*self = Self::PendingFirstConfirmation {
144					first_broadcast_hash: *first_broadcast_hash,
145					latest_broadcast_height: cur_height,
146					latest_spending_tx,
147				};
148			},
149			Self::PendingThresholdConfirmations { .. } => {
150				debug_assert!(false, "We should never rebroadcast confirmed transactions.");
151			},
152		}
153	}
154
155	fn confirmed(
156		&mut self, confirmation_hash: BlockHash, confirmation_height: u32,
157		latest_spending_tx: Transaction,
158	) {
159		match self {
160			Self::PendingInitialBroadcast { .. } => {
161				// Generally we can't see any of our transactions confirmed if they haven't been
162				// broadcasted yet, so this should never be reachable via `transactions_confirmed`.
163				debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
164				*self = Self::PendingThresholdConfirmations {
165					first_broadcast_hash: confirmation_hash,
166					latest_broadcast_height: confirmation_height,
167					latest_spending_tx,
168					confirmation_height,
169					confirmation_hash,
170				};
171			},
172			Self::PendingFirstConfirmation {
173				first_broadcast_hash,
174				latest_broadcast_height,
175				..
176			} => {
177				*self = Self::PendingThresholdConfirmations {
178					first_broadcast_hash: *first_broadcast_hash,
179					latest_broadcast_height: *latest_broadcast_height,
180					latest_spending_tx,
181					confirmation_height,
182					confirmation_hash,
183				};
184			},
185			Self::PendingThresholdConfirmations {
186				first_broadcast_hash,
187				latest_broadcast_height,
188				..
189			} => {
190				*self = Self::PendingThresholdConfirmations {
191					first_broadcast_hash: *first_broadcast_hash,
192					latest_broadcast_height: *latest_broadcast_height,
193					latest_spending_tx,
194					confirmation_height,
195					confirmation_hash,
196				};
197			},
198		}
199	}
200
201	fn unconfirmed(&mut self) {
202		match self {
203			Self::PendingInitialBroadcast { .. } => {
204				debug_assert!(
205					false,
206					"We should only mark a spend as unconfirmed if it used to be confirmed."
207				);
208			},
209			Self::PendingFirstConfirmation { .. } => {
210				debug_assert!(
211					false,
212					"We should only mark a spend as unconfirmed if it used to be confirmed."
213				);
214			},
215			Self::PendingThresholdConfirmations {
216				first_broadcast_hash,
217				latest_broadcast_height,
218				latest_spending_tx,
219				..
220			} => {
221				*self = Self::PendingFirstConfirmation {
222					first_broadcast_hash: *first_broadcast_hash,
223					latest_broadcast_height: *latest_broadcast_height,
224					latest_spending_tx: latest_spending_tx.clone(),
225				};
226			},
227		}
228	}
229
230	fn is_delayed(&self, cur_height: u32) -> bool {
231		match self {
232			Self::PendingInitialBroadcast { delayed_until_height } => {
233				delayed_until_height.map_or(false, |req_height| cur_height < req_height)
234			},
235			Self::PendingFirstConfirmation { .. } => false,
236			Self::PendingThresholdConfirmations { .. } => false,
237		}
238	}
239
240	fn first_broadcast_hash(&self) -> Option<BlockHash> {
241		match self {
242			Self::PendingInitialBroadcast { .. } => None,
243			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
244				Some(*first_broadcast_hash)
245			},
246			Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
247				Some(*first_broadcast_hash)
248			},
249		}
250	}
251
252	fn latest_broadcast_height(&self) -> Option<u32> {
253		match self {
254			Self::PendingInitialBroadcast { .. } => None,
255			Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
256				Some(*latest_broadcast_height)
257			},
258			Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
259				Some(*latest_broadcast_height)
260			},
261		}
262	}
263
264	fn confirmation_height(&self) -> Option<u32> {
265		match self {
266			Self::PendingInitialBroadcast { .. } => None,
267			Self::PendingFirstConfirmation { .. } => None,
268			Self::PendingThresholdConfirmations { confirmation_height, .. } => {
269				Some(*confirmation_height)
270			},
271		}
272	}
273
274	fn confirmation_hash(&self) -> Option<BlockHash> {
275		match self {
276			Self::PendingInitialBroadcast { .. } => None,
277			Self::PendingFirstConfirmation { .. } => None,
278			Self::PendingThresholdConfirmations { confirmation_hash, .. } => {
279				Some(*confirmation_hash)
280			},
281		}
282	}
283
284	fn latest_spending_tx(&self) -> Option<&Transaction> {
285		match self {
286			Self::PendingInitialBroadcast { .. } => None,
287			Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
288			Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
289				Some(latest_spending_tx)
290			},
291		}
292	}
293
294	fn is_confirmed(&self) -> bool {
295		match self {
296			Self::PendingInitialBroadcast { .. } => false,
297			Self::PendingFirstConfirmation { .. } => false,
298			Self::PendingThresholdConfirmations { .. } => true,
299		}
300	}
301}
302
303impl_writeable_tlv_based_enum!(OutputSpendStatus,
304	(0, PendingInitialBroadcast) => {
305		(0, delayed_until_height, option),
306	},
307	(2, PendingFirstConfirmation) => {
308		(0, first_broadcast_hash, required),
309		(2, latest_broadcast_height, required),
310		(4, latest_spending_tx, required),
311	},
312	(4, PendingThresholdConfirmations) => {
313		(0, first_broadcast_hash, required),
314		(2, latest_broadcast_height, required),
315		(4, latest_spending_tx, required),
316		(6, confirmation_height, required),
317		(8, confirmation_hash, required),
318	},
319);
320
321/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
322/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor
323/// methods.
324///
325/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`].
326///
327/// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
328/// implementation and hence has to be connected with the utilized chain data sources.
329///
330/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
331/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
332/// constructor.
333///
334/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
335pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
336where
337	B::Target: BroadcasterInterface,
338	D::Target: ChangeDestinationSource,
339	E::Target: FeeEstimator,
340	F::Target: Filter + Sync + Send,
341	K::Target: KVStore,
342	L::Target: Logger,
343	O::Target: OutputSpender,
344{
345	sweeper_state: Mutex<SweeperState>,
346	broadcaster: B,
347	fee_estimator: E,
348	chain_data_source: Option<F>,
349	output_spender: O,
350	change_destination_source: D,
351	kv_store: K,
352	logger: L,
353}
354
355impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356	OutputSweeper<B, D, E, F, K, L, O>
357where
358	B::Target: BroadcasterInterface,
359	D::Target: ChangeDestinationSource,
360	E::Target: FeeEstimator,
361	F::Target: Filter + Sync + Send,
362	K::Target: KVStore,
363	L::Target: Logger,
364	O::Target: OutputSpender,
365{
366	/// Constructs a new [`OutputSweeper`].
367	///
368	/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
369	/// need to register their [`Filter`] implementation via the given `chain_data_source`.
370	pub fn new(
371		best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372		output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373	) -> Self {
374		let outputs = Vec::new();
375		let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
376		Self {
377			sweeper_state,
378			broadcaster,
379			fee_estimator,
380			chain_data_source,
381			output_spender,
382			change_destination_source,
383			kv_store,
384			logger,
385		}
386	}
387
388	/// Tells the sweeper to track the given outputs descriptors.
389	///
390	/// Usually, this should be called based on the values emitted by the
391	/// [`Event::SpendableOutputs`].
392	///
393	/// The given `exclude_static_outputs` flag controls whether the sweeper will filter out
394	/// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
395	/// wallet implementation.
396	///
397	/// If `delay_until_height` is set, we will delay the spending until the respective block
398	/// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
399	///
400	/// Returns `Err` on persistence failure, in which case the call may be safely retried.
401	///
402	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
403	pub fn track_spendable_outputs(
404		&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
405		exclude_static_outputs: bool, delay_until_height: Option<u32>,
406	) -> Result<(), ()> {
407		let mut relevant_descriptors = output_descriptors
408			.into_iter()
409			.filter(|desc| {
410				!(exclude_static_outputs
411					&& matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
412			})
413			.peekable();
414
415		if relevant_descriptors.peek().is_none() {
416			return Ok(());
417		}
418
419		let spending_tx_opt;
420		{
421			let mut state_lock = self.sweeper_state.lock().unwrap();
422			for descriptor in relevant_descriptors {
423				let output_info = TrackedSpendableOutput {
424					descriptor,
425					channel_id,
426					status: OutputSpendStatus::PendingInitialBroadcast {
427						delayed_until_height: delay_until_height,
428					},
429				};
430
431				if state_lock
432					.outputs
433					.iter()
434					.find(|o| o.descriptor == output_info.descriptor)
435					.is_some()
436				{
437					continue;
438				}
439
440				state_lock.outputs.push(output_info);
441			}
442			spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
443			self.persist_state(&*state_lock).map_err(|e| {
444				log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
445			})?;
446		}
447
448		if let Some(spending_tx) = spending_tx_opt {
449			self.broadcaster.broadcast_transactions(&[&spending_tx]);
450		}
451
452		Ok(())
453	}
454
455	/// Returns a list of the currently tracked spendable outputs.
456	pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
457		self.sweeper_state.lock().unwrap().outputs.clone()
458	}
459
460	/// Gets the latest best block which was connected either via the [`Listen`] or
461	/// [`Confirm`] interfaces.
462	pub fn current_best_block(&self) -> BestBlock {
463		self.sweeper_state.lock().unwrap().best_block
464	}
465
466	fn regenerate_spend_if_necessary(
467		&self, sweeper_state: &mut SweeperState,
468	) -> Option<Transaction> {
469		let cur_height = sweeper_state.best_block.height;
470		let cur_hash = sweeper_state.best_block.block_hash;
471		let filter_fn = |o: &TrackedSpendableOutput| {
472			if o.status.is_confirmed() {
473				// Don't rebroadcast confirmed txs.
474				return false;
475			}
476
477			if o.status.is_delayed(cur_height) {
478				// Don't generate and broadcast if still delayed
479				return false;
480			}
481
482			if o.status.latest_broadcast_height() >= Some(cur_height) {
483				// Only broadcast once per block height.
484				return false;
485			}
486
487			true
488		};
489
490		let respend_descriptors: Vec<&SpendableOutputDescriptor> =
491			sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
492
493		if respend_descriptors.is_empty() {
494			// Nothing to do.
495			return None;
496		}
497
498		let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) {
499			Ok(spending_tx) => {
500				log_debug!(
501					self.logger,
502					"Generating and broadcasting sweeping transaction {}",
503					spending_tx.compute_txid()
504				);
505				spending_tx
506			},
507			Err(e) => {
508				log_error!(self.logger, "Error spending outputs: {:?}", e);
509				return None;
510			},
511		};
512
513		// As we didn't modify the state so far, the same filter_fn yields the same elements as
514		// above.
515		let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
516		for output_info in respend_outputs {
517			if let Some(filter) = self.chain_data_source.as_ref() {
518				let watched_output = output_info.to_watched_output(cur_hash);
519				filter.register_output(watched_output);
520			}
521
522			output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
523		}
524
525		Some(spending_tx)
526	}
527
528	fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
529		let cur_height = sweeper_state.best_block.height;
530
531		// Prune all outputs that have sufficient depth by now.
532		sweeper_state.outputs.retain(|o| {
533			if let Some(confirmation_height) = o.status.confirmation_height() {
534				// We wait at least `PRUNE_DELAY_BLOCKS` as before that
535				// `Event::SpendableOutputs` from lingering monitors might get replayed.
536				if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
537					log_debug!(self.logger,
538						"Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
539						o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
540					);
541					return false;
542				}
543			}
544			true
545		});
546	}
547
548	fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
549		self.kv_store
550			.write(
551				OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
552				OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
553				OUTPUT_SWEEPER_PERSISTENCE_KEY,
554				&sweeper_state.encode(),
555			)
556			.map_err(|e| {
557				log_error!(
558					self.logger,
559					"Write for key {}/{}/{} failed due to: {}",
560					OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
561					OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
562					OUTPUT_SWEEPER_PERSISTENCE_KEY,
563					e
564				);
565				e
566			})
567	}
568
569	fn spend_outputs(
570		&self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
571	) -> Result<Transaction, ()> {
572		let tx_feerate =
573			self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
574		let change_destination_script =
575			self.change_destination_source.get_change_destination_script()?;
576		let cur_height = sweeper_state.best_block.height;
577		let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
578		self.output_spender.spend_spendable_outputs(
579			&descriptors,
580			Vec::new(),
581			change_destination_script,
582			tx_feerate,
583			locktime,
584			&Secp256k1::new(),
585		)
586	}
587
588	fn transactions_confirmed_internal(
589		&self, sweeper_state: &mut SweeperState, header: &Header,
590		txdata: &chain::transaction::TransactionData, height: u32,
591	) {
592		let confirmation_hash = header.block_hash();
593		for (_, tx) in txdata {
594			for output_info in sweeper_state.outputs.iter_mut() {
595				if output_info.is_spent_in(*tx) {
596					output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
597				}
598			}
599		}
600	}
601
602	fn best_block_updated_internal(
603		&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
604	) -> Option<Transaction> {
605		sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
606		self.prune_confirmed_outputs(sweeper_state);
607		let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
608		spending_tx_opt
609	}
610}
611
612impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
613	for OutputSweeper<B, D, E, F, K, L, O>
614where
615	B::Target: BroadcasterInterface,
616	D::Target: ChangeDestinationSource,
617	E::Target: FeeEstimator,
618	F::Target: Filter + Sync + Send,
619	K::Target: KVStore,
620	L::Target: Logger,
621	O::Target: OutputSpender,
622{
623	fn filtered_block_connected(
624		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
625	) {
626		let mut spending_tx_opt;
627		{
628			let mut state_lock = self.sweeper_state.lock().unwrap();
629			assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
630				"Blocks must be connected in chain-order - the connected header must build on the last connected header");
631			assert_eq!(state_lock.best_block.height, height - 1,
632				"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
633
634			self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
635			spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
636
637			self.persist_state(&*state_lock).unwrap_or_else(|e| {
638				log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
639				// Skip broadcasting if the persist failed.
640				spending_tx_opt = None;
641			});
642		}
643
644		if let Some(spending_tx) = spending_tx_opt {
645			self.broadcaster.broadcast_transactions(&[&spending_tx]);
646		}
647	}
648
649	fn block_disconnected(&self, header: &Header, height: u32) {
650		let mut state_lock = self.sweeper_state.lock().unwrap();
651
652		let new_height = height - 1;
653		let block_hash = header.block_hash();
654
655		assert_eq!(state_lock.best_block.block_hash, block_hash,
656		"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
657		assert_eq!(state_lock.best_block.height, height,
658			"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
659		state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
660
661		for output_info in state_lock.outputs.iter_mut() {
662			if output_info.status.confirmation_hash() == Some(block_hash) {
663				debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
664				output_info.status.unconfirmed();
665			}
666		}
667
668		self.persist_state(&*state_lock).unwrap_or_else(|e| {
669			log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
670		});
671	}
672}
673
674impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
675	for OutputSweeper<B, D, E, F, K, L, O>
676where
677	B::Target: BroadcasterInterface,
678	D::Target: ChangeDestinationSource,
679	E::Target: FeeEstimator,
680	F::Target: Filter + Sync + Send,
681	K::Target: KVStore,
682	L::Target: Logger,
683	O::Target: OutputSpender,
684{
685	fn transactions_confirmed(
686		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
687	) {
688		let mut state_lock = self.sweeper_state.lock().unwrap();
689		self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
690		self.persist_state(&*state_lock).unwrap_or_else(|e| {
691			log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
692		});
693	}
694
695	fn transaction_unconfirmed(&self, txid: &Txid) {
696		let mut state_lock = self.sweeper_state.lock().unwrap();
697
698		// Get what height was unconfirmed.
699		let unconf_height = state_lock
700			.outputs
701			.iter()
702			.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
703			.and_then(|o| o.status.confirmation_height());
704
705		if let Some(unconf_height) = unconf_height {
706			// Unconfirm all >= this height.
707			state_lock
708				.outputs
709				.iter_mut()
710				.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
711				.for_each(|o| o.status.unconfirmed());
712
713			self.persist_state(&*state_lock).unwrap_or_else(|e| {
714				log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
715			});
716		}
717	}
718
719	fn best_block_updated(&self, header: &Header, height: u32) {
720		let mut spending_tx_opt;
721		{
722			let mut state_lock = self.sweeper_state.lock().unwrap();
723			spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
724			self.persist_state(&*state_lock).unwrap_or_else(|e| {
725				log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
726				// Skip broadcasting if the persist failed.
727				spending_tx_opt = None;
728			});
729		}
730
731		if let Some(spending_tx) = spending_tx_opt {
732			self.broadcaster.broadcast_transactions(&[&spending_tx]);
733		}
734	}
735
736	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
737		let state_lock = self.sweeper_state.lock().unwrap();
738		state_lock
739			.outputs
740			.iter()
741			.filter_map(|o| match o.status {
742				OutputSpendStatus::PendingThresholdConfirmations {
743					ref latest_spending_tx,
744					confirmation_height,
745					confirmation_hash,
746					..
747				} => Some((
748					latest_spending_tx.compute_txid(),
749					confirmation_height,
750					Some(confirmation_hash),
751				)),
752				_ => None,
753			})
754			.collect::<Vec<_>>()
755	}
756}
757
758#[derive(Debug, Clone)]
759struct SweeperState {
760	outputs: Vec<TrackedSpendableOutput>,
761	best_block: BestBlock,
762}
763
764impl_writeable_tlv_based!(SweeperState, {
765	(0, outputs, required_vec),
766	(2, best_block, required),
767});
768
769/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
770/// future block height is reached.
771#[derive(Debug, Clone)]
772pub enum SpendingDelay {
773	/// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks`
774	/// is reached.
775	Relative {
776		/// The number of blocks until we'll generate and broadcast the spending transaction.
777		num_blocks: u32,
778	},
779	/// An absolute delay indicating we shouldn't spend the output before `height` is reached.
780	Absolute {
781		/// The height at which we'll generate and broadcast the spending transaction.
782		height: u32,
783	},
784}
785
786impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
787	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for OutputSweeper<B, D, E, F, K, L, O>
788where
789	B::Target: BroadcasterInterface,
790	D::Target: ChangeDestinationSource,
791	E::Target: FeeEstimator,
792	F::Target: Filter + Sync + Send,
793	K::Target: KVStore,
794	L::Target: Logger,
795	O::Target: OutputSpender,
796{
797	#[inline]
798	fn read<R: io::Read>(
799		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
800	) -> Result<Self, DecodeError> {
801		let (
802			broadcaster,
803			fee_estimator,
804			chain_data_source,
805			output_spender,
806			change_destination_source,
807			kv_store,
808			logger,
809		) = args;
810		let state = SweeperState::read(reader)?;
811		let best_block = state.best_block;
812
813		if let Some(filter) = chain_data_source.as_ref() {
814			for output_info in &state.outputs {
815				let watched_output = output_info.to_watched_output(best_block.block_hash);
816				filter.register_output(watched_output);
817			}
818		}
819
820		let sweeper_state = Mutex::new(state);
821		Ok(Self {
822			sweeper_state,
823			broadcaster,
824			fee_estimator,
825			chain_data_source,
826			output_spender,
827			change_destination_source,
828			kv_store,
829			logger,
830		})
831	}
832}
833
834impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
835	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
836where
837	B::Target: BroadcasterInterface,
838	D::Target: ChangeDestinationSource,
839	E::Target: FeeEstimator,
840	F::Target: Filter + Sync + Send,
841	K::Target: KVStore,
842	L::Target: Logger,
843	O::Target: OutputSpender,
844{
845	#[inline]
846	fn read<R: io::Read>(
847		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
848	) -> Result<Self, DecodeError> {
849		let (
850			broadcaster,
851			fee_estimator,
852			chain_data_source,
853			output_spender,
854			change_destination_source,
855			kv_store,
856			logger,
857		) = args;
858		let state = SweeperState::read(reader)?;
859		let best_block = state.best_block;
860
861		if let Some(filter) = chain_data_source.as_ref() {
862			for output_info in &state.outputs {
863				let watched_output = output_info.to_watched_output(best_block.block_hash);
864				filter.register_output(watched_output);
865			}
866		}
867
868		let sweeper_state = Mutex::new(state);
869		Ok((
870			best_block,
871			OutputSweeper {
872				sweeper_state,
873				broadcaster,
874				fee_estimator,
875				chain_data_source,
876				output_spender,
877				change_destination_source,
878				kv_store,
879				logger,
880			},
881		))
882	}
883}