Skip to main content

Consumer

Struct Consumer 

Source
pub struct Consumer { /* private fields */ }
Expand description

A consumer for reading records from a LANCE topic stream.

The consumer automatically reconnects on transient failures (connection drops, timeouts, server errors) with exponential backoff and DNS re-resolution. The current offset is preserved across reconnections so no data is lost.

§Example

let config = ConsumerConfig::new(topic_id);
let mut consumer = Consumer::connect("lance.example.com:1992", config).await?;

// Receive batches — auto-reconnects on failure
while let Some(result) = consumer.next_batch().await? {
    process_data(&result.data);
    if result.end_of_stream {
        break;
    }
}

Implementations§

Source§

impl Consumer

Source

pub async fn connect( addr: &str, config: ConsumerConfig, ) -> Result<Self, ClientError>

Connect to a LANCE server and create a consumer with auto-reconnect.

The address can be either an IP:port or hostname:port. DNS is re-resolved on each reconnect for load-balanced endpoints.

Source

pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self

Create a new consumer with the given client and configuration.

The addr is stored for DNS re-resolution on reconnect.

Source

pub fn with_consumer_id( client: LanceClient, addr: &str, config: ConsumerConfig, consumer_id: u64, ) -> Self

Create a new consumer with a specific consumer ID

Source

pub fn with_offset_store( client: LanceClient, addr: &str, config: ConsumerConfig, consumer_id: u64, offset_store: Arc<dyn OffsetStore>, ) -> Result<Self, ClientError>

Create a consumer with an offset store for client-side offset persistence

The consumer will automatically load its starting offset from the store if one exists, otherwise it uses the configured start position.

§Arguments
  • client - The LANCE client connection
  • addr - Server address for reconnection
  • config - Consumer configuration
  • consumer_id - Unique identifier for this consumer instance
  • offset_store - The offset store for persistence
§Example
let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
let consumer = Consumer::with_offset_store(client, "lance:1992", config, 12345, store)?;
Source

pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self

Create a consumer starting from the beginning of the stream

Source

pub fn from_offset( client: LanceClient, addr: &str, topic_id: u32, offset: u64, ) -> Self

Create a consumer starting from a specific offset

Source

pub fn current_offset(&self) -> u64

Get the current offset position

Source

pub fn topic_id(&self) -> u32

Get the topic ID being consumed

Source

pub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>

Seek to a specific position in the stream

This allows rewinding to replay historical data or fast-forwarding to skip ahead.

§Arguments
  • position - The position to seek to
§Examples
// Rewind to the beginning
consumer.seek(SeekPosition::Beginning).await?;

// Seek to a specific offset
consumer.seek(SeekPosition::Offset(1000)).await?;

// Seek to the end (latest data)
consumer.seek(SeekPosition::End).await?;
Source

pub async fn rewind(&mut self) -> Result<(), ClientError>

Rewind to the beginning of the stream (offset 0)

Convenience method equivalent to seek(SeekPosition::Beginning)

Source

pub async fn seek_to_offset(&mut self, offset: u64) -> Result<(), ClientError>

Seek to a specific byte offset in the stream

Convenience method equivalent to seek(SeekPosition::Offset(offset))

Source

pub async fn seek_to_end(&mut self) -> Result<u64, ClientError>

Seek to the end of the stream (latest available data)

Convenience method equivalent to seek(SeekPosition::End)

Source

pub async fn next_batch(&mut self) -> Result<Option<PollResult>, ClientError>

Receive the next batch of records.

Returns Ok(Some(result)) if data was fetched, or Ok(None) if there was no new data available (end of stream reached).

The consumer’s offset is automatically advanced after each successful fetch. On transient errors, the consumer auto-reconnects and retries from the same offset.

Source

pub async fn consume(&mut self) -> Result<Option<PollResult>, ClientError>

Primary consume interface alias.

Source

pub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>

Compatibility wrapper for callers still using polling terminology.

Source

pub async fn poll_blocking(&mut self) -> Result<PollResult, ClientError>

Poll for records, blocking until data is available or timeout

Unlike poll(), this will return empty results instead of None, allowing the caller to continue waiting.

Source

pub async fn commit(&mut self) -> Result<(), ClientError>

Commit the current offset to the offset store

If an offset store is configured, this persists the current offset so the consumer can resume from this position on restart.

§Example
let records = consumer.next_batch().await?;
process(records);
consumer.commit().await?;  // Persist offset for crash recovery
Source

pub fn consumer_id(&self) -> u64

Get the consumer ID

Source

pub fn has_offset_store(&self) -> bool

Check if this consumer has an offset store configured

Source

pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient

Get mutable access to the underlying reconnecting client

Trait Implementations§

Source§

impl Debug for Consumer

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more