lighter_rust/client/
ws_client.rs1use crate::config::Config;
2use crate::error::{LighterError, Result};
3use futures::{SinkExt, StreamExt};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use tokio::net::TcpStream;
8use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
9use tracing::{debug, error, info, warn};
10
11pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct WsRequest {
15 pub id: String,
16 pub method: String,
17 pub params: Option<Value>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct WsResponse {
22 pub id: Option<String>,
23 pub result: Option<Value>,
24 pub error: Option<WsError>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct WsError {
29 pub code: i32,
30 pub message: String,
31 pub data: Option<Value>,
32}
33
34#[derive(Debug)]
35pub struct WebSocketClient {
36 config: Config,
37 stream: Option<WsStream>,
38 subscriptions: HashMap<String, String>,
39}
40
41impl WebSocketClient {
42 pub fn new(config: Config) -> Self {
43 Self {
44 config,
45 stream: None,
46 subscriptions: HashMap::new(),
47 }
48 }
49
50 pub async fn connect(&mut self) -> Result<()> {
51 info!("Connecting to WebSocket: {}", self.config.ws_url);
52
53 let (ws_stream, _response) = connect_async(&self.config.ws_url.to_string())
54 .await
55 .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
56
57 info!("WebSocket connected successfully");
58 self.stream = Some(ws_stream);
59 Ok(())
60 }
61
62 pub async fn subscribe(&mut self, channel: &str, params: Option<Value>) -> Result<String> {
63 let request_id = uuid::Uuid::new_v4().to_string();
64 let request = WsRequest {
65 id: request_id.clone(),
66 method: "SUBSCRIBE".to_string(),
67 params,
68 };
69
70 self.send_request(&request).await?;
71 self.subscriptions
72 .insert(request_id.clone(), channel.to_string());
73
74 debug!("Subscribed to channel: {} with ID: {}", channel, request_id);
75 Ok(request_id)
76 }
77
78 pub async fn unsubscribe(&mut self, subscription_id: &str) -> Result<()> {
79 let request = WsRequest {
80 id: uuid::Uuid::new_v4().to_string(),
81 method: "UNSUBSCRIBE".to_string(),
82 params: Some(serde_json::json!({
83 "subscription_id": subscription_id
84 })),
85 };
86
87 self.send_request(&request).await?;
88 self.subscriptions.remove(subscription_id);
89
90 debug!("Unsubscribed from subscription ID: {}", subscription_id);
91 Ok(())
92 }
93
94 pub async fn send_request(&mut self, request: &WsRequest) -> Result<()> {
95 let stream = self.stream.as_mut().ok_or_else(|| {
96 LighterError::WebSocket(Box::new(tungstenite::Error::ConnectionClosed))
97 })?;
98
99 let message = serde_json::to_string(request).map_err(LighterError::Json)?;
100
101 stream
102 .send(Message::Text(message))
103 .await
104 .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
105
106 debug!("Sent WebSocket request: {}", request.id);
107 Ok(())
108 }
109
110 pub async fn next_message(&mut self) -> Result<Option<Value>> {
111 let stream = self.stream.as_mut().ok_or_else(|| {
112 LighterError::WebSocket(Box::new(tungstenite::Error::ConnectionClosed))
113 })?;
114
115 match stream.next().await {
116 Some(Ok(Message::Text(text))) => {
117 debug!("Received WebSocket message: {}", text);
118 let value: Value = serde_json::from_str(&text).map_err(LighterError::Json)?;
119 Ok(Some(value))
120 }
121 Some(Ok(Message::Close(_))) => {
122 info!("WebSocket connection closed by server");
123 self.stream = None;
124 Ok(None)
125 }
126 Some(Ok(Message::Ping(payload))) => {
127 debug!("Received ping, sending pong");
128 stream
129 .send(Message::Pong(payload))
130 .await
131 .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
132 Ok(None)
133 }
134 Some(Ok(Message::Pong(_))) => {
135 debug!("Received pong");
136 Ok(None)
137 }
138 Some(Ok(_)) => {
139 warn!("Received unsupported message type");
140 Ok(None)
141 }
142 Some(Err(e)) => {
143 error!("WebSocket error: {}", e);
144 Err(LighterError::WebSocket(Box::new(e)))
145 }
146 None => {
147 info!("WebSocket stream ended");
148 self.stream = None;
149 Ok(None)
150 }
151 }
152 }
153
154 pub async fn close(&mut self) -> Result<()> {
155 if let Some(stream) = &mut self.stream {
156 stream
157 .close(None)
158 .await
159 .map_err(|e| LighterError::WebSocket(Box::new(e)))?;
160 info!("WebSocket connection closed");
161 }
162 self.stream = None;
163 Ok(())
164 }
165
166 pub fn is_connected(&self) -> bool {
167 self.stream.is_some()
168 }
169
170 pub fn get_subscriptions(&self) -> &HashMap<String, String> {
171 &self.subscriptions
172 }
173}