Skip to main content

helius_laserstream/
client.rs

1use crate::{LaserstreamConfig, LaserstreamError, config::CompressionEncoding as ConfigCompressionEncoding};
2use async_stream::stream;
3use futures::StreamExt;
4use futures_channel::mpsc as futures_mpsc;
5use futures_util::{sink::SinkExt, Stream};
6use std::{pin::Pin, time::Duration};
7use tokio::sync::mpsc;
8use tokio::time::sleep;
9use laserstream_core_proto::tonic::{
10    Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding,
11};
12use tracing::{error, instrument, warn};
13use uuid;
14use laserstream_core_client::{ClientTlsConfig, Interceptor};
15use laserstream_core_proto::prelude::{geyser_client::GeyserClient};
16use laserstream_core_proto::geyser::{
17    subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterSlots,
18    SubscribeRequestPing, SubscribeUpdate,
19    SubscribePreprocessedRequest, SubscribePreprocessedUpdate,
20};
21
22const HARD_CAP_RECONNECT_ATTEMPTS: u32 = (20 * 60) / 5; // 20 mins / 5 sec interval
23const FIXED_RECONNECT_INTERVAL_MS: u64 = 5000; // 5 seconds fixed interval
24const SDK_NAME: &str = "laserstream-rust";
25const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
26
27/// Custom interceptor that adds SDK metadata headers to all gRPC requests
28#[derive(Clone)]
29struct SdkMetadataInterceptor {
30    x_token: Option<laserstream_core_proto::tonic::metadata::AsciiMetadataValue>,
31}
32
33impl SdkMetadataInterceptor {
34    fn new(api_key: String) -> Result<Self, Status> {
35        let x_token = if !api_key.is_empty() {
36            Some(api_key.parse().map_err(|e| {
37                Status::invalid_argument(format!("Invalid API key: {}", e))
38            })?)
39        } else {
40            None
41        };
42        Ok(Self { x_token })
43    }
44}
45
46impl Interceptor for SdkMetadataInterceptor {
47    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
48        // Add x-token if present
49        if let Some(ref x_token) = self.x_token {
50            request.metadata_mut().insert("x-token", x_token.clone());
51        }
52
53        // Add SDK metadata headers
54        request.metadata_mut().insert("x-sdk-name", MetadataValue::from_static(SDK_NAME));
55        request.metadata_mut().insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION));
56
57        Ok(request)
58    }
59}
60
61/// Handle for managing a bidirectional streaming subscription.
62#[derive(Clone)]
63pub struct StreamHandle {
64    write_tx: mpsc::UnboundedSender<SubscribeRequest>,
65}
66
67impl StreamHandle {
68    /// Send a new subscription request to update the active subscription.
69    pub async fn write(&self, request: SubscribeRequest) -> Result<(), LaserstreamError> {
70        self.write_tx
71            .send(request)
72            .map_err(|_| LaserstreamError::ConnectionError("Write channel closed".to_string()))
73    }
74}
75
76/// Establishes a gRPC connection, handles the subscription lifecycle,
77/// and provides a stream of updates. Automatically reconnects on failure.
78#[instrument(skip(config, request))]
79pub fn subscribe(
80    config: LaserstreamConfig,
81    request: SubscribeRequest,
82) -> (
83    impl Stream<Item = Result<SubscribeUpdate, LaserstreamError>>,
84    StreamHandle,
85) {
86    let (write_tx, mut write_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
87    let handle = StreamHandle { write_tx };
88    let update_stream = stream! {
89        let mut reconnect_attempts = 0;
90        let mut tracked_slot: u64 = 0;
91
92        // Determine the effective max reconnect attempts
93        let effective_max_attempts = config
94            .max_reconnect_attempts
95            .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS) // Default to hard cap if not set
96            .min(HARD_CAP_RECONNECT_ATTEMPTS); // Enforce hard cap
97
98        // Keep original request for reconnection attempts
99        let mut current_request = request.clone();
100        let internal_slot_sub_id = format!("internal-{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap());
101        
102        // Get replay behavior from config
103        let replay_enabled = config.replay;
104        
105        // Add internal slot subscription only when replay is enabled
106        if replay_enabled {
107            current_request.slots.insert(
108                internal_slot_sub_id.clone(),
109                SubscribeRequestFilterSlots {
110                    filter_by_commitment: Some(true), // Use same commitment as user request
111                    ..Default::default()
112                }
113            );
114        }
115        
116        // Clear any user-provided from_slot if replay is disabled
117        if !replay_enabled {
118            current_request.from_slot = None;
119        }
120
121        let api_key_string = config.api_key.clone();
122
123        loop {
124            // Drain any pending write requests that arrived during reconnection delay.
125            // This ensures writes sent while disconnected are included in the next connection.
126            while let Ok(write_request) = write_rx.try_recv() {
127                merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
128            }
129
130            // Always update from_slot on current_request based on tracked_slot.
131            // This ensures reconnections always use the most recent slot, even after
132            // a successful connection that subsequently errors on the stream.
133            if tracked_slot > 0 && replay_enabled {
134                let commitment_level = current_request.commitment.unwrap_or(0);
135                let from_slot = match commitment_level {
136                    0 => tracked_slot.saturating_sub(31), // PROCESSED: rewind by 31 slots
137                    1 | 2 => tracked_slot,                 // CONFIRMED/FINALIZED: exact slot
138                    _ => tracked_slot.saturating_sub(31),  // Unknown: default to safe behavior
139                };
140                current_request.from_slot = Some(from_slot);
141            } else if !replay_enabled {
142                current_request.from_slot = None;
143            }
144
145            let attempt_request = current_request.clone();
146
147            match connect_and_subscribe_once(&config, attempt_request, api_key_string.clone()).await {
148                Ok((sender, stream)) => {
149                    // Successful connection – reset attempt counter so we don't hit the cap
150                    reconnect_attempts = 0;
151
152                    // Box sender and stream here before processing
153                    let mut sender: Pin<Box<dyn futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send>> = Box::pin(sender);
154                    // Ensure the boxed stream yields Result<_, Status>
155                    let mut stream: Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>> + Send>> = Box::pin(stream);
156
157                    // Ping interval timer
158                    let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
159                    ping_interval.tick().await; // Skip first immediate tick
160                    let mut ping_id = 0i32;
161
162                    loop {
163                        tokio::select! {
164                            // Send periodic ping
165                            _ = ping_interval.tick() => {
166                                ping_id = ping_id.wrapping_add(1);
167                                let ping_request = SubscribeRequest {
168                                    ping: Some(SubscribeRequestPing { id: ping_id }),
169                                    ..Default::default()
170                                };
171                                let _ = sender.send(ping_request).await;
172                            },
173                            // Handle incoming messages from the server
174                            result = stream.next() => {
175                                if let Some(result) = result {
176                                    match result {
177                                        Ok(update) => {
178                                            
179                                            // Handle ping/pong
180                                            if matches!(&update.update_oneof, Some(UpdateOneof::Ping(_))) {
181                                                let pong_req = SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() };
182                                                if let Err(e) = sender.send(pong_req).await {
183                                                    warn!(error = %e, "Failed to send pong");
184                                                    break;
185                                                }
186                                                continue;
187                                            }
188                                            
189                                            // Do not forward server 'Pong' updates to consumers either
190                                            if matches!(&update.update_oneof, Some(UpdateOneof::Pong(_))) {
191                                                continue;
192                                            }
193
194                                // Track the latest slot from any slot update (including internal subscription)
195                                if let Some(UpdateOneof::Slot(s)) = &update.update_oneof {
196                                    if replay_enabled {
197                                        tracked_slot = s.slot;
198                                    }
199                                    
200                                    // Skip if this slot update is EXCLUSIVELY from our internal subscription
201                                    if update.filters.len() == 1 && update.filters.contains(&internal_slot_sub_id) {
202                                        continue;
203                                    }
204                                }
205
206                                            // Filter out internal subscription from filters before yielding (only if replay is enabled)
207                                            let mut clean_update = update;
208                                            if replay_enabled {
209                                                clean_update.filters.retain(|f| f != &internal_slot_sub_id);
210                                                
211                                                // Only yield if there are still filters after cleaning
212                                                if !clean_update.filters.is_empty() {
213                                                    yield Ok(clean_update);
214                                                }
215                                            } else {
216                                                // When replay is disabled, yield all updates as-is
217                                                yield Ok(clean_update);
218                                            }
219                                        }
220                                        Err(status) => {
221                                            // Transient error: reconnect silently. Surfaced to the consumer
222                                            // only on terminal failure (max attempts) — see the Err arm below.
223                                            warn!(error = %status, "Stream error, will reconnect after 5s delay");
224                                            break;
225                                        }
226                                    }
227                                } else {
228                                    // Stream ended
229                                    break;
230                                }
231                            }
232                            
233                            // Handle write requests from the user
234                            Some(write_request) = write_rx.recv() => {
235                                // Merge the write_request into current_request so it persists across reconnections
236                                merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
237
238                                // Send the merged current_request (which preserves the internal slot
239                                // tracker) instead of the raw write_request. Yellowstone gRPC replaces
240                                // all subscriptions on each write, so the raw request would drop the
241                                // internal slot tracker and cause tracked_slot to go stale.
242                                let mut send_req = current_request.clone();
243                                send_req.from_slot = None;
244                                send_req.ping = None;
245
246                                if let Err(e) = sender.send(send_req).await {
247                                    warn!(error = %e, "Failed to send write request");
248                                    break;
249                                }
250                            }
251                        }
252                    }
253                }
254                Err(err) => {
255                    // Increment reconnect attempts
256                    reconnect_attempts += 1;
257
258                    // Log error internally but don't yield to consumer until max attempts exhausted
259                    error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
260
261                    // Check if exceeded max reconnect attempts
262                    if reconnect_attempts >= effective_max_attempts {
263                        error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
264                        // Only report error to consumer after exhausting all retries
265                        yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
266                            format!("Connection failed after {} attempts", effective_max_attempts)
267                        )));
268                        return;
269                    }
270                }
271            }
272
273            // Wait 5s before retry
274            let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
275            sleep(delay).await;
276        }
277    };
278    
279    (update_stream, handle)
280}
281
282#[instrument(skip(config, request, api_key))]
283async fn connect_and_subscribe_once(
284    config: &LaserstreamConfig,
285    request: SubscribeRequest,
286    api_key: String,
287) -> Result<
288    (
289        impl futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send,
290        impl Stream<Item = Result<SubscribeUpdate, laserstream_core_proto::tonic::Status>> + Send,
291    ),
292    Status,
293> {
294    let options = &config.channel_options;
295
296    // Create our custom interceptor with SDK metadata
297    let interceptor = SdkMetadataInterceptor::new(api_key)?;
298
299    // Build endpoint with all options
300    let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
301        .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
302        .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
303        .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
304        .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
305        .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(5)))
306        .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true))
307        .initial_stream_window_size(options.initial_stream_window_size.or(Some(1024 * 1024 * 4)))
308        .initial_connection_window_size(options.initial_connection_window_size.or(Some(1024 * 1024 * 8)))
309        .http2_adaptive_window(options.http2_adaptive_window.unwrap_or(true))
310        .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
311        .buffer_size(options.buffer_size.or(Some(1024 * 64)));
312
313    if let Some(tcp_keepalive_secs) = options.tcp_keepalive_secs {
314        endpoint = endpoint.tcp_keepalive(Some(Duration::from_secs(tcp_keepalive_secs)));
315    }
316
317    // Configure TLS
318    endpoint = endpoint
319        .tls_config(ClientTlsConfig::new().with_enabled_roots())
320        .map_err(|e| Status::internal(format!("TLS config error: {}", e)))?;
321
322    // Connect to create channel
323    let channel = endpoint
324        .connect()
325        .await
326        .map_err(|e| Status::unavailable(format!("Connection failed: {}", e)))?;
327
328    // Create geyser client with our custom interceptor
329    let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor);
330
331    // Configure message size limits
332    geyser_client = geyser_client
333        .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
334        .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(64 * 1024 * 1024));
335
336    // Configure compression if specified
337    if let Some(send_comp) = options.send_compression {
338        let encoding = match send_comp {
339            ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
340            ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
341        };
342        geyser_client = geyser_client.send_compressed(encoding);
343    }
344
345    // Configure accepted compression encodings
346    if let Some(ref accept_comps) = options.accept_compression {
347        for comp in accept_comps {
348            let encoding = match comp {
349                ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
350                ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
351            };
352            geyser_client = geyser_client.accept_compressed(encoding);
353        }
354    }
355
356    // Create bidirectional stream
357    let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
358    subscribe_tx
359        .send(request)
360        .await
361        .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
362
363    let response = geyser_client
364        .subscribe(subscribe_rx)
365        .await
366        .map_err(|e| Status::internal(format!("Subscription failed: {}", e)))?;
367
368    Ok((subscribe_tx, response.into_inner()))
369}
370
371/// Handle for managing a preprocessed subscription (no write support).
372#[derive(Clone)]
373pub struct PreprocessedStreamHandle;
374
375/// Establishes a gRPC connection for preprocessed transactions and provides a stream of updates.
376/// Automatically reconnects on failure. No slot tracking or replay - just simple reconnection.
377#[instrument(skip(config, request))]
378pub fn subscribe_preprocessed(
379    config: LaserstreamConfig,
380    request: SubscribePreprocessedRequest,
381) -> (
382    impl Stream<Item = Result<SubscribePreprocessedUpdate, LaserstreamError>>,
383    PreprocessedStreamHandle,
384) {
385    let handle = PreprocessedStreamHandle;
386    let update_stream = stream! {
387        let mut reconnect_attempts = 0;
388
389        // Determine the effective max reconnect attempts
390        let effective_max_attempts = config
391            .max_reconnect_attempts
392            .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS)
393            .min(HARD_CAP_RECONNECT_ATTEMPTS);
394
395        loop {
396            let api_key = config.api_key.clone();
397            let request_clone = request.clone();
398
399            match connect_and_subscribe_preprocessed_once(&config, request_clone, api_key).await {
400                Ok(mut stream) => {
401                    reconnect_attempts = 0;
402
403                    while let Some(result) = stream.next().await {
404                        match result {
405                            Ok(update) => yield Ok(update),
406                            Err(e) => {
407                                warn!(error = %e, "Stream error received");
408                                break;
409                            }
410                        }
411                    }
412                }
413                Err(err) => {
414                    reconnect_attempts += 1;
415                    error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
416
417                    if reconnect_attempts >= effective_max_attempts {
418                        error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
419                        yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
420                            format!("Connection failed after {} attempts", effective_max_attempts)
421                        )));
422                        return;
423                    }
424                }
425            }
426
427            let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
428            sleep(delay).await;
429        }
430    };
431
432    (update_stream, handle)
433}
434
435#[instrument(skip(config, request, api_key))]
436async fn connect_and_subscribe_preprocessed_once(
437    config: &LaserstreamConfig,
438    request: SubscribePreprocessedRequest,
439    api_key: String,
440) -> Result<
441    impl Stream<Item = Result<SubscribePreprocessedUpdate, laserstream_core_proto::tonic::Status>> + Send,
442    Status,
443> {
444    let options = &config.channel_options;
445
446    // Create our custom interceptor with SDK metadata
447    let interceptor = SdkMetadataInterceptor::new(api_key)?;
448
449    // Build endpoint with all options
450    let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
451        .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
452        .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
453        .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
454        .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
455        .tcp_keepalive(Some(Duration::from_secs(options.tcp_keepalive_secs.unwrap_or(30))))
456        .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
457        .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(10)))
458        .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true));
459
460    endpoint = endpoint
461        .tls_config(ClientTlsConfig::new().with_enabled_roots())
462        .map_err(|e| Status::internal(format!("Failed to configure TLS: {}", e)))?;
463
464    let channel = endpoint
465        .connect()
466        .await
467        .map_err(|e| Status::internal(format!("Failed to connect: {}", e)))?;
468
469    let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor)
470        .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
471        .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(64 * 1024 * 1024));
472
473    // Apply compression if specified
474    if let Some(compression) = &options.send_compression {
475        let encoding = match compression {
476            ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
477            ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
478        };
479        geyser_client = geyser_client.send_compressed(encoding).accept_compressed(encoding);
480    }
481
482    let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
483
484    subscribe_tx
485        .send(request)
486        .await
487        .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
488
489    let response = geyser_client
490        .subscribe_preprocessed(subscribe_rx)
491        .await
492        .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {}", e)))?;
493
494    Ok(response.into_inner())
495}
496
497/// Merges a write request into the current stored request so that subscription
498/// changes made via `write()` persist across reconnections.
499///
500/// Replaces all user subscription fields with the modification's values while
501/// preserving the internal slot tracker subscription used for replay.
502/// `from_slot` and `ping` are not replaced as they are connection-specific.
503fn merge_subscribe_requests(
504    current: &mut SubscribeRequest,
505    modification: &SubscribeRequest,
506    internal_slot_sub_id: &str,
507) {
508    // Save the internal slot tracker before replacing slots
509    let internal_tracker = current
510        .slots
511        .get(internal_slot_sub_id)
512        .cloned();
513
514    // Replace all subscription types (Yellowstone gRPC replaces, not merges)
515    current.accounts = modification.accounts.clone();
516    current.slots = modification.slots.clone();
517    current.transactions = modification.transactions.clone();
518    current.transactions_status = modification.transactions_status.clone();
519    current.blocks = modification.blocks.clone();
520    current.blocks_meta = modification.blocks_meta.clone();
521    current.entry = modification.entry.clone();
522    current.accounts_data_slice = modification.accounts_data_slice.clone();
523
524    // Restore the internal slot tracker if it existed
525    if let Some(value) = internal_tracker {
526        current
527            .slots
528            .insert(internal_slot_sub_id.to_string(), value);
529    }
530
531    // Update commitment if specified in the modification
532    if modification.commitment.is_some() {
533        current.commitment = modification.commitment;
534    }
535
536    // Note: from_slot and ping are not replaced as they are connection-specific
537}
538