lnbits-rs 0.9.2

A Rust library for the LNbits API
Documentation
//! Websocket

use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;

use crate::LNBitsClient;

#[derive(Debug, Deserialize)]
struct WebSocketPayment {
    payment_hash: String,
    amount: i64,
}

#[derive(Debug, Deserialize)]
struct WebSocketMessage {
    payment: Option<WebSocketPayment>,
}

impl LNBitsClient {
    /// Subscribe to websocket updates
    pub async fn subscribe_to_websocket(&self) -> anyhow::Result<()> {
        // Create a new channel for this connection
        // This ensures old receivers will get None and new receivers will work
        let (new_sender, new_receiver) = tokio::sync::mpsc::channel(8);

        // Replace the receiver with the new one
        *self.receiver.lock().await = new_receiver;

        let base_url = self
            .lnbits_url
            .to_string()
            .trim_end_matches('/')
            .replace("http", "ws");
        let ws_url = format!("{}/api/v1/ws/{}", base_url, self.invoice_read_key);

        let (ws_stream, _) = connect_async(ws_url).await?;
        let (mut write, mut read) = ws_stream.split();

        // Move the sender into the task (don't store it in self.sender)
        // This ensures when the task ends, the sender is dropped and receiver gets None
        let sender = new_sender;

        // Handle incoming messages with timeout detection
        tokio::spawn(async move {
            let mut last_message_time = std::time::Instant::now();
            let timeout_duration = std::time::Duration::from_secs(60); // 60 second timeout

            loop {
                // Use timeout to detect dead connections
                let message_result =
                    tokio::time::timeout(std::time::Duration::from_secs(30), read.next()).await;

                match message_result {
                    Ok(Some(message)) => {
                        last_message_time = std::time::Instant::now();
                        match message {
                            Ok(msg) => {
                                match msg {
                                    Message::Text(text) => {
                                        tracing::trace!("Received websocket message: {}", text);

                                        // Parse the message
                                        if let Ok(message) =
                                            serde_json::from_str::<WebSocketMessage>(&text)
                                        {
                                            if let Some(payment) = message.payment {
                                                if payment.amount > 0 {
                                                    tracing::info!(
                                                        "Payment received: {}",
                                                        payment.payment_hash
                                                    );
                                                    if let Err(err) =
                                                        sender.send(payment.payment_hash).await
                                                    {
                                                        log::error!(
                                                            "Failed to send payment hash: {}",
                                                            err
                                                        );
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    Message::Ping(payload) => {
                                        // Server sent us a ping, must respond with pong
                                        tracing::trace!("Received ping, sending pong");
                                        if let Err(e) = write.send(Message::Pong(payload)).await {
                                            tracing::error!("Failed to send pong response: {}", e);
                                            break;
                                        }
                                    }
                                    Message::Pong(_) => {
                                        // Response to our ping, just log it
                                        tracing::trace!("Received pong");
                                    }
                                    Message::Close(_) => {
                                        tracing::warn!("WebSocket closed by server");
                                        break;
                                    }
                                    _ => {}
                                }
                            }
                            Err(e) => {
                                // Log with both Display and Debug to get full error details
                                tracing::error!(
                                    "Error receiving websocket message: {} (Debug: {:?})",
                                    e,
                                    e
                                );

                                // Log specific protocol error details if available
                                use tokio_tungstenite::tungstenite::Error;
                                if let Error::Protocol(ref proto_err) = e {
                                    tracing::error!(
                                        "WebSocket protocol error details: {:?}",
                                        proto_err
                                    );
                                }

                                break;
                            }
                        }
                    }
                    Ok(None) => {
                        // Stream ended
                        tracing::warn!("WebSocket stream ended");
                        break;
                    }
                    Err(_) => {
                        // Timeout - check if we've exceeded the overall timeout
                        if last_message_time.elapsed() > timeout_duration {
                            tracing::warn!(
                                "WebSocket timeout - no messages received for {:?}",
                                timeout_duration
                            );
                            break;
                        }
                        // Send a ping to keep connection alive and detect dead connections
                        if let Err(e) = write.send(Message::Ping(vec![].into())).await {
                            tracing::error!("Failed to send ping: {}", e);
                            break;
                        }
                        tracing::trace!("Sent ping to keep connection alive");
                    }
                }
            }

            tracing::info!("WebSocket task ending, sender will be dropped");
            // Task ends, sender gets dropped, receiver will get None
        });

        Ok(())
    }
}