1use crate::{LaserstreamConfig, LaserstreamError, config::CompressionEncoding as ConfigCompressionEncoding};
2use async_stream::stream;
3use futures::StreamExt;
4use futures_channel::mpsc as futures_mpsc;
5use futures_util::{sink::SinkExt, Stream};
6use std::{pin::Pin, time::Duration};
7use tokio::sync::mpsc;
8use tokio::time::sleep;
9use laserstream_core_proto::tonic::{
10 Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding,
11};
12use tracing::{error, instrument, warn};
13use uuid;
14use laserstream_core_client::{ClientTlsConfig, Interceptor};
15use laserstream_core_proto::prelude::{geyser_client::GeyserClient};
16use laserstream_core_proto::geyser::{
17 subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterSlots,
18 SubscribeRequestPing, SubscribeUpdate,
19 SubscribePreprocessedRequest, SubscribePreprocessedUpdate,
20};
21
22const HARD_CAP_RECONNECT_ATTEMPTS: u32 = (20 * 60) / 5; const FIXED_RECONNECT_INTERVAL_MS: u64 = 5000; const SDK_NAME: &str = "laserstream-rust";
25const SDK_VERSION: &str = "0.1.9";
26
27#[derive(Clone)]
29struct SdkMetadataInterceptor {
30 x_token: Option<laserstream_core_proto::tonic::metadata::AsciiMetadataValue>,
31}
32
33impl SdkMetadataInterceptor {
34 fn new(api_key: String) -> Result<Self, Status> {
35 let x_token = if !api_key.is_empty() {
36 Some(api_key.parse().map_err(|e| {
37 Status::invalid_argument(format!("Invalid API key: {}", e))
38 })?)
39 } else {
40 None
41 };
42 Ok(Self { x_token })
43 }
44}
45
46impl Interceptor for SdkMetadataInterceptor {
47 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
48 if let Some(ref x_token) = self.x_token {
50 request.metadata_mut().insert("x-token", x_token.clone());
51 }
52
53 request.metadata_mut().insert("x-sdk-name", MetadataValue::from_static(SDK_NAME));
55 request.metadata_mut().insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION));
56
57 Ok(request)
58 }
59}
60
61#[derive(Clone)]
63pub struct StreamHandle {
64 write_tx: mpsc::UnboundedSender<SubscribeRequest>,
65}
66
67impl StreamHandle {
68 pub async fn write(&self, request: SubscribeRequest) -> Result<(), LaserstreamError> {
70 self.write_tx
71 .send(request)
72 .map_err(|_| LaserstreamError::ConnectionError("Write channel closed".to_string()))
73 }
74}
75
76#[instrument(skip(config, request))]
79pub fn subscribe(
80 config: LaserstreamConfig,
81 request: SubscribeRequest,
82) -> (
83 impl Stream<Item = Result<SubscribeUpdate, LaserstreamError>>,
84 StreamHandle,
85) {
86 let (write_tx, mut write_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
87 let handle = StreamHandle { write_tx };
88 let update_stream = stream! {
89 let mut reconnect_attempts = 0;
90 let mut tracked_slot: u64 = 0;
91
92 let effective_max_attempts = config
94 .max_reconnect_attempts
95 .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS) .min(HARD_CAP_RECONNECT_ATTEMPTS); let mut current_request = request.clone();
100 let internal_slot_sub_id = format!("internal-{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap());
101
102 let replay_enabled = config.replay;
104
105 if replay_enabled {
107 current_request.slots.insert(
108 internal_slot_sub_id.clone(),
109 SubscribeRequestFilterSlots {
110 filter_by_commitment: Some(true), ..Default::default()
112 }
113 );
114 }
115
116 if !replay_enabled {
118 current_request.from_slot = None;
119 }
120
121 let api_key_string = config.api_key.clone();
122
123 loop {
124 while let Ok(write_request) = write_rx.try_recv() {
127 merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
128 }
129
130 if tracked_slot > 0 && replay_enabled {
134 let commitment_level = current_request.commitment.unwrap_or(0);
135 let from_slot = match commitment_level {
136 0 => tracked_slot.saturating_sub(31), 1 | 2 => tracked_slot, _ => tracked_slot.saturating_sub(31), };
140 current_request.from_slot = Some(from_slot);
141 } else if !replay_enabled {
142 current_request.from_slot = None;
143 }
144
145 let attempt_request = current_request.clone();
146
147 match connect_and_subscribe_once(&config, attempt_request, api_key_string.clone()).await {
148 Ok((sender, stream)) => {
149 reconnect_attempts = 0;
151
152 let mut sender: Pin<Box<dyn futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send>> = Box::pin(sender);
154 let mut stream: Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>> + Send>> = Box::pin(stream);
156
157 let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
159 ping_interval.tick().await; let mut ping_id = 0i32;
161
162 loop {
163 tokio::select! {
164 _ = ping_interval.tick() => {
166 ping_id = ping_id.wrapping_add(1);
167 let ping_request = SubscribeRequest {
168 ping: Some(SubscribeRequestPing { id: ping_id }),
169 ..Default::default()
170 };
171 let _ = sender.send(ping_request).await;
172 },
173 result = stream.next() => {
175 if let Some(result) = result {
176 match result {
177 Ok(update) => {
178
179 if matches!(&update.update_oneof, Some(UpdateOneof::Ping(_))) {
181 let pong_req = SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() };
182 if let Err(e) = sender.send(pong_req).await {
183 warn!(error = %e, "Failed to send pong");
184 break;
185 }
186 continue;
187 }
188
189 if matches!(&update.update_oneof, Some(UpdateOneof::Pong(_))) {
191 continue;
192 }
193
194 if let Some(UpdateOneof::Slot(s)) = &update.update_oneof {
196 if replay_enabled {
197 tracked_slot = s.slot;
198 }
199
200 if update.filters.len() == 1 && update.filters.contains(&internal_slot_sub_id) {
202 continue;
203 }
204 }
205
206 let mut clean_update = update;
208 if replay_enabled {
209 clean_update.filters.retain(|f| f != &internal_slot_sub_id);
210
211 if !clean_update.filters.is_empty() {
213 yield Ok(clean_update);
214 }
215 } else {
216 yield Ok(clean_update);
218 }
219 }
220 Err(status) => {
221 warn!(error = %status, "Stream error, will reconnect after 5s delay");
223 yield Err(LaserstreamError::Status(status.clone()));
224 break;
225 }
226 }
227 } else {
228 break;
230 }
231 }
232
233 Some(write_request) = write_rx.recv() => {
235 merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
237
238 if let Err(e) = sender.send(write_request).await {
239 warn!(error = %e, "Failed to send write request");
240 break;
241 }
242 }
243 }
244 }
245 }
246 Err(err) => {
247 reconnect_attempts += 1;
249
250 error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
252
253 if reconnect_attempts >= effective_max_attempts {
255 error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
256 yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
258 format!("Connection failed after {} attempts", effective_max_attempts)
259 )));
260 return;
261 }
262 }
263 }
264
265 let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
267 sleep(delay).await;
268 }
269 };
270
271 (update_stream, handle)
272}
273
274#[instrument(skip(config, request, api_key))]
275async fn connect_and_subscribe_once(
276 config: &LaserstreamConfig,
277 request: SubscribeRequest,
278 api_key: String,
279) -> Result<
280 (
281 impl futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send,
282 impl Stream<Item = Result<SubscribeUpdate, laserstream_core_proto::tonic::Status>> + Send,
283 ),
284 Status,
285> {
286 let options = &config.channel_options;
287
288 let interceptor = SdkMetadataInterceptor::new(api_key)?;
290
291 let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
293 .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
294 .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
295 .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
296 .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
297 .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(5)))
298 .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true))
299 .initial_stream_window_size(options.initial_stream_window_size.or(Some(1024 * 1024 * 4)))
300 .initial_connection_window_size(options.initial_connection_window_size.or(Some(1024 * 1024 * 8)))
301 .http2_adaptive_window(options.http2_adaptive_window.unwrap_or(true))
302 .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
303 .buffer_size(options.buffer_size.or(Some(1024 * 64)));
304
305 if let Some(tcp_keepalive_secs) = options.tcp_keepalive_secs {
306 endpoint = endpoint.tcp_keepalive(Some(Duration::from_secs(tcp_keepalive_secs)));
307 }
308
309 endpoint = endpoint
311 .tls_config(ClientTlsConfig::new().with_enabled_roots())
312 .map_err(|e| Status::internal(format!("TLS config error: {}", e)))?;
313
314 let channel = endpoint
316 .connect()
317 .await
318 .map_err(|e| Status::unavailable(format!("Connection failed: {}", e)))?;
319
320 let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor);
322
323 geyser_client = geyser_client
325 .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
326 .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(32_000_000));
327
328 if let Some(send_comp) = options.send_compression {
330 let encoding = match send_comp {
331 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
332 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
333 };
334 geyser_client = geyser_client.send_compressed(encoding);
335 }
336
337 if let Some(ref accept_comps) = options.accept_compression {
339 for comp in accept_comps {
340 let encoding = match comp {
341 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
342 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
343 };
344 geyser_client = geyser_client.accept_compressed(encoding);
345 }
346 }
347
348 let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
350 subscribe_tx
351 .send(request)
352 .await
353 .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
354
355 let response = geyser_client
356 .subscribe(subscribe_rx)
357 .await
358 .map_err(|e| Status::internal(format!("Subscription failed: {}", e)))?;
359
360 Ok((subscribe_tx, response.into_inner()))
361}
362
363#[derive(Clone)]
365pub struct PreprocessedStreamHandle;
366
367#[instrument(skip(config, request))]
370pub fn subscribe_preprocessed(
371 config: LaserstreamConfig,
372 request: SubscribePreprocessedRequest,
373) -> (
374 impl Stream<Item = Result<SubscribePreprocessedUpdate, LaserstreamError>>,
375 PreprocessedStreamHandle,
376) {
377 let handle = PreprocessedStreamHandle;
378 let update_stream = stream! {
379 let mut reconnect_attempts = 0;
380
381 let effective_max_attempts = config
383 .max_reconnect_attempts
384 .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS)
385 .min(HARD_CAP_RECONNECT_ATTEMPTS);
386
387 loop {
388 let api_key = config.api_key.clone();
389 let request_clone = request.clone();
390
391 match connect_and_subscribe_preprocessed_once(&config, request_clone, api_key).await {
392 Ok(mut stream) => {
393 reconnect_attempts = 0;
394
395 while let Some(result) = stream.next().await {
396 match result {
397 Ok(update) => yield Ok(update),
398 Err(e) => {
399 warn!(error = %e, "Stream error received");
400 break;
401 }
402 }
403 }
404 }
405 Err(err) => {
406 reconnect_attempts += 1;
407 error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
408
409 if reconnect_attempts >= effective_max_attempts {
410 error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
411 yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
412 format!("Connection failed after {} attempts", effective_max_attempts)
413 )));
414 return;
415 }
416 }
417 }
418
419 let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
420 sleep(delay).await;
421 }
422 };
423
424 (update_stream, handle)
425}
426
427#[instrument(skip(config, request, api_key))]
428async fn connect_and_subscribe_preprocessed_once(
429 config: &LaserstreamConfig,
430 request: SubscribePreprocessedRequest,
431 api_key: String,
432) -> Result<
433 impl Stream<Item = Result<SubscribePreprocessedUpdate, laserstream_core_proto::tonic::Status>> + Send,
434 Status,
435> {
436 let options = &config.channel_options;
437
438 let interceptor = SdkMetadataInterceptor::new(api_key)?;
440
441 let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
443 .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
444 .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
445 .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
446 .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
447 .tcp_keepalive(Some(Duration::from_secs(options.tcp_keepalive_secs.unwrap_or(30))))
448 .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
449 .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(10)))
450 .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true));
451
452 endpoint = endpoint
453 .tls_config(ClientTlsConfig::new().with_enabled_roots())
454 .map_err(|e| Status::internal(format!("Failed to configure TLS: {}", e)))?;
455
456 let channel = endpoint
457 .connect()
458 .await
459 .map_err(|e| Status::internal(format!("Failed to connect: {}", e)))?;
460
461 let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor)
462 .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
463 .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(32_000_000));
464
465 if let Some(compression) = &options.send_compression {
467 let encoding = match compression {
468 ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
469 ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
470 };
471 geyser_client = geyser_client.send_compressed(encoding).accept_compressed(encoding);
472 }
473
474 let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
475
476 subscribe_tx
477 .send(request)
478 .await
479 .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
480
481 let response = geyser_client
482 .subscribe_preprocessed(subscribe_rx)
483 .await
484 .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {}", e)))?;
485
486 Ok(response.into_inner())
487}
488
489fn merge_subscribe_requests(
496 current: &mut SubscribeRequest,
497 modification: &SubscribeRequest,
498 internal_slot_sub_id: &str,
499) {
500 let internal_tracker = current
502 .slots
503 .get(internal_slot_sub_id)
504 .cloned();
505
506 current.accounts = modification.accounts.clone();
508 current.slots = modification.slots.clone();
509 current.transactions = modification.transactions.clone();
510 current.transactions_status = modification.transactions_status.clone();
511 current.blocks = modification.blocks.clone();
512 current.blocks_meta = modification.blocks_meta.clone();
513 current.entry = modification.entry.clone();
514 current.accounts_data_slice = modification.accounts_data_slice.clone();
515
516 if let Some(value) = internal_tracker {
518 current
519 .slots
520 .insert(internal_slot_sub_id.to_string(), value);
521 }
522
523 if modification.commitment.is_some() {
525 current.commitment = modification.commitment;
526 }
527
528 }
530