1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use {
3 bytes::Bytes,
4 futures::{
5 channel::mpsc,
6 sink::{Sink, SinkExt},
7 stream::Stream,
8 },
9 std::time::Duration,
10 tonic::{
11 codec::{CompressionEncoding, Streaming},
12 metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
13 service::interceptor::InterceptedService,
14 transport::channel::{Channel, Endpoint},
15 Request, Response, Status,
16 },
17 tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
18 yellowstone_grpc_proto::prelude::{
19 geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
20 GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
21 GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
22 IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
23 SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
24 },
25};
26
27#[derive(Debug, Clone)]
28pub struct InterceptorXToken {
29 pub x_token: Option<AsciiMetadataValue>,
30 pub x_request_snapshot: bool,
31}
32
33impl Interceptor for InterceptorXToken {
34 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
35 if let Some(x_token) = self.x_token.clone() {
36 request.metadata_mut().insert("x-token", x_token);
37 }
38 if self.x_request_snapshot {
39 request
40 .metadata_mut()
41 .insert("x-request-snapshot", MetadataValue::from_static("true"));
42 }
43 Ok(request)
44 }
45}
46
47#[derive(Debug, thiserror::Error)]
48pub enum GeyserGrpcClientError {
49 #[error("gRPC status: {0}")]
50 TonicStatus(#[from] Status),
51 #[error("Failed to send subscribe request: {0}")]
52 SubscribeSendError(#[from] mpsc::SendError),
53}
54
55pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
56
57pub struct GeyserGrpcClient<F> {
58 pub health: HealthClient<InterceptedService<Channel, F>>,
59 pub geyser: GeyserClient<InterceptedService<Channel, F>>,
60}
61
62impl GeyserGrpcClient<()> {
63 pub fn build_from_shared(
64 endpoint: impl Into<Bytes>,
65 ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
66 Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
67 }
68
69 pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
70 GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
71 }
72}
73
74impl<F: Interceptor> GeyserGrpcClient<F> {
75 pub const fn new(
76 health: HealthClient<InterceptedService<Channel, F>>,
77 geyser: GeyserClient<InterceptedService<Channel, F>>,
78 ) -> Self {
79 Self { health, geyser }
80 }
81
82 pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
84 let request = HealthCheckRequest {
85 service: "geyser.Geyser".to_owned(),
86 };
87 let response = self.health.check(request).await?;
88 Ok(response.into_inner())
89 }
90
91 pub async fn health_watch(
92 &mut self,
93 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
94 let request = HealthCheckRequest {
95 service: "geyser.Geyser".to_owned(),
96 };
97 let response = self.health.watch(request).await?;
98 Ok(response.into_inner())
99 }
100
101 pub async fn subscribe(
103 &mut self,
104 ) -> GeyserGrpcClientResult<(
105 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
106 impl Stream<Item = Result<SubscribeUpdate, Status>>,
107 )> {
108 self.subscribe_with_request(None).await
109 }
110
111 pub async fn subscribe_with_request(
112 &mut self,
113 request: Option<SubscribeRequest>,
114 ) -> GeyserGrpcClientResult<(
115 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
116 impl Stream<Item = Result<SubscribeUpdate, Status>>,
117 )> {
118 let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
119 if let Some(request) = request {
120 subscribe_tx
121 .send(request)
122 .await
123 .map_err(GeyserGrpcClientError::SubscribeSendError)?;
124 }
125 let response: Response<Streaming<SubscribeUpdate>> =
126 self.geyser.subscribe(subscribe_rx).await?;
127 Ok((subscribe_tx, response.into_inner()))
128 }
129
130 pub async fn subscribe_once(
131 &mut self,
132 request: SubscribeRequest,
133 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
134 self.subscribe_with_request(Some(request))
135 .await
136 .map(|(_sink, stream)| stream)
137 }
138
139 pub async fn subscribe_replay_info(
141 &mut self,
142 ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
143 let message = SubscribeReplayInfoRequest {};
144 let request = tonic::Request::new(message);
145 let response = self.geyser.subscribe_replay_info(request).await?;
146 Ok(response.into_inner())
147 }
148
149 pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
150 let message = PingRequest { count };
151 let request = tonic::Request::new(message);
152 let response = self.geyser.ping(request).await?;
153 Ok(response.into_inner())
154 }
155
156 pub async fn get_latest_blockhash(
157 &mut self,
158 commitment: Option<CommitmentLevel>,
159 ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
160 let request = tonic::Request::new(GetLatestBlockhashRequest {
161 commitment: commitment.map(|value| value as i32),
162 });
163 let response = self.geyser.get_latest_blockhash(request).await?;
164 Ok(response.into_inner())
165 }
166
167 pub async fn get_block_height(
168 &mut self,
169 commitment: Option<CommitmentLevel>,
170 ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
171 let request = tonic::Request::new(GetBlockHeightRequest {
172 commitment: commitment.map(|value| value as i32),
173 });
174 let response = self.geyser.get_block_height(request).await?;
175 Ok(response.into_inner())
176 }
177
178 pub async fn get_slot(
179 &mut self,
180 commitment: Option<CommitmentLevel>,
181 ) -> GeyserGrpcClientResult<GetSlotResponse> {
182 let request = tonic::Request::new(GetSlotRequest {
183 commitment: commitment.map(|value| value as i32),
184 });
185 let response = self.geyser.get_slot(request).await?;
186 Ok(response.into_inner())
187 }
188
189 pub async fn is_blockhash_valid(
190 &mut self,
191 blockhash: String,
192 commitment: Option<CommitmentLevel>,
193 ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
194 let request = tonic::Request::new(IsBlockhashValidRequest {
195 blockhash,
196 commitment: commitment.map(|value| value as i32),
197 });
198 let response = self.geyser.is_blockhash_valid(request).await?;
199 Ok(response.into_inner())
200 }
201
202 pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
203 let request = tonic::Request::new(GetVersionRequest {});
204 let response = self.geyser.get_version(request).await?;
205 Ok(response.into_inner())
206 }
207}
208
209#[derive(Debug, thiserror::Error)]
210pub enum GeyserGrpcBuilderError {
211 #[error("Failed to parse x-token: {0}")]
212 MetadataValueError(#[from] InvalidMetadataValue),
213 #[error("gRPC transport error: {0}")]
214 TonicError(#[from] tonic::transport::Error),
215}
216
217pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
218
219#[derive(Debug)]
220pub struct GeyserGrpcBuilder {
221 pub endpoint: Endpoint,
222 pub x_token: Option<AsciiMetadataValue>,
223 pub x_request_snapshot: bool,
224 pub send_compressed: Option<CompressionEncoding>,
225 pub accept_compressed: Option<CompressionEncoding>,
226 pub max_decoding_message_size: Option<usize>,
227 pub max_encoding_message_size: Option<usize>,
228}
229
230impl GeyserGrpcBuilder {
231 const fn new(endpoint: Endpoint) -> Self {
233 Self {
234 endpoint,
235 x_token: None,
236 x_request_snapshot: false,
237 send_compressed: None,
238 accept_compressed: None,
239 max_decoding_message_size: None,
240 max_encoding_message_size: None,
241 }
242 }
243
244 pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
245 Ok(Self::new(Endpoint::from_shared(endpoint)?))
246 }
247
248 pub fn from_static(endpoint: &'static str) -> Self {
249 Self::new(Endpoint::from_static(endpoint))
250 }
251
252 fn build(
254 self,
255 channel: Channel,
256 ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
257 let interceptor = InterceptorXToken {
258 x_token: self.x_token,
259 x_request_snapshot: self.x_request_snapshot,
260 };
261
262 let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
263 if let Some(encoding) = self.send_compressed {
264 geyser = geyser.send_compressed(encoding);
265 }
266 if let Some(encoding) = self.accept_compressed {
267 geyser = geyser.accept_compressed(encoding);
268 }
269 if let Some(limit) = self.max_decoding_message_size {
270 geyser = geyser.max_decoding_message_size(limit);
271 }
272 if let Some(limit) = self.max_encoding_message_size {
273 geyser = geyser.max_encoding_message_size(limit);
274 }
275
276 Ok(GeyserGrpcClient::new(
277 HealthClient::with_interceptor(channel, interceptor),
278 geyser,
279 ))
280 }
281
282 pub async fn connect(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
283 let channel = self.endpoint.connect().await?;
284 self.build(channel)
285 }
286
287 pub fn connect_lazy(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
288 let channel = self.endpoint.connect_lazy();
289 self.build(channel)
290 }
291
292 pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
294 where
295 T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
296 {
297 Ok(Self {
298 x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
299 ..self
300 })
301 }
302
303 pub fn set_x_request_snapshot(self, value: bool) -> Self {
305 Self {
306 x_request_snapshot: value,
307 ..self
308 }
309 }
310
311 pub fn connect_timeout(self, dur: Duration) -> Self {
313 Self {
314 endpoint: self.endpoint.connect_timeout(dur),
315 ..self
316 }
317 }
318
319 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
320 Self {
321 endpoint: self.endpoint.buffer_size(sz),
322 ..self
323 }
324 }
325
326 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
327 Self {
328 endpoint: self.endpoint.http2_adaptive_window(enabled),
329 ..self
330 }
331 }
332
333 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
334 Self {
335 endpoint: self.endpoint.http2_keep_alive_interval(interval),
336 ..self
337 }
338 }
339
340 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
341 Self {
342 endpoint: self.endpoint.initial_connection_window_size(sz),
343 ..self
344 }
345 }
346
347 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
348 Self {
349 endpoint: self.endpoint.initial_stream_window_size(sz),
350 ..self
351 }
352 }
353
354 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
355 Self {
356 endpoint: self.endpoint.keep_alive_timeout(duration),
357 ..self
358 }
359 }
360
361 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
362 Self {
363 endpoint: self.endpoint.keep_alive_while_idle(enabled),
364 ..self
365 }
366 }
367
368 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
369 Self {
370 endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
371 ..self
372 }
373 }
374
375 pub fn tcp_nodelay(self, enabled: bool) -> Self {
376 Self {
377 endpoint: self.endpoint.tcp_nodelay(enabled),
378 ..self
379 }
380 }
381
382 pub fn timeout(self, dur: Duration) -> Self {
383 Self {
384 endpoint: self.endpoint.timeout(dur),
385 ..self
386 }
387 }
388
389 pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
390 Ok(Self {
391 endpoint: self.endpoint.tls_config(tls_config)?,
392 ..self
393 })
394 }
395
396 pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
398 Self {
399 send_compressed: Some(encoding),
400 ..self
401 }
402 }
403
404 pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
405 Self {
406 accept_compressed: Some(encoding),
407 ..self
408 }
409 }
410
411 pub fn max_decoding_message_size(self, limit: usize) -> Self {
412 Self {
413 max_decoding_message_size: Some(limit),
414 ..self
415 }
416 }
417
418 pub fn max_encoding_message_size(self, limit: usize) -> Self {
419 Self {
420 max_encoding_message_size: Some(limit),
421 ..self
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::GeyserGrpcClient;
429
430 #[tokio::test]
431 async fn test_channel_https_success() {
432 let endpoint = "https://ams17.rpcpool.com:443";
433 let x_token = "1000000000000000000000000007";
434
435 let res = GeyserGrpcClient::build_from_shared(endpoint);
436 assert!(res.is_ok());
437
438 let res = res.unwrap().x_token(Some(x_token));
439 assert!(res.is_ok());
440
441 let res = res.unwrap().connect_lazy();
442 assert!(res.is_ok());
443 }
444
445 #[tokio::test]
446 async fn test_channel_http_success() {
447 let endpoint = "http://127.0.0.1:10000";
448 let x_token = "1234567891012141618202224268";
449
450 let res = GeyserGrpcClient::build_from_shared(endpoint);
451 assert!(res.is_ok());
452
453 let res = res.unwrap().x_token(Some(x_token));
454 assert!(res.is_ok());
455
456 let res = res.unwrap().connect_lazy();
457 assert!(res.is_ok());
458 }
459
460 #[tokio::test]
461 async fn test_channel_empty_token_some() {
462 let endpoint = "http://127.0.0.1:10000";
463 let x_token = "";
464
465 let res = GeyserGrpcClient::build_from_shared(endpoint);
466 assert!(res.is_ok());
467
468 let res = res.unwrap().x_token(Some(x_token));
469 assert!(res.is_ok());
470 }
471
472 #[tokio::test]
473 async fn test_channel_invalid_token_none() {
474 let endpoint = "http://127.0.0.1:10000";
475
476 let res = GeyserGrpcClient::build_from_shared(endpoint);
477 assert!(res.is_ok());
478
479 let res = res.unwrap().x_token::<String>(None);
480 assert!(res.is_ok());
481
482 let res = res.unwrap().connect_lazy();
483 assert!(res.is_ok());
484 }
485
486 #[tokio::test]
487 async fn test_channel_invalid_uri() {
488 let endpoint = "sites/files/images/picture.png";
489
490 let res = GeyserGrpcClient::build_from_shared(endpoint);
491 assert_eq!(
492 format!("{:?}", res),
493 "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
494 .to_owned()
495 );
496 }
497}