Skip to main content

StreamingConsumer

Struct StreamingConsumer 

Source
pub struct StreamingConsumer { /* private fields */ }
Expand description

A streaming consumer that uses subscribe/unsubscribe signals

Unlike the poll-based Consumer, StreamingConsumer explicitly signals to the server when it starts and stops consuming, and reports position updates for consumer group tracking.

§Lifecycle

  1. Subscribe - Call start() to signal the server to begin streaming
  2. Consume - Call poll() to receive batches of data
  3. Commit - Call commit() to checkpoint progress (or use auto-commit)
  4. Unsubscribe - Call stop() or drop the consumer to signal completion

§Example

let client = LanceClient::connect_to("127.0.0.1:1992").await?;
let config = StreamingConsumerConfig::new(topic_id)
    .with_consumer_group("my-group");
let mut consumer = StreamingConsumer::new(client, config);

// Start streaming
consumer.start().await?;

// Process records
while let Some(result) = consumer.poll().await? {
    process_data(&result.data);
    consumer.commit().await?; // Checkpoint progress
}

// Stop streaming
consumer.stop().await?;

Implementations§

Source§

impl StreamingConsumer

Source

pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self

Create a new streaming consumer

Source

pub async fn start(&mut self) -> Result<(), ClientError>

Start streaming from the topic

Sends a Subscribe signal to the server indicating this consumer wants to start receiving data from the configured position.

Source

pub async fn stop(&mut self) -> Result<(), ClientError>

Stop streaming from the topic

Sends an Unsubscribe signal to the server. This should be called when the consumer is done processing to free server resources.

Source

pub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>

Poll for the next batch of records

Returns Ok(Some(result)) if data was fetched, or Ok(None) if no new data is available.

Source

pub async fn commit(&mut self) -> Result<(), ClientError>

Commit the current offset to the server

This checkpoints the consumer’s position so that if it restarts, it can resume from this point.

Source

pub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>

Seek to a new position

Note: This will commit the current offset and restart the subscription at the new position.

Source

pub fn current_offset(&self) -> u64

Get the current offset

Source

pub fn committed_offset(&self) -> u64

Get the last committed offset

Source

pub fn consumer_id(&self) -> u64

Get the consumer ID

Source

pub fn is_subscribed(&self) -> bool

Check if currently subscribed

Source

pub fn client(&self) -> &LanceClient

Get access to the underlying client

Source

pub async fn into_client(self) -> Result<LanceClient, ClientError>

Consume this consumer and return the underlying client

Trait Implementations§

Source§

impl Debug for StreamingConsumer

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more