laserstream_core_client/
lib.rs

1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use laserstream_core_proto::geyser::{SubscribePreprocessedRequest, SubscribePreprocessedUpdate};
3use {
4    bytes::Bytes,
5    futures::{
6        channel::mpsc,
7        sink::{Sink, SinkExt},
8        stream::Stream,
9    },
10    std::time::Duration,
11    tonic::{
12        codec::{CompressionEncoding, Streaming},
13        metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
14        service::interceptor::InterceptedService,
15        transport::channel::{Channel, Endpoint},
16        Request, Response, Status,
17    },
18    tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
19    laserstream_core_proto::prelude::{
20        geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
21        GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
22        GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
23        IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
24        SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
25    },
26};
27
28#[derive(Debug, Clone)]
29pub struct InterceptorXToken {
30    pub x_token: Option<AsciiMetadataValue>,
31    pub x_request_snapshot: bool,
32}
33
34impl Interceptor for InterceptorXToken {
35    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
36        if let Some(x_token) = self.x_token.clone() {
37            request.metadata_mut().insert("x-token", x_token);
38        }
39        if self.x_request_snapshot {
40            request
41                .metadata_mut()
42                .insert("x-request-snapshot", MetadataValue::from_static("true"));
43        }
44        Ok(request)
45    }
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum GeyserGrpcClientError {
50    #[error("gRPC status: {0}")]
51    TonicStatus(#[from] Status),
52    #[error("Failed to send subscribe request: {0}")]
53    SubscribeSendError(#[from] mpsc::SendError),
54}
55
56pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
57
58pub struct GeyserGrpcClient<F> {
59    pub health: HealthClient<InterceptedService<Channel, F>>,
60    pub geyser: GeyserClient<InterceptedService<Channel, F>>,
61}
62
63impl GeyserGrpcClient<()> {
64    pub fn build_from_shared(
65        endpoint: impl Into<Bytes>,
66    ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
67        Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
68    }
69
70    pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
71        GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
72    }
73}
74
75impl<F: Interceptor> GeyserGrpcClient<F> {
76    pub const fn new(
77        health: HealthClient<InterceptedService<Channel, F>>,
78        geyser: GeyserClient<InterceptedService<Channel, F>>,
79    ) -> Self {
80        Self { health, geyser }
81    }
82
83    // Health
84    pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
85        let request = HealthCheckRequest {
86            service: "geyser.Geyser".to_owned(),
87        };
88        let response = self.health.check(request).await?;
89        Ok(response.into_inner())
90    }
91
92    pub async fn health_watch(
93        &mut self,
94    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
95        let request = HealthCheckRequest {
96            service: "geyser.Geyser".to_owned(),
97        };
98        let response = self.health.watch(request).await?;
99        Ok(response.into_inner())
100    }
101
102    // Subscribe
103    pub async fn subscribe(
104        &mut self,
105    ) -> GeyserGrpcClientResult<(
106        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
107        impl Stream<Item = Result<SubscribeUpdate, Status>>,
108    )> {
109        self.subscribe_with_request(None).await
110    }
111
112    pub async fn subscribe_with_request(
113        &mut self,
114        request: Option<SubscribeRequest>,
115    ) -> GeyserGrpcClientResult<(
116        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
117        impl Stream<Item = Result<SubscribeUpdate, Status>>,
118    )> {
119        let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
120        if let Some(request) = request {
121            subscribe_tx
122                .send(request)
123                .await
124                .map_err(GeyserGrpcClientError::SubscribeSendError)?;
125        }
126        let response: Response<Streaming<SubscribeUpdate>> =
127            self.geyser.subscribe(subscribe_rx).await?;
128        Ok((subscribe_tx, response.into_inner()))
129    }
130
131    pub async fn subscribe_once(
132        &mut self,
133        request: SubscribeRequest,
134    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
135        self.subscribe_with_request(Some(request))
136            .await
137            .map(|(_sink, stream)| stream)
138    }
139
140    pub async fn subscribe_preprocessed(
141        &mut self,
142    ) -> GeyserGrpcClientResult<(
143        impl Sink<SubscribePreprocessedRequest, Error = mpsc::SendError>,
144        impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>,
145    )> {
146        self.subscribe_preprocessed_with_request(None).await
147    }
148
149    pub async fn subscribe_preprocessed_with_request(
150        &mut self,
151        request: Option<SubscribePreprocessedRequest>,
152    ) -> GeyserGrpcClientResult<(
153        impl Sink<SubscribePreprocessedRequest, Error = mpsc::SendError>,
154        impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>,
155    )> {
156        let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
157        if let Some(request) = request {
158            subscribe_tx
159                .send(request)
160                .await
161                .map_err(GeyserGrpcClientError::SubscribeSendError)?;
162        }
163        let response: Response<Streaming<SubscribePreprocessedUpdate>> =
164            self.geyser.subscribe_preprocessed(subscribe_rx).await?;
165        Ok((subscribe_tx, response.into_inner()))
166    }
167
168    pub async fn subscribe_preprocessed_once(
169        &mut self,
170        request: SubscribePreprocessedRequest,
171    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>> {
172        self.subscribe_preprocessed_with_request(Some(request))
173            .await
174            .map(|(_sink, stream)| stream)
175    }
176    // RPC calls
177    pub async fn subscribe_replay_info(
178        &mut self,
179    ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
180        let message = SubscribeReplayInfoRequest {};
181        let request = tonic::Request::new(message);
182        let response = self.geyser.subscribe_replay_info(request).await?;
183        Ok(response.into_inner())
184    }
185
186    pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
187        let message = PingRequest { count };
188        let request = tonic::Request::new(message);
189        let response = self.geyser.ping(request).await?;
190        Ok(response.into_inner())
191    }
192
193    pub async fn get_latest_blockhash(
194        &mut self,
195        commitment: Option<CommitmentLevel>,
196    ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
197        let request = tonic::Request::new(GetLatestBlockhashRequest {
198            commitment: commitment.map(|value| value as i32),
199        });
200        let response = self.geyser.get_latest_blockhash(request).await?;
201        Ok(response.into_inner())
202    }
203
204    pub async fn get_block_height(
205        &mut self,
206        commitment: Option<CommitmentLevel>,
207    ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
208        let request = tonic::Request::new(GetBlockHeightRequest {
209            commitment: commitment.map(|value| value as i32),
210        });
211        let response = self.geyser.get_block_height(request).await?;
212        Ok(response.into_inner())
213    }
214
215    pub async fn get_slot(
216        &mut self,
217        commitment: Option<CommitmentLevel>,
218    ) -> GeyserGrpcClientResult<GetSlotResponse> {
219        let request = tonic::Request::new(GetSlotRequest {
220            commitment: commitment.map(|value| value as i32),
221        });
222        let response = self.geyser.get_slot(request).await?;
223        Ok(response.into_inner())
224    }
225
226    pub async fn is_blockhash_valid(
227        &mut self,
228        blockhash: String,
229        commitment: Option<CommitmentLevel>,
230    ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
231        let request = tonic::Request::new(IsBlockhashValidRequest {
232            blockhash,
233            commitment: commitment.map(|value| value as i32),
234        });
235        let response = self.geyser.is_blockhash_valid(request).await?;
236        Ok(response.into_inner())
237    }
238
239    pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
240        let request = tonic::Request::new(GetVersionRequest {});
241        let response = self.geyser.get_version(request).await?;
242        Ok(response.into_inner())
243    }
244}
245
246#[derive(Debug, thiserror::Error)]
247pub enum GeyserGrpcBuilderError {
248    #[error("Failed to parse x-token: {0}")]
249    MetadataValueError(#[from] InvalidMetadataValue),
250    #[error("gRPC transport error: {0}")]
251    TonicError(#[from] tonic::transport::Error),
252}
253
254pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
255
256#[derive(Debug)]
257pub struct GeyserGrpcBuilder {
258    pub endpoint: Endpoint,
259    pub x_token: Option<AsciiMetadataValue>,
260    pub x_request_snapshot: bool,
261    pub send_compressed: Option<CompressionEncoding>,
262    pub accept_compressed: Option<CompressionEncoding>,
263    pub max_decoding_message_size: Option<usize>,
264    pub max_encoding_message_size: Option<usize>,
265}
266
267impl GeyserGrpcBuilder {
268    // Create new builder
269    const fn new(endpoint: Endpoint) -> Self {
270        Self {
271            endpoint,
272            x_token: None,
273            x_request_snapshot: false,
274            send_compressed: None,
275            accept_compressed: None,
276            max_decoding_message_size: None,
277            max_encoding_message_size: None,
278        }
279    }
280
281    pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
282        Ok(Self::new(Endpoint::from_shared(endpoint)?))
283    }
284
285    pub fn from_static(endpoint: &'static str) -> Self {
286        Self::new(Endpoint::from_static(endpoint))
287    }
288
289    // Create client
290    fn build(
291        self,
292        channel: Channel,
293    ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
294        let interceptor = InterceptorXToken {
295            x_token: self.x_token,
296            x_request_snapshot: self.x_request_snapshot,
297        };
298
299        let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
300        if let Some(encoding) = self.send_compressed {
301            geyser = geyser.send_compressed(encoding);
302        }
303        if let Some(encoding) = self.accept_compressed {
304            geyser = geyser.accept_compressed(encoding);
305        }
306        if let Some(limit) = self.max_decoding_message_size {
307            geyser = geyser.max_decoding_message_size(limit);
308        }
309        if let Some(limit) = self.max_encoding_message_size {
310            geyser = geyser.max_encoding_message_size(limit);
311        }
312
313        Ok(GeyserGrpcClient::new(
314            HealthClient::with_interceptor(channel, interceptor),
315            geyser,
316        ))
317    }
318
319    pub async fn connect(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
320        let channel = self.endpoint.connect().await?;
321        self.build(channel)
322    }
323
324    pub fn connect_lazy(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
325        let channel = self.endpoint.connect_lazy();
326        self.build(channel)
327    }
328
329    // Set x-token
330    pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
331    where
332        T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
333    {
334        Ok(Self {
335            x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
336            ..self
337        })
338    }
339
340    // Include `x-request-snapshot`
341    pub fn set_x_request_snapshot(self, value: bool) -> Self {
342        Self {
343            x_request_snapshot: value,
344            ..self
345        }
346    }
347
348    // Endpoint options
349    pub fn connect_timeout(self, dur: Duration) -> Self {
350        Self {
351            endpoint: self.endpoint.connect_timeout(dur),
352            ..self
353        }
354    }
355
356    pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
357        Self {
358            endpoint: self.endpoint.buffer_size(sz),
359            ..self
360        }
361    }
362
363    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
364        Self {
365            endpoint: self.endpoint.http2_adaptive_window(enabled),
366            ..self
367        }
368    }
369
370    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
371        Self {
372            endpoint: self.endpoint.http2_keep_alive_interval(interval),
373            ..self
374        }
375    }
376
377    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
378        Self {
379            endpoint: self.endpoint.initial_connection_window_size(sz),
380            ..self
381        }
382    }
383
384    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
385        Self {
386            endpoint: self.endpoint.initial_stream_window_size(sz),
387            ..self
388        }
389    }
390
391    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
392        Self {
393            endpoint: self.endpoint.keep_alive_timeout(duration),
394            ..self
395        }
396    }
397
398    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
399        Self {
400            endpoint: self.endpoint.keep_alive_while_idle(enabled),
401            ..self
402        }
403    }
404
405    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
406        Self {
407            endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
408            ..self
409        }
410    }
411
412    pub fn tcp_nodelay(self, enabled: bool) -> Self {
413        Self {
414            endpoint: self.endpoint.tcp_nodelay(enabled),
415            ..self
416        }
417    }
418
419    pub fn timeout(self, dur: Duration) -> Self {
420        Self {
421            endpoint: self.endpoint.timeout(dur),
422            ..self
423        }
424    }
425
426    pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
427        Ok(Self {
428            endpoint: self.endpoint.tls_config(tls_config)?,
429            ..self
430        })
431    }
432
433    // Geyser options
434    pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
435        Self {
436            send_compressed: Some(encoding),
437            ..self
438        }
439    }
440
441    pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
442        Self {
443            accept_compressed: Some(encoding),
444            ..self
445        }
446    }
447
448    pub fn max_decoding_message_size(self, limit: usize) -> Self {
449        Self {
450            max_decoding_message_size: Some(limit),
451            ..self
452        }
453    }
454
455    pub fn max_encoding_message_size(self, limit: usize) -> Self {
456        Self {
457            max_encoding_message_size: Some(limit),
458            ..self
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::GeyserGrpcClient;
466
467    #[tokio::test]
468    async fn test_channel_https_success() {
469        let endpoint = "https://ams17.rpcpool.com:443";
470        let x_token = "1000000000000000000000000007";
471
472        let res = GeyserGrpcClient::build_from_shared(endpoint);
473        assert!(res.is_ok());
474
475        let res = res.unwrap().x_token(Some(x_token));
476        assert!(res.is_ok());
477
478        let res = res.unwrap().connect_lazy();
479        assert!(res.is_ok());
480    }
481
482    #[tokio::test]
483    async fn test_channel_http_success() {
484        let endpoint = "http://127.0.0.1:10000";
485        let x_token = "1234567891012141618202224268";
486
487        let res = GeyserGrpcClient::build_from_shared(endpoint);
488        assert!(res.is_ok());
489
490        let res = res.unwrap().x_token(Some(x_token));
491        assert!(res.is_ok());
492
493        let res = res.unwrap().connect_lazy();
494        assert!(res.is_ok());
495    }
496
497    #[tokio::test]
498    async fn test_channel_empty_token_some() {
499        let endpoint = "http://127.0.0.1:10000";
500        let x_token = "";
501
502        let res = GeyserGrpcClient::build_from_shared(endpoint);
503        assert!(res.is_ok());
504
505        let res = res.unwrap().x_token(Some(x_token));
506        assert!(res.is_ok());
507    }
508
509    #[tokio::test]
510    async fn test_channel_invalid_token_none() {
511        let endpoint = "http://127.0.0.1:10000";
512
513        let res = GeyserGrpcClient::build_from_shared(endpoint);
514        assert!(res.is_ok());
515
516        let res = res.unwrap().x_token::<String>(None);
517        assert!(res.is_ok());
518
519        let res = res.unwrap().connect_lazy();
520        assert!(res.is_ok());
521    }
522
523    #[tokio::test]
524    async fn test_channel_invalid_uri() {
525        let endpoint = "sites/files/images/picture.png";
526
527        let res = GeyserGrpcClient::build_from_shared(endpoint);
528        assert_eq!(
529            format!("{:?}", res),
530            "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
531                .to_owned()
532        );
533    }
534}