bdk_electrum_streaming/
lib.rs

1//! BDK Electrum goodness.
2
3use bdk_core::spk_client::FullScanResponse;
4/// Re-export.
5pub use electrum_streaming;
6use electrum_streaming::client::RequestError;
7use electrum_streaming::notification::Notification;
8use electrum_streaming::pending_request::{ErroredRequest, PendingRequestTuple, SatisfiedRequest};
9use electrum_streaming::{request, Client, ElectrumScriptHash, Event, ResponseError};
10use futures::channel::mpsc::{self, UnboundedReceiver};
11use futures::channel::oneshot;
12use futures::{select, AsyncRead, AsyncWrite, FutureExt, StreamExt};
13use futures_timer::Delay;
14
15use std::collections::{btree_map, hash_map, BTreeMap, BTreeSet, VecDeque};
16use std::sync::Arc;
17use std::time::Duration;
18
19use bdk_core::bitcoin::block::Header;
20use bdk_core::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, Txid};
21use bdk_core::{collections::HashMap, CheckPoint};
22use bdk_core::{BlockId, ConfirmationBlockTime, TxUpdate};
23use miniscript::{Descriptor, DescriptorPublicKey};
24
25pub type Update<K> = FullScanResponse<K, ConfirmationBlockTime>;
26pub type HeaderCache = HashMap<u32, (BlockHash, Header)>;
27
28/// Keeps track of spks.
29///
30/// This manages subscriptions to spk histories.
31/// * When we reconnect with the Electrum server, we wish to resubscribe to all spks.
32/// * When we receive history for a spk, we wish to ensure `lookahead` number of spks above are
33///   also tracked.
34/// * When we receive history for a spk, we wish to include a `last_active_index` update in case
35///   `KeychainTxOutIndex` is not up-to-date.
36#[derive(Debug, Clone)]
37pub struct DerivedSpkTracker<K: Clone + Ord + Send + Sync + 'static> {
38    lookahead: u32,
39    descriptors: BTreeMap<K, Descriptor<DescriptorPublicKey>>,
40    derived_spks: BTreeMap<(K, u32), ElectrumScriptHash>,
41    derived_spks_rev: HashMap<ElectrumScriptHash, (K, u32)>,
42}
43
44impl<K: Clone + Ord + Send + Sync + 'static> DerivedSpkTracker<K> {
45    pub fn new(lookahead: u32) -> Self {
46        Self {
47            lookahead,
48            descriptors: BTreeMap::new(),
49            derived_spks: BTreeMap::new(),
50            derived_spks_rev: HashMap::new(),
51        }
52    }
53
54    pub fn all_spk_hashes(&self) -> impl Iterator<Item = ElectrumScriptHash> + '_ {
55        self.derived_spks.values().copied()
56    }
57
58    fn _add_derived_spk(&mut self, keychain: K, index: u32) -> Option<ElectrumScriptHash> {
59        if let btree_map::Entry::Vacant(spk_hash_entry) =
60            self.derived_spks.entry((keychain.clone(), index))
61        {
62            let descriptor = self
63                .descriptors
64                .get(&keychain)
65                .expect("keychain must have associated descriptor");
66            let spk = descriptor
67                .at_derivation_index(index)
68                .expect("descriptor must derive")
69                .script_pubkey();
70            let script_hash = ElectrumScriptHash::new(&spk);
71            spk_hash_entry.insert(script_hash);
72            assert!(self
73                .derived_spks_rev
74                .insert(script_hash, (keychain, index))
75                .is_none());
76            return Some(script_hash);
77        }
78        None
79    }
80
81    fn _clear_tracked_spks_of_keychain(&mut self, keychain: K) {
82        let split = {
83            let mut split = self.derived_spks.split_off(&(keychain.clone(), 0));
84            let to_add_back = split.split_off(&(keychain, u32::MAX)); // `u32::MAX` is never derived
85            self.derived_spks.extend(to_add_back);
86            split
87        };
88        for script_hash in split.into_values() {
89            self.derived_spks_rev.remove(&script_hash);
90        }
91    }
92
93    pub fn insert_descriptor(
94        &mut self,
95        keychain: K,
96        descriptor: Descriptor<DescriptorPublicKey>,
97        next_index: u32,
98    ) -> Vec<ElectrumScriptHash> {
99        if let Some(old_descriptor) = self
100            .descriptors
101            .insert(keychain.clone(), descriptor.clone())
102        {
103            if old_descriptor == descriptor {
104                return vec![];
105            }
106            self._clear_tracked_spks_of_keychain(keychain.clone());
107        }
108        (0_u32..=next_index + self.lookahead + 1)
109            .filter_map(|index| self._add_derived_spk(keychain.clone(), index))
110            .collect()
111    }
112
113    pub fn handle_script_status(
114        &mut self,
115        script_hash: ElectrumScriptHash,
116    ) -> Option<(K, u32, Vec<ElectrumScriptHash>)> {
117        let (k, mut next_index) = self.derived_spks_rev.get(&script_hash).cloned()?;
118        next_index += 1;
119
120        let mut spk_hashes = Vec::new();
121        for index in (next_index..=next_index + 1 + self.lookahead).rev() {
122            match self._add_derived_spk(k.clone(), index) {
123                Some(spk_hash) => spk_hashes.push(spk_hash),
124                None => break,
125            }
126        }
127        Some((k, next_index, spk_hashes))
128    }
129}
130
131/// This is a plceholder until we can put headers in checkpoints.
132#[derive(Debug)]
133pub struct Headers {
134    tip: CheckPoint,
135    headers: HashMap<BlockHash, Header>,
136}
137
138impl Headers {
139    pub fn new(tip: CheckPoint) -> Self {
140        Self {
141            tip,
142            headers: HashMap::new(),
143        }
144    }
145
146    pub fn tip(&self) -> CheckPoint {
147        self.tip.clone()
148    }
149
150    pub async fn update(
151        &mut self,
152        client: &Client,
153        tip_height: u32,
154        tip_hash: BlockHash,
155    ) -> anyhow::Result<Option<CheckPoint>> {
156        const ASSUME_FINAL_DEPTH: u32 = 8;
157        const CONSECUTIVE_THRESHOLD: usize = 3;
158
159        // TODO: Get rid of `ASSUME_FINAL_DEPTH`. Instead, get headers one by one and stop when we
160        // connect with a checkpoint.
161        let start_height = tip_height.saturating_sub(ASSUME_FINAL_DEPTH - 1);
162        let headers_resp = client
163            .request(request::Headers {
164                start_height,
165                count: ASSUME_FINAL_DEPTH as _,
166            })
167            .await;
168        let mut new_headers = (start_height..)
169            .zip(headers_resp?.headers)
170            .collect::<BTreeMap<u32, Header>>();
171
172        // Check that the tip is still the same.
173        if new_headers.get(&tip_height).map(|h| h.block_hash()) != Some(tip_hash) {
174            // It is safe to ignore this update and wait for the next notification.
175            return Ok(None);
176        }
177
178        // Ensure local recent headers are still in the best chain.
179        let mut consecutive_matches = 0_usize;
180        for cp in self.tip.iter() {
181            let height = cp.height();
182            let orig_hash = cp.hash();
183            let header = match new_headers.entry(height) {
184                btree_map::Entry::Vacant(e) => {
185                    *e.insert(client.request(request::Header { height }).await?.header)
186                }
187                btree_map::Entry::Occupied(e) => *e.get(),
188            };
189            let hash = header.block_hash();
190            self.headers.insert(hash, header);
191            if header.block_hash() == orig_hash {
192                consecutive_matches += 1;
193                if consecutive_matches > CONSECUTIVE_THRESHOLD {
194                    break;
195                }
196            } else {
197                consecutive_matches = 0;
198            }
199        }
200        for (height, header) in new_headers {
201            let hash = header.block_hash();
202            self.tip = self.tip.clone().insert(BlockId { height, hash });
203        }
204        Ok(Some(self.tip.clone()))
205    }
206
207    pub async fn ensure_heights(
208        &mut self,
209        client: &Client,
210        heights: BTreeSet<u32>,
211    ) -> anyhow::Result<HeaderCache> {
212        let mut header_cache = HeaderCache::new();
213
214        // Anything above this height is ignored.
215        let tip_height = self.tip.height();
216
217        let mut heights_iter = heights.into_iter().filter(|&h| h <= tip_height).peekable();
218        let start_height = match heights_iter.peek() {
219            Some(&h) => h,
220            None => return Ok(header_cache),
221        };
222
223        let mut cp_tail = BTreeMap::<u32, BlockHash>::new();
224        let start_cp = {
225            let mut start_cp = Option::<CheckPoint>::None;
226            for cp in self.tip.iter() {
227                let BlockId { height, hash } = cp.block_id();
228                if height < start_height {
229                    start_cp = Some(cp);
230                    break;
231                }
232                cp_tail.insert(height, hash);
233            }
234            match start_cp {
235                Some(cp) => cp,
236                // TODO: Is this the correct thing to do when we don't have a start cp?
237                None => return Ok(header_cache),
238            }
239        };
240
241        // Ensure `heights` are all in `cp_tail`.
242        for height in heights_iter {
243            let header_req = request::Header { height };
244            let header_opt = match cp_tail.entry(height) {
245                btree_map::Entry::Vacant(tail_e) => {
246                    let header = client.request(header_req).await?.header;
247                    let hash = header.block_hash();
248                    self.headers.insert(hash, header);
249                    tail_e.insert(hash);
250                    Some((hash, header))
251                }
252                btree_map::Entry::Occupied(tail_e) => {
253                    let hash = *tail_e.get();
254                    // Try ensure we also have the header.
255                    match self.headers.entry(hash) {
256                        hash_map::Entry::Occupied(header_e) => Some((hash, *header_e.get())),
257                        hash_map::Entry::Vacant(header_e) => {
258                            let header = client.request(header_req).await?.header;
259                            if header.block_hash() == hash {
260                                header_e.insert(header);
261                                Some((hash, header))
262                            } else {
263                                // TODO: What to do here?
264                                None
265                            }
266                        }
267                    }
268                }
269            };
270            if let Some(hash_and_header) = header_opt {
271                header_cache.insert(height, hash_and_header);
272            }
273        }
274
275        // Create new cp.
276        self.tip = start_cp
277            .extend(cp_tail.into_iter().map(Into::into))
278            .expect("must extend");
279        Ok(header_cache)
280    }
281}
282
283#[derive(Debug, Default)]
284pub struct Txs {
285    txs: HashMap<Txid, Arc<Transaction>>,
286}
287
288impl Txs {
289    pub fn new() -> Self {
290        Self::default()
291    }
292
293    pub fn insert_tx(&mut self, tx: impl Into<Arc<Transaction>>) {
294        let tx: Arc<Transaction> = tx.into();
295        self.txs.insert(tx.compute_txid(), tx);
296    }
297
298    pub async fn fetch_tx(
299        &mut self,
300        client: &Client,
301        txid: Txid,
302    ) -> anyhow::Result<Arc<Transaction>> {
303        match self.txs.entry(txid) {
304            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
305            hash_map::Entry::Vacant(entry) => {
306                let tx = client.request(request::GetTx(txid)).await?.tx;
307                let arc_tx = entry.insert(Arc::new(tx)).clone();
308                Ok(arc_tx)
309            }
310        }
311    }
312
313    pub async fn fetch_txout(
314        &mut self,
315        client: &Client,
316        outpoint: OutPoint,
317    ) -> anyhow::Result<Option<TxOut>> {
318        let tx = self.fetch_tx(client, outpoint.txid).await?;
319        Ok(tx.output.get(outpoint.vout as usize).cloned())
320    }
321}
322
323/// Initiate emitter by subscribing to headers and scripts.
324pub async fn init<K>(client: &Client, spk_tracker: &mut DerivedSpkTracker<K>) -> anyhow::Result<()>
325where
326    K: Clone + Ord + Send + Sync + 'static,
327{
328    client.request_event(request::HeadersSubscribe)?;
329
330    // Assume `spk_tracker` is initiated.
331    for script_hash in spk_tracker.all_spk_hashes() {
332        client.request_event(request::ScriptHashSubscribe { script_hash })?;
333    }
334
335    Ok(())
336}
337
338/// [`init`] should be called beforehand.
339pub async fn handle_event<K>(
340    client: &Client,
341    spk_tracker: &mut DerivedSpkTracker<K>,
342    headers: &mut Headers,
343    txs: &mut Txs,
344    broadcast_queue: &mut BroadcastQueue,
345    event: Event,
346) -> anyhow::Result<Option<Update<K>>>
347where
348    K: Clone + Ord + Send + Sync + 'static,
349{
350    match event {
351        Event::Response(SatisfiedRequest::Header { req, resp }) => Ok(headers
352            .update(client, req.height, resp.header.block_hash())
353            .await?
354            .map(|cp| Update {
355                chain_update: Some(cp),
356                ..Default::default()
357            })),
358        Event::Response(SatisfiedRequest::HeadersSubscribe { resp, .. }) => Ok(headers
359            .update(client, resp.height, resp.header.block_hash())
360            .await?
361            .map(|cp| Update {
362                chain_update: Some(cp),
363                ..Default::default()
364            })),
365        Event::Notification(Notification::Header(h)) => Ok(headers
366            .update(client, h.height(), h.header().block_hash())
367            .await?
368            .map(|cp| Update {
369                chain_update: Some(cp),
370                ..Default::default()
371            })),
372        Event::Response(SatisfiedRequest::ScriptHashSubscribe { req, resp }) => {
373            if resp.is_none() {
374                return Ok(None);
375            }
376            let (k, i) = match spk_tracker.handle_script_status(req.script_hash) {
377                Some((k, i, new_spk_hashes)) => {
378                    for script_hash in new_spk_hashes {
379                        client.request_event(request::ScriptHashSubscribe { script_hash })?;
380                    }
381                    (k, i)
382                }
383                None => return Ok(None),
384            };
385            let tx_update = script_hash_update(client, headers, txs, req.script_hash).await?;
386            let last_active_indices = core::iter::once((k, i)).collect();
387            let chain_update = Some(headers.tip());
388            Ok(Some(Update {
389                tx_update,
390                last_active_indices,
391                chain_update,
392            }))
393        }
394        Event::Notification(Notification::ScriptHash(inner)) => {
395            let (k, i) = match spk_tracker.handle_script_status(inner.script_hash()) {
396                Some((k, i, new_spk_hashes)) => {
397                    for script_hash in new_spk_hashes {
398                        client.request_event(request::ScriptHashSubscribe { script_hash })?;
399                    }
400                    (k, i)
401                }
402                None => return Ok(None),
403            };
404            let tx_update = script_hash_update(client, headers, txs, inner.script_hash()).await?;
405            let last_active_indices = core::iter::once((k, i)).collect();
406            let chain_update = Some(headers.tip());
407            Ok(Some(Update {
408                tx_update,
409                last_active_indices,
410                chain_update,
411            }))
412        }
413        Event::Response(SatisfiedRequest::BroadcastTx { resp, .. }) => {
414            broadcast_queue.handle_resp_ok(resp);
415            Ok(None)
416        }
417        Event::ResponseError(ErroredRequest::BroadcastTx { req, error }) => {
418            broadcast_queue.handle_resp_err(req.0.compute_txid(), error);
419            Ok(None)
420        }
421        Event::ResponseError(err) => Err(err.into()),
422        _ => Ok(None),
423    }
424}
425
426async fn script_hash_update(
427    client: &Client,
428    headers: &mut Headers,
429    txs: &mut Txs,
430    script_hash: ElectrumScriptHash,
431) -> anyhow::Result<TxUpdate<ConfirmationBlockTime>> {
432    let electrum_txs = client.request(request::GetHistory { script_hash }).await?;
433
434    let header_cache = headers
435        .ensure_heights(
436            client,
437            electrum_txs
438                .iter()
439                .filter_map(|tx| tx.confirmation_height().map(|h| h.to_consensus_u32()))
440                .collect(),
441        )
442        .await?;
443
444    let mut tx_update = TxUpdate::<ConfirmationBlockTime>::default();
445
446    for tx in electrum_txs {
447        let txid = tx.txid();
448        let full_tx = txs.fetch_tx(client, txid).await?;
449
450        for txin in &full_tx.input {
451            let op = txin.previous_output;
452            if let Some(txout) = txs.fetch_txout(client, op).await? {
453                tx_update.txouts.insert(op, txout);
454            }
455        }
456        tx_update.txs.push(full_tx);
457
458        if let Some(height) = tx.confirmation_height() {
459            let height = height.to_consensus_u32();
460            let merkle_res = client
461                .request(request::GetTxMerkle { txid, height })
462                .await?;
463            let (hash, header) = match header_cache.get(&height) {
464                Some(&hash_and_header) => hash_and_header,
465                None => continue,
466            };
467            if header.merkle_root != merkle_res.expected_merkle_root(txid) {
468                continue;
469            }
470            tx_update.anchors.insert((
471                ConfirmationBlockTime {
472                    block_id: BlockId { height, hash },
473                    confirmation_time: header.time as _,
474                },
475                txid,
476            ));
477        }
478    }
479
480    Ok(tx_update)
481}
482
483#[derive(Debug, Default)]
484pub struct BroadcastQueue {
485    queue: VecDeque<(Transaction, oneshot::Sender<Result<(), ResponseError>>)>,
486}
487
488impl BroadcastQueue {
489    pub fn txs(&self) -> impl Iterator<Item = Transaction> + '_ {
490        self.queue.iter().map(|(tx, _)| tx.clone())
491    }
492
493    pub fn add(&mut self, tx: Transaction, resp: oneshot::Sender<Result<(), ResponseError>>) {
494        self.queue.push_back((tx, resp));
495    }
496
497    pub fn handle_resp_ok(&mut self, txid: Txid) {
498        let i_opt = self.queue.iter().enumerate().find_map(|(i, (tx, _))| {
499            if tx.compute_txid() == txid {
500                Some(i)
501            } else {
502                None
503            }
504        });
505        if let Some(i) = i_opt {
506            let (_, resp_tx) = self.queue.remove(i).expect("must exist");
507            let _ = resp_tx.send(Ok(()));
508        }
509    }
510
511    pub fn handle_resp_err(&mut self, txid: Txid, err: ResponseError) {
512        let i_opt = self.queue.iter().enumerate().find_map(|(i, (tx, _))| {
513            if tx.compute_txid() == txid {
514                Some(i)
515            } else {
516                None
517            }
518        });
519        if let Some(i) = i_opt {
520            let (_, resp_tx) = self.queue.remove(i).expect("must exist");
521            let _ = resp_tx.send(Err(err));
522        }
523    }
524}
525
526#[derive(Debug)]
527pub struct Emitter<K: Clone + Ord + Send + Sync + 'static> {
528    spk_tracker: DerivedSpkTracker<K>,
529    header_cache: Headers,
530    tx_cache: Txs,
531
532    client: Arc<futures::lock::Mutex<Option<electrum_streaming::Client>>>,
533    cmd_rx: UnboundedReceiver<Cmd<K>>,
534    update_tx: mpsc::UnboundedSender<Update<K>>,
535    broadcast_queue: BroadcastQueue,
536}
537
538impl<K> Emitter<K>
539where
540    K: core::fmt::Debug + Clone + Ord + Send + Sync + 'static,
541{
542    pub fn new(
543        wallet_tip: CheckPoint,
544        lookahead: u32,
545    ) -> (Self, CmdSender<K>, UnboundedReceiver<Update<K>>) {
546        let (cmd_tx, cmd_rx) = mpsc::unbounded::<Cmd<K>>();
547        let (update_tx, update_rx) = mpsc::unbounded::<Update<K>>();
548        let client = Arc::new(futures::lock::Mutex::new(None));
549        (
550            Self {
551                spk_tracker: DerivedSpkTracker::new(lookahead),
552                header_cache: Headers::new(wallet_tip),
553                tx_cache: Txs::new(),
554                client: client.clone(),
555                cmd_rx,
556                update_tx,
557                broadcast_queue: BroadcastQueue::default(),
558            },
559            CmdSender { tx: cmd_tx, client },
560            update_rx,
561        )
562    }
563
564    /// Populate tx cache.
565    pub fn insert_txs<Tx>(&mut self, txs: impl IntoIterator<Item = Tx>)
566    where
567        Tx: Into<Arc<Transaction>>,
568    {
569        for tx in txs {
570            self.tx_cache.insert_tx(tx);
571        }
572    }
573
574    pub async fn run<C>(&mut self, ping_delay: Duration, conn: C) -> anyhow::Result<()>
575    where
576        C: AsyncRead + AsyncWrite + Send,
577    {
578        let (client, mut event_rx, run_fut) = electrum_streaming::run(conn);
579        self.client.lock().await.replace(client.clone());
580
581        client.request_event(request::HeadersSubscribe)?;
582        for script_hash in self.spk_tracker.all_spk_hashes() {
583            client.request_event(request::ScriptHashSubscribe { script_hash })?;
584        }
585        for tx in self.broadcast_queue.txs() {
586            client.request_event(request::BroadcastTx(tx))?;
587        }
588
589        let spk_tracker = &mut self.spk_tracker;
590        let header_cache = &mut self.header_cache;
591        let tx_cache = &mut self.tx_cache;
592        let cmd_rx = &mut self.cmd_rx;
593        let update_tx = &mut self.update_tx;
594        let broadcast_queue = &mut self.broadcast_queue;
595
596        let process_fut = async move {
597            // TODO: We should not await in the select branches. Instead, have a struct that keeps
598            // state and mutate this state in the `handle_event` logic (which should be inlined).
599            loop {
600                select! {
601                    opt = event_rx.next() => match opt {
602                        Some(event) => {
603                            let update_opt =
604                                handle_event(&client, spk_tracker, header_cache, tx_cache, broadcast_queue, event).await?;
605                            if let Some(update) = update_opt {
606                                if let Err(_err) = update_tx.unbounded_send(update) {
607                                    break;
608                                }
609                            }
610                        },
611                        None => break,
612                    },
613                    opt = cmd_rx.next() => match opt {
614                        Some(Cmd::InsertDescriptor { keychain, descriptor, next_index }) => {
615                            for script_hash in spk_tracker.insert_descriptor(keychain, descriptor, next_index) {
616                                client.request_event(request::ScriptHashSubscribe { script_hash })?;
617                            }
618                        }
619                        Some(Cmd::Broadcast { tx, resp_tx }) => {
620                            broadcast_queue.add(tx.clone(), resp_tx);
621                            client.request_event(request::BroadcastTx(tx))?;
622                        }
623                        Some(Cmd::Close) | None => break,
624                    },
625                    _ = Delay::new(ping_delay).fuse() => {
626                        client.request_event(request::Ping)?;
627                    }
628                }
629            }
630            anyhow::Ok(())
631        };
632
633        select! {
634            res = run_fut.fuse() => res?,
635            res = process_fut.fuse() => res?,
636        }
637        Ok(())
638    }
639}
640
641pub type CmdRx<K> = mpsc::UnboundedReceiver<Cmd<K>>;
642
643#[non_exhaustive]
644pub enum Cmd<K> {
645    InsertDescriptor {
646        keychain: K,
647        descriptor: Descriptor<DescriptorPublicKey>,
648        next_index: u32,
649    },
650    Broadcast {
651        tx: Transaction,
652        resp_tx: oneshot::Sender<Result<(), ResponseError>>,
653    },
654    Close,
655}
656
657#[derive(Debug, Clone)]
658pub struct CmdSender<K> {
659    tx: mpsc::UnboundedSender<Cmd<K>>,
660    client: Arc<futures::lock::Mutex<Option<electrum_streaming::Client>>>,
661}
662
663impl<K: Send + Sync + 'static> CmdSender<K> {
664    pub fn insert_descriptor(
665        &self,
666        keychain: K,
667        descriptor: Descriptor<DescriptorPublicKey>,
668        next_index: u32,
669    ) -> anyhow::Result<()> {
670        self.tx.unbounded_send(Cmd::InsertDescriptor {
671            keychain,
672            descriptor,
673            next_index,
674        })?;
675        Ok(())
676    }
677
678    pub async fn request<Req>(&self, request: Req) -> Result<Req::Response, RequestError>
679    where
680        Req: electrum_streaming::Request,
681        PendingRequestTuple<Req, Req::Response>:
682            Into<electrum_streaming::pending_request::PendingRequest>,
683    {
684        match self.client.lock().await.as_ref().cloned() {
685            Some(client) => client.request(request).await,
686            None => Err(RequestError::Canceled),
687        }
688    }
689
690    pub async fn broadcast_tx(&self, tx: Transaction) -> anyhow::Result<()> {
691        let (resp_tx, rx) = oneshot::channel();
692        self.tx.unbounded_send(Cmd::Broadcast { tx, resp_tx })?;
693        rx.await??;
694        Ok(())
695    }
696
697    pub async fn close(&self) -> anyhow::Result<()> {
698        self.tx.unbounded_send(Cmd::Close)?;
699        Ok(())
700    }
701}