yellowstone_grpc_client/
lib.rs

1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use {
3    bytes::Bytes,
4    futures::{
5        channel::mpsc,
6        sink::{Sink, SinkExt},
7        stream::Stream,
8    },
9    std::time::Duration,
10    tonic::{
11        codec::{CompressionEncoding, Streaming},
12        metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
13        service::interceptor::InterceptedService,
14        transport::channel::{Channel, Endpoint},
15        Request, Response, Status,
16    },
17    tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
18    yellowstone_grpc_proto::prelude::{
19        geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
20        GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
21        GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
22        IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
23        SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
24    },
25};
26
27#[derive(Debug, Clone)]
28pub struct InterceptorXToken {
29    pub x_token: Option<AsciiMetadataValue>,
30    pub x_request_snapshot: bool,
31}
32
33impl Interceptor for InterceptorXToken {
34    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
35        if let Some(x_token) = self.x_token.clone() {
36            request.metadata_mut().insert("x-token", x_token);
37        }
38        if self.x_request_snapshot {
39            request
40                .metadata_mut()
41                .insert("x-request-snapshot", MetadataValue::from_static("true"));
42        }
43        Ok(request)
44    }
45}
46
47#[derive(Debug, thiserror::Error)]
48pub enum GeyserGrpcClientError {
49    #[error("gRPC status: {0}")]
50    TonicStatus(#[from] Status),
51    #[error("Failed to send subscribe request: {0}")]
52    SubscribeSendError(#[from] mpsc::SendError),
53}
54
55pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
56
57pub struct GeyserGrpcClient<F> {
58    pub health: HealthClient<InterceptedService<Channel, F>>,
59    pub geyser: GeyserClient<InterceptedService<Channel, F>>,
60}
61
62impl GeyserGrpcClient<()> {
63    pub fn build_from_shared(
64        endpoint: impl Into<Bytes>,
65    ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
66        Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
67    }
68
69    pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
70        GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
71    }
72}
73
74impl<F: Interceptor> GeyserGrpcClient<F> {
75    pub const fn new(
76        health: HealthClient<InterceptedService<Channel, F>>,
77        geyser: GeyserClient<InterceptedService<Channel, F>>,
78    ) -> Self {
79        Self { health, geyser }
80    }
81
82    // Health
83    pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
84        let request = HealthCheckRequest {
85            service: "geyser.Geyser".to_owned(),
86        };
87        let response = self.health.check(request).await?;
88        Ok(response.into_inner())
89    }
90
91    pub async fn health_watch(
92        &mut self,
93    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
94        let request = HealthCheckRequest {
95            service: "geyser.Geyser".to_owned(),
96        };
97        let response = self.health.watch(request).await?;
98        Ok(response.into_inner())
99    }
100
101    // Subscribe
102    pub async fn subscribe(
103        &mut self,
104    ) -> GeyserGrpcClientResult<(
105        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
106        impl Stream<Item = Result<SubscribeUpdate, Status>>,
107    )> {
108        self.subscribe_with_request(None).await
109    }
110
111    pub async fn subscribe_with_request(
112        &mut self,
113        request: Option<SubscribeRequest>,
114    ) -> GeyserGrpcClientResult<(
115        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
116        impl Stream<Item = Result<SubscribeUpdate, Status>>,
117    )> {
118        let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
119        if let Some(request) = request {
120            subscribe_tx
121                .send(request)
122                .await
123                .map_err(GeyserGrpcClientError::SubscribeSendError)?;
124        }
125        let response: Response<Streaming<SubscribeUpdate>> =
126            self.geyser.subscribe(subscribe_rx).await?;
127        Ok((subscribe_tx, response.into_inner()))
128    }
129
130    pub async fn subscribe_once(
131        &mut self,
132        request: SubscribeRequest,
133    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
134        self.subscribe_with_request(Some(request))
135            .await
136            .map(|(_sink, stream)| stream)
137    }
138
139    // RPC calls
140    pub async fn subscribe_replay_info(
141        &mut self,
142    ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
143        let message = SubscribeReplayInfoRequest {};
144        let request = tonic::Request::new(message);
145        let response = self.geyser.subscribe_replay_info(request).await?;
146        Ok(response.into_inner())
147    }
148
149    pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
150        let message = PingRequest { count };
151        let request = tonic::Request::new(message);
152        let response = self.geyser.ping(request).await?;
153        Ok(response.into_inner())
154    }
155
156    pub async fn get_latest_blockhash(
157        &mut self,
158        commitment: Option<CommitmentLevel>,
159    ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
160        let request = tonic::Request::new(GetLatestBlockhashRequest {
161            commitment: commitment.map(|value| value as i32),
162        });
163        let response = self.geyser.get_latest_blockhash(request).await?;
164        Ok(response.into_inner())
165    }
166
167    pub async fn get_block_height(
168        &mut self,
169        commitment: Option<CommitmentLevel>,
170    ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
171        let request = tonic::Request::new(GetBlockHeightRequest {
172            commitment: commitment.map(|value| value as i32),
173        });
174        let response = self.geyser.get_block_height(request).await?;
175        Ok(response.into_inner())
176    }
177
178    pub async fn get_slot(
179        &mut self,
180        commitment: Option<CommitmentLevel>,
181    ) -> GeyserGrpcClientResult<GetSlotResponse> {
182        let request = tonic::Request::new(GetSlotRequest {
183            commitment: commitment.map(|value| value as i32),
184        });
185        let response = self.geyser.get_slot(request).await?;
186        Ok(response.into_inner())
187    }
188
189    pub async fn is_blockhash_valid(
190        &mut self,
191        blockhash: String,
192        commitment: Option<CommitmentLevel>,
193    ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
194        let request = tonic::Request::new(IsBlockhashValidRequest {
195            blockhash,
196            commitment: commitment.map(|value| value as i32),
197        });
198        let response = self.geyser.is_blockhash_valid(request).await?;
199        Ok(response.into_inner())
200    }
201
202    pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
203        let request = tonic::Request::new(GetVersionRequest {});
204        let response = self.geyser.get_version(request).await?;
205        Ok(response.into_inner())
206    }
207}
208
209#[derive(Debug, thiserror::Error)]
210pub enum GeyserGrpcBuilderError {
211    #[error("Failed to parse x-token: {0}")]
212    MetadataValueError(#[from] InvalidMetadataValue),
213    #[error("gRPC transport error: {0}")]
214    TonicError(#[from] tonic::transport::Error),
215}
216
217pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
218
219#[derive(Debug)]
220pub struct GeyserGrpcBuilder {
221    pub endpoint: Endpoint,
222    pub x_token: Option<AsciiMetadataValue>,
223    pub x_request_snapshot: bool,
224    pub send_compressed: Option<CompressionEncoding>,
225    pub accept_compressed: Option<CompressionEncoding>,
226    pub max_decoding_message_size: Option<usize>,
227    pub max_encoding_message_size: Option<usize>,
228}
229
230impl GeyserGrpcBuilder {
231    // Create new builder
232    const fn new(endpoint: Endpoint) -> Self {
233        Self {
234            endpoint,
235            x_token: None,
236            x_request_snapshot: false,
237            send_compressed: None,
238            accept_compressed: None,
239            max_decoding_message_size: None,
240            max_encoding_message_size: None,
241        }
242    }
243
244    pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
245        Ok(Self::new(Endpoint::from_shared(endpoint)?))
246    }
247
248    pub fn from_static(endpoint: &'static str) -> Self {
249        Self::new(Endpoint::from_static(endpoint))
250    }
251
252    // Create client
253    fn build(
254        self,
255        channel: Channel,
256    ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
257        let interceptor = InterceptorXToken {
258            x_token: self.x_token,
259            x_request_snapshot: self.x_request_snapshot,
260        };
261
262        let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
263        if let Some(encoding) = self.send_compressed {
264            geyser = geyser.send_compressed(encoding);
265        }
266        if let Some(encoding) = self.accept_compressed {
267            geyser = geyser.accept_compressed(encoding);
268        }
269        if let Some(limit) = self.max_decoding_message_size {
270            geyser = geyser.max_decoding_message_size(limit);
271        }
272        if let Some(limit) = self.max_encoding_message_size {
273            geyser = geyser.max_encoding_message_size(limit);
274        }
275
276        Ok(GeyserGrpcClient::new(
277            HealthClient::with_interceptor(channel, interceptor),
278            geyser,
279        ))
280    }
281
282    pub async fn connect(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
283        let channel = self.endpoint.connect().await?;
284        self.build(channel)
285    }
286
287    pub fn connect_lazy(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
288        let channel = self.endpoint.connect_lazy();
289        self.build(channel)
290    }
291
292    // Set x-token
293    pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
294    where
295        T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
296    {
297        Ok(Self {
298            x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
299            ..self
300        })
301    }
302
303    // Include `x-request-snapshot`
304    pub fn set_x_request_snapshot(self, value: bool) -> Self {
305        Self {
306            x_request_snapshot: value,
307            ..self
308        }
309    }
310
311    // Endpoint options
312    pub fn connect_timeout(self, dur: Duration) -> Self {
313        Self {
314            endpoint: self.endpoint.connect_timeout(dur),
315            ..self
316        }
317    }
318
319    pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
320        Self {
321            endpoint: self.endpoint.buffer_size(sz),
322            ..self
323        }
324    }
325
326    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
327        Self {
328            endpoint: self.endpoint.http2_adaptive_window(enabled),
329            ..self
330        }
331    }
332
333    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
334        Self {
335            endpoint: self.endpoint.http2_keep_alive_interval(interval),
336            ..self
337        }
338    }
339
340    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
341        Self {
342            endpoint: self.endpoint.initial_connection_window_size(sz),
343            ..self
344        }
345    }
346
347    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
348        Self {
349            endpoint: self.endpoint.initial_stream_window_size(sz),
350            ..self
351        }
352    }
353
354    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
355        Self {
356            endpoint: self.endpoint.keep_alive_timeout(duration),
357            ..self
358        }
359    }
360
361    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
362        Self {
363            endpoint: self.endpoint.keep_alive_while_idle(enabled),
364            ..self
365        }
366    }
367
368    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
369        Self {
370            endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
371            ..self
372        }
373    }
374
375    pub fn tcp_nodelay(self, enabled: bool) -> Self {
376        Self {
377            endpoint: self.endpoint.tcp_nodelay(enabled),
378            ..self
379        }
380    }
381
382    pub fn timeout(self, dur: Duration) -> Self {
383        Self {
384            endpoint: self.endpoint.timeout(dur),
385            ..self
386        }
387    }
388
389    pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
390        Ok(Self {
391            endpoint: self.endpoint.tls_config(tls_config)?,
392            ..self
393        })
394    }
395
396    // Geyser options
397    pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
398        Self {
399            send_compressed: Some(encoding),
400            ..self
401        }
402    }
403
404    pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
405        Self {
406            accept_compressed: Some(encoding),
407            ..self
408        }
409    }
410
411    pub fn max_decoding_message_size(self, limit: usize) -> Self {
412        Self {
413            max_decoding_message_size: Some(limit),
414            ..self
415        }
416    }
417
418    pub fn max_encoding_message_size(self, limit: usize) -> Self {
419        Self {
420            max_encoding_message_size: Some(limit),
421            ..self
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::GeyserGrpcClient;
429
430    #[tokio::test]
431    async fn test_channel_https_success() {
432        let endpoint = "https://ams17.rpcpool.com:443";
433        let x_token = "1000000000000000000000000007";
434
435        let res = GeyserGrpcClient::build_from_shared(endpoint);
436        assert!(res.is_ok());
437
438        let res = res.unwrap().x_token(Some(x_token));
439        assert!(res.is_ok());
440
441        let res = res.unwrap().connect_lazy();
442        assert!(res.is_ok());
443    }
444
445    #[tokio::test]
446    async fn test_channel_http_success() {
447        let endpoint = "http://127.0.0.1:10000";
448        let x_token = "1234567891012141618202224268";
449
450        let res = GeyserGrpcClient::build_from_shared(endpoint);
451        assert!(res.is_ok());
452
453        let res = res.unwrap().x_token(Some(x_token));
454        assert!(res.is_ok());
455
456        let res = res.unwrap().connect_lazy();
457        assert!(res.is_ok());
458    }
459
460    #[tokio::test]
461    async fn test_channel_empty_token_some() {
462        let endpoint = "http://127.0.0.1:10000";
463        let x_token = "";
464
465        let res = GeyserGrpcClient::build_from_shared(endpoint);
466        assert!(res.is_ok());
467
468        let res = res.unwrap().x_token(Some(x_token));
469        assert!(res.is_ok());
470    }
471
472    #[tokio::test]
473    async fn test_channel_invalid_token_none() {
474        let endpoint = "http://127.0.0.1:10000";
475
476        let res = GeyserGrpcClient::build_from_shared(endpoint);
477        assert!(res.is_ok());
478
479        let res = res.unwrap().x_token::<String>(None);
480        assert!(res.is_ok());
481
482        let res = res.unwrap().connect_lazy();
483        assert!(res.is_ok());
484    }
485
486    #[tokio::test]
487    async fn test_channel_invalid_uri() {
488        let endpoint = "sites/files/images/picture.png";
489
490        let res = GeyserGrpcClient::build_from_shared(endpoint);
491        assert_eq!(
492            format!("{:?}", res),
493            "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
494                .to_owned()
495        );
496    }
497}