mwck 0.1.0

mempool.space wallet connector kit
Documentation
use crate::compat;
use crate::wallet::address::Event as AddressEvent;

#[cfg(not(target_arch = "wasm32"))]
use super::native::{Message, Stream};
#[cfg(target_arch = "wasm32")]
use super::wasm::{Message, Stream, StreamError};

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use futures_util::StreamExt;
use serde::Deserialize;
use esplora_client::{ScriptBuf, Tx};

#[derive(Debug, Clone)]
pub enum WebsocketEvent {
    AddressEvent(AddressEvent),
    Offline,
    Disconnected,
    Connected,
    Error,
}

#[derive(Deserialize)]
struct WebsocketResponse {
    #[serde(rename = "multi-scriptpubkey-transactions")]
    multi_scriptpubkey_transactions: Option<HashMap<ScriptBuf, WebsocketAddressTransactions>>,
}

#[derive(Deserialize)]
struct WebsocketAddressTransactions {
    mempool: Vec<Tx>,
    confirmed: Vec<Tx>,
    removed: Vec<Tx>,
}

pub struct Manager {
    ws_rx: Stream,
    event_sender: broadcast::Sender<WebsocketEvent>,
    disconnect_channel: broadcast::Sender<bool>,
    last_response: Arc<RwLock<Duration>>,
}

impl Manager {
    pub fn new(
        ws_rx: Stream,
        event_sender: broadcast::Sender<WebsocketEvent>,
        disconnect_channel: broadcast::Sender<bool>,
        last_response: Arc<RwLock<Duration>>,
    ) -> Self {
        Self {
            ws_rx,
            event_sender,
            disconnect_channel,
            last_response
        }
    }

    pub async fn start(&mut self, id: u32) {
        log::trace!("starting event loop {}", id);
        let mut disconnect_receiver = self.disconnect_channel.subscribe();
        loop {
            log::trace!("...event loop... {}", id);
            tokio::select! {
                _ = disconnect_receiver.recv() => {
                    log::trace!("disconnect signal received! breaking event loop {}", id);
                    break;
                }

                Some(msg) = self.ws_rx.next() => {
                    #[cfg(target_arch = "wasm32")]
                    let msg = Ok::<Message, StreamError>(msg);
                    {
                        let mut response_time = self.last_response.write().await;
                        *response_time = compat::now();
                    }
                    match msg {
                        Ok(Message::Text(text)) => {
                            log::trace!("handling websocket event {}", id);
                            self.handle_event(text.as_str());
                        }

                        Err(e) => {
                            log::trace!("DISCONNECT message error in websocket event loop {:?} {}", e, id);
                            let _ = self.disconnect_channel.send(true);
                        }

                        x => {
                            log::trace!("unexpected ws message received {:?} {}", x, id);
                        }
                    }
                }
            }
        }
        log::trace!("ending event loop {}", id);
    }

    fn handle_event(&self, json_message: &str) {
        let response: Result<WebsocketResponse, serde_json::Error> =
        serde_json::from_str(json_message);
        match response {
            Ok(message) => {
                if let Some(payload) = message.multi_scriptpubkey_transactions {
                    log::trace!("broadcasting multi-spk transactions event");
                    self.notify_spk_transactions(&payload);
                }
            }
            Err(e) => {
                log::error!("failed to parse websocket response {:?}", e);
            }
        }
    }

    fn notify_spk_transactions(&self, spk_transactions: &HashMap<ScriptBuf, WebsocketAddressTransactions>) {
        for (scriptpubkey, txs) in spk_transactions {
            self.notify_transations_for_spk(
                AddressEvent::Removed,
                scriptpubkey,
                &txs.removed,
            );
            self.notify_transations_for_spk(
                AddressEvent::Mempool,
                scriptpubkey,
                &txs.mempool,
            );
            self.notify_transations_for_spk(
                AddressEvent::Confirmed,
                scriptpubkey,
                &txs.confirmed,
            );
        }
    }

    fn notify_transations_for_spk(
        &self,
        event: impl Fn(ScriptBuf, Tx) -> AddressEvent,
        scriptpubkey: &ScriptBuf,
        txs: &[Tx],
    ) {
        for tx in txs {
            log::trace!(
                "broadcasting websocket event involving scriptpubky {} and tx {}",
                scriptpubkey,
                tx.txid
            );
            let _ = self.event_sender.send(WebsocketEvent::AddressEvent(event(
                scriptpubkey.clone(),
                tx.clone(),
            )));
        }
    }
}