1use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError};
6
7use bitcoin::block::Block;
8use bitcoin::constants::ChainHash;
9use bitcoin::hash_types::BlockHash;
10use bitcoin::transaction::{OutPoint, TxOut};
11
12use lightning::ln::peer_handler::APeerManager;
13use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
14use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
15use lightning::util::logger::Logger;
16use lightning::util::native_async::FutureSpawner;
17
18use std::collections::VecDeque;
19use std::future::Future;
20use std::ops::Deref;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex};
23use std::task::Poll;
24
25pub trait UtxoSource: BlockSource + 'static {
32 fn get_block_hash_by_height<'a>(
37 &'a self, block_height: u32,
38 ) -> AsyncBlockSourceResult<'a, BlockHash>;
39
40 fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>;
43}
44
45#[cfg(feature = "tokio")]
46pub struct TokioSpawner;
48#[cfg(feature = "tokio")]
49impl FutureSpawner for TokioSpawner {
50 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
51 tokio::spawn(future);
52 }
53}
54
55pub(crate) struct Joiner<
58 A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
59 B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
60> {
61 pub a: A,
62 pub b: B,
63 a_res: Option<(BlockHash, Option<u32>)>,
64 b_res: Option<BlockHash>,
65}
66
67impl<
68 A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
69 B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
70 > Joiner<A, B>
71{
72 fn new(a: A, b: B) -> Self {
73 Self { a, b, a_res: None, b_res: None }
74 }
75}
76
77impl<
78 A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
79 B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
80 > Future for Joiner<A, B>
81{
82 type Output = Result<((BlockHash, Option<u32>), BlockHash), BlockSourceError>;
83 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
84 if self.a_res.is_none() {
85 match Pin::new(&mut self.a).poll(ctx) {
86 Poll::Ready(res) => {
87 if let Ok(ok) = res {
88 self.a_res = Some(ok);
89 } else {
90 return Poll::Ready(Err(res.unwrap_err()));
91 }
92 },
93 Poll::Pending => {},
94 }
95 }
96 if self.b_res.is_none() {
97 match Pin::new(&mut self.b).poll(ctx) {
98 Poll::Ready(res) => {
99 if let Ok(ok) = res {
100 self.b_res = Some(ok);
101 } else {
102 return Poll::Ready(Err(res.unwrap_err()));
103 }
104 },
105 Poll::Pending => {},
106 }
107 }
108 if let Some(b_res) = self.b_res {
109 if let Some(a_res) = self.a_res {
110 return Poll::Ready(Ok((a_res, b_res)));
111 }
112 }
113 Poll::Pending
114 }
115}
116
117pub struct GossipVerifier<
126 S: FutureSpawner,
127 Blocks: Deref + Send + Sync + 'static + Clone,
128 L: Deref + Send + Sync + 'static,
129> where
130 Blocks::Target: UtxoSource,
131 L::Target: Logger,
132{
133 source: Blocks,
134 peer_manager_wake: Arc<dyn Fn() + Send + Sync>,
135 gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
136 spawn: S,
137 block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
138}
139
140const BLOCK_CACHE_SIZE: usize = 5;
141
142impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync>
143 GossipVerifier<S, Blocks, L>
144where
145 Blocks::Target: UtxoSource,
146 L::Target: Logger,
147{
148 pub fn new<APM: Deref + Send + Sync + Clone + 'static>(
153 source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
154 peer_manager: APM,
155 ) -> Self
156 where
157 APM::Target: APeerManager,
158 {
159 let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events());
160 Self {
161 source,
162 spawn,
163 gossiper,
164 peer_manager_wake,
165 block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
166 }
167 }
168
169 async fn retrieve_utxo(
170 source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64,
171 ) -> Result<TxOut, UtxoLookupError> {
172 let block_height = (short_channel_id >> 5 * 8) as u32; let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
174 let output_index = (short_channel_id & 0xffff) as u16;
175
176 let (outpoint, output);
177
178 'tx_found: loop {
179 macro_rules! process_block {
180 ($block: expr) => {{
181 if transaction_index as usize >= $block.txdata.len() {
182 return Err(UtxoLookupError::UnknownTx);
183 }
184 let transaction = &$block.txdata[transaction_index as usize];
185 if output_index as usize >= transaction.output.len() {
186 return Err(UtxoLookupError::UnknownTx);
187 }
188
189 outpoint = OutPoint::new(transaction.compute_txid(), output_index.into());
190 output = transaction.output[output_index as usize].clone();
191 }};
192 }
193 {
194 let recent_blocks = block_cache.lock().unwrap();
195 for (height, block) in recent_blocks.iter() {
196 if *height == block_height {
197 process_block!(block);
198 break 'tx_found;
199 }
200 }
201 }
202
203 let ((_, tip_height_opt), block_hash) =
204 Joiner::new(source.get_best_block(), source.get_block_hash_by_height(block_height))
205 .await
206 .map_err(|_| UtxoLookupError::UnknownTx)?;
207 if let Some(tip_height) = tip_height_opt {
208 if block_height + 5 > tip_height {
214 return Err(UtxoLookupError::UnknownTx);
215 }
216 }
217 let block_data =
218 source.get_block(&block_hash).await.map_err(|_| UtxoLookupError::UnknownTx)?;
219 let block = match block_data {
220 BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
221 BlockData::FullBlock(block) => block,
222 };
223 process_block!(block);
224 {
225 let mut recent_blocks = block_cache.lock().unwrap();
226 let mut insert = true;
227 for (height, _) in recent_blocks.iter() {
228 if *height == block_height {
229 insert = false;
230 }
231 }
232 if insert {
233 if recent_blocks.len() >= BLOCK_CACHE_SIZE {
234 recent_blocks.pop_front();
235 }
236 recent_blocks.push_back((block_height, block));
237 }
238 }
239 break 'tx_found;
240 }
241 let outpoint_unspent =
242 source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?;
243 if outpoint_unspent {
244 Ok(output)
245 } else {
246 Err(UtxoLookupError::UnknownTx)
247 }
248 }
249}
250
251impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> Deref
252 for GossipVerifier<S, Blocks, L>
253where
254 Blocks::Target: UtxoSource,
255 L::Target: Logger,
256{
257 type Target = Self;
258 fn deref(&self) -> &Self {
259 self
260 }
261}
262
263impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> UtxoLookup
264 for GossipVerifier<S, Blocks, L>
265where
266 Blocks::Target: UtxoSource,
267 L::Target: Logger,
268{
269 fn get_utxo(&self, _chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult {
270 let res = UtxoFuture::new();
271 let fut = res.clone();
272 let source = self.source.clone();
273 let gossiper = Arc::clone(&self.gossiper);
274 let block_cache = Arc::clone(&self.block_cache);
275 let pmw = Arc::clone(&self.peer_manager_wake);
276 self.spawn.spawn(async move {
277 let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
278 fut.resolve(gossiper.network_graph(), &*gossiper, res);
279 (pmw)();
280 });
281 UtxoResult::Async(res)
282 }
283}