lnbits_rs/api/
websocket.rs

1//! Websocket
2
3use futures_util::{SinkExt, StreamExt};
4use serde::Deserialize;
5use tokio_tungstenite::connect_async;
6use tokio_tungstenite::tungstenite::protocol::Message;
7
8use crate::LNBitsClient;
9
10#[derive(Debug, Deserialize)]
11struct WebSocketPayment {
12    payment_hash: String,
13    amount: i64,
14}
15
16#[derive(Debug, Deserialize)]
17struct WebSocketMessage {
18    payment: Option<WebSocketPayment>,
19}
20
21impl LNBitsClient {
22    /// Subscribe to websocket updates
23    pub async fn subscribe_to_websocket(&self) -> anyhow::Result<()> {
24        // Create a new channel for this connection
25        // This ensures old receivers will get None and new receivers will work
26        let (new_sender, new_receiver) = tokio::sync::mpsc::channel(8);
27
28        // Replace the receiver with the new one
29        *self.receiver.lock().await = new_receiver;
30
31        let base_url = self
32            .lnbits_url
33            .to_string()
34            .trim_end_matches('/')
35            .replace("http", "ws");
36        let ws_url = format!("{}/api/v1/ws/{}", base_url, self.invoice_read_key);
37
38        let (ws_stream, _) = connect_async(ws_url).await?;
39        let (mut write, mut read) = ws_stream.split();
40
41        // Move the sender into the task (don't store it in self.sender)
42        // This ensures when the task ends, the sender is dropped and receiver gets None
43        let sender = new_sender;
44
45        // Handle incoming messages with timeout detection
46        tokio::spawn(async move {
47            let mut last_message_time = std::time::Instant::now();
48            let timeout_duration = std::time::Duration::from_secs(60); // 60 second timeout
49
50            loop {
51                // Use timeout to detect dead connections
52                let message_result =
53                    tokio::time::timeout(std::time::Duration::from_secs(30), read.next()).await;
54
55                match message_result {
56                    Ok(Some(message)) => {
57                        last_message_time = std::time::Instant::now();
58                        match message {
59                            Ok(msg) => {
60                                match msg {
61                                    Message::Text(text) => {
62                                        tracing::trace!("Received websocket message: {}", text);
63
64                                        // Parse the message
65                                        if let Ok(message) =
66                                            serde_json::from_str::<WebSocketMessage>(&text)
67                                        {
68                                            if let Some(payment) = message.payment {
69                                                if payment.amount > 0 {
70                                                    tracing::info!(
71                                                        "Payment received: {}",
72                                                        payment.payment_hash
73                                                    );
74                                                    if let Err(err) =
75                                                        sender.send(payment.payment_hash).await
76                                                    {
77                                                        log::error!(
78                                                            "Failed to send payment hash: {}",
79                                                            err
80                                                        );
81                                                    }
82                                                }
83                                            }
84                                        }
85                                    }
86                                    Message::Ping(_) | Message::Pong(_) => {
87                                        // Keepalive messages
88                                        tracing::trace!("Received ping/pong");
89                                    }
90                                    Message::Close(_) => {
91                                        tracing::warn!("WebSocket closed by server");
92                                        break;
93                                    }
94                                    _ => {}
95                                }
96                            }
97                            Err(e) => {
98                                tracing::error!("Error receiving websocket message: {}", e);
99                                break;
100                            }
101                        }
102                    }
103                    Ok(None) => {
104                        // Stream ended
105                        tracing::warn!("WebSocket stream ended");
106                        break;
107                    }
108                    Err(_) => {
109                        // Timeout - check if we've exceeded the overall timeout
110                        if last_message_time.elapsed() > timeout_duration {
111                            tracing::warn!(
112                                "WebSocket timeout - no messages received for {:?}",
113                                timeout_duration
114                            );
115                            break;
116                        }
117                        // Send a ping to keep connection alive and detect dead connections
118                        if let Err(e) = write.send(Message::Ping(vec![].into())).await {
119                            tracing::error!("Failed to send ping: {}", e);
120                            break;
121                        }
122                        tracing::trace!("Sent ping to keep connection alive");
123                    }
124                }
125            }
126
127            tracing::info!("WebSocket task ending, sender will be dropped");
128            // Task ends, sender gets dropped, receiver will get None
129        });
130
131        Ok(())
132    }
133}