1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use {
3 bytes::Bytes,
4 futures::{
5 channel::mpsc,
6 sink::{Sink, SinkExt},
7 stream::Stream,
8 },
9 std::time::Duration,
10 tonic::{
11 codec::{CompressionEncoding, Streaming},
12 metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
13 service::interceptor::InterceptedService,
14 transport::channel::{Channel, Endpoint},
15 Request, Response, Status,
16 },
17 tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
18 yellowstone_grpc_proto::prelude::{
19 geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
20 GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
21 GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
22 IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
23 SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
24 },
25};
26
27#[derive(Debug, Clone)]
28pub struct InterceptorXToken {
29 pub x_token: Option<AsciiMetadataValue>,
30 pub x_request_snapshot: bool,
31}
32
33impl Interceptor for InterceptorXToken {
34 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
35 if let Some(x_token) = self.x_token.clone() {
36 request.metadata_mut().insert("x-token", x_token);
37 }
38 if self.x_request_snapshot {
39 request
40 .metadata_mut()
41 .insert("x-request-snapshot", MetadataValue::from_static("true"));
42 }
43 Ok(request)
44 }
45}
46
47#[derive(Debug, thiserror::Error)]
48pub enum GeyserGrpcClientError {
49 #[error("gRPC status: {0}")]
50 TonicStatus(#[from] Status),
51 #[error("Failed to send subscribe request: {0}")]
52 SubscribeSendError(#[from] mpsc::SendError),
53}
54
55pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
56
57#[derive(Clone)]
58pub struct GeyserGrpcClient<F> {
59 pub health: HealthClient<InterceptedService<Channel, F>>,
60 pub geyser: GeyserClient<InterceptedService<Channel, F>>,
61}
62
63impl GeyserGrpcClient<()> {
64 pub fn build_from_shared(
65 endpoint: impl Into<Bytes>,
66 ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
67 Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
68 }
69
70 pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
71 GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
72 }
73}
74
75impl<F: Interceptor> GeyserGrpcClient<F> {
76 pub const fn new(
77 health: HealthClient<InterceptedService<Channel, F>>,
78 geyser: GeyserClient<InterceptedService<Channel, F>>,
79 ) -> Self {
80 Self { health, geyser }
81 }
82
83 pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
85 let request = HealthCheckRequest {
86 service: "geyser.Geyser".to_owned(),
87 };
88 let response = self.health.check(request).await?;
89 Ok(response.into_inner())
90 }
91
92 pub async fn health_watch(
93 &mut self,
94 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
95 let request = HealthCheckRequest {
96 service: "geyser.Geyser".to_owned(),
97 };
98 let response = self.health.watch(request).await?;
99 Ok(response.into_inner())
100 }
101
102 pub async fn subscribe(
104 &mut self,
105 ) -> GeyserGrpcClientResult<(
106 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
107 impl Stream<Item = Result<SubscribeUpdate, Status>>,
108 )> {
109 self.subscribe_with_request(None).await
110 }
111
112 pub async fn subscribe_with_request(
113 &mut self,
114 request: Option<SubscribeRequest>,
115 ) -> GeyserGrpcClientResult<(
116 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
117 impl Stream<Item = Result<SubscribeUpdate, Status>>,
118 )> {
119 let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
120 if let Some(request) = request {
121 subscribe_tx
122 .send(request)
123 .await
124 .map_err(GeyserGrpcClientError::SubscribeSendError)?;
125 }
126 let response: Response<Streaming<SubscribeUpdate>> =
127 self.geyser.subscribe(subscribe_rx).await?;
128 Ok((subscribe_tx, response.into_inner()))
129 }
130
131 pub async fn subscribe_once(
132 &mut self,
133 request: SubscribeRequest,
134 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
135 self.subscribe_with_request(Some(request))
136 .await
137 .map(|(_sink, stream)| stream)
138 }
139
140 pub async fn subscribe_replay_info(
142 &mut self,
143 ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
144 let message = SubscribeReplayInfoRequest {};
145 let request = tonic::Request::new(message);
146 let response = self.geyser.subscribe_replay_info(request).await?;
147 Ok(response.into_inner())
148 }
149
150 pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
151 let message = PingRequest { count };
152 let request = tonic::Request::new(message);
153 let response = self.geyser.ping(request).await?;
154 Ok(response.into_inner())
155 }
156
157 pub async fn get_latest_blockhash(
158 &mut self,
159 commitment: Option<CommitmentLevel>,
160 ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
161 let request = tonic::Request::new(GetLatestBlockhashRequest {
162 commitment: commitment.map(|value| value as i32),
163 });
164 let response = self.geyser.get_latest_blockhash(request).await?;
165 Ok(response.into_inner())
166 }
167
168 pub async fn get_block_height(
169 &mut self,
170 commitment: Option<CommitmentLevel>,
171 ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
172 let request = tonic::Request::new(GetBlockHeightRequest {
173 commitment: commitment.map(|value| value as i32),
174 });
175 let response = self.geyser.get_block_height(request).await?;
176 Ok(response.into_inner())
177 }
178
179 pub async fn get_slot(
180 &mut self,
181 commitment: Option<CommitmentLevel>,
182 ) -> GeyserGrpcClientResult<GetSlotResponse> {
183 let request = tonic::Request::new(GetSlotRequest {
184 commitment: commitment.map(|value| value as i32),
185 });
186 let response = self.geyser.get_slot(request).await?;
187 Ok(response.into_inner())
188 }
189
190 pub async fn is_blockhash_valid(
191 &mut self,
192 blockhash: String,
193 commitment: Option<CommitmentLevel>,
194 ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
195 let request = tonic::Request::new(IsBlockhashValidRequest {
196 blockhash,
197 commitment: commitment.map(|value| value as i32),
198 });
199 let response = self.geyser.is_blockhash_valid(request).await?;
200 Ok(response.into_inner())
201 }
202
203 pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
204 let request = tonic::Request::new(GetVersionRequest {});
205 let response = self.geyser.get_version(request).await?;
206 Ok(response.into_inner())
207 }
208}
209
210#[derive(Debug, thiserror::Error)]
211pub enum GeyserGrpcBuilderError {
212 #[error("Failed to parse x-token: {0}")]
213 MetadataValueError(#[from] InvalidMetadataValue),
214 #[error("gRPC transport error: {0}")]
215 TonicError(#[from] tonic::transport::Error),
216}
217
218pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
219
220#[derive(Debug)]
221pub struct GeyserGrpcBuilder {
222 pub endpoint: Endpoint,
223 pub x_token: Option<AsciiMetadataValue>,
224 pub x_request_snapshot: bool,
225 pub send_compressed: Option<CompressionEncoding>,
226 pub accept_compressed: Option<CompressionEncoding>,
227 pub max_decoding_message_size: Option<usize>,
228 pub max_encoding_message_size: Option<usize>,
229}
230
231impl GeyserGrpcBuilder {
232 const fn new(endpoint: Endpoint) -> Self {
234 Self {
235 endpoint,
236 x_token: None,
237 x_request_snapshot: false,
238 send_compressed: None,
239 accept_compressed: None,
240 max_decoding_message_size: None,
241 max_encoding_message_size: None,
242 }
243 }
244
245 pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
246 Ok(Self::new(Endpoint::from_shared(endpoint)?))
247 }
248
249 pub fn from_static(endpoint: &'static str) -> Self {
250 Self::new(Endpoint::from_static(endpoint))
251 }
252
253 fn build(
255 self,
256 channel: Channel,
257 ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
258 let interceptor = InterceptorXToken {
259 x_token: self.x_token,
260 x_request_snapshot: self.x_request_snapshot,
261 };
262
263 let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
264 if let Some(encoding) = self.send_compressed {
265 geyser = geyser.send_compressed(encoding);
266 }
267 if let Some(encoding) = self.accept_compressed {
268 geyser = geyser.accept_compressed(encoding);
269 }
270 if let Some(limit) = self.max_decoding_message_size {
271 geyser = geyser.max_decoding_message_size(limit);
272 }
273 if let Some(limit) = self.max_encoding_message_size {
274 geyser = geyser.max_encoding_message_size(limit);
275 }
276
277 Ok(GeyserGrpcClient::new(
278 HealthClient::with_interceptor(channel, interceptor),
279 geyser,
280 ))
281 }
282
283 pub async fn connect(
284 self,
285 ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
286 let channel = self.endpoint.connect().await?;
287 self.build(channel)
288 }
289
290 pub fn connect_lazy(
291 self,
292 ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
293 let channel = self.endpoint.connect_lazy();
294 self.build(channel)
295 }
296
297 pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
299 where
300 T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
301 {
302 Ok(Self {
303 x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
304 ..self
305 })
306 }
307
308 pub fn set_x_request_snapshot(self, value: bool) -> Self {
310 Self {
311 x_request_snapshot: value,
312 ..self
313 }
314 }
315
316 pub fn connect_timeout(self, dur: Duration) -> Self {
318 Self {
319 endpoint: self.endpoint.connect_timeout(dur),
320 ..self
321 }
322 }
323
324 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
325 Self {
326 endpoint: self.endpoint.buffer_size(sz),
327 ..self
328 }
329 }
330
331 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
332 Self {
333 endpoint: self.endpoint.http2_adaptive_window(enabled),
334 ..self
335 }
336 }
337
338 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
339 Self {
340 endpoint: self.endpoint.http2_keep_alive_interval(interval),
341 ..self
342 }
343 }
344
345 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
346 Self {
347 endpoint: self.endpoint.initial_connection_window_size(sz),
348 ..self
349 }
350 }
351
352 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
353 Self {
354 endpoint: self.endpoint.initial_stream_window_size(sz),
355 ..self
356 }
357 }
358
359 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
360 Self {
361 endpoint: self.endpoint.keep_alive_timeout(duration),
362 ..self
363 }
364 }
365
366 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
367 Self {
368 endpoint: self.endpoint.keep_alive_while_idle(enabled),
369 ..self
370 }
371 }
372
373 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
374 Self {
375 endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
376 ..self
377 }
378 }
379
380 pub fn tcp_nodelay(self, enabled: bool) -> Self {
381 Self {
382 endpoint: self.endpoint.tcp_nodelay(enabled),
383 ..self
384 }
385 }
386
387 pub fn timeout(self, dur: Duration) -> Self {
388 Self {
389 endpoint: self.endpoint.timeout(dur),
390 ..self
391 }
392 }
393
394 pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
395 Ok(Self {
396 endpoint: self.endpoint.tls_config(tls_config)?,
397 ..self
398 })
399 }
400
401 pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
403 Self {
404 send_compressed: Some(encoding),
405 ..self
406 }
407 }
408
409 pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
410 Self {
411 accept_compressed: Some(encoding),
412 ..self
413 }
414 }
415
416 pub fn max_decoding_message_size(self, limit: usize) -> Self {
417 Self {
418 max_decoding_message_size: Some(limit),
419 ..self
420 }
421 }
422
423 pub fn max_encoding_message_size(self, limit: usize) -> Self {
424 Self {
425 max_encoding_message_size: Some(limit),
426 ..self
427 }
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::GeyserGrpcClient;
434
435 #[tokio::test]
436 async fn test_channel_https_success() {
437 let endpoint = "https://ams17.rpcpool.com:443";
438 let x_token = "1000000000000000000000000007";
439
440 let res = GeyserGrpcClient::build_from_shared(endpoint);
441 assert!(res.is_ok());
442
443 let res = res.unwrap().x_token(Some(x_token));
444 assert!(res.is_ok());
445
446 let res = res.unwrap().connect_lazy();
447 assert!(res.is_ok());
448 }
449
450 #[tokio::test]
451 async fn test_channel_http_success() {
452 let endpoint = "http://127.0.0.1:10000";
453 let x_token = "1234567891012141618202224268";
454
455 let res = GeyserGrpcClient::build_from_shared(endpoint);
456 assert!(res.is_ok());
457
458 let res = res.unwrap().x_token(Some(x_token));
459 assert!(res.is_ok());
460
461 let res = res.unwrap().connect_lazy();
462 assert!(res.is_ok());
463 }
464
465 #[tokio::test]
466 async fn test_channel_empty_token_some() {
467 let endpoint = "http://127.0.0.1:10000";
468 let x_token = "";
469
470 let res = GeyserGrpcClient::build_from_shared(endpoint);
471 assert!(res.is_ok());
472
473 let res = res.unwrap().x_token(Some(x_token));
474 assert!(res.is_ok());
475 }
476
477 #[tokio::test]
478 async fn test_channel_invalid_token_none() {
479 let endpoint = "http://127.0.0.1:10000";
480
481 let res = GeyserGrpcClient::build_from_shared(endpoint);
482 assert!(res.is_ok());
483
484 let res = res.unwrap().x_token::<String>(None);
485 assert!(res.is_ok());
486
487 let res = res.unwrap().connect_lazy();
488 assert!(res.is_ok());
489 }
490
491 #[tokio::test]
492 async fn test_channel_invalid_uri() {
493 let endpoint = "sites/files/images/picture.png";
494
495 let res = GeyserGrpcClient::build_from_shared(endpoint);
496 assert_eq!(
497 format!("{:?}", res),
498 "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
499 .to_owned()
500 );
501 }
502}