helius-laserstream 0.0.1

Rust client for Helius LaserStream gRPC with robust reconnection and slot tracking
Documentation

Rust

helius-laserstream-rust

Rust client for Laserstream. Features automatic reconnection with slot tracking - if connection is lost, the client automatically reconnects and continues streaming from the last processed slot, ensuring no data is missed.

Installation

Add the dependency to your Cargo.toml:

[dependencies]
helius-laserstream = "0.0.1" # Or the latest version

Or use cargo add:

cargo add helius-laserstream

Usage Example

use helius_laserstream::{
    grpc::{
        CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions,
    },
    subscribe, LaserstreamConfig,
};
use std::collections::HashMap;
use futures_util::StreamExt; // Required for stream.next()

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = LaserstreamConfig {
        api_key: "your-api-key".to_string(),
        endpoint: "your-endpoint".parse()?,
    };

    // Subscribe to all confirmed non-vote transactions involving the Token program
    let mut transactions_filter = HashMap::new();
    transactions_filter.insert(
        "token_txs".to_string(),
        SubscribeRequestFilterTransactions {
            account_include: vec!["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string()],
            vote: Some(false),
            failed: Some(false),
            ..Default::default()
        },
    );

    let request = SubscribeRequest {
        transactions: transactions_filter,
        commitment: Some(CommitmentLevel::Confirmed as i32),
        // Empty maps/vecs for unused subscription types
        accounts: HashMap::new(),
        slots: HashMap::new(),
        transactions_status: HashMap::new(),
        entry: HashMap::new(),
        blocks: HashMap::new(),
        blocks_meta: HashMap::new(),
        accounts_data_slice: vec![],
        ping: None,
        from_slot: None, // Start from current slot
    };

    println!("Connecting and subscribing...");

    // Client handles disconnections automatically:
    // - Reconnects on network issues
    // - Resumes from last processed slot
    // - Maintains subscription state
    let stream = subscribe(config, request);
    futures::pin_mut!(stream); // Pin the stream to the stack

    while let Some(result) = stream.next().await {
        match result {
            Ok(update) => {
                println!("Received update: {:?}", update);
            }
            Err(e) => {
                eprintln!("Stream error: {}. Will attempt reconnection.", e);
                // The underlying subscribe function handles reconnection automatically.
                // You might want to add logic here for fatal errors (e.g., MaxReconnectAttempts)
            }
        }
    }

    println!("Stream finished (likely due to max reconnection attempts or manual stop).");
    Ok(())
}

License

MIT