1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//! Websocket
use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;
use crate::LNBitsClient;
#[derive(Debug, Deserialize)]
struct WebSocketPayment {
payment_hash: String,
amount: i64,
}
#[derive(Debug, Deserialize)]
struct WebSocketMessage {
payment: Option<WebSocketPayment>,
}
impl LNBitsClient {
/// Subscribe to websocket updates
pub async fn subscribe_to_websocket(&self) -> anyhow::Result<()> {
// Create a new channel for this connection
// This ensures old receivers will get None and new receivers will work
let (new_sender, new_receiver) = tokio::sync::mpsc::channel(8);
// Replace the receiver with the new one
*self.receiver.lock().await = new_receiver;
let base_url = self
.lnbits_url
.to_string()
.trim_end_matches('/')
.replace("http", "ws");
let ws_url = format!("{}/api/v1/ws/{}", base_url, self.invoice_read_key);
let (ws_stream, _) = connect_async(ws_url).await?;
let (mut write, mut read) = ws_stream.split();
// Move the sender into the task (don't store it in self.sender)
// This ensures when the task ends, the sender is dropped and receiver gets None
let sender = new_sender;
// Handle incoming messages with timeout detection
tokio::spawn(async move {
let mut last_message_time = std::time::Instant::now();
let timeout_duration = std::time::Duration::from_secs(60); // 60 second timeout
loop {
// Use timeout to detect dead connections
let message_result =
tokio::time::timeout(std::time::Duration::from_secs(30), read.next()).await;
match message_result {
Ok(Some(message)) => {
last_message_time = std::time::Instant::now();
match message {
Ok(msg) => {
match msg {
Message::Text(text) => {
tracing::trace!("Received websocket message: {}", text);
// Parse the message
if let Ok(message) =
serde_json::from_str::<WebSocketMessage>(&text)
{
if let Some(payment) = message.payment {
if payment.amount > 0 {
tracing::info!(
"Payment received: {}",
payment.payment_hash
);
if let Err(err) =
sender.send(payment.payment_hash).await
{
log::error!(
"Failed to send payment hash: {}",
err
);
}
}
}
}
}
Message::Ping(payload) => {
// Server sent us a ping, must respond with pong
tracing::trace!("Received ping, sending pong");
if let Err(e) = write.send(Message::Pong(payload)).await {
tracing::error!("Failed to send pong response: {}", e);
break;
}
}
Message::Pong(_) => {
// Response to our ping, just log it
tracing::trace!("Received pong");
}
Message::Close(_) => {
tracing::warn!("WebSocket closed by server");
break;
}
_ => {}
}
}
Err(e) => {
// Log with both Display and Debug to get full error details
tracing::error!(
"Error receiving websocket message: {} (Debug: {:?})",
e,
e
);
// Log specific protocol error details if available
use tokio_tungstenite::tungstenite::Error;
if let Error::Protocol(ref proto_err) = e {
tracing::error!(
"WebSocket protocol error details: {:?}",
proto_err
);
}
break;
}
}
}
Ok(None) => {
// Stream ended
tracing::warn!("WebSocket stream ended");
break;
}
Err(_) => {
// Timeout - check if we've exceeded the overall timeout
if last_message_time.elapsed() > timeout_duration {
tracing::warn!(
"WebSocket timeout - no messages received for {:?}",
timeout_duration
);
break;
}
// Send a ping to keep connection alive and detect dead connections
if let Err(e) = write.send(Message::Ping(vec![].into())).await {
tracing::error!("Failed to send ping: {}", e);
break;
}
tracing::trace!("Sent ping to keep connection alive");
}
}
}
tracing::info!("WebSocket task ending, sender will be dropped");
// Task ends, sender gets dropped, receiver will get None
});
Ok(())
}
}