1use bdk_core::spk_client::FullScanResponse;
4pub use electrum_streaming;
6use electrum_streaming::client::RequestError;
7use electrum_streaming::notification::Notification;
8use electrum_streaming::pending_request::{ErroredRequest, PendingRequestTuple, SatisfiedRequest};
9use electrum_streaming::{request, Client, ElectrumScriptHash, Event, ResponseError};
10use futures::channel::mpsc::{self, UnboundedReceiver};
11use futures::channel::oneshot;
12use futures::{select, AsyncRead, AsyncWrite, FutureExt, StreamExt};
13use futures_timer::Delay;
14
15use std::collections::{btree_map, hash_map, BTreeMap, BTreeSet, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18
19use bdk_core::bitcoin::block::Header;
20use bdk_core::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, Txid};
21use bdk_core::{collections::HashMap, CheckPoint};
22use bdk_core::{BlockId, ConfirmationBlockTime, TxUpdate};
23use miniscript::{Descriptor, DescriptorPublicKey};
24
25pub type Update<K> = FullScanResponse<K, ConfirmationBlockTime>;
26pub type HeaderCache = HashMap<u32, (BlockHash, Header)>;
27
28#[derive(Debug, Clone)]
37pub struct DerivedSpkTracker<K: Clone + Ord + Send + Sync + 'static> {
38 lookahead: u32,
39 descriptors: BTreeMap<K, Descriptor<DescriptorPublicKey>>,
40 derived_spks: BTreeMap<(K, u32), ElectrumScriptHash>,
41 derived_spks_rev: HashMap<ElectrumScriptHash, (K, u32)>,
42}
43
44impl<K: Clone + Ord + Send + Sync + 'static> DerivedSpkTracker<K> {
45 pub fn new(lookahead: u32) -> Self {
46 Self {
47 lookahead,
48 descriptors: BTreeMap::new(),
49 derived_spks: BTreeMap::new(),
50 derived_spks_rev: HashMap::new(),
51 }
52 }
53
54 pub fn all_spk_hashes(&self) -> impl Iterator<Item = ElectrumScriptHash> + '_ {
55 self.derived_spks.values().copied()
56 }
57
58 fn _add_derived_spk(&mut self, keychain: K, index: u32) -> Option<ElectrumScriptHash> {
59 if let btree_map::Entry::Vacant(spk_hash_entry) =
60 self.derived_spks.entry((keychain.clone(), index))
61 {
62 let descriptor = self
63 .descriptors
64 .get(&keychain)
65 .expect("keychain must have associated descriptor");
66 let spk = descriptor
67 .at_derivation_index(index)
68 .expect("descriptor must derive")
69 .script_pubkey();
70 let script_hash = ElectrumScriptHash::new(&spk);
71 spk_hash_entry.insert(script_hash);
72 assert!(self
73 .derived_spks_rev
74 .insert(script_hash, (keychain, index))
75 .is_none());
76 return Some(script_hash);
77 }
78 None
79 }
80
81 fn _clear_tracked_spks_of_keychain(&mut self, keychain: K) {
82 let split = {
83 let mut split = self.derived_spks.split_off(&(keychain.clone(), 0));
84 let to_add_back = split.split_off(&(keychain, u32::MAX)); self.derived_spks.extend(to_add_back);
86 split
87 };
88 for script_hash in split.into_values() {
89 self.derived_spks_rev.remove(&script_hash);
90 }
91 }
92
93 pub fn insert_descriptor(
94 &mut self,
95 keychain: K,
96 descriptor: Descriptor<DescriptorPublicKey>,
97 next_index: u32,
98 ) -> Vec<ElectrumScriptHash> {
99 if let Some(old_descriptor) = self
100 .descriptors
101 .insert(keychain.clone(), descriptor.clone())
102 {
103 if old_descriptor == descriptor {
104 return vec![];
105 }
106 self._clear_tracked_spks_of_keychain(keychain.clone());
107 }
108 (0_u32..=next_index + self.lookahead + 1)
109 .filter_map(|index| self._add_derived_spk(keychain.clone(), index))
110 .collect()
111 }
112
113 pub fn handle_script_status(
114 &mut self,
115 script_hash: ElectrumScriptHash,
116 ) -> Option<(K, u32, Vec<ElectrumScriptHash>)> {
117 let (k, mut next_index) = self.derived_spks_rev.get(&script_hash).cloned()?;
118 next_index += 1;
119
120 let mut spk_hashes = Vec::new();
121 for index in (next_index..=next_index + 1 + self.lookahead).rev() {
122 match self._add_derived_spk(k.clone(), index) {
123 Some(spk_hash) => spk_hashes.push(spk_hash),
124 None => break,
125 }
126 }
127 Some((k, next_index, spk_hashes))
128 }
129}
130
131#[derive(Debug)]
133pub struct Headers {
134 tip: CheckPoint,
135 headers: HashMap<BlockHash, Header>,
136}
137
138impl Headers {
139 pub fn new(tip: CheckPoint) -> Self {
140 Self {
141 tip,
142 headers: HashMap::new(),
143 }
144 }
145
146 pub fn tip(&self) -> CheckPoint {
147 self.tip.clone()
148 }
149
150 pub async fn update(
151 &mut self,
152 client: &Client,
153 tip_height: u32,
154 tip_hash: BlockHash,
155 ) -> anyhow::Result<Option<CheckPoint>> {
156 const ASSUME_FINAL_DEPTH: u32 = 8;
157 const CONSECUTIVE_THRESHOLD: usize = 3;
158
159 let start_height = tip_height.saturating_sub(ASSUME_FINAL_DEPTH - 1);
162 let headers_resp = client
163 .request(request::Headers {
164 start_height,
165 count: ASSUME_FINAL_DEPTH as _,
166 })
167 .await;
168 let mut new_headers = (start_height..)
169 .zip(headers_resp?.headers)
170 .collect::<BTreeMap<u32, Header>>();
171
172 if new_headers.get(&tip_height).map(|h| h.block_hash()) != Some(tip_hash) {
174 return Ok(None);
176 }
177
178 let mut consecutive_matches = 0_usize;
180 for cp in self.tip.iter() {
181 let height = cp.height();
182 let orig_hash = cp.hash();
183 let header = match new_headers.entry(height) {
184 btree_map::Entry::Vacant(e) => {
185 *e.insert(client.request(request::Header { height }).await?.header)
186 }
187 btree_map::Entry::Occupied(e) => *e.get(),
188 };
189 let hash = header.block_hash();
190 self.headers.insert(hash, header);
191 if header.block_hash() == orig_hash {
192 consecutive_matches += 1;
193 if consecutive_matches > CONSECUTIVE_THRESHOLD {
194 break;
195 }
196 } else {
197 consecutive_matches = 0;
198 }
199 }
200 for (height, header) in new_headers {
201 let hash = header.block_hash();
202 self.tip = self.tip.clone().insert(BlockId { height, hash });
203 }
204 Ok(Some(self.tip.clone()))
205 }
206
207 pub async fn ensure_heights(
208 &mut self,
209 client: &Client,
210 heights: BTreeSet<u32>,
211 ) -> anyhow::Result<HeaderCache> {
212 let mut header_cache = HeaderCache::new();
213
214 let tip_height = self.tip.height();
216
217 let mut heights_iter = heights.into_iter().filter(|&h| h <= tip_height).peekable();
218 let start_height = match heights_iter.peek() {
219 Some(&h) => h,
220 None => return Ok(header_cache),
221 };
222
223 let mut cp_tail = BTreeMap::<u32, BlockHash>::new();
224 let start_cp = {
225 let mut start_cp = Option::<CheckPoint>::None;
226 for cp in self.tip.iter() {
227 let BlockId { height, hash } = cp.block_id();
228 if height < start_height {
229 start_cp = Some(cp);
230 break;
231 }
232 cp_tail.insert(height, hash);
233 }
234 match start_cp {
235 Some(cp) => cp,
236 None => return Ok(header_cache),
238 }
239 };
240
241 for height in heights_iter {
243 let header_req = request::Header { height };
244 let header_opt = match cp_tail.entry(height) {
245 btree_map::Entry::Vacant(tail_e) => {
246 let header = client.request(header_req).await?.header;
247 let hash = header.block_hash();
248 self.headers.insert(hash, header);
249 tail_e.insert(hash);
250 Some((hash, header))
251 }
252 btree_map::Entry::Occupied(tail_e) => {
253 let hash = *tail_e.get();
254 match self.headers.entry(hash) {
256 hash_map::Entry::Occupied(header_e) => Some((hash, *header_e.get())),
257 hash_map::Entry::Vacant(header_e) => {
258 let header = client.request(header_req).await?.header;
259 if header.block_hash() == hash {
260 header_e.insert(header);
261 Some((hash, header))
262 } else {
263 None
265 }
266 }
267 }
268 }
269 };
270 if let Some(hash_and_header) = header_opt {
271 header_cache.insert(height, hash_and_header);
272 }
273 }
274
275 self.tip = start_cp
277 .extend(cp_tail.into_iter().map(Into::into))
278 .expect("must extend");
279 Ok(header_cache)
280 }
281}
282
283#[derive(Debug, Default)]
284pub struct Txs {
285 txs: HashMap<Txid, Arc<Transaction>>,
286}
287
288impl Txs {
289 pub fn new() -> Self {
290 Self::default()
291 }
292
293 pub fn insert_tx(&mut self, tx: impl Into<Arc<Transaction>>) {
294 let tx: Arc<Transaction> = tx.into();
295 self.txs.insert(tx.compute_txid(), tx);
296 }
297
298 pub async fn fetch_tx(
299 &mut self,
300 client: &Client,
301 txid: Txid,
302 ) -> anyhow::Result<Arc<Transaction>> {
303 match self.txs.entry(txid) {
304 hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
305 hash_map::Entry::Vacant(entry) => {
306 let tx = client.request(request::GetTx(txid)).await?.tx;
307 let arc_tx = entry.insert(Arc::new(tx)).clone();
308 Ok(arc_tx)
309 }
310 }
311 }
312
313 pub async fn fetch_txout(
314 &mut self,
315 client: &Client,
316 outpoint: OutPoint,
317 ) -> anyhow::Result<Option<TxOut>> {
318 let tx = self.fetch_tx(client, outpoint.txid).await?;
319 Ok(tx.output.get(outpoint.vout as usize).cloned())
320 }
321}
322
323pub async fn init<K>(client: &Client, spk_tracker: &mut DerivedSpkTracker<K>) -> anyhow::Result<()>
325where
326 K: Clone + Ord + Send + Sync + 'static,
327{
328 client.request_event(request::HeadersSubscribe)?;
329
330 for script_hash in spk_tracker.all_spk_hashes() {
332 client.request_event(request::ScriptHashSubscribe { script_hash })?;
333 }
334
335 Ok(())
336}
337
338pub async fn handle_event<K>(
340 client: &Client,
341 spk_tracker: &mut DerivedSpkTracker<K>,
342 headers: &mut Headers,
343 txs: &mut Txs,
344 broadcast_queue: &mut BroadcastQueue,
345 event: Event,
346) -> anyhow::Result<Option<Update<K>>>
347where
348 K: Clone + Ord + Send + Sync + 'static,
349{
350 match event {
351 Event::Response(SatisfiedRequest::Header { req, resp }) => Ok(headers
352 .update(client, req.height, resp.header.block_hash())
353 .await?
354 .map(|cp| Update {
355 chain_update: Some(cp),
356 ..Default::default()
357 })),
358 Event::Response(SatisfiedRequest::HeadersSubscribe { resp, .. }) => Ok(headers
359 .update(client, resp.height, resp.header.block_hash())
360 .await?
361 .map(|cp| Update {
362 chain_update: Some(cp),
363 ..Default::default()
364 })),
365 Event::Notification(Notification::Header(h)) => Ok(headers
366 .update(client, h.height(), h.header().block_hash())
367 .await?
368 .map(|cp| Update {
369 chain_update: Some(cp),
370 ..Default::default()
371 })),
372 Event::Response(SatisfiedRequest::ScriptHashSubscribe { req, resp }) => {
373 if resp.is_none() {
374 return Ok(None);
375 }
376 let (k, i) = match spk_tracker.handle_script_status(req.script_hash) {
377 Some((k, i, new_spk_hashes)) => {
378 for script_hash in new_spk_hashes {
379 client.request_event(request::ScriptHashSubscribe { script_hash })?;
380 }
381 (k, i)
382 }
383 None => return Ok(None),
384 };
385 let tx_update = script_hash_update(client, headers, txs, req.script_hash).await?;
386 let last_active_indices = core::iter::once((k, i)).collect();
387 let chain_update = Some(headers.tip());
388 Ok(Some(Update {
389 tx_update,
390 last_active_indices,
391 chain_update,
392 }))
393 }
394 Event::Notification(Notification::ScriptHash(inner)) => {
395 let (k, i) = match spk_tracker.handle_script_status(inner.script_hash()) {
396 Some((k, i, new_spk_hashes)) => {
397 for script_hash in new_spk_hashes {
398 client.request_event(request::ScriptHashSubscribe { script_hash })?;
399 }
400 (k, i)
401 }
402 None => return Ok(None),
403 };
404 let tx_update = script_hash_update(client, headers, txs, inner.script_hash()).await?;
405 let last_active_indices = core::iter::once((k, i)).collect();
406 let chain_update = Some(headers.tip());
407 Ok(Some(Update {
408 tx_update,
409 last_active_indices,
410 chain_update,
411 }))
412 }
413 Event::Response(SatisfiedRequest::BroadcastTx { resp, .. }) => {
414 broadcast_queue.handle_resp_ok(resp);
415 Ok(None)
416 }
417 Event::ResponseError(ErroredRequest::BroadcastTx { req, error }) => {
418 broadcast_queue.handle_resp_err(req.0.compute_txid(), error);
419 Ok(None)
420 }
421 Event::ResponseError(err) => Err(err.into()),
422 _ => Ok(None),
423 }
424}
425
426async fn script_hash_update(
427 client: &Client,
428 headers: &mut Headers,
429 txs: &mut Txs,
430 script_hash: ElectrumScriptHash,
431) -> anyhow::Result<TxUpdate<ConfirmationBlockTime>> {
432 let electrum_txs = client.request(request::GetHistory { script_hash }).await?;
433
434 let header_cache = headers
435 .ensure_heights(
436 client,
437 electrum_txs
438 .iter()
439 .filter_map(|tx| tx.confirmation_height().map(|h| h.to_consensus_u32()))
440 .collect(),
441 )
442 .await?;
443
444 let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
445
446 for tx in electrum_txs {
447 let txid = tx.txid();
448 let full_tx = txs.fetch_tx(client, txid).await?;
449
450 for txin in &full_tx.input {
451 let op = txin.previous_output;
452 if let Some(txout) = txs.fetch_txout(client, op).await? {
453 tx_update.txouts.insert(op, txout);
454 }
455 }
456 tx_update.txs.push(full_tx);
457
458 if let Some(height) = tx.confirmation_height() {
459 let height = height.to_consensus_u32();
460 let merkle_res = client
461 .request(request::GetTxMerkle { txid, height })
462 .await?;
463 let (hash, header) = match header_cache.get(&height) {
464 Some(&hash_and_header) => hash_and_header,
465 None => continue,
466 };
467 if header.merkle_root != merkle_res.expected_merkle_root(txid) {
468 continue;
469 }
470 tx_update.anchors.insert((
471 ConfirmationBlockTime {
472 block_id: BlockId { height, hash },
473 confirmation_time: header.time as _,
474 },
475 txid,
476 ));
477 }
478 }
479
480 Ok(tx_update)
481}
482
483#[derive(Debug, Default)]
484pub struct BroadcastQueue {
485 queue: VecDeque<(Transaction, oneshot::Sender<Result<(), ResponseError>>)>,
486}
487
488impl BroadcastQueue {
489 pub fn txs(&self) -> impl Iterator<Item = Transaction> + '_ {
490 self.queue.iter().map(|(tx, _)| tx.clone())
491 }
492
493 pub fn add(&mut self, tx: Transaction, resp: oneshot::Sender<Result<(), ResponseError>>) {
494 self.queue.push_back((tx, resp));
495 }
496
497 pub fn handle_resp_ok(&mut self, txid: Txid) {
498 let i_opt = self.queue.iter().enumerate().find_map(|(i, (tx, _))| {
499 if tx.compute_txid() == txid {
500 Some(i)
501 } else {
502 None
503 }
504 });
505 if let Some(i) = i_opt {
506 let (_, resp_tx) = self.queue.remove(i).expect("must exist");
507 let _ = resp_tx.send(Ok(()));
508 }
509 }
510
511 pub fn handle_resp_err(&mut self, txid: Txid, err: ResponseError) {
512 let i_opt = self.queue.iter().enumerate().find_map(|(i, (tx, _))| {
513 if tx.compute_txid() == txid {
514 Some(i)
515 } else {
516 None
517 }
518 });
519 if let Some(i) = i_opt {
520 let (_, resp_tx) = self.queue.remove(i).expect("must exist");
521 let _ = resp_tx.send(Err(err));
522 }
523 }
524}
525
526#[derive(Debug)]
527pub struct Emitter<K: Clone + Ord + Send + Sync + 'static> {
528 spk_tracker: DerivedSpkTracker<K>,
529 header_cache: Headers,
530 tx_cache: Txs,
531
532 client: Arc<futures::lock::Mutex<Option<electrum_streaming::Client>>>,
533 cmd_rx: UnboundedReceiver<Cmd<K>>,
534 update_tx: mpsc::UnboundedSender<Update<K>>,
535 broadcast_queue: BroadcastQueue,
536}
537
538impl<K> Emitter<K>
539where
540 K: core::fmt::Debug + Clone + Ord + Send + Sync + 'static,
541{
542 pub fn new(
543 wallet_tip: CheckPoint,
544 lookahead: u32,
545 ) -> (Self, CmdSender<K>, UnboundedReceiver<Update<K>>) {
546 let (cmd_tx, cmd_rx) = mpsc::unbounded::<Cmd<K>>();
547 let (update_tx, update_rx) = mpsc::unbounded::<Update<K>>();
548 let client = Arc::new(futures::lock::Mutex::new(None));
549 (
550 Self {
551 spk_tracker: DerivedSpkTracker::new(lookahead),
552 header_cache: Headers::new(wallet_tip),
553 tx_cache: Txs::new(),
554 client: client.clone(),
555 cmd_rx,
556 update_tx,
557 broadcast_queue: BroadcastQueue::default(),
558 },
559 CmdSender { tx: cmd_tx, client },
560 update_rx,
561 )
562 }
563
564 pub fn insert_txs<Tx>(&mut self, txs: impl IntoIterator<Item = Tx>)
566 where
567 Tx: Into<Arc<Transaction>>,
568 {
569 for tx in txs {
570 self.tx_cache.insert_tx(tx);
571 }
572 }
573
574 pub async fn run<C>(&mut self, ping_delay: Duration, conn: C) -> anyhow::Result<()>
575 where
576 C: AsyncRead + AsyncWrite + Send,
577 {
578 let (client, mut event_rx, run_fut) = electrum_streaming::run(conn);
579 self.client.lock().await.replace(client.clone());
580
581 client.request_event(request::HeadersSubscribe)?;
582 for script_hash in self.spk_tracker.all_spk_hashes() {
583 client.request_event(request::ScriptHashSubscribe { script_hash })?;
584 }
585 for tx in self.broadcast_queue.txs() {
586 client.request_event(request::BroadcastTx(tx))?;
587 }
588
589 let spk_tracker = &mut self.spk_tracker;
590 let header_cache = &mut self.header_cache;
591 let tx_cache = &mut self.tx_cache;
592 let cmd_rx = &mut self.cmd_rx;
593 let update_tx = &mut self.update_tx;
594 let broadcast_queue = &mut self.broadcast_queue;
595
596 let process_fut = async move {
597 loop {
600 select! {
601 opt = event_rx.next() => match opt {
602 Some(event) => {
603 let update_opt =
604 handle_event(&client, spk_tracker, header_cache, tx_cache, broadcast_queue, event).await?;
605 if let Some(update) = update_opt {
606 if let Err(_err) = update_tx.unbounded_send(update) {
607 break;
608 }
609 }
610 },
611 None => break,
612 },
613 opt = cmd_rx.next() => match opt {
614 Some(Cmd::InsertDescriptor { keychain, descriptor, next_index }) => {
615 for script_hash in spk_tracker.insert_descriptor(keychain, descriptor, next_index) {
616 client.request_event(request::ScriptHashSubscribe { script_hash })?;
617 }
618 }
619 Some(Cmd::Broadcast { tx, resp_tx }) => {
620 broadcast_queue.add(tx.clone(), resp_tx);
621 client.request_event(request::BroadcastTx(tx))?;
622 }
623 Some(Cmd::Close) | None => break,
624 },
625 _ = Delay::new(ping_delay).fuse() => {
626 client.request_event(request::Ping)?;
627 }
628 }
629 }
630 anyhow::Ok(())
631 };
632
633 select! {
634 res = run_fut.fuse() => res?,
635 res = process_fut.fuse() => res?,
636 }
637 Ok(())
638 }
639}
640
641pub type CmdRx<K> = mpsc::UnboundedReceiver<Cmd<K>>;
642
643#[non_exhaustive]
644pub enum Cmd<K> {
645 InsertDescriptor {
646 keychain: K,
647 descriptor: Descriptor<DescriptorPublicKey>,
648 next_index: u32,
649 },
650 Broadcast {
651 tx: Transaction,
652 resp_tx: oneshot::Sender<Result<(), ResponseError>>,
653 },
654 Close,
655}
656
657#[derive(Debug, Clone)]
658pub struct CmdSender<K> {
659 tx: mpsc::UnboundedSender<Cmd<K>>,
660 client: Arc<futures::lock::Mutex<Option<electrum_streaming::Client>>>,
661}
662
663impl<K: Send + Sync + 'static> CmdSender<K> {
664 pub fn insert_descriptor(
665 &self,
666 keychain: K,
667 descriptor: Descriptor<DescriptorPublicKey>,
668 next_index: u32,
669 ) -> anyhow::Result<()> {
670 self.tx.unbounded_send(Cmd::InsertDescriptor {
671 keychain,
672 descriptor,
673 next_index,
674 })?;
675 Ok(())
676 }
677
678 pub async fn request<Req>(&self, request: Req) -> Result<Req::Response, RequestError>
679 where
680 Req: electrum_streaming::Request,
681 PendingRequestTuple<Req, Req::Response>:
682 Into<electrum_streaming::pending_request::PendingRequest>,
683 {
684 match self.client.lock().await.as_ref().cloned() {
685 Some(client) => client.request(request).await,
686 None => Err(RequestError::Canceled),
687 }
688 }
689
690 pub async fn broadcast_tx(&self, tx: Transaction) -> anyhow::Result<()> {
691 let (resp_tx, rx) = oneshot::channel();
692 self.tx.unbounded_send(Cmd::Broadcast { tx, resp_tx })?;
693 rx.await??;
694 Ok(())
695 }
696
697 pub async fn close(&self) -> anyhow::Result<()> {
698 self.tx.unbounded_send(Cmd::Close)?;
699 Ok(())
700 }
701}