lighter_rust/client/
ws_client.rs

1use crate::config::Config;
2use crate::error::{LighterError, Result};
3use futures::{SinkExt, StreamExt};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use tokio::net::TcpStream;
8use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
9use tracing::{debug, error, info, warn};
10
11pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct WsRequest {
15    pub id: String,
16    pub method: String,
17    pub params: Option<Value>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct WsResponse {
22    pub id: Option<String>,
23    pub result: Option<Value>,
24    pub error: Option<WsError>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct WsError {
29    pub code: i32,
30    pub message: String,
31    pub data: Option<Value>,
32}
33
34#[derive(Debug)]
35pub struct WebSocketClient {
36    config: Config,
37    stream: Option<WsStream>,
38    subscriptions: HashMap<String, String>,
39}
40
41impl WebSocketClient {
42    pub fn new(config: Config) -> Self {
43        Self {
44            config,
45            stream: None,
46            subscriptions: HashMap::new(),
47        }
48    }
49
50    pub async fn connect(&mut self) -> Result<()> {
51        info!("Connecting to WebSocket: {}", self.config.ws_url);
52
53        let (ws_stream, _response) = connect_async(&self.config.ws_url.to_string())
54            .await
55            .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
56
57        info!("WebSocket connected successfully");
58        self.stream = Some(ws_stream);
59        Ok(())
60    }
61
62    pub async fn subscribe(&mut self, channel: &str, params: Option<Value>) -> Result<String> {
63        let request_id = uuid::Uuid::new_v4().to_string();
64        let request = WsRequest {
65            id: request_id.clone(),
66            method: "SUBSCRIBE".to_string(),
67            params,
68        };
69
70        self.send_request(&request).await?;
71        self.subscriptions
72            .insert(request_id.clone(), channel.to_string());
73
74        debug!("Subscribed to channel: {} with ID: {}", channel, request_id);
75        Ok(request_id)
76    }
77
78    pub async fn unsubscribe(&mut self, subscription_id: &str) -> Result<()> {
79        let request = WsRequest {
80            id: uuid::Uuid::new_v4().to_string(),
81            method: "UNSUBSCRIBE".to_string(),
82            params: Some(serde_json::json!({
83                "subscription_id": subscription_id
84            })),
85        };
86
87        self.send_request(&request).await?;
88        self.subscriptions.remove(subscription_id);
89
90        debug!("Unsubscribed from subscription ID: {}", subscription_id);
91        Ok(())
92    }
93
94    pub async fn send_request(&mut self, request: &WsRequest) -> Result<()> {
95        let stream = self.stream.as_mut().ok_or_else(|| {
96            LighterError::WebSocket(Box::new(tungstenite::Error::ConnectionClosed))
97        })?;
98
99        let message = serde_json::to_string(request).map_err(LighterError::Json)?;
100
101        stream
102            .send(Message::Text(message))
103            .await
104            .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
105
106        debug!("Sent WebSocket request: {}", request.id);
107        Ok(())
108    }
109
110    pub async fn next_message(&mut self) -> Result<Option<Value>> {
111        let stream = self.stream.as_mut().ok_or_else(|| {
112            LighterError::WebSocket(Box::new(tungstenite::Error::ConnectionClosed))
113        })?;
114
115        match stream.next().await {
116            Some(Ok(Message::Text(text))) => {
117                debug!("Received WebSocket message: {}", text);
118                let value: Value = serde_json::from_str(&text).map_err(LighterError::Json)?;
119                Ok(Some(value))
120            }
121            Some(Ok(Message::Close(_))) => {
122                info!("WebSocket connection closed by server");
123                self.stream = None;
124                Ok(None)
125            }
126            Some(Ok(Message::Ping(payload))) => {
127                debug!("Received ping, sending pong");
128                stream
129                    .send(Message::Pong(payload))
130                    .await
131                    .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
132                Ok(None)
133            }
134            Some(Ok(Message::Pong(_))) => {
135                debug!("Received pong");
136                Ok(None)
137            }
138            Some(Ok(_)) => {
139                warn!("Received unsupported message type");
140                Ok(None)
141            }
142            Some(Err(e)) => {
143                error!("WebSocket error: {}", e);
144                Err(LighterError::WebSocket(Box::new(e)))
145            }
146            None => {
147                info!("WebSocket stream ended");
148                self.stream = None;
149                Ok(None)
150            }
151        }
152    }
153
154    pub async fn close(&mut self) -> Result<()> {
155        if let Some(stream) = &mut self.stream {
156            stream
157                .close(None)
158                .await
159                .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
160            info!("WebSocket connection closed");
161        }
162        self.stream = None;
163        Ok(())
164    }
165
166    pub fn is_connected(&self) -> bool {
167        self.stream.is_some()
168    }
169
170    pub fn get_subscriptions(&self) -> &HashMap<String, String> {
171        &self.subscriptions
172    }
173}