1use crate::{
2 electrum::{
3 request::Request,
4 response::{
5 ErrorResponse, HistoryResult, Response, SHGetHistoryResponse, SHNotification,
6 SHSubscribeResponse, TxBroadcastResponse, TxGetResponse, TxGetResult,
7 },
8 types::ScriptHash,
9 },
10 raw_client::{self, Client as RawClient},
11};
12use bwk_backoff::Backoff;
13use hex_conservative::FromHex;
14use miniscript::bitcoin::{
15 consensus::{self, encode::serialize_hex, Decodable},
16 OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid,
17};
18use std::{
19 collections::{BTreeMap, HashMap},
20 fmt::{Debug, Display},
21 sync::mpsc,
22 thread::{self},
23 time::Duration,
24};
25
26#[derive(Debug, Clone)]
27pub enum Error {
28 Electrum(String),
29 TxParsing,
30 WrongResponse,
31 WrongOutPoint,
32 TxDoesNotExists,
33}
34
35impl Display for Error {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 Error::Electrum(e) => write!(f, "{e:?}"),
39 Error::TxParsing => write!(f, "Fail to parse the transaction"),
40 Error::WrongResponse => write!(f, "Wrong response from electrum server"),
41 Error::WrongOutPoint => write!(f, "Requested outpoint did not exists"),
42 Error::TxDoesNotExists => write!(f, "Requested transaction did not exists"),
43 }
44 }
45}
46
47impl From<raw_client::Error> for Error {
48 fn from(value: raw_client::Error) -> Self {
49 Error::Electrum(format!("{value:?}"))
50 }
51}
52
53#[derive(Debug, Clone, Copy)]
54pub enum CoinStatus {
55 Unconfirmed,
56 Confirmed,
57 Spend,
58}
59
60pub fn short_hash(s: &ScriptBuf) -> String {
61 let s = ScriptHash::new(s).to_string();
62 short_string(s)
63}
64
65pub fn short_string(s: String) -> String {
66 let head = 4;
67 let tail = 4;
68 if s.len() <= head + tail + 2 {
69 return s.to_string();
71 }
72 format!("{}..{}", &s[..head], &s[s.len() - tail..])
73}
74
75#[derive(Clone)]
76pub enum CoinRequest {
77 Subscribe(Vec<ScriptBuf>),
78 History(Vec<ScriptBuf>),
79 Txs(Vec<Txid>),
80 Stop,
81}
82
83impl Debug for CoinRequest {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 match self {
86 Self::Subscribe(vec) => {
87 let hashes: Vec<_> = vec.iter().map(short_hash).collect();
88 f.debug_tuple("Subscribe").field(&hashes).finish()
89 }
90 Self::History(vec) => {
91 let hashes: Vec<_> = vec.iter().map(short_hash).collect();
92 f.debug_tuple("History").field(&hashes).finish()
93 }
94 Self::Txs(arg0) => f.debug_tuple("Txs").field(arg0).finish(),
95 Self::Stop => write!(f, "Stop"),
96 }
97 }
98}
99
100#[derive(Clone)]
101pub enum CoinResponse {
102 Status(BTreeMap<ScriptBuf, Option<String>>),
103 History(BTreeMap<ScriptBuf, Vec<(Txid, Option<u64> )>>),
104 Txs(Vec<Transaction>),
105 Stopped,
106 Error(String),
107}
108
109impl Debug for CoinResponse {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 match self {
112 Self::Txs(vec) => {
113 let txids: Vec<_> = vec.iter().map(|tx| tx.compute_txid()).collect();
114 f.debug_tuple("Txs").field(&txids).finish()
115 }
116 Self::Status(map) => {
117 let statuses: Vec<_> = map
118 .iter()
119 .map(|(spk, status)| {
120 format!(
121 "{} => {:?}",
122 short_hash(spk),
123 status.as_ref().map(|st| short_string(st.to_string()))
124 )
125 })
126 .collect();
127 f.debug_tuple("Status").field(&statuses).finish()
128 }
129 Self::History(map) => {
130 let map: Vec<_> = map
131 .iter()
132 .map(|(spk, v)| {
133 let conf: Vec<_> =
134 v.iter().filter(|(_, height)| height.is_some()).collect();
135 format!(
136 "{} => conf: {}, total: {}",
137 short_hash(spk),
138 conf.len(),
139 v.len()
140 )
141 })
142 .collect();
143 f.debug_tuple("History").field(&map).finish()
144 }
145 Self::Stopped => write!(f, "Stopped"),
146 Self::Error(e) => write!(f, "Error({})", e),
147 }
148 }
149}
150
151#[derive(Debug)]
152pub struct Client {
153 inner: RawClient,
154 index: HashMap<usize, Request>,
155 last_id: usize,
156 url: String,
157 port: u16,
158}
159
160impl Clone for Client {
161 fn clone(&self) -> Self {
162 Client::new(&self.url, self.port).unwrap()
163 }
164}
165
166impl Client {
167 pub fn new(address: &str, port: u16) -> Result<Self, Error> {
173 let ssl = address.starts_with("ssl://");
174 let address = address.to_string().replace("ssl://", "");
175 let mut inner = RawClient::new_ssl_maybe(&address, port, ssl);
176 inner.try_connect()?;
177 Ok(Client {
178 inner,
179 index: HashMap::new(),
180 last_id: 0,
181 url: address,
182 port,
183 })
184 }
185
186 pub fn new_local(address: &str, port: u16) -> Result<Self, Error> {
193 let ssl = address.starts_with("ssl://");
194 let address = address.to_string().replace("ssl://", "");
195 let mut inner = RawClient::new_ssl_maybe(&address, port, ssl).verif_certificate(false);
196 inner.try_connect()?;
197 Ok(Client {
198 inner,
199 index: HashMap::new(),
200 last_id: 0,
201 url: address,
202 port,
203 })
204 }
205
206 fn id(&mut self) -> usize {
208 self.last_id = self.last_id.wrapping_add(1);
209 self.last_id
210 }
211
212 fn register(&mut self, req: &mut Request) -> usize {
213 let id = self.id();
214 req.id = id;
215 self.index.insert(req.id, req.clone());
216 id
217 }
218
219 pub fn listen<RQ, RS>(self) -> (mpsc::Sender<RQ>, mpsc::Receiver<RS>)
220 where
221 RQ: Into<CoinRequest> + Debug + Send + 'static,
222 RS: From<CoinResponse> + Debug + Send + 'static,
223 {
224 let (sender, request) = mpsc::channel();
225 let (response, receiver) = mpsc::channel();
226 thread::spawn(move || self.listen_txs(response, request));
227
228 (sender, receiver)
229 }
230
231 fn listen_txs<RQ, RS>(mut self, send: mpsc::Sender<RS>, recv: mpsc::Receiver<RQ>)
232 where
233 RQ: Into<CoinRequest> + Debug + Send + 'static,
234 RS: From<CoinResponse> + Debug + Send + 'static,
235 {
236 log::debug!("Client::listen_txs()");
237 let mut req_id_spk_map = BTreeMap::new();
238 let mut watched_spks_sh = BTreeMap::<usize , ScriptHash>::new();
239 let mut sh_sbf_map = BTreeMap::<ScriptHash, ScriptBuf>::new();
240
241 let mut last_request = None;
242
243 fn responses_matches_requests(req: &[Request], resp: &[Response]) -> bool {
244 req.iter()
245 .all(|rq| resp.iter().any(|response| response.id() == Some(rq.id)))
246 }
247
248 let mut backoff = Backoff::new_ms(50);
249
250 loop {
251 let mut received = false;
252 if last_request.is_none() {
257 match recv.try_recv() {
258 Ok(rq) => {
259 log::debug!("Client::listen_txs() recv request: {rq:#?}");
260 received = true;
261 let rq: CoinRequest = rq.into();
262 match rq {
263 CoinRequest::Subscribe(spks) => {
264 let mut batch = vec![];
265 for spk in spks {
266 let mut sub = Request::subscribe_sh(&spk);
267 let id = self.register(&mut sub);
268 log::debug!("Client::listen_txs() subscribe request: {sub:?}");
269 let sh = ScriptHash::new(&spk);
270 watched_spks_sh.insert(id, sh);
271 sh_sbf_map.insert(sh, spk);
272 batch.push(sub);
273 }
274 if !batch.is_empty() {
275 log::debug!(
276 "Client::listen_txs() last_request = {:?}",
277 batch.len()
278 );
279 last_request = Some(batch.clone());
280
281 let mut retry = 0usize;
282 while let Err(e) =
283 self.inner.try_send_batch(batch.iter().collect())
284 {
285 retry += 1;
286 if retry > 10 {
287 send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).expect("caller dropped");
288 }
289 thread::sleep(Duration::from_millis(50));
290 }
291 }
292 }
293 CoinRequest::History(sbfs) => {
294 let mut batch = vec![];
295 for spk in sbfs {
296 let mut history = Request::sh_get_history(&spk);
297 let id = self.register(&mut history);
298 log::debug!(
299 "Client::listen_txs() history request: {history:?}"
300 );
301 req_id_spk_map.insert(id, spk);
302 batch.push(history);
303 }
304 if !batch.is_empty() {
305 log::debug!(
306 "Client::listen_txs() last_request = {:?}",
307 batch.len()
308 );
309 last_request = Some(batch.clone());
310
311 let mut retry = 0usize;
312 while let Err(e) =
313 self.inner.try_send_batch(batch.iter().collect())
314 {
315 retry += 1;
316 if retry > 10 {
317 send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).expect("caller dropped");
318 }
319 thread::sleep(Duration::from_millis(50));
320 }
321 }
322 }
323 CoinRequest::Txs(txids) => {
324 let mut batch = vec![];
325 for txid in txids {
326 let mut tx = Request::tx_get(txid);
327 self.register(&mut tx);
328 log::debug!("Client::listen_txs() txs request: {tx:?}");
329 batch.push(tx);
330 }
331 if !batch.is_empty() {
332 log::debug!(
333 "Client::listen_txs() last_request = {:?}",
334 batch.len()
335 );
336 last_request = Some(batch.clone());
337
338 let mut retry = 0usize;
339 while let Err(e) =
340 self.inner.try_send_batch(batch.iter().collect())
341 {
342 retry += 1;
343 if retry > 10 && send.send(CoinResponse::Error(format!("electrum::Client::listen_txs() Fail to send bacth request: {:?}", e)).into()).is_err() {
344 return;
347 }
348 thread::sleep(Duration::from_millis(50));
349 }
350 }
351 }
352 CoinRequest::Stop => {
353 send.send(CoinResponse::Stopped.into()).unwrap();
354 return;
355 }
356 };
357 }
358 Err(e) => {
359 match e {
360 mpsc::TryRecvError::Empty => {}
361 mpsc::TryRecvError::Disconnected => {
362 return;
365 }
366 }
367 }
368 }
369 }
370 match self.inner.try_recv(&self.index) {
372 Ok(Some(r)) => {
373 log::debug!("Client::listen_txs() from electrum: {r:#?}");
374 let r_match = if let Some(req) = &last_request {
375 responses_matches_requests(req, &r)
376 } else {
377 false
378 };
379 if r_match {
380 last_request = None;
381 } else if let Some(last_req) = &last_request {
382 log::debug!("Client::listen_txs() request not match resend last request");
383 thread::sleep(Duration::from_millis(100));
384 self.inner
385 .try_send_batch(last_req.iter().collect())
386 .unwrap();
387 }
388
389 received = true;
390 let mut statuses = BTreeMap::new();
391 let mut txs = Vec::new();
392 let mut histories = BTreeMap::new();
394 for r in r {
395 match r {
396 Response::SHSubscribe(SHSubscribeResponse { result: status, id }) => {
397 let sh = watched_spks_sh.get(&id).expect("already inserted");
398 let sbf = sh_sbf_map.get(sh).expect("already inserted");
399 statuses.insert(sbf.clone(), status);
400 }
401 Response::SHNotification(SHNotification {
402 status: (sh, status),
403 ..
404 }) => {
405 let sbf = sh_sbf_map.get(&sh).expect("already inserted");
406 statuses.insert(sbf.clone(), status);
407 }
408 Response::SHGetHistory(SHGetHistoryResponse { history, id }) => {
409 let spk =
410 req_id_spk_map.get(&id).expect("already inserted").clone();
411 req_id_spk_map.remove(&id);
412 let mut spk_hist = vec![];
413 for tx in history {
414 let HistoryResult { txid, height, .. } = tx;
415 let height = if height < 1 {
416 None
417 } else {
418 Some(height as u64)
419 };
420 spk_hist.push((txid, height));
421 }
422 histories.insert(spk, spk_hist);
423 }
424 Response::TxGet(TxGetResponse {
425 result: TxGetResult::Raw(raw_tx),
426 ..
427 }) => {
428 let tx: Transaction =
429 consensus::encode::deserialize_hex(&raw_tx).unwrap();
431 txs.push(tx);
432 }
433 Response::Error(e) => {
434 if send
435 .send(CoinResponse::Error(e.to_string()).into())
436 .is_err()
437 {
438 return;
441 }
442 }
443 _ => {}
444 }
445 }
446 if !histories.is_empty() {
447 let rsp = CoinResponse::History(histories);
448 log::debug!("Client::listen_txs() send response: {rsp:#?}");
449 send.send(rsp.into()).unwrap();
450 }
451 if !statuses.is_empty() {
452 let rsp = CoinResponse::Status(statuses);
453 log::debug!("Client::listen_txs() send response: {rsp:#?}");
454 send.send(rsp.into()).unwrap();
455 }
456 if !txs.is_empty() {
458 let rsp = CoinResponse::Txs(txs);
459 log::debug!("Client::listen_txs() send response: {rsp:#?}");
460 send.send(rsp.into()).unwrap();
461 }
462 }
463 Ok(None) => {}
464 Err(e) => {
465 if send
466 .send(CoinResponse::Error(e.to_string()).into())
467 .is_err()
468 {
469 return;
472 }
473 }
474 }
475 if received {
476 continue;
477 }
478 backoff.snooze();
479 }
480 }
481
482 pub fn get_tx(&mut self, txid: Txid) -> Result<Transaction, Error> {
492 let request = Request::tx_get(txid).id(self.id());
493 self.inner.try_send(&request)?;
494 let req_id = request.id;
495 self.index.insert(request.id, request);
496 let resp = match self.inner.recv(&self.index) {
497 Ok(r) => r,
498 Err(e) => {
499 self.index.remove(&req_id);
500 return Err(e.into());
501 }
502 };
503 for r in resp {
504 if let Response::TxGet(TxGetResponse {
505 id,
506 result: TxGetResult::Raw(res),
507 }) = r
508 {
509 if req_id == id {
510 self.index.remove(&req_id);
511 let raw_tx = match Vec::<u8>::from_hex(&res) {
512 Ok(raw) => raw,
513 Err(_) => {
514 return Err(Error::TxParsing);
515 }
516 };
517 let tx: Result<Transaction, _> =
518 Decodable::consensus_decode(&mut raw_tx.as_slice());
519 return tx.map_err(|_| Error::TxParsing);
520 }
521 } else if let Response::Error(ErrorResponse { id, .. }) = r {
522 if req_id == id {
523 self.index.remove(&req_id);
524 return Err(Error::TxDoesNotExists);
529 }
530 }
531 }
532 self.index.remove(&req_id);
533 Err(Error::WrongResponse)
534 }
535
536 #[allow(clippy::type_complexity)]
549 pub fn get_coins_at(
550 &mut self,
551 script: &Script,
552 ) -> Result<(Vec<(TxOut, OutPoint)>, HashMap<Txid, Transaction>), Error> {
553 let mut txouts = Vec::new();
554 let mut transactions = HashMap::new();
555 let txs = self.get_coins_tx_at(script)?;
556 for txid in txs {
557 let tx = self.get_tx(txid)?;
558 for (i, txout) in tx.output.iter().enumerate() {
559 if *txout.script_pubkey == *script {
560 let outpoint = OutPoint {
561 txid,
562 vout: i as u32,
563 };
564 txouts.push((txout.clone(), outpoint));
565 }
566 }
567 transactions.insert(txid, tx);
568 }
569 Ok((txouts, transactions))
570 }
571
572 pub fn get_coins_tx_at(&mut self, script: &Script) -> Result<Vec<Txid>, Error> {
581 let request = Request::sh_get_history(script).id(self.id());
582 self.inner.try_send(&request)?;
583 let req_id = request.id;
584 self.index.insert(request.id, request);
585 let resp = match self.inner.recv(&self.index) {
586 Ok(r) => r,
587 Err(e) => {
588 self.index.remove(&req_id);
589 return Err(e.into());
590 }
591 };
592 for r in resp {
593 if let Response::SHGetHistory(SHGetHistoryResponse { id, history }) = r {
594 if req_id == id {
595 self.index.remove(&req_id);
596 let history: Vec<_> = history.into_iter().map(|r| r.txid).collect();
597 return Ok(history);
598 }
599 }
600 }
601 self.index.remove(&req_id);
602 Err(Error::WrongResponse)
603 }
604
605 pub fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error> {
613 let raw_tx = serialize_hex(tx);
614 log::debug!("electrum::Client().broadcast(): {:?}", raw_tx);
615 let request = Request::tx_broadcast(raw_tx);
616 self.inner.try_send(&request)?;
617 let req_id = request.id;
618 self.index.insert(request.id, request);
619 let resp = match self.inner.recv(&self.index) {
620 Ok(r) => r,
621 Err(e) => {
622 self.index.remove(&req_id);
623 return Err(e.into());
624 }
625 };
626 log::debug!(
627 "electrum::Client().broadcast(): receive response: {:?}",
628 resp
629 );
630 for r in resp {
631 if let Response::TxBroadcast(TxBroadcastResponse { id, .. }) = r {
632 if req_id == id {
633 self.index.remove(&req_id);
634 return Ok(());
635 }
636 }
637 }
638 self.index.remove(&req_id);
639 Err(Error::WrongResponse)
640 }
641
642 pub fn url(&self) -> String {
647 self.url.clone()
648 }
649
650 pub fn port(&self) -> u16 {
655 self.port
656 }
657}