pub struct Consumer { /* private fields */ }Expand description
A consumer for reading records from a LANCE topic stream.
The consumer automatically reconnects on transient failures (connection drops, timeouts, server errors) with exponential backoff and DNS re-resolution. The current offset is preserved across reconnections so no data is lost.
§Example
let config = ConsumerConfig::new(topic_id);
let mut consumer = Consumer::connect("lance.example.com:1992", config).await?;
// Receive batches — auto-reconnects on failure
while let Some(result) = consumer.next_batch().await? {
process_data(&result.data);
if result.end_of_stream {
break;
}
}Implementations§
Source§impl Consumer
impl Consumer
Sourcepub async fn connect(
addr: &str,
config: ConsumerConfig,
) -> Result<Self, ClientError>
pub async fn connect( addr: &str, config: ConsumerConfig, ) -> Result<Self, ClientError>
Connect to a LANCE server and create a consumer with auto-reconnect.
The address can be either an IP:port or hostname:port. DNS is re-resolved on each reconnect for load-balanced endpoints.
Sourcepub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self
pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self
Create a new consumer with the given client and configuration.
The addr is stored for DNS re-resolution on reconnect.
Sourcepub fn with_consumer_id(
client: LanceClient,
addr: &str,
config: ConsumerConfig,
consumer_id: u64,
) -> Self
pub fn with_consumer_id( client: LanceClient, addr: &str, config: ConsumerConfig, consumer_id: u64, ) -> Self
Create a new consumer with a specific consumer ID
Sourcepub fn with_offset_store(
client: LanceClient,
addr: &str,
config: ConsumerConfig,
consumer_id: u64,
offset_store: Arc<dyn OffsetStore>,
) -> Result<Self, ClientError>
pub fn with_offset_store( client: LanceClient, addr: &str, config: ConsumerConfig, consumer_id: u64, offset_store: Arc<dyn OffsetStore>, ) -> Result<Self, ClientError>
Create a consumer with an offset store for client-side offset persistence
The consumer will automatically load its starting offset from the store if one exists, otherwise it uses the configured start position.
§Arguments
client- The LANCE client connectionaddr- Server address for reconnectionconfig- Consumer configurationconsumer_id- Unique identifier for this consumer instanceoffset_store- The offset store for persistence
§Example
let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
let consumer = Consumer::with_offset_store(client, "lance:1992", config, 12345, store)?;Sourcepub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self
pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self
Create a consumer starting from the beginning of the stream
Sourcepub fn from_offset(
client: LanceClient,
addr: &str,
topic_id: u32,
offset: u64,
) -> Self
pub fn from_offset( client: LanceClient, addr: &str, topic_id: u32, offset: u64, ) -> Self
Create a consumer starting from a specific offset
Sourcepub fn current_offset(&self) -> u64
pub fn current_offset(&self) -> u64
Get the current offset position
Sourcepub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>
pub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>
Seek to a specific position in the stream
This allows rewinding to replay historical data or fast-forwarding to skip ahead.
§Arguments
position- The position to seek to
§Examples
// Rewind to the beginning
consumer.seek(SeekPosition::Beginning).await?;
// Seek to a specific offset
consumer.seek(SeekPosition::Offset(1000)).await?;
// Seek to the end (latest data)
consumer.seek(SeekPosition::End).await?;Sourcepub async fn rewind(&mut self) -> Result<(), ClientError>
pub async fn rewind(&mut self) -> Result<(), ClientError>
Rewind to the beginning of the stream (offset 0)
Convenience method equivalent to seek(SeekPosition::Beginning)
Sourcepub async fn seek_to_offset(&mut self, offset: u64) -> Result<(), ClientError>
pub async fn seek_to_offset(&mut self, offset: u64) -> Result<(), ClientError>
Seek to a specific byte offset in the stream
Convenience method equivalent to seek(SeekPosition::Offset(offset))
Sourcepub async fn seek_to_end(&mut self) -> Result<u64, ClientError>
pub async fn seek_to_end(&mut self) -> Result<u64, ClientError>
Seek to the end of the stream (latest available data)
Convenience method equivalent to seek(SeekPosition::End)
Sourcepub async fn next_batch(&mut self) -> Result<Option<PollResult>, ClientError>
pub async fn next_batch(&mut self) -> Result<Option<PollResult>, ClientError>
Receive the next batch of records.
Returns Ok(Some(result)) if data was fetched, or Ok(None) if
there was no new data available (end of stream reached).
The consumer’s offset is automatically advanced after each successful fetch. On transient errors, the consumer auto-reconnects and retries from the same offset.
Sourcepub async fn consume(&mut self) -> Result<Option<PollResult>, ClientError>
pub async fn consume(&mut self) -> Result<Option<PollResult>, ClientError>
Primary consume interface alias.
Sourcepub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>
pub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>
Compatibility wrapper for callers still using polling terminology.
Sourcepub async fn poll_blocking(&mut self) -> Result<PollResult, ClientError>
pub async fn poll_blocking(&mut self) -> Result<PollResult, ClientError>
Poll for records, blocking until data is available or timeout
Unlike poll(), this will return empty results instead of None,
allowing the caller to continue waiting.
Sourcepub async fn commit(&mut self) -> Result<(), ClientError>
pub async fn commit(&mut self) -> Result<(), ClientError>
Commit the current offset to the offset store
If an offset store is configured, this persists the current offset so the consumer can resume from this position on restart.
§Example
let records = consumer.next_batch().await?;
process(records);
consumer.commit().await?; // Persist offset for crash recoverySourcepub fn consumer_id(&self) -> u64
pub fn consumer_id(&self) -> u64
Get the consumer ID
Sourcepub fn has_offset_store(&self) -> bool
pub fn has_offset_store(&self) -> bool
Check if this consumer has an offset store configured
Sourcepub fn reconnecting_client(&mut self) -> &mut ReconnectingClient
pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient
Get mutable access to the underlying reconnecting client
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Consumer
impl !RefUnwindSafe for Consumer
impl Send for Consumer
impl Sync for Consumer
impl Unpin for Consumer
impl UnsafeUnpin for Consumer
impl !UnwindSafe for Consumer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request