lnbits_rs/api/
websocket.rs

1//! Websocket
2
3use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio_tungstenite::connect_async;
6use tokio_tungstenite::tungstenite::protocol::Message;
7
8use crate::LNBitsClient;
9
10#[derive(Debug, Deserialize)]
11struct WebSocketPayment {
12    payment_hash: String,
13    amount: i64,
14}
15
16#[derive(Debug, Deserialize)]
17struct WebSocketMessage {
18    payment: Option<WebSocketPayment>,
19}
20
21impl LNBitsClient {
22    /// Subscribe to websocket updates
23    pub async fn subscribe_to_websocket(&self) -> anyhow::Result<()> {
24        let base_url = self
25            .lnbits_url
26            .to_string()
27            .trim_end_matches('/')
28            .replace("http", "ws");
29        let ws_url = format!("{}/api/v1/ws/{}", base_url, self.invoice_read_key);
30
31        let (ws_stream, _) = connect_async(ws_url).await?;
32        let (_, mut read) = ws_stream.split();
33
34        let sender = self.sender.clone();
35
36        // Handle incoming messages
37        tokio::spawn(async move {
38            while let Some(message) = read.next().await {
39                match message {
40                    Ok(msg) => {
41                        if let Message::Text(text) = msg {
42                            tracing::trace!("Received websocket message: {}", text);
43
44                            // Parse the message
45                            if let Ok(message) = serde_json::from_str::<WebSocketMessage>(&text) {
46                                if let Some(payment) = message.payment {
47                                    if payment.amount > 0 {
48                                        tracing::info!(
49                                            "Payment received: {}",
50                                            payment.payment_hash
51                                        );
52                                        if let Err(err) = sender.send(payment.payment_hash).await {
53                                            log::error!("Failed to send payment hash: {}", err);
54                                        }
55                                    }
56                                }
57                            }
58                        }
59                    }
60                    Err(e) => {
61                        tracing::error!("Error receiving websocket message: {}", e);
62                    }
63                }
64            }
65        });
66
67        Ok(())
68    }
69}