1pub use tonic::{service::Interceptor, transport::ClientTlsConfig};
2use laserstream_core_proto::geyser::{SubscribePreprocessedRequest, SubscribePreprocessedUpdate};
3use {
4 bytes::Bytes,
5 futures::{
6 channel::mpsc,
7 sink::{Sink, SinkExt},
8 stream::Stream,
9 },
10 std::time::Duration,
11 tonic::{
12 codec::{CompressionEncoding, Streaming},
13 metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
14 service::interceptor::InterceptedService,
15 transport::channel::{Channel, Endpoint},
16 Request, Response, Status,
17 },
18 tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
19 laserstream_core_proto::prelude::{
20 geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
21 GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
22 GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
23 IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
24 SubscribeReplayInfoRequest, SubscribeReplayInfoResponse, SubscribeRequest, SubscribeUpdate,
25 },
26};
27
28#[derive(Debug, Clone)]
29pub struct InterceptorXToken {
30 pub x_token: Option<AsciiMetadataValue>,
31 pub x_request_snapshot: bool,
32}
33
34impl Interceptor for InterceptorXToken {
35 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
36 if let Some(x_token) = self.x_token.clone() {
37 request.metadata_mut().insert("x-token", x_token);
38 }
39 if self.x_request_snapshot {
40 request
41 .metadata_mut()
42 .insert("x-request-snapshot", MetadataValue::from_static("true"));
43 }
44 Ok(request)
45 }
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum GeyserGrpcClientError {
50 #[error("gRPC status: {0}")]
51 TonicStatus(#[from] Status),
52 #[error("Failed to send subscribe request: {0}")]
53 SubscribeSendError(#[from] mpsc::SendError),
54}
55
56pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
57
58pub struct GeyserGrpcClient<F> {
59 pub health: HealthClient<InterceptedService<Channel, F>>,
60 pub geyser: GeyserClient<InterceptedService<Channel, F>>,
61}
62
63impl GeyserGrpcClient<()> {
64 pub fn build_from_shared(
65 endpoint: impl Into<Bytes>,
66 ) -> GeyserGrpcBuilderResult<GeyserGrpcBuilder> {
67 Ok(GeyserGrpcBuilder::new(Endpoint::from_shared(endpoint)?))
68 }
69
70 pub fn build_from_static(endpoint: &'static str) -> GeyserGrpcBuilder {
71 GeyserGrpcBuilder::new(Endpoint::from_static(endpoint))
72 }
73}
74
75impl<F: Interceptor> GeyserGrpcClient<F> {
76 pub const fn new(
77 health: HealthClient<InterceptedService<Channel, F>>,
78 geyser: GeyserClient<InterceptedService<Channel, F>>,
79 ) -> Self {
80 Self { health, geyser }
81 }
82
83 pub async fn health_check(&mut self) -> GeyserGrpcClientResult<HealthCheckResponse> {
85 let request = HealthCheckRequest {
86 service: "geyser.Geyser".to_owned(),
87 };
88 let response = self.health.check(request).await?;
89 Ok(response.into_inner())
90 }
91
92 pub async fn health_watch(
93 &mut self,
94 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<HealthCheckResponse, Status>>> {
95 let request = HealthCheckRequest {
96 service: "geyser.Geyser".to_owned(),
97 };
98 let response = self.health.watch(request).await?;
99 Ok(response.into_inner())
100 }
101
102 pub async fn subscribe(
104 &mut self,
105 ) -> GeyserGrpcClientResult<(
106 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
107 impl Stream<Item = Result<SubscribeUpdate, Status>>,
108 )> {
109 self.subscribe_with_request(None).await
110 }
111
112 pub async fn subscribe_with_request(
113 &mut self,
114 request: Option<SubscribeRequest>,
115 ) -> GeyserGrpcClientResult<(
116 impl Sink<SubscribeRequest, Error = mpsc::SendError>,
117 impl Stream<Item = Result<SubscribeUpdate, Status>>,
118 )> {
119 let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
120 if let Some(request) = request {
121 subscribe_tx
122 .send(request)
123 .await
124 .map_err(GeyserGrpcClientError::SubscribeSendError)?;
125 }
126 let response: Response<Streaming<SubscribeUpdate>> =
127 self.geyser.subscribe(subscribe_rx).await?;
128 Ok((subscribe_tx, response.into_inner()))
129 }
130
131 pub async fn subscribe_once(
132 &mut self,
133 request: SubscribeRequest,
134 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
135 self.subscribe_with_request(Some(request))
136 .await
137 .map(|(_sink, stream)| stream)
138 }
139
140 pub async fn subscribe_preprocessed(
141 &mut self,
142 ) -> GeyserGrpcClientResult<(
143 impl Sink<SubscribePreprocessedRequest, Error = mpsc::SendError>,
144 impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>,
145 )> {
146 self.subscribe_preprocessed_with_request(None).await
147 }
148
149 pub async fn subscribe_preprocessed_with_request(
150 &mut self,
151 request: Option<SubscribePreprocessedRequest>,
152 ) -> GeyserGrpcClientResult<(
153 impl Sink<SubscribePreprocessedRequest, Error = mpsc::SendError>,
154 impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>,
155 )> {
156 let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
157 if let Some(request) = request {
158 subscribe_tx
159 .send(request)
160 .await
161 .map_err(GeyserGrpcClientError::SubscribeSendError)?;
162 }
163 let response: Response<Streaming<SubscribePreprocessedUpdate>> =
164 self.geyser.subscribe_preprocessed(subscribe_rx).await?;
165 Ok((subscribe_tx, response.into_inner()))
166 }
167
168 pub async fn subscribe_preprocessed_once(
169 &mut self,
170 request: SubscribePreprocessedRequest,
171 ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribePreprocessedUpdate, Status>>> {
172 self.subscribe_preprocessed_with_request(Some(request))
173 .await
174 .map(|(_sink, stream)| stream)
175 }
176 pub async fn subscribe_replay_info(
178 &mut self,
179 ) -> GeyserGrpcClientResult<SubscribeReplayInfoResponse> {
180 let message = SubscribeReplayInfoRequest {};
181 let request = tonic::Request::new(message);
182 let response = self.geyser.subscribe_replay_info(request).await?;
183 Ok(response.into_inner())
184 }
185
186 pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {
187 let message = PingRequest { count };
188 let request = tonic::Request::new(message);
189 let response = self.geyser.ping(request).await?;
190 Ok(response.into_inner())
191 }
192
193 pub async fn get_latest_blockhash(
194 &mut self,
195 commitment: Option<CommitmentLevel>,
196 ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
197 let request = tonic::Request::new(GetLatestBlockhashRequest {
198 commitment: commitment.map(|value| value as i32),
199 });
200 let response = self.geyser.get_latest_blockhash(request).await?;
201 Ok(response.into_inner())
202 }
203
204 pub async fn get_block_height(
205 &mut self,
206 commitment: Option<CommitmentLevel>,
207 ) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
208 let request = tonic::Request::new(GetBlockHeightRequest {
209 commitment: commitment.map(|value| value as i32),
210 });
211 let response = self.geyser.get_block_height(request).await?;
212 Ok(response.into_inner())
213 }
214
215 pub async fn get_slot(
216 &mut self,
217 commitment: Option<CommitmentLevel>,
218 ) -> GeyserGrpcClientResult<GetSlotResponse> {
219 let request = tonic::Request::new(GetSlotRequest {
220 commitment: commitment.map(|value| value as i32),
221 });
222 let response = self.geyser.get_slot(request).await?;
223 Ok(response.into_inner())
224 }
225
226 pub async fn is_blockhash_valid(
227 &mut self,
228 blockhash: String,
229 commitment: Option<CommitmentLevel>,
230 ) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
231 let request = tonic::Request::new(IsBlockhashValidRequest {
232 blockhash,
233 commitment: commitment.map(|value| value as i32),
234 });
235 let response = self.geyser.is_blockhash_valid(request).await?;
236 Ok(response.into_inner())
237 }
238
239 pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
240 let request = tonic::Request::new(GetVersionRequest {});
241 let response = self.geyser.get_version(request).await?;
242 Ok(response.into_inner())
243 }
244}
245
246#[derive(Debug, thiserror::Error)]
247pub enum GeyserGrpcBuilderError {
248 #[error("Failed to parse x-token: {0}")]
249 MetadataValueError(#[from] InvalidMetadataValue),
250 #[error("gRPC transport error: {0}")]
251 TonicError(#[from] tonic::transport::Error),
252}
253
254pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
255
256#[derive(Debug)]
257pub struct GeyserGrpcBuilder {
258 pub endpoint: Endpoint,
259 pub x_token: Option<AsciiMetadataValue>,
260 pub x_request_snapshot: bool,
261 pub send_compressed: Option<CompressionEncoding>,
262 pub accept_compressed: Option<CompressionEncoding>,
263 pub max_decoding_message_size: Option<usize>,
264 pub max_encoding_message_size: Option<usize>,
265}
266
267impl GeyserGrpcBuilder {
268 const fn new(endpoint: Endpoint) -> Self {
270 Self {
271 endpoint,
272 x_token: None,
273 x_request_snapshot: false,
274 send_compressed: None,
275 accept_compressed: None,
276 max_decoding_message_size: None,
277 max_encoding_message_size: None,
278 }
279 }
280
281 pub fn from_shared(endpoint: impl Into<Bytes>) -> GeyserGrpcBuilderResult<Self> {
282 Ok(Self::new(Endpoint::from_shared(endpoint)?))
283 }
284
285 pub fn from_static(endpoint: &'static str) -> Self {
286 Self::new(Endpoint::from_static(endpoint))
287 }
288
289 fn build(
291 self,
292 channel: Channel,
293 ) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
294 let interceptor = InterceptorXToken {
295 x_token: self.x_token,
296 x_request_snapshot: self.x_request_snapshot,
297 };
298
299 let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
300 if let Some(encoding) = self.send_compressed {
301 geyser = geyser.send_compressed(encoding);
302 }
303 if let Some(encoding) = self.accept_compressed {
304 geyser = geyser.accept_compressed(encoding);
305 }
306 if let Some(limit) = self.max_decoding_message_size {
307 geyser = geyser.max_decoding_message_size(limit);
308 }
309 if let Some(limit) = self.max_encoding_message_size {
310 geyser = geyser.max_encoding_message_size(limit);
311 }
312
313 Ok(GeyserGrpcClient::new(
314 HealthClient::with_interceptor(channel, interceptor),
315 geyser,
316 ))
317 }
318
319 pub async fn connect(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
320 let channel = self.endpoint.connect().await?;
321 self.build(channel)
322 }
323
324 pub fn connect_lazy(self) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
325 let channel = self.endpoint.connect_lazy();
326 self.build(channel)
327 }
328
329 pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
331 where
332 T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
333 {
334 Ok(Self {
335 x_token: x_token.map(|x_token| x_token.try_into()).transpose()?,
336 ..self
337 })
338 }
339
340 pub fn set_x_request_snapshot(self, value: bool) -> Self {
342 Self {
343 x_request_snapshot: value,
344 ..self
345 }
346 }
347
348 pub fn connect_timeout(self, dur: Duration) -> Self {
350 Self {
351 endpoint: self.endpoint.connect_timeout(dur),
352 ..self
353 }
354 }
355
356 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
357 Self {
358 endpoint: self.endpoint.buffer_size(sz),
359 ..self
360 }
361 }
362
363 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
364 Self {
365 endpoint: self.endpoint.http2_adaptive_window(enabled),
366 ..self
367 }
368 }
369
370 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
371 Self {
372 endpoint: self.endpoint.http2_keep_alive_interval(interval),
373 ..self
374 }
375 }
376
377 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
378 Self {
379 endpoint: self.endpoint.initial_connection_window_size(sz),
380 ..self
381 }
382 }
383
384 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
385 Self {
386 endpoint: self.endpoint.initial_stream_window_size(sz),
387 ..self
388 }
389 }
390
391 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
392 Self {
393 endpoint: self.endpoint.keep_alive_timeout(duration),
394 ..self
395 }
396 }
397
398 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
399 Self {
400 endpoint: self.endpoint.keep_alive_while_idle(enabled),
401 ..self
402 }
403 }
404
405 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
406 Self {
407 endpoint: self.endpoint.tcp_keepalive(tcp_keepalive),
408 ..self
409 }
410 }
411
412 pub fn tcp_nodelay(self, enabled: bool) -> Self {
413 Self {
414 endpoint: self.endpoint.tcp_nodelay(enabled),
415 ..self
416 }
417 }
418
419 pub fn timeout(self, dur: Duration) -> Self {
420 Self {
421 endpoint: self.endpoint.timeout(dur),
422 ..self
423 }
424 }
425
426 pub fn tls_config(self, tls_config: ClientTlsConfig) -> GeyserGrpcBuilderResult<Self> {
427 Ok(Self {
428 endpoint: self.endpoint.tls_config(tls_config)?,
429 ..self
430 })
431 }
432
433 pub fn send_compressed(self, encoding: CompressionEncoding) -> Self {
435 Self {
436 send_compressed: Some(encoding),
437 ..self
438 }
439 }
440
441 pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self {
442 Self {
443 accept_compressed: Some(encoding),
444 ..self
445 }
446 }
447
448 pub fn max_decoding_message_size(self, limit: usize) -> Self {
449 Self {
450 max_decoding_message_size: Some(limit),
451 ..self
452 }
453 }
454
455 pub fn max_encoding_message_size(self, limit: usize) -> Self {
456 Self {
457 max_encoding_message_size: Some(limit),
458 ..self
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::GeyserGrpcClient;
466
467 #[tokio::test]
468 async fn test_channel_https_success() {
469 let endpoint = "https://ams17.rpcpool.com:443";
470 let x_token = "1000000000000000000000000007";
471
472 let res = GeyserGrpcClient::build_from_shared(endpoint);
473 assert!(res.is_ok());
474
475 let res = res.unwrap().x_token(Some(x_token));
476 assert!(res.is_ok());
477
478 let res = res.unwrap().connect_lazy();
479 assert!(res.is_ok());
480 }
481
482 #[tokio::test]
483 async fn test_channel_http_success() {
484 let endpoint = "http://127.0.0.1:10000";
485 let x_token = "1234567891012141618202224268";
486
487 let res = GeyserGrpcClient::build_from_shared(endpoint);
488 assert!(res.is_ok());
489
490 let res = res.unwrap().x_token(Some(x_token));
491 assert!(res.is_ok());
492
493 let res = res.unwrap().connect_lazy();
494 assert!(res.is_ok());
495 }
496
497 #[tokio::test]
498 async fn test_channel_empty_token_some() {
499 let endpoint = "http://127.0.0.1:10000";
500 let x_token = "";
501
502 let res = GeyserGrpcClient::build_from_shared(endpoint);
503 assert!(res.is_ok());
504
505 let res = res.unwrap().x_token(Some(x_token));
506 assert!(res.is_ok());
507 }
508
509 #[tokio::test]
510 async fn test_channel_invalid_token_none() {
511 let endpoint = "http://127.0.0.1:10000";
512
513 let res = GeyserGrpcClient::build_from_shared(endpoint);
514 assert!(res.is_ok());
515
516 let res = res.unwrap().x_token::<String>(None);
517 assert!(res.is_ok());
518
519 let res = res.unwrap().connect_lazy();
520 assert!(res.is_ok());
521 }
522
523 #[tokio::test]
524 async fn test_channel_invalid_uri() {
525 let endpoint = "sites/files/images/picture.png";
526
527 let res = GeyserGrpcClient::build_from_shared(endpoint);
528 assert_eq!(
529 format!("{:?}", res),
530 "Err(TonicError(tonic::transport::Error(InvalidUri, InvalidUri(InvalidFormat))))"
531 .to_owned()
532 );
533 }
534}