pub struct StreamingConsumer { /* private fields */ }Expand description
A streaming consumer that uses subscribe/unsubscribe signals
Unlike the poll-based Consumer, StreamingConsumer explicitly signals
to the server when it starts and stops consuming, and reports position
updates for consumer group tracking.
§Lifecycle
- Subscribe - Call
start()to signal the server to begin streaming - Consume - Call
poll()to receive batches of data - Commit - Call
commit()to checkpoint progress (or use auto-commit) - Unsubscribe - Call
stop()or drop the consumer to signal completion
§Example
let client = LanceClient::connect_to("127.0.0.1:1992").await?;
let config = StreamingConsumerConfig::new(topic_id)
.with_consumer_group("my-group");
let mut consumer = StreamingConsumer::new(client, config);
// Start streaming
consumer.start().await?;
// Process records
while let Some(result) = consumer.poll().await? {
process_data(&result.data);
consumer.commit().await?; // Checkpoint progress
}
// Stop streaming
consumer.stop().await?;Implementations§
Source§impl StreamingConsumer
impl StreamingConsumer
Sourcepub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self
pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self
Create a new streaming consumer
Sourcepub async fn start(&mut self) -> Result<(), ClientError>
pub async fn start(&mut self) -> Result<(), ClientError>
Start streaming from the topic
Sends a Subscribe signal to the server indicating this consumer wants to start receiving data from the configured position.
Sourcepub async fn stop(&mut self) -> Result<(), ClientError>
pub async fn stop(&mut self) -> Result<(), ClientError>
Stop streaming from the topic
Sends an Unsubscribe signal to the server. This should be called when the consumer is done processing to free server resources.
Sourcepub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>
pub async fn poll(&mut self) -> Result<Option<PollResult>, ClientError>
Poll for the next batch of records
Returns Ok(Some(result)) if data was fetched, or Ok(None) if
no new data is available.
Sourcepub async fn commit(&mut self) -> Result<(), ClientError>
pub async fn commit(&mut self) -> Result<(), ClientError>
Commit the current offset to the server
This checkpoints the consumer’s position so that if it restarts, it can resume from this point.
Sourcepub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>
pub async fn seek(&mut self, position: SeekPosition) -> Result<u64, ClientError>
Seek to a new position
Note: This will commit the current offset and restart the subscription at the new position.
Sourcepub fn current_offset(&self) -> u64
pub fn current_offset(&self) -> u64
Get the current offset
Sourcepub fn committed_offset(&self) -> u64
pub fn committed_offset(&self) -> u64
Get the last committed offset
Sourcepub fn consumer_id(&self) -> u64
pub fn consumer_id(&self) -> u64
Get the consumer ID
Sourcepub fn is_subscribed(&self) -> bool
pub fn is_subscribed(&self) -> bool
Check if currently subscribed
Sourcepub fn client(&self) -> &LanceClient
pub fn client(&self) -> &LanceClient
Get access to the underlying client
Sourcepub async fn into_client(self) -> Result<LanceClient, ClientError>
pub async fn into_client(self) -> Result<LanceClient, ClientError>
Consume this consumer and return the underlying client
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for StreamingConsumer
impl !RefUnwindSafe for StreamingConsumer
impl Send for StreamingConsumer
impl Sync for StreamingConsumer
impl Unpin for StreamingConsumer
impl !UnwindSafe for StreamingConsumer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request