chaincodec-stream 0.2.0

Real-time blockchain event streaming engine for ChainCodec (Tokio-based)
Documentation

chaincodec-stream

Real-time blockchain event streaming for ChainCodec — Tokio-based WebSocket listener with automatic reconnection.

crates.io docs.rs license

chaincodec-stream connects to an Ethereum JSON-RPC WebSocket endpoint, subscribes to eth_subscribe("logs", filter), and emits RawEvent items through an async channel. The WebSocket connection and resubscription are handled automatically on disconnect.


Features

  • Live EVM log streaming — subscribes to eth_subscribe and emits RawEvent items
  • Automatic reconnect — the channel stays open across disconnects; caller never needs to re-subscribe
  • Address filtering — subscribe to one or many contract addresses
  • Async channels — events arrive via futures::channel::mpsc, composable with any Tokio runtime
  • Connection state — query is_connected() at any time for health checks
  • Removed log filtering — reorged / removed logs are dropped before reaching your handler

Installation

[dependencies]
chaincodec-stream = "0.1"
chaincodec-evm    = "0.1"
chaincodec-core   = "0.1"
tokio             = { version = "1", features = ["full"] }
futures           = "0.3"

Quick start

use std::sync::Arc;
use futures::StreamExt;
use chaincodec_stream::{ws_listener::EvmWsListener, listener::BlockListener};
use chaincodec_evm::EvmDecoder;
use chaincodec_registry::MemoryRegistry;
use chaincodec_core::{chain::chains, decoder::ChainDecoder, schema::SchemaRegistry};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. Load schemas
    let mut registry = MemoryRegistry::new();
    registry.load_directory("schemas/")?;
    let decoder = EvmDecoder::new();

    // 2. Create a WebSocket listener for Ethereum mainnet
    let listener = EvmWsListener::new(
        chains::ethereum(),
        "wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY",
    )
    .with_address("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48") // USDC
    .with_address("0xdac17f958d2ee523a2206206994597c13d831ec7"); // USDT

    // 3. Subscribe and decode events
    let mut stream = listener.subscribe().await?;

    println!("Listening for events...");
    while let Some(result) = stream.next().await {
        match result {
            Ok(raw) => {
                let fp = decoder.fingerprint(&raw);
                if let Some(schema) = registry.get_by_fingerprint(&fp) {
                    if let Ok(event) = decoder.decode_event(&raw, &schema) {
                        println!(
                            "[block {}] {}{:?}",
                            raw.block_number, event.schema_name, event.fields
                        );
                    }
                }
            }
            Err(e) => eprintln!("stream error: {}", e),
        }
    }

    Ok(())
}

Filter by multiple addresses

let listener = EvmWsListener::new(chains::ethereum(), "wss://...")
    .with_addresses([
        "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", // USDC
        "0xdac17f958d2ee523a2206206994597c13d831ec7", // USDT
        "0x6b175474e89094c44da98b954eedeac495271d0f", // DAI
    ]);

// Omit .with_address() entirely to receive ALL logs on the chain
let listener_all = EvmWsListener::new(chains::ethereum(), "wss://...");

Monitor connection health

let listener = Arc::new(EvmWsListener::new(chains::ethereum(), "wss://..."));

let monitor = Arc::clone(&listener);
tokio::spawn(async move {
    loop {
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
        if !monitor.is_connected() {
            tracing::warn!("WebSocket disconnected — waiting for reconnect");
        }
    }
});

let mut stream = listener.subscribe().await?;

Compact decode pipeline

while let Some(Ok(raw)) = stream.next().await {
    let fp = decoder.fingerprint(&raw);
    let Some(schema) = registry.get_by_fingerprint(&fp) else { continue };
    let Ok(event) = decoder.decode_event(&raw, &schema) else { continue };

    // Forward to database, message queue, webhook, etc.
    store_event(&event).await?;
}

Stream error variants

use chaincodec_core::error::StreamError;

StreamError::ConnectionFailed { url, reason }  // could not connect
StreamError::Closed                            // connection dropped
StreamError::Timeout { ms }                    // subscription timed out
StreamError::Decode(err)                       // decode pipeline error
StreamError::Other(msg)                        // other runtime error

Ecosystem

Crate Purpose
chaincodec-core Traits, types, primitives
chaincodec-evm EVM ABI event & call decoder
chaincodec-registry CSDL schema registry
chaincodec-batch Historical batch decode
chaincodec-stream Live WebSocket event streaming (this crate)

License

MIT — see LICENSE