yellowstone_grpc_client/
lib.rs

1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use {
3    bytes::Bytes,
4    futures::{
5        channel::mpsc,
6        sink::{Sink, SinkExt},
7        stream::Stream,
8    },
9    std::time::Duration,
10    tonic::{
11        codec::{CompressionEncoding, Streaming},
12        metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
13        service::interceptor::InterceptedService,
14        transport::channel::{Channel, Endpoint},
15        Request, Response, Status,
16    },
17    tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
18    yellowstone_grpc_proto::prelude::{
19        geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
20        GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
21        GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
22        IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
23        SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
24    },
25};
26
27#[derive(Debug, Clone)]
28pub struct InterceptorXToken {
29    pub x_token: Option<AsciiMetadataValue>,
30    pub x_request_snapshot: bool,
31}
32
33impl Interceptor for InterceptorXToken {
34    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
35        if let Some(x_token) = self.x_token.clone() {
36            request.metadata_mut().insert("x-token", x_token);
37        }
38        if self.x_request_snapshot {
39            request
40                .metadata_mut()
41                .insert("x-request-snapshot", MetadataValue::from_static("true"));
42        }
43        Ok(request)
44    }
45}
46
47#[derive(Debug, thiserror::Error)]
48pub enum GeyserGrpcClientError {
49    #[error("gRPC status: {0}")]
50    TonicStatus(#[from] Status),
51    #[error("Failed to send subscribe request: {0}")]
52    SubscribeSendError(#[from] mpsc::SendError),
53}
54
55pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
56
57#[derive(Clone)]
58pub struct GeyserGrpcClient<F> {
59    pub health: HealthClient<InterceptedService<Channel, F>>,
60    pub geyser: GeyserClient<InterceptedService<Channel, F>>,
61}
62
63impl GeyserGrpcClient<()> {
64    pub fn build_from_shared(
65        endpoint: impl Into<Bytes>,
66    ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
67        Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
68    }
69
70    pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
71        GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
72    }
73}
74
75impl<F: Interceptor> GeyserGrpcClient<F> {
76    pub const fn new(
77        health: HealthClient<InterceptedService<Channel, F>>,
78        geyser: GeyserClient<InterceptedService<Channel, F>>,
79    ) -> Self {
80        Self { health, geyser }
81    }
82
83    // Health
84    pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
85        let request = HealthCheckRequest {
86            service: "geyser.Geyser".to_owned(),
87        };
88        let response = self.health.check(request).await?;
89        Ok(response.into_inner())
90    }
91
92    pub async fn health_watch(
93        &mut self,
94    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
95        let request = HealthCheckRequest {
96            service: "geyser.Geyser".to_owned(),
97        };
98        let response = self.health.watch(request).await?;
99        Ok(response.into_inner())
100    }
101
102    // Subscribe
103    pub async fn subscribe(
104        &mut self,
105    ) -> GeyserGrpcClientResult<(
106        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
107        impl Stream<Item = Result<SubscribeUpdate, Status>>,
108    )> {
109        self.subscribe_with_request(None).await
110    }
111
112    pub async fn subscribe_with_request(
113        &mut self,
114        request: Option<SubscribeRequest>,
115    ) -> GeyserGrpcClientResult<(
116        impl Sink<SubscribeRequest, Error = mpsc::SendError>,
117        impl Stream<Item = Result<SubscribeUpdate, Status>>,
118    )> {
119        let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
120        if let Some(request) = request {
121            subscribe_tx
122                .send(request)
123                .await
124                .map_err(GeyserGrpcClientError::SubscribeSendError)?;
125        }
126        let response: Response<Streaming<SubscribeUpdate>> =
127            self.geyser.subscribe(subscribe_rx).await?;
128        Ok((subscribe_tx, response.into_inner()))
129    }
130
131    pub async fn subscribe_once(
132        &mut self,
133        request: SubscribeRequest,
134    ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
135        self.subscribe_with_request(Some(request))
136            .await
137            .map(|(_sink, stream)| stream)
138    }
139
140    // RPC calls
141    pub async fn subscribe_replay_info(
142        &mut self,
143    ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
144        let message = SubscribeReplayInfoRequest {};
145        let request = tonic::Request::new(message);
146        let response = self.geyser.subscribe_replay_info(request).await?;
147        Ok(response.into_inner())
148    }
149
150    pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
151        let message = PingRequest { count };
152        let request = tonic::Request::new(message);
153        let response = self.geyser.ping(request).await?;
154        Ok(response.into_inner())
155    }
156
157    pub async fn get_latest_blockhash(
158        &mut self,
159        commitment: Option<CommitmentLevel>,
160    ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
161        let request = tonic::Request::new(GetLatestBlockhashRequest {
162            commitment: commitment.map(|value| value as i32),
163        });
164        let response = self.geyser.get_latest_blockhash(request).await?;
165        Ok(response.into_inner())
166    }
167
168    pub async fn get_block_height(
169        &mut self,
170        commitment: Option<CommitmentLevel>,
171    ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
172        let request = tonic::Request::new(GetBlockHeightRequest {
173            commitment: commitment.map(|value| value as i32),
174        });
175        let response = self.geyser.get_block_height(request).await?;
176        Ok(response.into_inner())
177    }
178
179    pub async fn get_slot(
180        &mut self,
181        commitment: Option<CommitmentLevel>,
182    ) -> GeyserGrpcClientResult<GetSlotResponse> {
183        let request = tonic::Request::new(GetSlotRequest {
184            commitment: commitment.map(|value| value as i32),
185        });
186        let response = self.geyser.get_slot(request).await?;
187        Ok(response.into_inner())
188    }
189
190    pub async fn is_blockhash_valid(
191        &mut self,
192        blockhash: String,
193        commitment: Option<CommitmentLevel>,
194    ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
195        let request = tonic::Request::new(IsBlockhashValidRequest {
196            blockhash,
197            commitment: commitment.map(|value| value as i32),
198        });
199        let response = self.geyser.is_blockhash_valid(request).await?;
200        Ok(response.into_inner())
201    }
202
203    pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
204        let request = tonic::Request::new(GetVersionRequest {});
205        let response = self.geyser.get_version(request).await?;
206        Ok(response.into_inner())
207    }
208}
209
210#[derive(Debug, thiserror::Error)]
211pub enum GeyserGrpcBuilderError {
212    #[error("Failed to parse x-token: {0}")]
213    MetadataValueError(#[from] InvalidMetadataValue),
214    #[error("gRPC transport error: {0}")]
215    TonicError(#[from] tonic::transport::Error),
216}
217
218pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
219
220#[derive(Debug)]
221pub struct GeyserGrpcBuilder {
222    pub endpoint: Endpoint,
223    pub x_token: Option<AsciiMetadataValue>,
224    pub x_request_snapshot: bool,
225    pub send_compressed: Option<CompressionEncoding>,
226    pub accept_compressed: Option<CompressionEncoding>,
227    pub max_decoding_message_size: Option<usize>,
228    pub max_encoding_message_size: Option<usize>,
229}
230
231impl GeyserGrpcBuilder {
232    // Create new builder
233    const fn new(endpoint: Endpoint) -> Self {
234        Self {
235            endpoint,
236            x_token: None,
237            x_request_snapshot: false,
238            send_compressed: None,
239            accept_compressed: None,
240            max_decoding_message_size: None,
241            max_encoding_message_size: None,
242        }
243    }
244
245    pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
246        Ok(Self::new(Endpoint::from_shared(endpoint)?))
247    }
248
249    pub fn from_static(endpoint: &'static str) -> Self {
250        Self::new(Endpoint::from_static(endpoint))
251    }
252
253    // Create client
254    fn build(
255        self,
256        channel: Channel,
257    ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
258        let interceptor = InterceptorXToken {
259            x_token: self.x_token,
260            x_request_snapshot: self.x_request_snapshot,
261        };
262
263        let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
264        if let Some(encoding) = self.send_compressed {
265            geyser = geyser.send_compressed(encoding);
266        }
267        if let Some(encoding) = self.accept_compressed {
268            geyser = geyser.accept_compressed(encoding);
269        }
270        if let Some(limit) = self.max_decoding_message_size {
271            geyser = geyser.max_decoding_message_size(limit);
272        }
273        if let Some(limit) = self.max_encoding_message_size {
274            geyser = geyser.max_encoding_message_size(limit);
275        }
276
277        Ok(GeyserGrpcClient::new(
278            HealthClient::with_interceptor(channel, interceptor),
279            geyser,
280        ))
281    }
282
283    pub async fn connect(
284        self,
285    ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
286        let channel = self.endpoint.connect().await?;
287        self.build(channel)
288    }
289
290    pub fn connect_lazy(
291        self,
292    ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
293        let channel = self.endpoint.connect_lazy();
294        self.build(channel)
295    }
296
297    // Set x-token
298    pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
299    where
300        T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
301    {
302        Ok(Self {
303            x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
304            ..self
305        })
306    }
307
308    // Include `x-request-snapshot`
309    pub fn set_x_request_snapshot(self, value: bool) -> Self {
310        Self {
311            x_request_snapshot: value,
312            ..self
313        }
314    }
315
316    // Endpoint options
317    pub fn connect_timeout(self, dur: Duration) -> Self {
318        Self {
319            endpoint: self.endpoint.connect_timeout(dur),
320            ..self
321        }
322    }
323
324    pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
325        Self {
326            endpoint: self.endpoint.buffer_size(sz),
327            ..self
328        }
329    }
330
331    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
332        Self {
333            endpoint: self.endpoint.http2_adaptive_window(enabled),
334            ..self
335        }
336    }
337
338    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
339        Self {
340            endpoint: self.endpoint.http2_keep_alive_interval(interval),
341            ..self
342        }
343    }
344
345    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
346        Self {
347            endpoint: self.endpoint.initial_connection_window_size(sz),
348            ..self
349        }
350    }
351
352    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
353        Self {
354            endpoint: self.endpoint.initial_stream_window_size(sz),
355            ..self
356        }
357    }
358
359    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
360        Self {
361            endpoint: self.endpoint.keep_alive_timeout(duration),
362            ..self
363        }
364    }
365
366    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
367        Self {
368            endpoint: self.endpoint.keep_alive_while_idle(enabled),
369            ..self
370        }
371    }
372
373    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
374        Self {
375            endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
376            ..self
377        }
378    }
379
380    pub fn tcp_nodelay(self, enabled: bool) -> Self {
381        Self {
382            endpoint: self.endpoint.tcp_nodelay(enabled),
383            ..self
384        }
385    }
386
387    pub fn timeout(self, dur: Duration) -> Self {
388        Self {
389            endpoint: self.endpoint.timeout(dur),
390            ..self
391        }
392    }
393
394    pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
395        Ok(Self {
396            endpoint: self.endpoint.tls_config(tls_config)?,
397            ..self
398        })
399    }
400
401    // Geyser options
402    pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
403        Self {
404            send_compressed: Some(encoding),
405            ..self
406        }
407    }
408
409    pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
410        Self {
411            accept_compressed: Some(encoding),
412            ..self
413        }
414    }
415
416    pub fn max_decoding_message_size(self, limit: usize) -> Self {
417        Self {
418            max_decoding_message_size: Some(limit),
419            ..self
420        }
421    }
422
423    pub fn max_encoding_message_size(self, limit: usize) -> Self {
424        Self {
425            max_encoding_message_size: Some(limit),
426            ..self
427        }
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::GeyserGrpcClient;
434
435    #[tokio::test]
436    async fn test_channel_https_success() {
437        let endpoint = "https://ams17.rpcpool.com:443";
438        let x_token = "1000000000000000000000000007";
439
440        let res = GeyserGrpcClient::build_from_shared(endpoint);
441        assert!(res.is_ok());
442
443        let res = res.unwrap().x_token(Some(x_token));
444        assert!(res.is_ok());
445
446        let res = res.unwrap().connect_lazy();
447        assert!(res.is_ok());
448    }
449
450    #[tokio::test]
451    async fn test_channel_http_success() {
452        let endpoint = "http://127.0.0.1:10000";
453        let x_token = "1234567891012141618202224268";
454
455        let res = GeyserGrpcClient::build_from_shared(endpoint);
456        assert!(res.is_ok());
457
458        let res = res.unwrap().x_token(Some(x_token));
459        assert!(res.is_ok());
460
461        let res = res.unwrap().connect_lazy();
462        assert!(res.is_ok());
463    }
464
465    #[tokio::test]
466    async fn test_channel_empty_token_some() {
467        let endpoint = "http://127.0.0.1:10000";
468        let x_token = "";
469
470        let res = GeyserGrpcClient::build_from_shared(endpoint);
471        assert!(res.is_ok());
472
473        let res = res.unwrap().x_token(Some(x_token));
474        assert!(res.is_ok());
475    }
476
477    #[tokio::test]
478    async fn test_channel_invalid_token_none() {
479        let endpoint = "http://127.0.0.1:10000";
480
481        let res = GeyserGrpcClient::build_from_shared(endpoint);
482        assert!(res.is_ok());
483
484        let res = res.unwrap().x_token::<String>(None);
485        assert!(res.is_ok());
486
487        let res = res.unwrap().connect_lazy();
488        assert!(res.is_ok());
489    }
490
491    #[tokio::test]
492    async fn test_channel_invalid_uri() {
493        let endpoint = "sites/files/images/picture.png";
494
495        let res = GeyserGrpcClient::build_from_shared(endpoint);
496        assert_eq!(
497            format!("{:?}", res),
498            "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
499                .to_owned()
500        );
501    }
502}