Skip to main content

Crate lnc_client

Crate lnc_client 

Source
Expand description

§lnc-client

Async Rust client library for the LANCE streaming platform.

LANCE is a high-performance, non-blocking stream engine designed for 100Gbps sustained ingestion with sub-microsecond P99 latency.

§Features

  • Async/await - Built on Tokio for high-performance async I/O
  • Producer - Batch records with configurable batching and backpressure
  • Consumer - Standalone (standalone) and grouped (grouped) consumer modes
  • Connection pooling - Automatic reconnection and cluster-aware routing
  • TLS support - Secure connections with rustls
  • Zero-copy - Efficient record parsing with minimal allocations

§Quick Start

§Producer

use lnc_client::{Producer, ProducerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = Producer::connect("127.0.0.1:1992", ProducerConfig::new()).await?;
     
    producer.send(1, b"Hello, LANCE!").await?;
    producer.flush().await?;
    producer.close().await?;
     
    Ok(())
}

§Consumer (Standalone)

For independent consumption with manual offset control:

use lnc_client::{StandaloneConsumer, StandaloneConfig};
use std::path::Path;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut consumer = StandaloneConsumer::connect(
        "127.0.0.1:1992",
        StandaloneConfig::new("my-consumer", 1)
            .with_offset_dir(Path::new("/var/lib/lance/offsets")),
    ).await?;
     
    loop {
        if let Some(records) = consumer.next_batch().await? {
            // Process records
            for chunk in records.data.chunks(256) {
                println!("Received {} bytes", chunk.len());
            }
            consumer.commit().await?;
        }
    }
}

§Consumer (Grouped)

use lnc_client::{AssignmentStrategy, GroupCoordinator, GroupConfig, GroupedConsumer, WorkerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Start coordinator (typically one per consumer group)
    let coordinator = GroupCoordinator::start(
        "127.0.0.1:1992",
        GroupConfig::new("my-group")
            .with_topics(vec![1, 2, 3])
            .with_assignment_strategy(AssignmentStrategy::RoundRobin),
    ).await?;

    // Workers join the group
    let mut worker = GroupedConsumer::join(
        "127.0.0.1:1992",
        coordinator.join_address(),
        WorkerConfig::new("worker-1"),
    ).await?;

    // Worker processes assigned topics
    loop {
        let topics: Vec<u32> = worker.assignments().to_vec();
        for topic_id in topics {
            if let Some(_records) = worker.next_batch(topic_id).await? {
                worker.commit(topic_id).await?;
            }
        }
    }
}

§Modules

  • producer - High-level producer with batching and async send
  • standalone - Standalone consumer for direct offset control
  • grouped - Grouped consumer with automatic partition assignment
  • connection - Connection pooling and cluster-aware routing
  • offset - Offset storage backends (memory, file-based)
  • record - Record encoding/decoding utilities
  • tls - TLS configuration

§Platform Support

The client library supports all major platforms:

  • Linux (x86_64, aarch64)
  • macOS (x86_64, aarch64)
  • Windows (x86_64)

Note: The LANCE server requires Linux with io_uring support, but the client works everywhere.

Re-exports§

pub use connection::ClusterClient;
pub use connection::ConnectionPool;
pub use connection::ConnectionPoolConfig;
pub use connection::PoolStats;
pub use connection::PooledClient;
pub use connection::ReconnectingClient;
pub use grouped::AssignmentStrategy;
pub use grouped::GroupConfig;
pub use grouped::GroupCoordinator;
pub use grouped::GroupedConsumer;
pub use grouped::WorkerConfig;
pub use offset::CollectingCommitHook;
pub use offset::CommitInfo;
pub use offset::HookedOffsetStore;
pub use offset::LockFileOffsetStore;
pub use offset::LoggingCommitHook;
pub use offset::MemoryOffsetStore;
pub use offset::OffsetStore;
pub use offset::PostCommitHook;
pub use producer::MetricsSnapshot;
pub use producer::Producer;
pub use producer::ProducerConfig;
pub use producer::ProducerMetrics;
pub use producer::SendAck;
pub use record::Record;
pub use record::RecordIterator;
pub use record::RecordParseError;
pub use record::RecordParserConfig;
pub use record::RecordType;
pub use record::TLV_HEADER_SIZE;
pub use record::encode_record;
pub use record::parse_record;
pub use record::parse_records;
pub use standalone::StandaloneConfig;
pub use standalone::StandaloneConsumer;
pub use standalone::StandaloneConsumerBuilder;
pub use tls::TlsClientConfig;

Modules§

connection
Connection Management and Resilience
grouped
Grouped Consumer Mode
offset
Client-side offset persistence for LANCE consumers
producer
Producer Abstraction Layer
record
TLV Record Parsing
standalone
Standalone Consumer Mode
tls
TLS Support for LANCE Client

Structs§

AuthConfig
Authentication configuration for client connections
ClientConfig
Configuration for the LANCE client
CommitResult
Result of a commit offset operation
Consumer
A consumer for reading records from a LANCE topic stream.
ConsumerConfig
Configuration for a consumer
FetchResult
Result of a fetch operation
LanceClient
LANCE protocol client for communicating with LANCE servers
PollResult
Result of a poll operation
StreamingConsumer
A streaming consumer that uses subscribe/unsubscribe signals
StreamingConsumerConfig
Configuration for a streaming consumer
SubscribeResult
Result of a subscribe operation
TopicInfo
Information about a topic

Enums§

ClientError
Errors that can occur during client operations
ClientStream
Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
SeekPosition
Position specifier for seeking within a stream

Functions§

parse_not_leader_error
Parse a NOT_LEADER error message and extract the redirect address if present
validate_topic_name
Validate that a topic name contains only [a-zA-Z0-9-] characters.