Skip to main content

helius_laserstream/
client.rs

1use crate::{LaserstreamConfig, LaserstreamError, config::CompressionEncoding as ConfigCompressionEncoding};
2use async_stream::stream;
3use futures::StreamExt;
4use futures_channel::mpsc as futures_mpsc;
5use futures_util::{sink::SinkExt, Stream};
6use std::{pin::Pin, time::Duration};
7use tokio::sync::mpsc;
8use tokio::time::sleep;
9use laserstream_core_proto::tonic::{
10    Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding,
11};
12use tracing::{error, instrument, warn};
13use uuid;
14use laserstream_core_client::{ClientTlsConfig, Interceptor};
15use laserstream_core_proto::prelude::{geyser_client::GeyserClient};
16use laserstream_core_proto::geyser::{
17    subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterSlots,
18    SubscribeRequestPing, SubscribeUpdate,
19    SubscribePreprocessedRequest, SubscribePreprocessedUpdate,
20};
21
22const HARD_CAP_RECONNECT_ATTEMPTS: u32 = (20 * 60) / 5; // 20 mins / 5 sec interval
23const FIXED_RECONNECT_INTERVAL_MS: u64 = 5000; // 5 seconds fixed interval
24const SDK_NAME: &str = "laserstream-rust";
25const SDK_VERSION: &str = "0.1.9";
26
27/// Custom interceptor that adds SDK metadata headers to all gRPC requests
28#[derive(Clone)]
29struct SdkMetadataInterceptor {
30    x_token: Option<laserstream_core_proto::tonic::metadata::AsciiMetadataValue>,
31}
32
33impl SdkMetadataInterceptor {
34    fn new(api_key: String) -> Result<Self, Status> {
35        let x_token = if !api_key.is_empty() {
36            Some(api_key.parse().map_err(|e| {
37                Status::invalid_argument(format!("Invalid API key: {}", e))
38            })?)
39        } else {
40            None
41        };
42        Ok(Self { x_token })
43    }
44}
45
46impl Interceptor for SdkMetadataInterceptor {
47    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
48        // Add x-token if present
49        if let Some(ref x_token) = self.x_token {
50            request.metadata_mut().insert("x-token", x_token.clone());
51        }
52
53        // Add SDK metadata headers
54        request.metadata_mut().insert("x-sdk-name", MetadataValue::from_static(SDK_NAME));
55        request.metadata_mut().insert("x-sdk-version", MetadataValue::from_static(SDK_VERSION));
56
57        Ok(request)
58    }
59}
60
61/// Handle for managing a bidirectional streaming subscription.
62#[derive(Clone)]
63pub struct StreamHandle {
64    write_tx: mpsc::UnboundedSender<SubscribeRequest>,
65}
66
67impl StreamHandle {
68    /// Send a new subscription request to update the active subscription.
69    pub async fn write(&self, request: SubscribeRequest) -> Result<(), LaserstreamError> {
70        self.write_tx
71            .send(request)
72            .map_err(|_| LaserstreamError::ConnectionError("Write channel closed".to_string()))
73    }
74}
75
76/// Establishes a gRPC connection, handles the subscription lifecycle,
77/// and provides a stream of updates. Automatically reconnects on failure.
78#[instrument(skip(config, request))]
79pub fn subscribe(
80    config: LaserstreamConfig,
81    request: SubscribeRequest,
82) -> (
83    impl Stream<Item = Result<SubscribeUpdate, LaserstreamError>>,
84    StreamHandle,
85) {
86    let (write_tx, mut write_rx) = mpsc::unbounded_channel::<SubscribeRequest>();
87    let handle = StreamHandle { write_tx };
88    let update_stream = stream! {
89        let mut reconnect_attempts = 0;
90        let mut tracked_slot: u64 = 0;
91
92        // Determine the effective max reconnect attempts
93        let effective_max_attempts = config
94            .max_reconnect_attempts
95            .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS) // Default to hard cap if not set
96            .min(HARD_CAP_RECONNECT_ATTEMPTS); // Enforce hard cap
97
98        // Keep original request for reconnection attempts
99        let mut current_request = request.clone();
100        let internal_slot_sub_id = format!("internal-{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap());
101        
102        // Get replay behavior from config
103        let replay_enabled = config.replay;
104        
105        // Add internal slot subscription only when replay is enabled
106        if replay_enabled {
107            current_request.slots.insert(
108                internal_slot_sub_id.clone(),
109                SubscribeRequestFilterSlots {
110                    filter_by_commitment: Some(true), // Use same commitment as user request
111                    ..Default::default()
112                }
113            );
114        }
115        
116        // Clear any user-provided from_slot if replay is disabled
117        if !replay_enabled {
118            current_request.from_slot = None;
119        }
120
121        let api_key_string = config.api_key.clone();
122
123        loop {
124            // Drain any pending write requests that arrived during reconnection delay.
125            // This ensures writes sent while disconnected are included in the next connection.
126            while let Ok(write_request) = write_rx.try_recv() {
127                merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
128            }
129
130            // Always update from_slot on current_request based on tracked_slot.
131            // This ensures reconnections always use the most recent slot, even after
132            // a successful connection that subsequently errors on the stream.
133            if tracked_slot > 0 && replay_enabled {
134                let commitment_level = current_request.commitment.unwrap_or(0);
135                let from_slot = match commitment_level {
136                    0 => tracked_slot.saturating_sub(31), // PROCESSED: rewind by 31 slots
137                    1 | 2 => tracked_slot,                 // CONFIRMED/FINALIZED: exact slot
138                    _ => tracked_slot.saturating_sub(31),  // Unknown: default to safe behavior
139                };
140                current_request.from_slot = Some(from_slot);
141            } else if !replay_enabled {
142                current_request.from_slot = None;
143            }
144
145            let attempt_request = current_request.clone();
146
147            match connect_and_subscribe_once(&config, attempt_request, api_key_string.clone()).await {
148                Ok((sender, stream)) => {
149                    // Successful connection – reset attempt counter so we don't hit the cap
150                    reconnect_attempts = 0;
151
152                    // Box sender and stream here before processing
153                    let mut sender: Pin<Box<dyn futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send>> = Box::pin(sender);
154                    // Ensure the boxed stream yields Result<_, Status>
155                    let mut stream: Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>> + Send>> = Box::pin(stream);
156
157                    // Ping interval timer
158                    let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
159                    ping_interval.tick().await; // Skip first immediate tick
160                    let mut ping_id = 0i32;
161
162                    loop {
163                        tokio::select! {
164                            // Send periodic ping
165                            _ = ping_interval.tick() => {
166                                ping_id = ping_id.wrapping_add(1);
167                                let ping_request = SubscribeRequest {
168                                    ping: Some(SubscribeRequestPing { id: ping_id }),
169                                    ..Default::default()
170                                };
171                                let _ = sender.send(ping_request).await;
172                            },
173                            // Handle incoming messages from the server
174                            result = stream.next() => {
175                                if let Some(result) = result {
176                                    match result {
177                                        Ok(update) => {
178                                            
179                                            // Handle ping/pong
180                                            if matches!(&update.update_oneof, Some(UpdateOneof::Ping(_))) {
181                                                let pong_req = SubscribeRequest { ping: Some(SubscribeRequestPing { id: 1 }), ..Default::default() };
182                                                if let Err(e) = sender.send(pong_req).await {
183                                                    warn!(error = %e, "Failed to send pong");
184                                                    break;
185                                                }
186                                                continue;
187                                            }
188                                            
189                                            // Do not forward server 'Pong' updates to consumers either
190                                            if matches!(&update.update_oneof, Some(UpdateOneof::Pong(_))) {
191                                                continue;
192                                            }
193
194                                // Track the latest slot from any slot update (including internal subscription)
195                                if let Some(UpdateOneof::Slot(s)) = &update.update_oneof {
196                                    if replay_enabled {
197                                        tracked_slot = s.slot;
198                                    }
199                                    
200                                    // Skip if this slot update is EXCLUSIVELY from our internal subscription
201                                    if update.filters.len() == 1 && update.filters.contains(&internal_slot_sub_id) {
202                                        continue;
203                                    }
204                                }
205
206                                            // Filter out internal subscription from filters before yielding (only if replay is enabled)
207                                            let mut clean_update = update;
208                                            if replay_enabled {
209                                                clean_update.filters.retain(|f| f != &internal_slot_sub_id);
210                                                
211                                                // Only yield if there are still filters after cleaning
212                                                if !clean_update.filters.is_empty() {
213                                                    yield Ok(clean_update);
214                                                }
215                                            } else {
216                                                // When replay is disabled, yield all updates as-is
217                                                yield Ok(clean_update);
218                                            }
219                                        }
220                                        Err(status) => {
221                                            // Yield the error to consumer AND continue with reconnection
222                                            warn!(error = %status, "Stream error, will reconnect after 5s delay");
223                                            yield Err(LaserstreamError::Status(status.clone()));
224                                            break;
225                                        }
226                                    }
227                                } else {
228                                    // Stream ended
229                                    break;
230                                }
231                            }
232                            
233                            // Handle write requests from the user
234                            Some(write_request) = write_rx.recv() => {
235                                // Merge the write_request into current_request so it persists across reconnections
236                                merge_subscribe_requests(&mut current_request, &write_request, &internal_slot_sub_id);
237
238                                if let Err(e) = sender.send(write_request).await {
239                                    warn!(error = %e, "Failed to send write request");
240                                    break;
241                                }
242                            }
243                        }
244                    }
245                }
246                Err(err) => {
247                    // Increment reconnect attempts
248                    reconnect_attempts += 1;
249
250                    // Log error internally but don't yield to consumer until max attempts exhausted
251                    error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
252
253                    // Check if exceeded max reconnect attempts
254                    if reconnect_attempts >= effective_max_attempts {
255                        error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
256                        // Only report error to consumer after exhausting all retries
257                        yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
258                            format!("Connection failed after {} attempts", effective_max_attempts)
259                        )));
260                        return;
261                    }
262                }
263            }
264
265            // Wait 5s before retry
266            let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
267            sleep(delay).await;
268        }
269    };
270    
271    (update_stream, handle)
272}
273
274#[instrument(skip(config, request, api_key))]
275async fn connect_and_subscribe_once(
276    config: &LaserstreamConfig,
277    request: SubscribeRequest,
278    api_key: String,
279) -> Result<
280    (
281        impl futures_util::Sink<SubscribeRequest, Error = futures_mpsc::SendError> + Send,
282        impl Stream<Item = Result<SubscribeUpdate, laserstream_core_proto::tonic::Status>> + Send,
283    ),
284    Status,
285> {
286    let options = &config.channel_options;
287
288    // Create our custom interceptor with SDK metadata
289    let interceptor = SdkMetadataInterceptor::new(api_key)?;
290
291    // Build endpoint with all options
292    let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
293        .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
294        .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
295        .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
296        .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
297        .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(5)))
298        .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true))
299        .initial_stream_window_size(options.initial_stream_window_size.or(Some(1024 * 1024 * 4)))
300        .initial_connection_window_size(options.initial_connection_window_size.or(Some(1024 * 1024 * 8)))
301        .http2_adaptive_window(options.http2_adaptive_window.unwrap_or(true))
302        .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
303        .buffer_size(options.buffer_size.or(Some(1024 * 64)));
304
305    if let Some(tcp_keepalive_secs) = options.tcp_keepalive_secs {
306        endpoint = endpoint.tcp_keepalive(Some(Duration::from_secs(tcp_keepalive_secs)));
307    }
308
309    // Configure TLS
310    endpoint = endpoint
311        .tls_config(ClientTlsConfig::new().with_enabled_roots())
312        .map_err(|e| Status::internal(format!("TLS config error: {}", e)))?;
313
314    // Connect to create channel
315    let channel = endpoint
316        .connect()
317        .await
318        .map_err(|e| Status::unavailable(format!("Connection failed: {}", e)))?;
319
320    // Create geyser client with our custom interceptor
321    let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor);
322
323    // Configure message size limits
324    geyser_client = geyser_client
325        .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
326        .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(32_000_000));
327
328    // Configure compression if specified
329    if let Some(send_comp) = options.send_compression {
330        let encoding = match send_comp {
331            ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
332            ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
333        };
334        geyser_client = geyser_client.send_compressed(encoding);
335    }
336
337    // Configure accepted compression encodings
338    if let Some(ref accept_comps) = options.accept_compression {
339        for comp in accept_comps {
340            let encoding = match comp {
341                ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
342                ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
343            };
344            geyser_client = geyser_client.accept_compressed(encoding);
345        }
346    }
347
348    // Create bidirectional stream
349    let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
350    subscribe_tx
351        .send(request)
352        .await
353        .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
354
355    let response = geyser_client
356        .subscribe(subscribe_rx)
357        .await
358        .map_err(|e| Status::internal(format!("Subscription failed: {}", e)))?;
359
360    Ok((subscribe_tx, response.into_inner()))
361}
362
363/// Handle for managing a preprocessed subscription (no write support).
364#[derive(Clone)]
365pub struct PreprocessedStreamHandle;
366
367/// Establishes a gRPC connection for preprocessed transactions and provides a stream of updates.
368/// Automatically reconnects on failure. No slot tracking or replay - just simple reconnection.
369#[instrument(skip(config, request))]
370pub fn subscribe_preprocessed(
371    config: LaserstreamConfig,
372    request: SubscribePreprocessedRequest,
373) -> (
374    impl Stream<Item = Result<SubscribePreprocessedUpdate, LaserstreamError>>,
375    PreprocessedStreamHandle,
376) {
377    let handle = PreprocessedStreamHandle;
378    let update_stream = stream! {
379        let mut reconnect_attempts = 0;
380
381        // Determine the effective max reconnect attempts
382        let effective_max_attempts = config
383            .max_reconnect_attempts
384            .unwrap_or(HARD_CAP_RECONNECT_ATTEMPTS)
385            .min(HARD_CAP_RECONNECT_ATTEMPTS);
386
387        loop {
388            let api_key = config.api_key.clone();
389            let request_clone = request.clone();
390
391            match connect_and_subscribe_preprocessed_once(&config, request_clone, api_key).await {
392                Ok(mut stream) => {
393                    reconnect_attempts = 0;
394
395                    while let Some(result) = stream.next().await {
396                        match result {
397                            Ok(update) => yield Ok(update),
398                            Err(e) => {
399                                warn!(error = %e, "Stream error received");
400                                break;
401                            }
402                        }
403                    }
404                }
405                Err(err) => {
406                    reconnect_attempts += 1;
407                    error!(error = %err, attempt = reconnect_attempts, max_attempts = effective_max_attempts, "Connection failed, will retry after 5s delay");
408
409                    if reconnect_attempts >= effective_max_attempts {
410                        error!(attempts = effective_max_attempts, "Max reconnection attempts reached");
411                        yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled(
412                            format!("Connection failed after {} attempts", effective_max_attempts)
413                        )));
414                        return;
415                    }
416                }
417            }
418
419            let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS);
420            sleep(delay).await;
421        }
422    };
423
424    (update_stream, handle)
425}
426
427#[instrument(skip(config, request, api_key))]
428async fn connect_and_subscribe_preprocessed_once(
429    config: &LaserstreamConfig,
430    request: SubscribePreprocessedRequest,
431    api_key: String,
432) -> Result<
433    impl Stream<Item = Result<SubscribePreprocessedUpdate, laserstream_core_proto::tonic::Status>> + Send,
434    Status,
435> {
436    let options = &config.channel_options;
437
438    // Create our custom interceptor with SDK metadata
439    let interceptor = SdkMetadataInterceptor::new(api_key)?;
440
441    // Build endpoint with all options
442    let mut endpoint = Endpoint::from_shared(config.endpoint.clone())
443        .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))?
444        .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10)))
445        .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30)))
446        .tcp_nodelay(options.tcp_nodelay.unwrap_or(true))
447        .tcp_keepalive(Some(Duration::from_secs(options.tcp_keepalive_secs.unwrap_or(30))))
448        .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30)))
449        .keep_alive_timeout(Duration::from_secs(options.keep_alive_timeout_secs.unwrap_or(10)))
450        .keep_alive_while_idle(options.keep_alive_while_idle.unwrap_or(true));
451
452    endpoint = endpoint
453        .tls_config(ClientTlsConfig::new().with_enabled_roots())
454        .map_err(|e| Status::internal(format!("Failed to configure TLS: {}", e)))?;
455
456    let channel = endpoint
457        .connect()
458        .await
459        .map_err(|e| Status::internal(format!("Failed to connect: {}", e)))?;
460
461    let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor)
462        .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000))
463        .max_encoding_message_size(options.max_encoding_message_size.unwrap_or(32_000_000));
464
465    // Apply compression if specified
466    if let Some(compression) = &options.send_compression {
467        let encoding = match compression {
468            ConfigCompressionEncoding::Gzip => CompressionEncoding::Gzip,
469            ConfigCompressionEncoding::Zstd => CompressionEncoding::Zstd,
470        };
471        geyser_client = geyser_client.send_compressed(encoding).accept_compressed(encoding);
472    }
473
474    let (mut subscribe_tx, subscribe_rx) = futures_mpsc::unbounded();
475
476    subscribe_tx
477        .send(request)
478        .await
479        .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?;
480
481    let response = geyser_client
482        .subscribe_preprocessed(subscribe_rx)
483        .await
484        .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {}", e)))?;
485
486    Ok(response.into_inner())
487}
488
489/// Merges a write request into the current stored request so that subscription
490/// changes made via `write()` persist across reconnections.
491///
492/// Replaces all user subscription fields with the modification's values while
493/// preserving the internal slot tracker subscription used for replay.
494/// `from_slot` and `ping` are not replaced as they are connection-specific.
495fn merge_subscribe_requests(
496    current: &mut SubscribeRequest,
497    modification: &SubscribeRequest,
498    internal_slot_sub_id: &str,
499) {
500    // Save the internal slot tracker before replacing slots
501    let internal_tracker = current
502        .slots
503        .get(internal_slot_sub_id)
504        .cloned();
505
506    // Replace all subscription types (Yellowstone gRPC replaces, not merges)
507    current.accounts = modification.accounts.clone();
508    current.slots = modification.slots.clone();
509    current.transactions = modification.transactions.clone();
510    current.transactions_status = modification.transactions_status.clone();
511    current.blocks = modification.blocks.clone();
512    current.blocks_meta = modification.blocks_meta.clone();
513    current.entry = modification.entry.clone();
514    current.accounts_data_slice = modification.accounts_data_slice.clone();
515
516    // Restore the internal slot tracker if it existed
517    if let Some(value) = internal_tracker {
518        current
519            .slots
520            .insert(internal_slot_sub_id.to_string(), value);
521    }
522
523    // Update commitment if specified in the modification
524    if modification.commitment.is_some() {
525        current.commitment = modification.commitment;
526    }
527
528    // Note: from_slot and ping are not replaced as they are connection-specific
529}
530