bwk_electrum/
client.rs

1use crate::{
2    electrum::{
3        request::Request,
4        response::{
5            ErrorResponse, HistoryResult, Response, SHGetHistoryResponse, SHNotification,
6            SHSubscribeResponse, TxBroadcastResponse, TxGetResponse, TxGetResult,
7        },
8        types::ScriptHash,
9    },
10    raw_client::{self, Client as RawClient},
11};
12use bwk_backoff::Backoff;
13use hex_conservative::FromHex;
14use miniscript::bitcoin::{
15    consensus::{self, encode::serialize_hex, Decodable},
16    OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid,
17};
18use std::{
19    collections::{BTreeMap, HashMap},
20    fmt::{Debug, Display},
21    sync::mpsc,
22    thread::{self},
23    time::Duration,
24};
25
26#[derive(Debug, Clone)]
27pub enum Error {
28    Electrum(String),
29    TxParsing,
30    WrongResponse,
31    WrongOutPoint,
32    TxDoesNotExists,
33}
34
35impl Display for Error {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Error::Electrum(e) => write!(f, "{e:?}"),
39            Error::TxParsing => write!(f, "Fail to parse the transaction"),
40            Error::WrongResponse => write!(f, "Wrong response from electrum server"),
41            Error::WrongOutPoint => write!(f, "Requested outpoint did not exists"),
42            Error::TxDoesNotExists => write!(f, "Requested transaction did not exists"),
43        }
44    }
45}
46
47impl From<raw_client::Error> for Error {
48    fn from(value: raw_client::Error) -> Self {
49        Error::Electrum(format!("{value:?}"))
50    }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum CoinStatus {
55    Unconfirmed,
56    Confirmed,
57    Spend,
58}
59
60pub fn short_hash(s: &ScriptBuf) -> String {
61    let s = ScriptHash::new(s).to_string();
62    short_string(s)
63}
64
65pub fn short_string(s: String) -> String {
66    let head = 4;
67    let tail = 4;
68    if s.len() <= head + tail + 2 {
69        // No need to truncate if string is short
70        return s.to_string();
71    }
72    format!("{}..{}", &s[..head], &s[s.len() - tail..])
73}
74
75#[derive(Clone)]
76pub enum CoinRequest {
77    Subscribe(Vec<ScriptBuf>),
78    History(Vec<ScriptBuf>),
79    Txs(Vec<Txid>),
80    Stop,
81}
82
83impl Debug for CoinRequest {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            Self::Subscribe(vec) => {
87                let hashes: Vec<_> = vec.iter().map(short_hash).collect();
88                f.debug_tuple("Subscribe").field(&hashes).finish()
89            }
90            Self::History(vec) => {
91                let hashes: Vec<_> = vec.iter().map(short_hash).collect();
92                f.debug_tuple("History").field(&hashes).finish()
93            }
94            Self::Txs(arg0) => f.debug_tuple("Txs").field(arg0).finish(),
95            Self::Stop => write!(f, "Stop"),
96        }
97    }
98}
99
100#[derive(Clone)]
101pub enum CoinResponse {
102    Status(BTreeMap<ScriptBuf, Option<String>>),
103    History(BTreeMap<ScriptBuf, Vec<(Txid, Option<u64> /* height */)>>),
104    Txs(Vec<Transaction>),
105    Stopped,
106    Error(String),
107}
108
109impl Debug for CoinResponse {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        match self {
112            Self::Txs(vec) => {
113                let txids: Vec<_> = vec.iter().map(|tx| tx.compute_txid()).collect();
114                f.debug_tuple("Txs").field(&txids).finish()
115            }
116            Self::Status(map) => {
117                let statuses: Vec<_> = map
118                    .iter()
119                    .map(|(spk, status)| {
120                        format!(
121                            "{} => {:?}",
122                            short_hash(spk),
123                            status.as_ref().map(|st| short_string(st.to_string()))
124                        )
125                    })
126                    .collect();
127                f.debug_tuple("Status").field(&statuses).finish()
128            }
129            Self::History(map) => {
130                let map: Vec<_> = map
131                    .iter()
132                    .map(|(spk, v)| {
133                        let conf: Vec<_> =
134                            v.iter().filter(|(_, height)| height.is_some()).collect();
135                        format!(
136                            "{} => conf: {}, total: {}",
137                            short_hash(spk),
138                            conf.len(),
139                            v.len()
140                        )
141                    })
142                    .collect();
143                f.debug_tuple("History").field(&map).finish()
144            }
145            Self::Stopped => write!(f, "Stopped"),
146            Self::Error(e) => write!(f, "Error({})", e),
147        }
148    }
149}
150
151#[derive(Debug)]
152pub struct Client {
153    inner: RawClient,
154    index: HashMap<usize, Request>,
155    last_id: usize,
156    url: String,
157    port: u16,
158}
159
160impl Clone for Client {
161    fn clone(&self) -> Self {
162        Client::new(&self.url, self.port).unwrap()
163    }
164}
165
166impl Client {
167    /// Create a new electrum client.
168    ///
169    /// # Arguments
170    /// * `address` - url/ip of the electrum server as String
171    /// * `port` - port of the electrum server
172    pub fn new(address: &str, port: u16) -> Result<Self, Error> {
173        let ssl = address.starts_with("ssl://");
174        let address = address.to_string().replace("ssl://", "");
175        let mut inner = RawClient::new_ssl_maybe(&address, port, ssl);
176        inner.try_connect()?;
177        Ok(Client {
178            inner,
179            index: HashMap::new(),
180            last_id: 0,
181            url: address,
182            port,
183        })
184    }
185
186    /// Create a new local electrum client: SSL certificate validation id disabled in
187    ///   order to be used with self-signed certificates.
188    ///
189    /// # Arguments
190    /// * `address` - url/ip of the electrum server as String
191    /// * `port` - port of the electrum server
192    pub fn new_local(address: &str, port: u16) -> Result<Self, Error> {
193        let ssl = address.starts_with("ssl://");
194        let address = address.to_string().replace("ssl://", "");
195        let mut inner = RawClient::new_ssl_maybe(&address, port, ssl).verif_certificate(false);
196        inner.try_connect()?;
197        Ok(Client {
198            inner,
199            index: HashMap::new(),
200            last_id: 0,
201            url: address,
202            port,
203        })
204    }
205
206    /// Generate a new request id
207    fn id(&mut self) -> usize {
208        self.last_id = self.last_id.wrapping_add(1);
209        self.last_id
210    }
211
212    fn register(&mut self, req: &mut Request) -> usize {
213        let id = self.id();
214        req.id = id;
215        self.index.insert(req.id, req.clone());
216        id
217    }
218
219    pub fn listen<RQ, RS>(self) -> (mpsc::Sender<RQ>, mpsc::Receiver<RS>)
220    where
221        RQ: Into<CoinRequest> + Debug + Send + 'static,
222        RS: From<CoinResponse> + Debug + Send + 'static,
223    {
224        let (sender, request) = mpsc::channel();
225        let (response, receiver) = mpsc::channel();
226        thread::spawn(move || self.listen_txs(response, request));
227
228        (sender, receiver)
229    }
230
231    fn listen_txs<RQ, RS>(mut self, send: mpsc::Sender<RS>, recv: mpsc::Receiver<RQ>)
232    where
233        RQ: Into<CoinRequest> + Debug + Send + 'static,
234        RS: From<CoinResponse> + Debug + Send + 'static,
235    {
236        log::debug!("Client::listen_txs()");
237        let mut req_id_spk_map = BTreeMap::new();
238        let mut watched_spks_sh = BTreeMap::<usize /* request_id */, ScriptHash>::new();
239        let mut sh_sbf_map = BTreeMap::<ScriptHash, ScriptBuf>::new();
240
241        let mut last_request = None;
242
243        fn responses_matches_requests(req: &[Request], resp: &[Response]) -> bool {
244            req.iter()
245                .all(|rq| resp.iter().any(|response| response.id() == Some(rq.id)))
246        }
247
248        let mut backoff = Backoff::new_ms(50);
249
250        loop {
251            let mut received = false;
252            // Handle requests from consumer
253            // NOTE: some server implementation (electrs for instance) will answer by an empty
254            // response if it receive a request while it has not yes sent its previous response
255            // so we need to make sure to not send a request before receiving the previous response
256            if last_request.is_none() {
257                match recv.try_recv() {
258                    Ok(rq) => {
259                        log::debug!("Client::listen_txs() recv request: {rq:#?}");
260                        received = true;
261                        let rq: CoinRequest = rq.into();
262                        match rq {
263                            CoinRequest::Subscribe(spks) => {
264                                let mut batch = vec![];
265                                for spk in spks {
266                                    let mut sub = Request::subscribe_sh(&spk);
267                                    let id = self.register(&mut sub);
268                                    log::debug!("Client::listen_txs() subscribe request: {sub:?}");
269                                    let sh = ScriptHash::new(&spk);
270                                    watched_spks_sh.insert(id, sh);
271                                    sh_sbf_map.insert(sh, spk);
272                                    batch.push(sub);
273                                }
274                                if !batch.is_empty() {
275                                    log::debug!(
276                                        "Client::listen_txs() last_request = {:?}",
277                                        batch.len()
278                                    );
279                                    last_request = Some(batch.clone());
280
281                                    let mut retry = 0usize;
282                                    while let Err(e) =
283                                        self.inner.try_send_batch(batch.iter().collect())
284                                    {
285                                        retry += 1;
286                                        if retry > 10 {
287                                            send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).expect("caller dropped");
288                                        }
289                                        thread::sleep(Duration::from_millis(50));
290                                    }
291                                }
292                            }
293                            CoinRequest::History(sbfs) => {
294                                let mut batch = vec![];
295                                for spk in sbfs {
296                                    let mut history = Request::sh_get_history(&spk);
297                                    let id = self.register(&mut history);
298                                    log::debug!(
299                                        "Client::listen_txs() history request: {history:?}"
300                                    );
301                                    req_id_spk_map.insert(id, spk);
302                                    batch.push(history);
303                                }
304                                if !batch.is_empty() {
305                                    log::debug!(
306                                        "Client::listen_txs() last_request = {:?}",
307                                        batch.len()
308                                    );
309                                    last_request = Some(batch.clone());
310
311                                    let mut retry = 0usize;
312                                    while let Err(e) =
313                                        self.inner.try_send_batch(batch.iter().collect())
314                                    {
315                                        retry += 1;
316                                        if retry > 10 {
317                                            send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).expect("caller dropped");
318                                        }
319                                        thread::sleep(Duration::from_millis(50));
320                                    }
321                                }
322                            }
323                            CoinRequest::Txs(txids) => {
324                                let mut batch = vec![];
325                                for txid in txids {
326                                    let mut tx = Request::tx_get(txid);
327                                    self.register(&mut tx);
328                                    log::debug!("Client::listen_txs() txs request: {tx:?}");
329                                    batch.push(tx);
330                                }
331                                if !batch.is_empty() {
332                                    log::debug!(
333                                        "Client::listen_txs() last_request = {:?}",
334                                        batch.len()
335                                    );
336                                    last_request = Some(batch.clone());
337
338                                    let mut retry = 0usize;
339                                    while let Err(e) =
340                                        self.inner.try_send_batch(batch.iter().collect())
341                                    {
342                                        retry += 1;
343                                        if retry > 10 && send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).is_err() {
344                                        // NOTE: caller has dropped the channel
345                                        // == Close request
346                                        return;
347                                                    }
348                                        thread::sleep(Duration::from_millis(50));
349                                    }
350                                }
351                            }
352                            CoinRequest::Stop => {
353                                send.send(CoinResponse::Stopped.into()).unwrap();
354                                return;
355                            }
356                        };
357                    }
358                    Err(e) => {
359                        match e {
360                            mpsc::TryRecvError::Empty => {}
361                            mpsc::TryRecvError::Disconnected => {
362                                // NOTE: caller has dropped the channel
363                                // == Close request
364                                return;
365                            }
366                        }
367                    }
368                }
369            }
370            // Handle responses from electrum server
371            match self.inner.try_recv(&self.index) {
372                Ok(Some(r)) => {
373                    log::debug!("Client::listen_txs() from electrum: {r:#?}");
374                    let r_match = if let Some(req) = &last_request {
375                        responses_matches_requests(req, &r)
376                    } else {
377                        false
378                    };
379                    if r_match {
380                        last_request = None;
381                    } else if let Some(last_req) = &last_request {
382                        log::debug!("Client::listen_txs() request not match resend last request");
383                        thread::sleep(Duration::from_millis(100));
384                        self.inner
385                            .try_send_batch(last_req.iter().collect())
386                            .unwrap();
387                    }
388
389                    received = true;
390                    let mut statuses = BTreeMap::new();
391                    let mut txs = Vec::new();
392                    // let mut txid_to_get = Vec::new();
393                    let mut histories = BTreeMap::new();
394                    for r in r {
395                        match r {
396                            Response::SHSubscribe(SHSubscribeResponse { result: status, id }) => {
397                                let sh = watched_spks_sh.get(&id).expect("already inserted");
398                                let sbf = sh_sbf_map.get(sh).expect("already inserted");
399                                statuses.insert(sbf.clone(), status);
400                            }
401                            Response::SHNotification(SHNotification {
402                                status: (sh, status),
403                                ..
404                            }) => {
405                                let sbf = sh_sbf_map.get(&sh).expect("already inserted");
406                                statuses.insert(sbf.clone(), status);
407                            }
408                            Response::SHGetHistory(SHGetHistoryResponse { history, id }) => {
409                                let spk =
410                                    req_id_spk_map.get(&id).expect("already inserted").clone();
411                                req_id_spk_map.remove(&id);
412                                let mut spk_hist = vec![];
413                                for tx in history {
414                                    let HistoryResult { txid, height, .. } = tx;
415                                    let height = if height < 1 {
416                                        None
417                                    } else {
418                                        Some(height as u64)
419                                    };
420                                    spk_hist.push((txid, height));
421                                }
422                                histories.insert(spk, spk_hist);
423                            }
424                            Response::TxGet(TxGetResponse {
425                                result: TxGetResult::Raw(raw_tx),
426                                ..
427                            }) => {
428                                let tx: Transaction =
429                            // TODO: do not unwrap
430                                    consensus::encode::deserialize_hex(&raw_tx).unwrap();
431                                txs.push(tx);
432                            }
433                            Response::Error(e) => {
434                                if send
435                                    .send(CoinResponse::Error(e.to_string()).into())
436                                    .is_err()
437                                {
438                                    // NOTE: caller has dropped the channel
439                                    // == Close request
440                                    return;
441                                }
442                            }
443                            _ => {}
444                        }
445                    }
446                    if !histories.is_empty() {
447                        let rsp = CoinResponse::History(histories);
448                        log::debug!("Client::listen_txs() send response: {rsp:#?}");
449                        send.send(rsp.into()).unwrap();
450                    }
451                    if !statuses.is_empty() {
452                        let rsp = CoinResponse::Status(statuses);
453                        log::debug!("Client::listen_txs() send response: {rsp:#?}");
454                        send.send(rsp.into()).unwrap();
455                    }
456                    // let mut txs = Vec::new();
457                    if !txs.is_empty() {
458                        let rsp = CoinResponse::Txs(txs);
459                        log::debug!("Client::listen_txs() send response: {rsp:#?}");
460                        send.send(rsp.into()).unwrap();
461                    }
462                }
463                Ok(None) => {}
464                Err(e) => {
465                    if send
466                        .send(CoinResponse::Error(e.to_string()).into())
467                        .is_err()
468                    {
469                        // NOTE: caller has dropped the channel
470                        // == Close request
471                        return;
472                    }
473                }
474            }
475            if received {
476                continue;
477            }
478            backoff.snooze();
479        }
480    }
481
482    /// Try to get a transaction by its txid
483    ///
484    /// # Errors
485    ///
486    /// This function will return an error if:
487    ///   - fail to send the request
488    ///   - parsing response fails
489    ///   - the response is not of expected type
490    ///   - the transaction does not exists
491    pub fn get_tx(&mut self, txid: Txid) -> Result<Transaction, Error> {
492        let request = Request::tx_get(txid).id(self.id());
493        self.inner.try_send(&request)?;
494        let req_id = request.id;
495        self.index.insert(request.id, request);
496        let resp = match self.inner.recv(&self.index) {
497            Ok(r) => r,
498            Err(e) => {
499                self.index.remove(&req_id);
500                return Err(e.into());
501            }
502        };
503        for r in resp {
504            if let Response::TxGet(TxGetResponse {
505                id,
506                result: TxGetResult::Raw(res),
507            }) = r
508            {
509                if req_id == id {
510                    self.index.remove(&req_id);
511                    let raw_tx = match Vec::<u8>::from_hex(&res) {
512                        Ok(raw) => raw,
513                        Err(_) => {
514                            return Err(Error::TxParsing);
515                        }
516                    };
517                    let tx: Result<Transaction, _> =
518                        Decodable::consensus_decode(&mut raw_tx.as_slice());
519                    return tx.map_err(|_| Error::TxParsing);
520                }
521            } else if let Response::Error(ErrorResponse { id, .. }) = r {
522                if req_id == id {
523                    self.index.remove(&req_id);
524                    // NOTE: it's very likely if we receive an error response from the server
525                    // it's because the txid does not match any Transaction, but maybe we can
526                    // do a better handling of the error case (for this we need check if responses
527                    // from all electrum server implementations are consistant).
528                    return Err(Error::TxDoesNotExists);
529                }
530            }
531        }
532        self.index.remove(&req_id);
533        Err(Error::WrongResponse)
534    }
535
536    /// Get coins that pay to the given spk and their related transaction.
537    /// This method will make several calls to the electrum server:
538    ///   - it will first request a list of all transactions txid that have
539    ///     an output paying to the spk.
540    ///   - it will then fetch all txs, store them and extract all the coins
541    ///     that pay to the given spk.
542    ///   - it will return a list of (TxOut, OutPoint) and a map of transactions.
543    ///
544    /// # Errors
545    ///
546    /// This function will return an error if:
547    ///   - a call to the electrum server fail
548    #[allow(clippy::type_complexity)]
549    pub fn get_coins_at(
550        &mut self,
551        script: &Script,
552    ) -> Result<(Vec<(TxOut, OutPoint)>, HashMap<Txid, Transaction>), Error> {
553        let mut txouts = Vec::new();
554        let mut transactions = HashMap::new();
555        let txs = self.get_coins_tx_at(script)?;
556        for txid in txs {
557            let tx = self.get_tx(txid)?;
558            for (i, txout) in tx.output.iter().enumerate() {
559                if *txout.script_pubkey == *script {
560                    let outpoint = OutPoint {
561                        txid,
562                        vout: i as u32,
563                    };
564                    txouts.push((txout.clone(), outpoint));
565                }
566            }
567            transactions.insert(txid, tx);
568        }
569        Ok((txouts, transactions))
570    }
571
572    /// Get a list of txid of all transaction that have an output paying to the
573    ///   given spk
574    ///
575    /// # Errors
576    ///
577    /// This function will return an error if:
578    ///   - fail sending the request
579    ///   - receive a wrong response
580    pub fn get_coins_tx_at(&mut self, script: &Script) -> Result<Vec<Txid>, Error> {
581        let request = Request::sh_get_history(script).id(self.id());
582        self.inner.try_send(&request)?;
583        let req_id = request.id;
584        self.index.insert(request.id, request);
585        let resp = match self.inner.recv(&self.index) {
586            Ok(r) => r,
587            Err(e) => {
588                self.index.remove(&req_id);
589                return Err(e.into());
590            }
591        };
592        for r in resp {
593            if let Response::SHGetHistory(SHGetHistoryResponse { id, history }) = r {
594                if req_id == id {
595                    self.index.remove(&req_id);
596                    let history: Vec<_> = history.into_iter().map(|r| r.txid).collect();
597                    return Ok(history);
598                }
599            }
600        }
601        self.index.remove(&req_id);
602        Err(Error::WrongResponse)
603    }
604
605    /// Broadcast the given transaction.
606    ///
607    /// # Errors
608    ///
609    /// This function will return an error if:
610    ///   - fail to send the request
611    ///   - get a wrong response
612    pub fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error> {
613        let raw_tx = serialize_hex(tx);
614        log::debug!("electrum::Client().broadcast(): {:?}", raw_tx);
615        let request = Request::tx_broadcast(raw_tx);
616        self.inner.try_send(&request)?;
617        let req_id = request.id;
618        self.index.insert(request.id, request);
619        let resp = match self.inner.recv(&self.index) {
620            Ok(r) => r,
621            Err(e) => {
622                self.index.remove(&req_id);
623                return Err(e.into());
624            }
625        };
626        log::debug!(
627            "electrum::Client().broadcast(): receive response: {:?}",
628            resp
629        );
630        for r in resp {
631            if let Response::TxBroadcast(TxBroadcastResponse { id, .. }) = r {
632                if req_id == id {
633                    self.index.remove(&req_id);
634                    return Ok(());
635                }
636            }
637        }
638        self.index.remove(&req_id);
639        Err(Error::WrongResponse)
640    }
641
642    /// Returns the URL of the electrum client.
643    ///
644    /// # Returns
645    /// A `String` containing the URL of the electrum server.
646    pub fn url(&self) -> String {
647        self.url.clone()
648    }
649
650    /// Returns the port of the electrum client.
651    ///
652    /// # Returns
653    /// A `u16` containing the port of the electrum server.
654    pub fn port(&self) -> u16 {
655        self.port
656    }
657}