lightning/util/
sweep.rs

1// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4// You may not use this file except in accordance with one or both of these
5// licenses.
6
7//! This module contains an [`OutputSweeper`] utility that keeps track of
8//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStoreSync`] and regularly retries
9//! sweeping them.
10
11use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS};
13use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14use crate::io;
15use crate::ln::msgs::DecodeError;
16use crate::ln::types::ChannelId;
17use crate::prelude::*;
18use crate::sign::{
19	ChangeDestinationSource, ChangeDestinationSourceSync, ChangeDestinationSourceSyncWrapper,
20	OutputSpender, SpendableOutputDescriptor,
21};
22use crate::sync::Mutex;
23use crate::util::logger::Logger;
24use crate::util::persist::{
25	KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY,
26	OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
27};
28use crate::util::ser::{Readable, ReadableArgs, Writeable};
29use crate::{impl_writeable_tlv_based, log_debug, log_error};
30
31use bitcoin::block::Header;
32use bitcoin::locktime::absolute::LockTime;
33use bitcoin::secp256k1::Secp256k1;
34use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
35
36use core::future::Future;
37use core::ops::Deref;
38use core::pin::Pin;
39use core::sync::atomic::{AtomicBool, Ordering};
40use core::task;
41
42use super::async_poll::dummy_waker;
43
44/// The number of blocks we wait before we prune the tracked spendable outputs.
45pub const PRUNE_DELAY_BLOCKS: u32 = ARCHIVAL_DELAY_BLOCKS + ANTI_REORG_DELAY;
46
47/// The state of a spendable output currently tracked by an [`OutputSweeper`].
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub struct TrackedSpendableOutput {
50	/// The tracked output descriptor.
51	pub descriptor: SpendableOutputDescriptor,
52	/// The channel this output belongs to.
53	///
54	/// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
55	pub channel_id: Option<ChannelId>,
56	/// The current status of the output spend.
57	pub status: OutputSpendStatus,
58}
59
60impl TrackedSpendableOutput {
61	fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
62		let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
63		match &self.descriptor {
64			SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
65				WatchedOutput {
66					block_hash,
67					outpoint: *outpoint,
68					script_pubkey: output.script_pubkey.clone(),
69				}
70			},
71			SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
72				block_hash,
73				outpoint: output.outpoint,
74				script_pubkey: output.output.script_pubkey.clone(),
75			},
76			SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
77				block_hash,
78				outpoint: output.outpoint,
79				script_pubkey: output.output.script_pubkey.clone(),
80			},
81		}
82	}
83
84	/// Returns whether the output is spent in the given transaction.
85	pub fn is_spent_in(&self, tx: &Transaction) -> bool {
86		let prev_outpoint = self.descriptor.spendable_outpoint().into_bitcoin_outpoint();
87		tx.input.iter().any(|input| input.previous_output == prev_outpoint)
88	}
89}
90
91impl_writeable_tlv_based!(TrackedSpendableOutput, {
92	(0, descriptor, required),
93	(2, channel_id, option),
94	(4, status, required),
95});
96
97/// The current status of the output spend.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub enum OutputSpendStatus {
100	/// The output is tracked but an initial spending transaction hasn't been generated and
101	/// broadcasted yet.
102	PendingInitialBroadcast {
103		/// The height at which we will first generate and broadcast a spending transaction.
104		delayed_until_height: Option<u32>,
105	},
106	/// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
107	PendingFirstConfirmation {
108		/// The hash of the chain tip when we first broadcast a transaction spending this output.
109		first_broadcast_hash: BlockHash,
110		/// The best height when we last broadcast a transaction spending this output.
111		latest_broadcast_height: u32,
112		/// The transaction spending this output we last broadcasted.
113		latest_spending_tx: Transaction,
114	},
115	/// A transaction spending the output has been confirmed on-chain but will be tracked until it
116	/// reaches at least [`PRUNE_DELAY_BLOCKS`] confirmations to ensure [`Event::SpendableOutputs`]
117	/// stemming from lingering [`ChannelMonitor`]s can safely be replayed.
118	///
119	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
120	/// [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor
121	PendingThresholdConfirmations {
122		/// The hash of the chain tip when we first broadcast a transaction spending this output.
123		first_broadcast_hash: BlockHash,
124		/// The best height when we last broadcast a transaction spending this output.
125		latest_broadcast_height: u32,
126		/// The transaction spending this output we saw confirmed on-chain.
127		latest_spending_tx: Transaction,
128		/// The height at which the spending transaction was confirmed.
129		confirmation_height: u32,
130		/// The hash of the block in which the spending transaction was confirmed.
131		confirmation_hash: BlockHash,
132	},
133}
134
135impl OutputSpendStatus {
136	fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
137		match self {
138			Self::PendingInitialBroadcast { delayed_until_height } => {
139				if let Some(delayed_until_height) = delayed_until_height {
140					debug_assert!(
141						cur_height >= *delayed_until_height,
142						"We should never broadcast before the required height is reached."
143					);
144				}
145				*self = Self::PendingFirstConfirmation {
146					first_broadcast_hash: cur_hash,
147					latest_broadcast_height: cur_height,
148					latest_spending_tx,
149				};
150			},
151			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
152				*self = Self::PendingFirstConfirmation {
153					first_broadcast_hash: *first_broadcast_hash,
154					latest_broadcast_height: cur_height,
155					latest_spending_tx,
156				};
157			},
158			Self::PendingThresholdConfirmations { .. } => {
159				debug_assert!(false, "We should never rebroadcast confirmed transactions.");
160			},
161		}
162	}
163
164	fn confirmed(
165		&mut self, confirmation_hash: BlockHash, confirmation_height: u32,
166		latest_spending_tx: Transaction,
167	) {
168		match self {
169			Self::PendingInitialBroadcast { .. } => {
170				// Generally we can't see any of our transactions confirmed if they haven't been
171				// broadcasted yet, so this should never be reachable via `transactions_confirmed`.
172				debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
173				*self = Self::PendingThresholdConfirmations {
174					first_broadcast_hash: confirmation_hash,
175					latest_broadcast_height: confirmation_height,
176					latest_spending_tx,
177					confirmation_height,
178					confirmation_hash,
179				};
180			},
181			Self::PendingFirstConfirmation {
182				first_broadcast_hash,
183				latest_broadcast_height,
184				..
185			} => {
186				*self = Self::PendingThresholdConfirmations {
187					first_broadcast_hash: *first_broadcast_hash,
188					latest_broadcast_height: *latest_broadcast_height,
189					latest_spending_tx,
190					confirmation_height,
191					confirmation_hash,
192				};
193			},
194			Self::PendingThresholdConfirmations {
195				first_broadcast_hash,
196				latest_broadcast_height,
197				..
198			} => {
199				*self = Self::PendingThresholdConfirmations {
200					first_broadcast_hash: *first_broadcast_hash,
201					latest_broadcast_height: *latest_broadcast_height,
202					latest_spending_tx,
203					confirmation_height,
204					confirmation_hash,
205				};
206			},
207		}
208	}
209
210	fn unconfirmed(&mut self) {
211		match self {
212			Self::PendingInitialBroadcast { .. } => {
213				debug_assert!(
214					false,
215					"We should only mark a spend as unconfirmed if it used to be confirmed."
216				);
217			},
218			Self::PendingFirstConfirmation { .. } => {
219				debug_assert!(
220					false,
221					"We should only mark a spend as unconfirmed if it used to be confirmed."
222				);
223			},
224			Self::PendingThresholdConfirmations {
225				first_broadcast_hash,
226				latest_broadcast_height,
227				latest_spending_tx,
228				..
229			} => {
230				*self = Self::PendingFirstConfirmation {
231					first_broadcast_hash: *first_broadcast_hash,
232					latest_broadcast_height: *latest_broadcast_height,
233					latest_spending_tx: latest_spending_tx.clone(),
234				};
235			},
236		}
237	}
238
239	fn is_delayed(&self, cur_height: u32) -> bool {
240		match self {
241			Self::PendingInitialBroadcast { delayed_until_height } => {
242				delayed_until_height.map_or(false, |req_height| cur_height < req_height)
243			},
244			Self::PendingFirstConfirmation { .. } => false,
245			Self::PendingThresholdConfirmations { .. } => false,
246		}
247	}
248
249	fn first_broadcast_hash(&self) -> Option<BlockHash> {
250		match self {
251			Self::PendingInitialBroadcast { .. } => None,
252			Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
253				Some(*first_broadcast_hash)
254			},
255			Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
256				Some(*first_broadcast_hash)
257			},
258		}
259	}
260
261	fn latest_broadcast_height(&self) -> Option<u32> {
262		match self {
263			Self::PendingInitialBroadcast { .. } => None,
264			Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
265				Some(*latest_broadcast_height)
266			},
267			Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
268				Some(*latest_broadcast_height)
269			},
270		}
271	}
272
273	fn confirmation_height(&self) -> Option<u32> {
274		match self {
275			Self::PendingInitialBroadcast { .. } => None,
276			Self::PendingFirstConfirmation { .. } => None,
277			Self::PendingThresholdConfirmations { confirmation_height, .. } => {
278				Some(*confirmation_height)
279			},
280		}
281	}
282
283	fn latest_spending_tx(&self) -> Option<&Transaction> {
284		match self {
285			Self::PendingInitialBroadcast { .. } => None,
286			Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
287			Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
288				Some(latest_spending_tx)
289			},
290		}
291	}
292
293	fn is_confirmed(&self) -> bool {
294		match self {
295			Self::PendingInitialBroadcast { .. } => false,
296			Self::PendingFirstConfirmation { .. } => false,
297			Self::PendingThresholdConfirmations { .. } => true,
298		}
299	}
300}
301
302impl_writeable_tlv_based_enum!(OutputSpendStatus,
303	(0, PendingInitialBroadcast) => {
304		(0, delayed_until_height, option),
305	},
306	(2, PendingFirstConfirmation) => {
307		(0, first_broadcast_hash, required),
308		(2, latest_broadcast_height, required),
309		(4, latest_spending_tx, required),
310	},
311	(4, PendingThresholdConfirmations) => {
312		(0, first_broadcast_hash, required),
313		(2, latest_broadcast_height, required),
314		(4, latest_spending_tx, required),
315		(6, confirmation_height, required),
316		(8, confirmation_hash, required),
317	},
318);
319
320/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
321/// [`KVStoreSync`] and regularly retries sweeping them based on a callback given to the constructor
322/// methods.
323///
324/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`].
325///
326/// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
327/// implementation and hence has to be connected with the utilized chain data sources.
328///
329/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
330/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
331/// constructor.
332///
333/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
334pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
335where
336	B::Target: BroadcasterInterface,
337	D::Target: ChangeDestinationSource,
338	E::Target: FeeEstimator,
339	F::Target: Filter,
340	K::Target: KVStore,
341	L::Target: Logger,
342	O::Target: OutputSpender,
343{
344	sweeper_state: Mutex<SweeperState>,
345	pending_sweep: AtomicBool,
346	broadcaster: B,
347	fee_estimator: E,
348	chain_data_source: Option<F>,
349	output_spender: O,
350	change_destination_source: D,
351	kv_store: K,
352	logger: L,
353}
354
355impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356	OutputSweeper<B, D, E, F, K, L, O>
357where
358	B::Target: BroadcasterInterface,
359	D::Target: ChangeDestinationSource,
360	E::Target: FeeEstimator,
361	F::Target: Filter,
362	K::Target: KVStore,
363	L::Target: Logger,
364	O::Target: OutputSpender,
365{
366	/// Constructs a new [`OutputSweeper`].
367	///
368	/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
369	/// need to register their [`Filter`] implementation via the given `chain_data_source`.
370	pub fn new(
371		best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372		output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373	) -> Self {
374		let outputs = Vec::new();
375		let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
376		Self {
377			sweeper_state,
378			pending_sweep: AtomicBool::new(false),
379			broadcaster,
380			fee_estimator,
381			chain_data_source,
382			output_spender,
383			change_destination_source,
384			kv_store,
385			logger,
386		}
387	}
388
389	/// Tells the sweeper to track the given outputs descriptors.
390	///
391	/// Usually, this should be called based on the values emitted by the
392	/// [`Event::SpendableOutputs`].
393	///
394	/// The given `exclude_static_outputs` flag controls whether the sweeper will filter out
395	/// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
396	/// wallet implementation.
397	///
398	/// If `delay_until_height` is set, we will delay the spending until the respective block
399	/// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
400	///
401	/// Returns `Err` on persistence failure, in which case the call may be safely retried.
402	///
403	/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
404	pub async fn track_spendable_outputs(
405		&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
406		exclude_static_outputs: bool, delay_until_height: Option<u32>,
407	) -> Result<(), ()> {
408		let mut relevant_descriptors = output_descriptors
409			.into_iter()
410			.filter(|desc| {
411				!(exclude_static_outputs
412					&& matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
413			})
414			.peekable();
415
416		if relevant_descriptors.peek().is_none() {
417			return Ok(());
418		}
419
420		self.update_state(|state_lock| -> Result<((), bool), ()> {
421			for descriptor in relevant_descriptors {
422				let output_info = TrackedSpendableOutput {
423					descriptor,
424					channel_id,
425					status: OutputSpendStatus::PendingInitialBroadcast {
426						delayed_until_height: delay_until_height,
427					},
428				};
429
430				if state_lock
431					.outputs
432					.iter()
433					.find(|o| o.descriptor == output_info.descriptor)
434					.is_some()
435				{
436					continue;
437				}
438
439				state_lock.outputs.push(output_info);
440				state_lock.dirty = true;
441			}
442
443			Ok(((), false))
444		})
445		.await
446	}
447
448	/// Returns a list of the currently tracked spendable outputs.
449	pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
450		self.sweeper_state.lock().unwrap().outputs.clone()
451	}
452
453	/// Gets the latest best block which was connected either via the [`Listen`] or
454	/// [`Confirm`] interfaces.
455	pub fn current_best_block(&self) -> BestBlock {
456		self.sweeper_state.lock().unwrap().best_block
457	}
458
459	/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
460	/// no-op if a sweep is already pending.
461	pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
462		// Prevent concurrent sweeps.
463		if self
464			.pending_sweep
465			.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
466			.is_err()
467		{
468			return Ok(());
469		}
470
471		let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
472
473		// Release the pending sweep flag again, regardless of result.
474		self.pending_sweep.store(false, Ordering::Release);
475
476		result
477	}
478
479	/// Regenerates and broadcasts the spending transaction for any outputs that are pending
480	async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<(), ()> {
481		let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
482			if o.status.is_confirmed() {
483				// Don't rebroadcast confirmed txs.
484				return false;
485			}
486
487			if o.status.is_delayed(cur_height) {
488				// Don't generate and broadcast if still delayed
489				return false;
490			}
491
492			if o.status.latest_broadcast_height() >= Some(cur_height) {
493				// Only broadcast once per block height.
494				return false;
495			}
496
497			true
498		};
499
500		// See if there is anything to sweep before requesting a change address.
501		let has_respends = self
502			.update_state(|sweeper_state| -> Result<(bool, bool), ()> {
503				let cur_height = sweeper_state.best_block.height;
504				let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
505
506				// If there are respends, we can postpone persisting a potentially dirty state until after the sweep.
507				Ok((has_respends, has_respends))
508			})
509			.await?;
510
511		if !has_respends {
512			return Ok(());
513		}
514
515		// Request a new change address outside of the mutex to avoid the mutex crossing await.
516		let change_destination_script =
517			self.change_destination_source.get_change_destination_script().await?;
518
519		// Sweep the outputs.
520		let spending_tx = self
521			.update_state(|sweeper_state| -> Result<(Option<Transaction>, bool), ()> {
522				let cur_height = sweeper_state.best_block.height;
523				let cur_hash = sweeper_state.best_block.block_hash;
524
525				let respend_descriptors_set: HashSet<&SpendableOutputDescriptor> = sweeper_state
526					.outputs
527					.iter()
528					.filter(|o| filter_fn(*o, cur_height))
529					.map(|o| &o.descriptor)
530					.collect();
531
532				// we first collect into a set to avoid duplicates and to "randomize" the order
533				// in which outputs are spent. Then we collect into a vec as that is what
534				// `spend_outputs` requires.
535				let respend_descriptors: Vec<&SpendableOutputDescriptor> =
536					respend_descriptors_set.into_iter().collect();
537
538				// Generate the spending transaction and broadcast it.
539				if !respend_descriptors.is_empty() {
540					let spending_tx = self
541						.spend_outputs(
542							&sweeper_state,
543							&respend_descriptors,
544							change_destination_script,
545						)
546						.map_err(|e| {
547							log_error!(self.logger, "Error spending outputs: {:?}", e);
548						})?;
549
550					log_debug!(
551						self.logger,
552						"Generating and broadcasting sweeping transaction {}",
553						spending_tx.compute_txid()
554					);
555
556					// As we didn't modify the state so far, the same filter_fn yields the same elements as
557					// above.
558					let respend_outputs =
559						sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
560					for output_info in respend_outputs {
561						if let Some(filter) = self.chain_data_source.as_ref() {
562							let watched_output = output_info.to_watched_output(cur_hash);
563							filter.register_output(watched_output);
564						}
565
566						output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
567						sweeper_state.dirty = true;
568					}
569
570					Ok((Some(spending_tx), false))
571				} else {
572					Ok((None, false))
573				}
574			})
575			.await?;
576
577		// Persistence completely successfully. If we have a spending transaction, we broadcast it.
578		if let Some(spending_tx) = spending_tx {
579			self.broadcaster.broadcast_transactions(&[&spending_tx]);
580		}
581
582		Ok(())
583	}
584
585	fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
586		let cur_height = sweeper_state.best_block.height;
587
588		// Prune all outputs that have sufficient depth by now.
589		sweeper_state.outputs.retain(|o| {
590			if let Some(confirmation_height) = o.status.confirmation_height() {
591				// We wait at least `PRUNE_DELAY_BLOCKS` as before that
592				// `Event::SpendableOutputs` from lingering monitors might get replayed.
593				if cur_height >= confirmation_height + PRUNE_DELAY_BLOCKS - 1 {
594					log_debug!(self.logger,
595						"Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
596						o.status.latest_spending_tx().map(|t| t.compute_txid()), o.descriptor
597					);
598					return false;
599				}
600			}
601			true
602		});
603
604		sweeper_state.dirty = true;
605	}
606
607	fn persist_state<'a>(
608		&self, sweeper_state: &SweeperState,
609	) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> {
610		let encoded = sweeper_state.encode();
611
612		self.kv_store.write(
613			OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
614			OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
615			OUTPUT_SWEEPER_PERSISTENCE_KEY,
616			encoded,
617		)
618	}
619
620	/// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
621	/// unless skip_persist is true. Returning true for skip_persist allows the callback to postpone persisting a
622	/// potentially dirty state.
623	async fn update_state<X>(
624		&self, callback: impl FnOnce(&mut SweeperState) -> Result<(X, bool), ()>,
625	) -> Result<X, ()> {
626		let (fut, res) = {
627			let mut state_lock = self.sweeper_state.lock().unwrap();
628
629			let (res, skip_persist) = callback(&mut state_lock)?;
630			if !state_lock.dirty || skip_persist {
631				return Ok(res);
632			}
633
634			state_lock.dirty = false;
635
636			(self.persist_state(&state_lock), res)
637		};
638
639		fut.await.map_err(|e| {
640			self.sweeper_state.lock().unwrap().dirty = true;
641
642			log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
643		})?;
644
645		Ok(res)
646	}
647
648	fn spend_outputs(
649		&self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
650		change_destination_script: ScriptBuf,
651	) -> Result<Transaction, ()> {
652		let tx_feerate =
653			self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
654		let cur_height = sweeper_state.best_block.height;
655		let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
656		self.output_spender.spend_spendable_outputs(
657			descriptors,
658			Vec::new(),
659			change_destination_script,
660			tx_feerate,
661			locktime,
662			&Secp256k1::new(),
663		)
664	}
665
666	fn transactions_confirmed_internal(
667		&self, sweeper_state: &mut SweeperState, header: &Header,
668		txdata: &chain::transaction::TransactionData, height: u32,
669	) {
670		let confirmation_hash = header.block_hash();
671		for (_, tx) in txdata {
672			for output_info in sweeper_state.outputs.iter_mut() {
673				if output_info.is_spent_in(*tx) {
674					output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
675				}
676			}
677		}
678
679		sweeper_state.dirty = true;
680	}
681
682	fn best_block_updated_internal(
683		&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
684	) {
685		sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
686		self.prune_confirmed_outputs(sweeper_state);
687
688		sweeper_state.dirty = true;
689	}
690}
691
692impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
693	for OutputSweeper<B, D, E, F, K, L, O>
694where
695	B::Target: BroadcasterInterface,
696	D::Target: ChangeDestinationSource,
697	E::Target: FeeEstimator,
698	F::Target: Filter + Sync + Send,
699	K::Target: KVStore,
700	L::Target: Logger,
701	O::Target: OutputSpender,
702{
703	fn filtered_block_connected(
704		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
705	) {
706		let mut state_lock = self.sweeper_state.lock().unwrap();
707		assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
708			"Blocks must be connected in chain-order - the connected header must build on the last connected header");
709		assert_eq!(state_lock.best_block.height, height - 1,
710			"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
711
712		self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
713		self.best_block_updated_internal(&mut state_lock, header, height);
714	}
715
716	fn blocks_disconnected(&self, fork_point: BestBlock) {
717		let mut state_lock = self.sweeper_state.lock().unwrap();
718
719		assert!(state_lock.best_block.height > fork_point.height,
720			"Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
721		state_lock.best_block = fork_point;
722
723		for output_info in state_lock.outputs.iter_mut() {
724			if output_info.status.confirmation_height() > Some(fork_point.height) {
725				output_info.status.unconfirmed();
726			}
727		}
728
729		state_lock.dirty = true;
730	}
731}
732
733impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
734	for OutputSweeper<B, D, E, F, K, L, O>
735where
736	B::Target: BroadcasterInterface,
737	D::Target: ChangeDestinationSource,
738	E::Target: FeeEstimator,
739	F::Target: Filter + Sync + Send,
740	K::Target: KVStore,
741	L::Target: Logger,
742	O::Target: OutputSpender,
743{
744	fn transactions_confirmed(
745		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
746	) {
747		let mut state_lock = self.sweeper_state.lock().unwrap();
748		self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
749	}
750
751	fn transaction_unconfirmed(&self, txid: &Txid) {
752		let mut state_lock = self.sweeper_state.lock().unwrap();
753
754		// Get what height was unconfirmed.
755		let unconf_height = state_lock
756			.outputs
757			.iter()
758			.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
759			.and_then(|o| o.status.confirmation_height());
760
761		if let Some(unconf_height) = unconf_height {
762			// Unconfirm all >= this height.
763			state_lock
764				.outputs
765				.iter_mut()
766				.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
767				.for_each(|o| o.status.unconfirmed());
768
769			state_lock.dirty = true;
770		}
771	}
772
773	fn best_block_updated(&self, header: &Header, height: u32) {
774		let mut state_lock = self.sweeper_state.lock().unwrap();
775		self.best_block_updated_internal(&mut state_lock, header, height);
776	}
777
778	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
779		let state_lock = self.sweeper_state.lock().unwrap();
780		state_lock
781			.outputs
782			.iter()
783			.filter_map(|o| match o.status {
784				OutputSpendStatus::PendingThresholdConfirmations {
785					ref latest_spending_tx,
786					confirmation_height,
787					confirmation_hash,
788					..
789				} => Some((
790					latest_spending_tx.compute_txid(),
791					confirmation_height,
792					Some(confirmation_hash),
793				)),
794				_ => None,
795			})
796			.collect::<Vec<_>>()
797	}
798}
799
800#[derive(Debug, Clone)]
801struct SweeperState {
802	outputs: Vec<TrackedSpendableOutput>,
803	best_block: BestBlock,
804	dirty: bool,
805}
806
807impl_writeable_tlv_based!(SweeperState, {
808	(0, outputs, required_vec),
809	(2, best_block, required),
810	(_unused, dirty, (static_value, false)),
811});
812
813/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
814/// future block height is reached.
815#[derive(Debug, Clone)]
816pub enum SpendingDelay {
817	/// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks`
818	/// is reached.
819	Relative {
820		/// The number of blocks until we'll generate and broadcast the spending transaction.
821		num_blocks: u32,
822	},
823	/// An absolute delay indicating we shouldn't spend the output before `height` is reached.
824	Absolute {
825		/// The height at which we'll generate and broadcast the spending transaction.
826		height: u32,
827	},
828}
829
830impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
831	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
832where
833	B::Target: BroadcasterInterface,
834	D::Target: ChangeDestinationSource,
835	E::Target: FeeEstimator,
836	F::Target: Filter + Sync + Send,
837	K::Target: KVStore,
838	L::Target: Logger,
839	O::Target: OutputSpender,
840{
841	#[inline]
842	fn read<R: io::Read>(
843		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
844	) -> Result<Self, DecodeError> {
845		let (
846			broadcaster,
847			fee_estimator,
848			chain_data_source,
849			output_spender,
850			change_destination_source,
851			kv_store,
852			logger,
853		) = args;
854		let state = SweeperState::read(reader)?;
855		let best_block = state.best_block;
856
857		if let Some(filter) = chain_data_source.as_ref() {
858			for output_info in &state.outputs {
859				let watched_output = output_info.to_watched_output(best_block.block_hash);
860				filter.register_output(watched_output);
861			}
862		}
863
864		let sweeper_state = Mutex::new(state);
865		Ok((
866			best_block,
867			OutputSweeper {
868				sweeper_state,
869				pending_sweep: AtomicBool::new(false),
870				broadcaster,
871				fee_estimator,
872				chain_data_source,
873				output_spender,
874				change_destination_source,
875				kv_store,
876				logger,
877			},
878		))
879	}
880}
881
882/// A synchronous wrapper around [`OutputSweeper`] to be used in contexts where async is not available.
883pub struct OutputSweeperSync<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
884where
885	B::Target: BroadcasterInterface,
886	D::Target: ChangeDestinationSourceSync,
887	E::Target: FeeEstimator,
888	F::Target: Filter,
889	K::Target: KVStoreSync,
890	L::Target: Logger,
891	O::Target: OutputSpender,
892{
893	sweeper:
894		OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>,
895}
896
897impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
898	OutputSweeperSync<B, D, E, F, K, L, O>
899where
900	B::Target: BroadcasterInterface,
901	D::Target: ChangeDestinationSourceSync,
902	E::Target: FeeEstimator,
903	F::Target: Filter,
904	K::Target: KVStoreSync,
905	L::Target: Logger,
906	O::Target: OutputSpender,
907{
908	/// Constructs a new [`OutputSweeperSync`] instance.
909	pub fn new(
910		best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
911		output_spender: O, change_destination_source: D, kv_store: K, logger: L,
912	) -> Self {
913		let change_destination_source =
914			ChangeDestinationSourceSyncWrapper::new(change_destination_source);
915
916		let kv_store = KVStoreSyncWrapper(kv_store);
917
918		let sweeper = OutputSweeper::new(
919			best_block,
920			broadcaster,
921			fee_estimator,
922			chain_data_source,
923			output_spender,
924			change_destination_source,
925			kv_store,
926			logger,
927		);
928		Self { sweeper }
929	}
930
931	/// Wrapper around [`OutputSweeper::track_spendable_outputs`].
932	pub fn track_spendable_outputs(
933		&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
934		exclude_static_outputs: bool, delay_until_height: Option<u32>,
935	) -> Result<(), ()> {
936		let mut fut = Box::pin(self.sweeper.track_spendable_outputs(
937			output_descriptors,
938			channel_id,
939			exclude_static_outputs,
940			delay_until_height,
941		));
942		let mut waker = dummy_waker();
943		let mut ctx = task::Context::from_waker(&mut waker);
944		match fut.as_mut().poll(&mut ctx) {
945			task::Poll::Ready(result) => result,
946			task::Poll::Pending => {
947				// In a sync context, we can't wait for the future to complete.
948				unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context");
949			},
950		}
951	}
952
953	/// Returns a list of the currently tracked spendable outputs. Wraps [`OutputSweeper::tracked_spendable_outputs`].
954	pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
955		self.sweeper.tracked_spendable_outputs()
956	}
957
958	/// Gets the latest best block which was connected either via [`Listen`] or [`Confirm`]
959	/// interfaces.
960	pub fn current_best_block(&self) -> BestBlock {
961		self.sweeper.current_best_block()
962	}
963
964	/// Regenerates and broadcasts the spending transaction for any outputs that are pending. Wraps
965	/// [`OutputSweeper::regenerate_and_broadcast_spend_if_necessary`].
966	pub fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
967		let mut fut = Box::pin(self.sweeper.regenerate_and_broadcast_spend_if_necessary());
968		let mut waker = dummy_waker();
969		let mut ctx = task::Context::from_waker(&mut waker);
970		match fut.as_mut().poll(&mut ctx) {
971			task::Poll::Ready(result) => result,
972			task::Poll::Pending => {
973				// In a sync context, we can't wait for the future to complete.
974				unreachable!("OutputSweeper::regenerate_and_broadcast_spend_if_necessary should not be pending in a sync context");
975			},
976		}
977	}
978
979	/// Fetch the inner async sweeper.
980	///
981	/// In general you shouldn't have much reason to use this - you have a sync [`KVStore`] backing
982	/// this [`OutputSweeperSync`], fetching an async [`OutputSweeper`] won't accomplish much, all
983	/// the async methods will hang waiting on your sync [`KVStore`] and likely confuse your async
984	/// runtime. This exists primarily for LDK-internal use, including outside of this crate.
985	#[doc(hidden)]
986	pub fn sweeper_async(
987		&self,
988	) -> &OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, KVStoreSyncWrapper<K>, L, O>
989	{
990		&self.sweeper
991	}
992}
993
994impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
995	for OutputSweeperSync<B, D, E, F, K, L, O>
996where
997	B::Target: BroadcasterInterface,
998	D::Target: ChangeDestinationSourceSync,
999	E::Target: FeeEstimator,
1000	F::Target: Filter + Sync + Send,
1001	K::Target: KVStoreSync,
1002	L::Target: Logger,
1003	O::Target: OutputSpender,
1004{
1005	fn filtered_block_connected(
1006		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1007	) {
1008		self.sweeper.filtered_block_connected(header, txdata, height);
1009	}
1010
1011	fn blocks_disconnected(&self, fork_point: BestBlock) {
1012		self.sweeper.blocks_disconnected(fork_point);
1013	}
1014}
1015
1016impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
1017	for OutputSweeperSync<B, D, E, F, K, L, O>
1018where
1019	B::Target: BroadcasterInterface,
1020	D::Target: ChangeDestinationSourceSync,
1021	E::Target: FeeEstimator,
1022	F::Target: Filter + Sync + Send,
1023	K::Target: KVStoreSync,
1024	L::Target: Logger,
1025	O::Target: OutputSpender,
1026{
1027	fn transactions_confirmed(
1028		&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
1029	) {
1030		self.sweeper.transactions_confirmed(header, txdata, height)
1031	}
1032
1033	fn transaction_unconfirmed(&self, txid: &Txid) {
1034		self.sweeper.transaction_unconfirmed(txid)
1035	}
1036
1037	fn best_block_updated(&self, header: &Header, height: u32) {
1038		self.sweeper.best_block_updated(header, height);
1039	}
1040
1041	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
1042		self.sweeper.get_relevant_txids()
1043	}
1044}
1045
1046impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
1047	ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeperSync<B, D, E, F, K, L, O>)
1048where
1049	B::Target: BroadcasterInterface,
1050	D::Target: ChangeDestinationSourceSync,
1051	E::Target: FeeEstimator,
1052	F::Target: Filter + Sync + Send,
1053	K::Target: KVStoreSync,
1054	L::Target: Logger,
1055	O::Target: OutputSpender,
1056{
1057	#[inline]
1058	fn read<R: io::Read>(
1059		reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
1060	) -> Result<Self, DecodeError> {
1061		let (a, b, c, d, change_destination_source, kv_store, e) = args;
1062		let change_destination_source =
1063			ChangeDestinationSourceSyncWrapper::new(change_destination_source);
1064		let kv_store = KVStoreSyncWrapper(kv_store);
1065		let args = (a, b, c, d, change_destination_source, kv_store, e);
1066		let (best_block, sweeper) =
1067			<(BestBlock, OutputSweeper<_, _, _, _, _, _, _>)>::read(reader, args)?;
1068		Ok((best_block, OutputSweeperSync { sweeper }))
1069	}
1070}