1use {
2 crate::{
3 rpc_config::{
4 RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter,
5 },
6 rpc_response::{
7 Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate,
8 },
9 },
10 log::*,
11 serde::de::DeserializeOwned,
12 serde_json::{
13 json,
14 value::Value::{Number, Object},
15 Map, Value,
16 },
17 gemachain_sdk::signature::Signature,
18 std::{
19 marker::PhantomData,
20 net::TcpStream,
21 sync::{
22 atomic::{AtomicBool, Ordering},
23 mpsc::{channel, Receiver},
24 Arc, RwLock,
25 },
26 thread::{sleep, JoinHandle},
27 time::Duration,
28 },
29 thiserror::Error,
30 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
31 url::{ParseError, Url},
32};
33
34#[derive(Debug, Error)]
35pub enum PubsubClientError {
36 #[error("url parse error")]
37 UrlParseError(#[from] ParseError),
38
39 #[error("unable to connect to server")]
40 ConnectionError(#[from] tungstenite::Error),
41
42 #[error("json parse error")]
43 JsonParseError(#[from] serde_json::error::Error),
44
45 #[error("unexpected message format: {0}")]
46 UnexpectedMessageError(String),
47}
48
49pub struct PubsubClientSubscription<T>
50where
51 T: DeserializeOwned,
52{
53 message_type: PhantomData<T>,
54 operation: &'static str,
55 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
56 subscription_id: u64,
57 t_cleanup: Option<JoinHandle<()>>,
58 exit: Arc<AtomicBool>,
59}
60
61impl<T> Drop for PubsubClientSubscription<T>
62where
63 T: DeserializeOwned,
64{
65 fn drop(&mut self) {
66 self.send_unsubscribe()
67 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
68 self.socket
69 .write()
70 .unwrap()
71 .close(None)
72 .unwrap_or_else(|_| warn!("unable to close websocket"));
73 }
74}
75
76impl<T> PubsubClientSubscription<T>
77where
78 T: DeserializeOwned,
79{
80 fn send_subscribe(
81 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
82 body: String,
83 ) -> Result<u64, PubsubClientError> {
84 writable_socket
85 .write()
86 .unwrap()
87 .write_message(Message::Text(body))?;
88 let message = writable_socket.write().unwrap().read_message()?;
89 Self::extract_subscription_id(message)
90 }
91
92 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
93 let message_text = &message.into_text()?;
94 let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
95
96 if let Some(Number(x)) = json_msg.get("result") {
97 if let Some(x) = x.as_u64() {
98 return Ok(x);
99 }
100 }
101 Err(PubsubClientError::UnexpectedMessageError(format!(
103 "{:?}",
104 json_msg
105 )))
106 }
107
108 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
109 let method = format!("{}Unsubscribe", self.operation);
110 self.socket
111 .write()
112 .unwrap()
113 .write_message(Message::Text(
114 json!({
115 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
116 })
117 .to_string(),
118 ))
119 .map_err(|err| err.into())
120 }
121
122 fn read_message(
123 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
124 ) -> Result<T, PubsubClientError> {
125 let message = writable_socket.write().unwrap().read_message()?;
126 let message_text = &message.into_text().unwrap();
127 let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
128
129 if let Some(Object(params)) = json_msg.get("params") {
130 if let Some(result) = params.get("result") {
131 let x: T = serde_json::from_value::<T>(result.clone()).unwrap();
132 return Ok(x);
133 }
134 }
135
136 Err(PubsubClientError::UnexpectedMessageError(format!(
138 "{:?}",
139 json_msg
140 )))
141 }
142
143 pub fn shutdown(&mut self) -> std::thread::Result<()> {
144 if self.t_cleanup.is_some() {
145 info!("websocket thread - shutting down");
146 self.exit.store(true, Ordering::Relaxed);
147 let x = self.t_cleanup.take().unwrap().join();
148 info!("websocket thread - shut down.");
149 x
150 } else {
151 warn!("websocket thread - already shut down.");
152 Ok(())
153 }
154 }
155}
156
157pub type LogsSubscription = (
158 PubsubClientSubscription<RpcResponse<RpcLogsResponse>>,
159 Receiver<RpcResponse<RpcLogsResponse>>,
160);
161pub type SlotsSubscription = (PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>);
162pub type SignatureSubscription = (
163 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>,
164 Receiver<RpcResponse<RpcSignatureResult>>,
165);
166
167pub struct PubsubClient {}
168
169fn connect_with_retry(
170 url: Url,
171) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
172 let mut connection_retries = 5;
173 loop {
174 let result = connect(url.clone()).map(|(socket, _)| socket);
175 if let Err(tungstenite::Error::Http(response)) = &result {
176 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
177 {
178 let mut duration = Duration::from_millis(500);
179 if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
180 if let Ok(retry_after) = retry_after.to_str() {
181 if let Ok(retry_after) = retry_after.parse::<u64>() {
182 if retry_after < 120 {
183 duration = Duration::from_secs(retry_after);
184 }
185 }
186 }
187 }
188
189 connection_retries -= 1;
190 debug!(
191 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
192 response, connection_retries, duration
193 );
194
195 sleep(duration);
196 continue;
197 }
198 }
199 return result;
200 }
201}
202
203impl PubsubClient {
204 pub fn logs_subscribe(
205 url: &str,
206 filter: RpcTransactionLogsFilter,
207 config: RpcTransactionLogsConfig,
208 ) -> Result<LogsSubscription, PubsubClientError> {
209 let url = Url::parse(url)?;
210 let socket = connect_with_retry(url)?;
211 let (sender, receiver) = channel();
212
213 let socket = Arc::new(RwLock::new(socket));
214 let socket_clone = socket.clone();
215 let exit = Arc::new(AtomicBool::new(false));
216 let exit_clone = exit.clone();
217
218 let subscription_id =
219 PubsubClientSubscription::<RpcResponse<RpcLogsResponse>>::send_subscribe(
220 &socket_clone,
221 json!({
222 "jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config]
223 })
224 .to_string(),
225 )?;
226
227 let t_cleanup = std::thread::spawn(move || {
228 loop {
229 if exit_clone.load(Ordering::Relaxed) {
230 break;
231 }
232
233 match PubsubClientSubscription::read_message(&socket_clone) {
234 Ok(message) => match sender.send(message) {
235 Ok(_) => (),
236 Err(err) => {
237 info!("receive error: {:?}", err);
238 break;
239 }
240 },
241 Err(err) => {
242 info!("receive error: {:?}", err);
243 break;
244 }
245 }
246 }
247
248 info!("websocket - exited receive loop");
249 });
250
251 let result = PubsubClientSubscription {
252 message_type: PhantomData,
253 operation: "logs",
254 socket,
255 subscription_id,
256 t_cleanup: Some(t_cleanup),
257 exit,
258 };
259
260 Ok((result, receiver))
261 }
262
263 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
264 let url = Url::parse(url)?;
265 let socket = connect_with_retry(url)?;
266 let (sender, receiver) = channel::<SlotInfo>();
267
268 let socket = Arc::new(RwLock::new(socket));
269 let socket_clone = socket.clone();
270 let exit = Arc::new(AtomicBool::new(false));
271 let exit_clone = exit.clone();
272 let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
273 &socket_clone,
274 json!({
275 "jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[]
276 })
277 .to_string(),
278 )?;
279
280 let t_cleanup = std::thread::spawn(move || {
281 loop {
282 if exit_clone.load(Ordering::Relaxed) {
283 break;
284 }
285 match PubsubClientSubscription::read_message(&socket_clone) {
286 Ok(message) => match sender.send(message) {
287 Ok(_) => (),
288 Err(err) => {
289 info!("receive error: {:?}", err);
290 break;
291 }
292 },
293 Err(err) => {
294 info!("receive error: {:?}", err);
295 break;
296 }
297 }
298 }
299
300 info!("websocket - exited receive loop");
301 });
302
303 let result = PubsubClientSubscription {
304 message_type: PhantomData,
305 operation: "slot",
306 socket,
307 subscription_id,
308 t_cleanup: Some(t_cleanup),
309 exit,
310 };
311
312 Ok((result, receiver))
313 }
314
315 pub fn signature_subscribe(
316 url: &str,
317 signature: &Signature,
318 config: Option<RpcSignatureSubscribeConfig>,
319 ) -> Result<SignatureSubscription, PubsubClientError> {
320 let url = Url::parse(url)?;
321 let socket = connect_with_retry(url)?;
322 let (sender, receiver) = channel();
323
324 let socket = Arc::new(RwLock::new(socket));
325 let socket_clone = socket.clone();
326 let exit = Arc::new(AtomicBool::new(false));
327 let exit_clone = exit.clone();
328 let body = json!({
329 "jsonrpc":"2.0",
330 "id":1,
331 "method":"signatureSubscribe",
332 "params":[
333 signature.to_string(),
334 config
335 ]
336 })
337 .to_string();
338 let subscription_id =
339 PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
340 &socket_clone,
341 body,
342 )?;
343
344 let t_cleanup = std::thread::spawn(move || {
345 loop {
346 if exit_clone.load(Ordering::Relaxed) {
347 break;
348 }
349
350 let message: Result<RpcResponse<RpcSignatureResult>, PubsubClientError> =
351 PubsubClientSubscription::read_message(&socket_clone);
352
353 if let Ok(msg) = message {
354 match sender.send(msg.clone()) {
355 Ok(_) => (),
356 Err(err) => {
357 info!("receive error: {:?}", err);
358 break;
359 }
360 }
361 } else {
362 info!("receive error: {:?}", message);
363 break;
364 }
365 }
366
367 info!("websocket - exited receive loop");
368 });
369
370 let result = PubsubClientSubscription {
371 message_type: PhantomData,
372 operation: "signature",
373 socket,
374 subscription_id,
375 t_cleanup: Some(t_cleanup),
376 exit,
377 };
378
379 Ok((result, receiver))
380 }
381
382 pub fn slot_updates_subscribe(
383 url: &str,
384 handler: impl Fn(SlotUpdate) + Send + 'static,
385 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
386 let url = Url::parse(url)?;
387 let socket = connect_with_retry(url)?;
388
389 let socket = Arc::new(RwLock::new(socket));
390 let exit = Arc::new(AtomicBool::new(false));
391 let exit_clone = exit.clone();
392 let subscription_id = PubsubClientSubscription::<SlotUpdate>::send_subscribe(
393 &socket,
394 json!({
395 "jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[]
396 })
397 .to_string(),
398 )?;
399
400 let t_cleanup = {
401 let socket = socket.clone();
402 std::thread::spawn(move || {
403 loop {
404 if exit_clone.load(Ordering::Relaxed) {
405 break;
406 }
407 match PubsubClientSubscription::read_message(&socket) {
408 Ok(message) => handler(message),
409 Err(err) => {
410 info!("receive error: {:?}", err);
411 break;
412 }
413 }
414 }
415
416 info!("websocket - exited receive loop");
417 })
418 };
419
420 Ok(PubsubClientSubscription {
421 message_type: PhantomData,
422 operation: "slotsUpdates",
423 socket,
424 subscription_id,
425 t_cleanup: Some(t_cleanup),
426 exit,
427 })
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 }