gemachain_client/
pubsub_client.rs

1use {
2    crate::{
3        rpc_config::{
4            RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter,
5        },
6        rpc_response::{
7            Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate,
8        },
9    },
10    log::*,
11    serde::de::DeserializeOwned,
12    serde_json::{
13        json,
14        value::Value::{Number, Object},
15        Map, Value,
16    },
17    gemachain_sdk::signature::Signature,
18    std::{
19        marker::PhantomData,
20        net::TcpStream,
21        sync::{
22            atomic::{AtomicBool, Ordering},
23            mpsc::{channel, Receiver},
24            Arc, RwLock,
25        },
26        thread::{sleep, JoinHandle},
27        time::Duration,
28    },
29    thiserror::Error,
30    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
31    url::{ParseError, Url},
32};
33
34#[derive(Debug, Error)]
35pub enum PubsubClientError {
36    #[error("url parse error")]
37    UrlParseError(#[from] ParseError),
38
39    #[error("unable to connect to server")]
40    ConnectionError(#[from] tungstenite::Error),
41
42    #[error("json parse error")]
43    JsonParseError(#[from] serde_json::error::Error),
44
45    #[error("unexpected message format: {0}")]
46    UnexpectedMessageError(String),
47}
48
49pub struct PubsubClientSubscription<T>
50where
51    T: DeserializeOwned,
52{
53    message_type: PhantomData<T>,
54    operation: &'static str,
55    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
56    subscription_id: u64,
57    t_cleanup: Option<JoinHandle<()>>,
58    exit: Arc<AtomicBool>,
59}
60
61impl<T> Drop for PubsubClientSubscription<T>
62where
63    T: DeserializeOwned,
64{
65    fn drop(&mut self) {
66        self.send_unsubscribe()
67            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
68        self.socket
69            .write()
70            .unwrap()
71            .close(None)
72            .unwrap_or_else(|_| warn!("unable to close websocket"));
73    }
74}
75
76impl<T> PubsubClientSubscription<T>
77where
78    T: DeserializeOwned,
79{
80    fn send_subscribe(
81        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
82        body: String,
83    ) -> Result<u64, PubsubClientError> {
84        writable_socket
85            .write()
86            .unwrap()
87            .write_message(Message::Text(body))?;
88        let message = writable_socket.write().unwrap().read_message()?;
89        Self::extract_subscription_id(message)
90    }
91
92    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
93        let message_text = &message.into_text()?;
94        let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
95
96        if let Some(Number(x)) = json_msg.get("result") {
97            if let Some(x) = x.as_u64() {
98                return Ok(x);
99            }
100        }
101        // TODO: Add proper JSON RPC response/error handling...
102        Err(PubsubClientError::UnexpectedMessageError(format!(
103            "{:?}",
104            json_msg
105        )))
106    }
107
108    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
109        let method = format!("{}Unsubscribe", self.operation);
110        self.socket
111            .write()
112            .unwrap()
113            .write_message(Message::Text(
114                json!({
115                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
116                })
117                .to_string(),
118            ))
119            .map_err(|err| err.into())
120    }
121
122    fn read_message(
123        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
124    ) -> Result<T, PubsubClientError> {
125        let message = writable_socket.write().unwrap().read_message()?;
126        let message_text = &message.into_text().unwrap();
127        let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
128
129        if let Some(Object(params)) = json_msg.get("params") {
130            if let Some(result) = params.get("result") {
131                let x: T = serde_json::from_value::<T>(result.clone()).unwrap();
132                return Ok(x);
133            }
134        }
135
136        // TODO: Add proper JSON RPC response/error handling...
137        Err(PubsubClientError::UnexpectedMessageError(format!(
138            "{:?}",
139            json_msg
140        )))
141    }
142
143    pub fn shutdown(&mut self) -> std::thread::Result<()> {
144        if self.t_cleanup.is_some() {
145            info!("websocket thread - shutting down");
146            self.exit.store(true, Ordering::Relaxed);
147            let x = self.t_cleanup.take().unwrap().join();
148            info!("websocket thread - shut down.");
149            x
150        } else {
151            warn!("websocket thread - already shut down.");
152            Ok(())
153        }
154    }
155}
156
157pub type LogsSubscription = (
158    PubsubClientSubscription<RpcResponse<RpcLogsResponse>>,
159    Receiver<RpcResponse<RpcLogsResponse>>,
160);
161pub type SlotsSubscription = (PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>);
162pub type SignatureSubscription = (
163    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>,
164    Receiver<RpcResponse<RpcSignatureResult>>,
165);
166
167pub struct PubsubClient {}
168
169fn connect_with_retry(
170    url: Url,
171) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
172    let mut connection_retries = 5;
173    loop {
174        let result = connect(url.clone()).map(|(socket, _)| socket);
175        if let Err(tungstenite::Error::Http(response)) = &result {
176            if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
177            {
178                let mut duration = Duration::from_millis(500);
179                if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
180                    if let Ok(retry_after) = retry_after.to_str() {
181                        if let Ok(retry_after) = retry_after.parse::<u64>() {
182                            if retry_after < 120 {
183                                duration = Duration::from_secs(retry_after);
184                            }
185                        }
186                    }
187                }
188
189                connection_retries -= 1;
190                debug!(
191                    "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
192                    response, connection_retries, duration
193                );
194
195                sleep(duration);
196                continue;
197            }
198        }
199        return result;
200    }
201}
202
203impl PubsubClient {
204    pub fn logs_subscribe(
205        url: &str,
206        filter: RpcTransactionLogsFilter,
207        config: RpcTransactionLogsConfig,
208    ) -> Result<LogsSubscription, PubsubClientError> {
209        let url = Url::parse(url)?;
210        let socket = connect_with_retry(url)?;
211        let (sender, receiver) = channel();
212
213        let socket = Arc::new(RwLock::new(socket));
214        let socket_clone = socket.clone();
215        let exit = Arc::new(AtomicBool::new(false));
216        let exit_clone = exit.clone();
217
218        let subscription_id =
219            PubsubClientSubscription::<RpcResponse<RpcLogsResponse>>::send_subscribe(
220                &socket_clone,
221                json!({
222                    "jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config]
223                })
224                .to_string(),
225            )?;
226
227        let t_cleanup = std::thread::spawn(move || {
228            loop {
229                if exit_clone.load(Ordering::Relaxed) {
230                    break;
231                }
232
233                match PubsubClientSubscription::read_message(&socket_clone) {
234                    Ok(message) => match sender.send(message) {
235                        Ok(_) => (),
236                        Err(err) => {
237                            info!("receive error: {:?}", err);
238                            break;
239                        }
240                    },
241                    Err(err) => {
242                        info!("receive error: {:?}", err);
243                        break;
244                    }
245                }
246            }
247
248            info!("websocket - exited receive loop");
249        });
250
251        let result = PubsubClientSubscription {
252            message_type: PhantomData,
253            operation: "logs",
254            socket,
255            subscription_id,
256            t_cleanup: Some(t_cleanup),
257            exit,
258        };
259
260        Ok((result, receiver))
261    }
262
263    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
264        let url = Url::parse(url)?;
265        let socket = connect_with_retry(url)?;
266        let (sender, receiver) = channel::<SlotInfo>();
267
268        let socket = Arc::new(RwLock::new(socket));
269        let socket_clone = socket.clone();
270        let exit = Arc::new(AtomicBool::new(false));
271        let exit_clone = exit.clone();
272        let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
273            &socket_clone,
274            json!({
275                "jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[]
276            })
277            .to_string(),
278        )?;
279
280        let t_cleanup = std::thread::spawn(move || {
281            loop {
282                if exit_clone.load(Ordering::Relaxed) {
283                    break;
284                }
285                match PubsubClientSubscription::read_message(&socket_clone) {
286                    Ok(message) => match sender.send(message) {
287                        Ok(_) => (),
288                        Err(err) => {
289                            info!("receive error: {:?}", err);
290                            break;
291                        }
292                    },
293                    Err(err) => {
294                        info!("receive error: {:?}", err);
295                        break;
296                    }
297                }
298            }
299
300            info!("websocket - exited receive loop");
301        });
302
303        let result = PubsubClientSubscription {
304            message_type: PhantomData,
305            operation: "slot",
306            socket,
307            subscription_id,
308            t_cleanup: Some(t_cleanup),
309            exit,
310        };
311
312        Ok((result, receiver))
313    }
314
315    pub fn signature_subscribe(
316        url: &str,
317        signature: &Signature,
318        config: Option<RpcSignatureSubscribeConfig>,
319    ) -> Result<SignatureSubscription, PubsubClientError> {
320        let url = Url::parse(url)?;
321        let socket = connect_with_retry(url)?;
322        let (sender, receiver) = channel();
323
324        let socket = Arc::new(RwLock::new(socket));
325        let socket_clone = socket.clone();
326        let exit = Arc::new(AtomicBool::new(false));
327        let exit_clone = exit.clone();
328        let body = json!({
329            "jsonrpc":"2.0",
330            "id":1,
331            "method":"signatureSubscribe",
332            "params":[
333                signature.to_string(),
334                config
335            ]
336        })
337        .to_string();
338        let subscription_id =
339            PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
340                &socket_clone,
341                body,
342            )?;
343
344        let t_cleanup = std::thread::spawn(move || {
345            loop {
346                if exit_clone.load(Ordering::Relaxed) {
347                    break;
348                }
349
350                let message: Result<RpcResponse<RpcSignatureResult>, PubsubClientError> =
351                    PubsubClientSubscription::read_message(&socket_clone);
352
353                if let Ok(msg) = message {
354                    match sender.send(msg.clone()) {
355                        Ok(_) => (),
356                        Err(err) => {
357                            info!("receive error: {:?}", err);
358                            break;
359                        }
360                    }
361                } else {
362                    info!("receive error: {:?}", message);
363                    break;
364                }
365            }
366
367            info!("websocket - exited receive loop");
368        });
369
370        let result = PubsubClientSubscription {
371            message_type: PhantomData,
372            operation: "signature",
373            socket,
374            subscription_id,
375            t_cleanup: Some(t_cleanup),
376            exit,
377        };
378
379        Ok((result, receiver))
380    }
381
382    pub fn slot_updates_subscribe(
383        url: &str,
384        handler: impl Fn(SlotUpdate) + Send + 'static,
385    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
386        let url = Url::parse(url)?;
387        let socket = connect_with_retry(url)?;
388
389        let socket = Arc::new(RwLock::new(socket));
390        let exit = Arc::new(AtomicBool::new(false));
391        let exit_clone = exit.clone();
392        let subscription_id = PubsubClientSubscription::<SlotUpdate>::send_subscribe(
393            &socket,
394            json!({
395                "jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[]
396            })
397            .to_string(),
398        )?;
399
400        let t_cleanup = {
401            let socket = socket.clone();
402            std::thread::spawn(move || {
403                loop {
404                    if exit_clone.load(Ordering::Relaxed) {
405                        break;
406                    }
407                    match PubsubClientSubscription::read_message(&socket) {
408                        Ok(message) => handler(message),
409                        Err(err) => {
410                            info!("receive error: {:?}", err);
411                            break;
412                        }
413                    }
414                }
415
416                info!("websocket - exited receive loop");
417            })
418        };
419
420        Ok(PubsubClientSubscription {
421            message_type: PhantomData,
422            operation: "slotsUpdates",
423            socket,
424            subscription_id,
425            t_cleanup: Some(t_cleanup),
426            exit,
427        })
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    // see core/tests/client.rs#test_slot_subscription()
434}