Expand description
§lnc-client
Async Rust client library for the LANCE streaming platform.
LANCE is a high-performance, non-blocking stream engine designed for 100Gbps sustained ingestion with sub-microsecond P99 latency.
§Features
- Async/await - Built on Tokio for high-performance async I/O
- Producer - Batch records with configurable batching and backpressure
- Consumer - Standalone (
standalone) and grouped (grouped) consumer modes - Connection pooling - Automatic reconnection and cluster-aware routing
- TLS support - Secure connections with rustls
- Zero-copy - Efficient record parsing with minimal allocations
§Quick Start
§Producer
use lnc_client::{Producer, ProducerConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let producer = Producer::connect("127.0.0.1:1992", ProducerConfig::new()).await?;
producer.send(1, b"Hello, LANCE!").await?;
producer.flush().await?;
producer.close().await?;
Ok(())
}§Consumer (Standalone)
For independent consumption with manual offset control:
use lnc_client::{StandaloneConsumer, StandaloneConfig};
use std::path::Path;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = StandaloneConsumer::connect(
"127.0.0.1:1992",
StandaloneConfig::new("my-consumer", 1)
.with_offset_dir(Path::new("/var/lib/lance/offsets")),
).await?;
loop {
if let Some(records) = consumer.next_batch().await? {
// Process records
for chunk in records.data.chunks(256) {
println!("Received {} bytes", chunk.len());
}
consumer.commit().await?;
}
}
}§Consumer (Grouped)
use lnc_client::{AssignmentStrategy, GroupCoordinator, GroupConfig, GroupedConsumer, WorkerConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Start coordinator (typically one per consumer group)
let coordinator = GroupCoordinator::start(
"127.0.0.1:1992",
GroupConfig::new("my-group")
.with_topics(vec![1, 2, 3])
.with_assignment_strategy(AssignmentStrategy::RoundRobin),
).await?;
// Workers join the group
let mut worker = GroupedConsumer::join(
"127.0.0.1:1992",
coordinator.join_address(),
WorkerConfig::new("worker-1"),
).await?;
// Worker processes assigned topics
loop {
let topics: Vec<u32> = worker.assignments().to_vec();
for topic_id in topics {
if let Some(_records) = worker.next_batch(topic_id).await? {
worker.commit(topic_id).await?;
}
}
}
}§Modules
producer- High-level producer with batching and async sendstandalone- Standalone consumer for direct offset controlgrouped- Grouped consumer with automatic partition assignmentconnection- Connection pooling and cluster-aware routingoffset- Offset storage backends (memory, file-based)record- Record encoding/decoding utilitiestls- TLS configuration
§Platform Support
The client library supports all major platforms:
- Linux (x86_64, aarch64)
- macOS (x86_64, aarch64)
- Windows (x86_64)
Note: The LANCE server requires Linux with io_uring support, but the client works everywhere.
Re-exports§
pub use connection::ClusterClient;pub use connection::ConnectionPool;pub use connection::ConnectionPoolConfig;pub use connection::PoolStats;pub use connection::PooledClient;pub use connection::ReconnectingClient;pub use grouped::AssignmentStrategy;pub use grouped::GroupConfig;pub use grouped::GroupCoordinator;pub use grouped::GroupedConsumer;pub use grouped::WorkerConfig;pub use offset::CollectingCommitHook;pub use offset::CommitInfo;pub use offset::HookedOffsetStore;pub use offset::LockFileOffsetStore;pub use offset::LoggingCommitHook;pub use offset::MemoryOffsetStore;pub use offset::OffsetStore;pub use offset::PostCommitHook;pub use producer::MetricsSnapshot;pub use producer::Producer;pub use producer::ProducerConfig;pub use producer::ProducerMetrics;pub use producer::SendAck;pub use record::Record;pub use record::RecordIterator;pub use record::RecordParseError;pub use record::RecordParserConfig;pub use record::RecordType;pub use record::TLV_HEADER_SIZE;pub use record::encode_record;pub use record::parse_record;pub use record::parse_records;pub use standalone::StandaloneConfig;pub use standalone::StandaloneConsumer;pub use standalone::StandaloneConsumerBuilder;pub use tls::TlsClientConfig;
Modules§
- connection
- Connection Management and Resilience
- grouped
- Grouped Consumer Mode
- offset
- Client-side offset persistence for LANCE consumers
- producer
- Producer Abstraction Layer
- record
- TLV Record Parsing
- standalone
- Standalone Consumer Mode
- tls
- TLS Support for LANCE Client
Structs§
- Auth
Config - Authentication configuration for client connections
- Client
Config - Configuration for the LANCE client
- Commit
Result - Result of a commit offset operation
- Consumer
- A consumer for reading records from a LANCE topic stream.
- Consumer
Config - Configuration for a consumer
- Fetch
Result - Result of a fetch operation
- Lance
Client - LANCE protocol client for communicating with LANCE servers
- Poll
Result - Result of a poll operation
- Streaming
Consumer - A streaming consumer that uses subscribe/unsubscribe signals
- Streaming
Consumer Config - Configuration for a streaming consumer
- Subscribe
Result - Result of a subscribe operation
- Topic
Info - Information about a topic
Enums§
- Client
Error - Errors that can occur during client operations
- Client
Stream - Wrapper enum for TCP and TLS streams to avoid dynamic dispatch
- Seek
Position - Position specifier for seeking within a stream
Functions§
- parse_
not_ leader_ error - Parse a NOT_LEADER error message and extract the redirect address if present
- validate_
topic_ name - Validate that a topic name contains only
[a-zA-Z0-9-]characters.