1use crate::{LaserstreamConfig, LaserstreamError, config::CompressionEncoding as ConfigCompressionEncoding};
2use async_stream::stream;
3use futures::StreamExt;
4use futures_channel::mpsc as futures_mpsc;
5use futures_util::{sink::SinkExt, Stream};
6use std::{pin::Pin, time::Duration};
7use tokio::sync::mpsc;
8use tokio::time::sleep;
9use laserstream_core_proto::tonic::{
10 Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding,
11};
12use tracing::{error, instrument, warn};
13use uuid;
14use laserstream_core_client::{ClientTlsConfig, Interceptor};
15use laserstream_core_proto::prelude::{geyser_client::GeyserClient};
16use laserstream_core_proto::geyser::{
17 subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterSlots,
18 SubscribeRequestPing, SubscribeUpdate,
19 SubscribePreprocessedRequest, SubscribePreprocessedUpdate,
20};
21
22const HARD_CAP_RECONNECT_ATTEMPTS: u32 = (20 * 60) / 5; const FIXED_RECONNECT_INTERVAL_MS: u64 = 5000; const SDK_NAME: &str = "laserstream-rust";
25const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
26
27#[derive(Clone)]
29struct SdkMetadataInterceptor {
30 x_token: Option<laserstream_core_proto::tonic::metadata::AsciiMetadataValue>,
31}
32
33impl SdkMetadataInterceptor {
34 fn new(api_key: String) -> Result<Self, Status> {
35 let x_token = if !api_key.is_empty() {
36 Some(api_key.parse().map_err(|e| {
37 Status::invalid_argument(format!("Invalid API key: {}", e))
38 })?)
39 } else {
40 None
41 };
42 Ok(Self { x_token })
43 }
44}
45
46impl Interceptor for SdkMetadataInterceptor {
47 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
48 if let Some(ref x_token) = self.x_token {
50 request.metadata_mut().insert("x-token", x_token.clone());
51 }
52
53 request.metadata_mut().insert("x-sdk-name", MetadataValue::from_static(SDK_NAME));
55 request.metadata_mut().insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION));
56
57 Ok(request)
58 }
59}
60
61#[derive(Clone)]
63pub struct StreamHandle {
64 write_tx: mpsc::UnboundedSender<SubscribeRequest>,
65}
66
67impl StreamHandle {
68 pub async fn write(&self, request: SubscribeRequest) -> Result<(), LaserstreamError> {
70 self.write_tx
71 .send(request)
72 .map_err(|_| LaserstreamError::ConnectionError("Write channel closed".to_string()))
73 }
74}
75
76#[instrument(skip(config, request))]
79pub fn subscribe(
80 config: LaserstreamConfig,
81 request: SubscribeRequest,
82) -> (
83 impl Stream<Item = Result<SubscribeUpdate, LaserstreamError>>,
84 StreamHandle,
85) {
86 let (write_tx, mut write_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
87 let handle = StreamHandle { write_tx };
88 let update_stream = stream! {
89 let mut reconnect_attempts = 0;
90 let mut tracked_slot: u64 = 0;
91
92 let effective_max_attempts = config
94 .max_reconnect_attempts
95 .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS) .min(HARD_CAP_RECONNECT_ATTEMPTS); let mut current_request = request.clone();
100 let internal_slot_sub_id = format!("internal-{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap());
101
102 let replay_enabled = config.replay;
104
105 if replay_enabled {
107 current_request.slots.insert(
108 internal_slot_sub_id.clone(),
109 SubscribeRequestFilterSlots {
110 filter_by_commitment: Some(true), ..Default::default()
112 }
113 );
114 }
115
116 if !replay_enabled {
118 current_request.from_slot = None;
119 }
120
121 let api_key_string = config.api_key.clone();
122
123 loop {
124 while let Ok(write_request) = write_rx.try_recv() {
127 merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
128 }
129
130 if tracked_slot > 0 && replay_enabled {
134 let commitment_level = current_request.commitment.unwrap_or(0);
135 let from_slot = match commitment_level {
136 0 => tracked_slot.saturating_sub(31), 1 | 2 => tracked_slot, _ => tracked_slot.saturating_sub(31), };
140 current_request.from_slot = Some(from_slot);
141 } else if !replay_enabled {
142 current_request.from_slot = None;
143 }
144
145 let attempt_request = current_request.clone();
146
147 match connect_and_subscribe_once(&config, attempt_request, api_key_string.clone()).await {
148 Ok((sender, stream)) => {
149 reconnect_attempts = 0;
151
152 let mut sender: Pin<Box<dyn futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send>> = Box::pin(sender);
154 let mut stream: Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>> + Send>> = Box::pin(stream);
156
157 let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
159 ping_interval.tick().await; let mut ping_id = 0i32;
161
162 loop {
163 tokio::select! {
164 _ = ping_interval.tick() => {
166 ping_id = ping_id.wrapping_add(1);
167 let ping_request = SubscribeRequest {
168 ping: Some(SubscribeRequestPing { id: ping_id }),
169 ..Default::default()
170 };
171 let _ = sender.send(ping_request).await;
172 },
173 result = stream.next() => {
175 if let Some(result) = result {
176 match result {
177 Ok(update) => {
178
179 if matches!(&update.update_oneof, Some(UpdateOneof::Ping(_))) {
181 let pong_req = SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() };
182 if let Err(e) = sender.send(pong_req).await {
183 warn!(error = %e, "Failed to send pong");
184 break;
185 }
186 continue;
187 }
188
189 if matches!(&update.update_oneof, Some(UpdateOneof::Pong(_))) {
191 continue;
192 }
193
194 if let Some(UpdateOneof::Slot(s)) = &update.update_oneof {
196 if replay_enabled {
197 tracked_slot = s.slot;
198 }
199
200 if update.filters.len() == 1 && update.filters.contains(&internal_slot_sub_id) {
202 continue;
203 }
204 }
205
206 let mut clean_update = update;
208 if replay_enabled {
209 clean_update.filters.retain(|f| f != &internal_slot_sub_id);
210
211 if !clean_update.filters.is_empty() {
213 yield Ok(clean_update);
214 }
215 } else {
216 yield Ok(clean_update);
218 }
219 }
220 Err(status) => {
221 warn!(error = %status, "Stream error, will reconnect after 5s delay");
224 break;
225 }
226 }
227 } else {
228 break;
230 }
231 }
232
233 Some(write_request) = write_rx.recv() => {
235 merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
237
238 let mut send_req = current_request.clone();
243 send_req.from_slot = None;
244 send_req.ping = None;
245
246 if let Err(e) = sender.send(send_req).await {
247 warn!(error = %e, "Failed to send write request");
248 break;
249 }
250 }
251 }
252 }
253 }
254 Err(err) => {
255 reconnect_attempts += 1;
257
258 error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
260
261 if reconnect_attempts >= effective_max_attempts {
263 error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
264 yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
266 format!("Connection failed after {} attempts", effective_max_attempts)
267 )));
268 return;
269 }
270 }
271 }
272
273 let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
275 sleep(delay).await;
276 }
277 };
278
279 (update_stream, handle)
280}
281
282#[instrument(skip(config, request, api_key))]
283async fn connect_and_subscribe_once(
284 config: &LaserstreamConfig,
285 request: SubscribeRequest,
286 api_key: String,
287) -> Result<
288 (
289 impl futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send,
290 impl Stream<Item = Result<SubscribeUpdate, laserstream_core_proto::tonic::Status>> + Send,
291 ),
292 Status,
293> {
294 let options = &config.channel_options;
295
296 let interceptor = SdkMetadataInterceptor::new(api_key)?;
298
299 let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
301 .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
302 .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
303 .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
304 .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
305 .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(5)))
306 .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true))
307 .initial_stream_window_size(options.initial_stream_window_size.or(Some(1024 * 1024 * 4)))
308 .initial_connection_window_size(options.initial_connection_window_size.or(Some(1024 * 1024 * 8)))
309 .http2_adaptive_window(options.http2_adaptive_window.unwrap_or(true))
310 .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
311 .buffer_size(options.buffer_size.or(Some(1024 * 64)));
312
313 if let Some(tcp_keepalive_secs) = options.tcp_keepalive_secs {
314 endpoint = endpoint.tcp_keepalive(Some(Duration::from_secs(tcp_keepalive_secs)));
315 }
316
317 endpoint = endpoint
319 .tls_config(ClientTlsConfig::new().with_enabled_roots())
320 .map_err(|e| Status::internal(format!("TLS config error: {}", e)))?;
321
322 let channel = endpoint
324 .connect()
325 .await
326 .map_err(|e| Status::unavailable(format!("Connection failed: {}", e)))?;
327
328 let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor);
330
331 geyser_client = geyser_client
333 .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
334 .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(64 * 1024 * 1024));
335
336 if let Some(send_comp) = options.send_compression {
338 let encoding = match send_comp {
339 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
340 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
341 };
342 geyser_client = geyser_client.send_compressed(encoding);
343 }
344
345 if let Some(ref accept_comps) = options.accept_compression {
347 for comp in accept_comps {
348 let encoding = match comp {
349 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
350 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
351 };
352 geyser_client = geyser_client.accept_compressed(encoding);
353 }
354 }
355
356 let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
358 subscribe_tx
359 .send(request)
360 .await
361 .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
362
363 let response = geyser_client
364 .subscribe(subscribe_rx)
365 .await
366 .map_err(|e| Status::internal(format!("Subscription failed: {}", e)))?;
367
368 Ok((subscribe_tx, response.into_inner()))
369}
370
371#[derive(Clone)]
373pub struct PreprocessedStreamHandle;
374
375#[instrument(skip(config, request))]
378pub fn subscribe_preprocessed(
379 config: LaserstreamConfig,
380 request: SubscribePreprocessedRequest,
381) -> (
382 impl Stream<Item = Result<SubscribePreprocessedUpdate, LaserstreamError>>,
383 PreprocessedStreamHandle,
384) {
385 let handle = PreprocessedStreamHandle;
386 let update_stream = stream! {
387 let mut reconnect_attempts = 0;
388
389 let effective_max_attempts = config
391 .max_reconnect_attempts
392 .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS)
393 .min(HARD_CAP_RECONNECT_ATTEMPTS);
394
395 loop {
396 let api_key = config.api_key.clone();
397 let request_clone = request.clone();
398
399 match connect_and_subscribe_preprocessed_once(&config, request_clone, api_key).await {
400 Ok(mut stream) => {
401 reconnect_attempts = 0;
402
403 while let Some(result) = stream.next().await {
404 match result {
405 Ok(update) => yield Ok(update),
406 Err(e) => {
407 warn!(error = %e, "Stream error received");
408 break;
409 }
410 }
411 }
412 }
413 Err(err) => {
414 reconnect_attempts += 1;
415 error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
416
417 if reconnect_attempts >= effective_max_attempts {
418 error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
419 yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
420 format!("Connection failed after {} attempts", effective_max_attempts)
421 )));
422 return;
423 }
424 }
425 }
426
427 let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
428 sleep(delay).await;
429 }
430 };
431
432 (update_stream, handle)
433}
434
435#[instrument(skip(config, request, api_key))]
436async fn connect_and_subscribe_preprocessed_once(
437 config: &LaserstreamConfig,
438 request: SubscribePreprocessedRequest,
439 api_key: String,
440) -> Result<
441 impl Stream<Item = Result<SubscribePreprocessedUpdate, laserstream_core_proto::tonic::Status>> + Send,
442 Status,
443> {
444 let options = &config.channel_options;
445
446 let interceptor = SdkMetadataInterceptor::new(api_key)?;
448
449 let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
451 .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
452 .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
453 .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
454 .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
455 .tcp_keepalive(Some(Duration::from_secs(options.tcp_keepalive_secs.unwrap_or(30))))
456 .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
457 .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(10)))
458 .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true));
459
460 endpoint = endpoint
461 .tls_config(ClientTlsConfig::new().with_enabled_roots())
462 .map_err(|e| Status::internal(format!("Failed to configure TLS: {}", e)))?;
463
464 let channel = endpoint
465 .connect()
466 .await
467 .map_err(|e| Status::internal(format!("Failed to connect: {}", e)))?;
468
469 let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor)
470 .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
471 .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(64 * 1024 * 1024));
472
473 if let Some(compression) = &options.send_compression {
475 let encoding = match compression {
476 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
477 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
478 };
479 geyser_client = geyser_client.send_compressed(encoding).accept_compressed(encoding);
480 }
481
482 let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
483
484 subscribe_tx
485 .send(request)
486 .await
487 .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
488
489 let response = geyser_client
490 .subscribe_preprocessed(subscribe_rx)
491 .await
492 .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {}", e)))?;
493
494 Ok(response.into_inner())
495}
496
497fn merge_subscribe_requests(
504 current: &mut SubscribeRequest,
505 modification: &SubscribeRequest,
506 internal_slot_sub_id: &str,
507) {
508 let internal_tracker = current
510 .slots
511 .get(internal_slot_sub_id)
512 .cloned();
513
514 current.accounts = modification.accounts.clone();
516 current.slots = modification.slots.clone();
517 current.transactions = modification.transactions.clone();
518 current.transactions_status = modification.transactions_status.clone();
519 current.blocks = modification.blocks.clone();
520 current.blocks_meta = modification.blocks_meta.clone();
521 current.entry = modification.entry.clone();
522 current.accounts_data_slice = modification.accounts_data_slice.clone();
523
524 if let Some(value) = internal_tracker {
526 current
527 .slots
528 .insert(internal_slot_sub_id.to_string(), value);
529 }
530
531 if modification.commitment.is_some() {
533 current.commitment = modification.commitment;
534 }
535
536 }
538