lnbits_rs/api/
websocket.rs1use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio_tungstenite::connect_async;
6use tokio_tungstenite::tungstenite::protocol::Message;
7
8use crate::LNBitsClient;
9
10#[derive(Debug, Deserialize)]
11struct WebSocketPayment {
12 payment_hash: String,
13 amount: i64,
14}
15
16#[derive(Debug, Deserialize)]
17struct WebSocketMessage {
18 payment: Option<WebSocketPayment>,
19}
20
21impl LNBitsClient {
22 pub async fn subscribe_to_websocket(&self) -> anyhow::Result<()> {
24 let base_url = self
25 .lnbits_url
26 .to_string()
27 .trim_end_matches('/')
28 .replace("http", "ws");
29 let ws_url = format!("{}/api/v1/ws/{}", base_url, self.invoice_read_key);
30
31 let (ws_stream, _) = connect_async(ws_url).await?;
32 let (_, mut read) = ws_stream.split();
33
34 let sender = self.sender.clone();
35
36 tokio::spawn(async move {
38 while let Some(message) = read.next().await {
39 match message {
40 Ok(msg) => {
41 if let Message::Text(text) = msg {
42 tracing::trace!("Received websocket message: {}", text);
43
44 if let Ok(message) = serde_json::from_str::<WebSocketMessage>(&text) {
46 if let Some(payment) = message.payment {
47 if payment.amount > 0 {
48 tracing::info!(
49 "Payment received: {}",
50 payment.payment_hash
51 );
52 if let Err(err) = sender.send(payment.payment_hash).await {
53 log::error!("Failed to send payment hash: {}", err);
54 }
55 }
56 }
57 }
58 }
59 }
60 Err(e) => {
61 tracing::error!("Error receiving websocket message: {}", e);
62 }
63 }
64 }
65 });
66
67 Ok(())
68 }
69}