use std::sync::Arc;
use std::time::Duration;
use crate::client::LanceClient;
use crate::connection::ReconnectingClient;
use crate::error::{ClientError, Result};
use crate::offset::OffsetStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SeekPosition {
Beginning,
End,
Offset(u64),
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub topic_id: u32,
pub max_fetch_bytes: u32,
pub start_position: SeekPosition,
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
topic_id: 0,
max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
}
}
}
impl ConsumerConfig {
pub fn new(topic_id: u32) -> Self {
Self {
topic_id,
..Default::default()
}
}
pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
self.max_fetch_bytes = bytes;
self
}
pub fn with_start_position(mut self, position: SeekPosition) -> Self {
self.start_position = position;
self
}
}
#[derive(Debug, Clone)]
pub struct PollResult {
pub data: bytes::Bytes,
pub current_offset: u64,
pub record_count: u32,
pub end_of_stream: bool,
}
impl PollResult {
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
pub struct Consumer {
client: ReconnectingClient,
config: ConsumerConfig,
current_offset: u64,
cached_end_offset: Option<u64>,
offset_store: Option<Arc<dyn OffsetStore>>,
consumer_id: u64,
}
impl Consumer {
pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
let rc = ReconnectingClient::connect(addr)
.await?
.with_unlimited_retries()
.with_base_delay(Duration::from_millis(500))
.with_max_delay(Duration::from_secs(30));
Ok(Self::from_reconnecting_client(rc, config, 0))
}
pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
Self::with_consumer_id(client, addr, config, 0)
}
pub fn with_consumer_id(
client: LanceClient,
addr: &str,
config: ConsumerConfig,
consumer_id: u64,
) -> Self {
let rc = ReconnectingClient::from_existing(client, addr);
Self::from_reconnecting_client(rc, config, consumer_id)
}
fn from_reconnecting_client(
client: ReconnectingClient,
config: ConsumerConfig,
consumer_id: u64,
) -> Self {
let initial_offset = match config.start_position {
SeekPosition::Beginning => 0,
SeekPosition::Offset(offset) => offset,
SeekPosition::End => u64::MAX, };
Self {
client,
config,
current_offset: initial_offset,
cached_end_offset: None,
offset_store: None,
consumer_id,
}
}
pub fn with_offset_store(
client: LanceClient,
addr: &str,
config: ConsumerConfig,
consumer_id: u64,
offset_store: Arc<dyn OffsetStore>,
) -> Result<Self> {
let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
let initial_offset = if let Some(offset) = stored_offset {
offset
} else {
match config.start_position {
SeekPosition::Beginning => 0,
SeekPosition::Offset(offset) => offset,
SeekPosition::End => u64::MAX, }
};
let rc = ReconnectingClient::from_existing(client, addr);
Ok(Self {
client: rc,
config,
current_offset: initial_offset,
cached_end_offset: None,
offset_store: Some(offset_store),
consumer_id,
})
}
pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
Self::new(client, addr, ConsumerConfig::new(topic_id))
}
pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
let config =
ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
Self::new(client, addr, config)
}
pub fn current_offset(&self) -> u64 {
self.current_offset
}
pub fn topic_id(&self) -> u32 {
self.config.topic_id
}
pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
match position {
SeekPosition::Beginning => {
self.current_offset = 0;
Ok(0)
},
SeekPosition::Offset(offset) => {
self.current_offset = offset;
Ok(offset)
},
SeekPosition::End => {
let end_offset = self.discover_end_offset().await?;
self.current_offset = end_offset;
Ok(end_offset)
},
}
}
pub async fn rewind(&mut self) -> Result<()> {
self.seek(SeekPosition::Beginning).await?;
Ok(())
}
pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
self.seek(SeekPosition::Offset(offset)).await?;
Ok(())
}
pub async fn seek_to_end(&mut self) -> Result<u64> {
self.seek(SeekPosition::End).await
}
pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
if self.current_offset == u64::MAX {
let end_offset = self.discover_end_offset().await?;
self.current_offset = end_offset;
}
let fetch_result = self.fetch_with_retry().await?;
let end_of_stream =
fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
let result = PollResult {
data: fetch_result.data,
current_offset: fetch_result.next_offset,
record_count: fetch_result.record_count,
end_of_stream,
};
if !result.is_empty() {
self.current_offset = fetch_result.next_offset;
}
self.cached_end_offset = Some(fetch_result.next_offset);
if result.is_empty() {
Ok(None)
} else {
Ok(Some(result))
}
}
#[inline]
pub async fn consume(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
#[inline]
pub async fn poll(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
pub async fn poll_blocking(&mut self) -> Result<PollResult> {
if self.current_offset == u64::MAX {
let end_offset = self.discover_end_offset().await?;
self.current_offset = end_offset;
}
let fetch_result = self.fetch_with_retry().await?;
let end_of_stream =
fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
let result = PollResult {
data: fetch_result.data,
current_offset: fetch_result.next_offset,
record_count: fetch_result.record_count,
end_of_stream,
};
if !result.is_empty() {
self.current_offset = fetch_result.next_offset;
}
self.cached_end_offset = Some(fetch_result.next_offset);
Ok(result)
}
async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
const MAX_RETRIES: u32 = 30;
const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
const STALE_ALERT_THRESHOLD: u32 = 3;
let mut attempt = 0u32;
let mut backoff = Duration::from_millis(500);
const MAX_BACKOFF: Duration = Duration::from_secs(30);
let mut last_server_offset: Option<u64> = None;
let mut stale_count: u32 = 0;
loop {
let result = match self.client.client().await {
Ok(c) => {
c.fetch(
self.config.topic_id,
self.current_offset,
self.config.max_fetch_bytes,
)
.await
},
Err(e) => Err(e),
};
match &result {
Ok(_) => return result,
Err(ClientError::ServerCatchingUp { server_offset }) => {
attempt += 1;
if last_server_offset == Some(*server_offset) {
stale_count += 1;
} else {
stale_count = 1;
last_server_offset = Some(*server_offset);
}
if stale_count == STALE_ALERT_THRESHOLD {
tracing::warn!(
topic_id = self.config.topic_id,
requested_offset = self.current_offset,
server_offset,
"Server offset stagnant while catching up; preserving consumer offset to avoid duplicate replay"
);
self.client.mark_failed();
return result;
}
if attempt >= MAX_RETRIES {
return result;
}
tracing::info!(
topic_id = self.config.topic_id,
requested_offset = self.current_offset,
server_offset,
attempt,
"Server catching up, backing off {}s",
CATCHING_UP_BACKOFF.as_secs()
);
tokio::time::sleep(CATCHING_UP_BACKOFF).await;
},
Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
attempt += 1;
self.client.mark_failed();
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(MAX_BACKOFF);
},
_ => return result,
}
}
}
async fn discover_end_offset(&mut self) -> Result<u64> {
if let Some(end) = self.cached_end_offset {
return Ok(end);
}
let c = self.client.client().await?;
let fetch_result = c
.fetch(
self.config.topic_id,
u64::MAX,
1, )
.await?;
let end_offset = fetch_result.next_offset;
self.cached_end_offset = Some(end_offset);
Ok(end_offset)
}
pub async fn commit(&mut self) -> Result<()> {
if let Some(ref store) = self.offset_store {
store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
}
Ok(())
}
pub fn consumer_id(&self) -> u64 {
self.consumer_id
}
pub fn has_offset_store(&self) -> bool {
self.offset_store.is_some()
}
pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
&mut self.client
}
}
impl std::fmt::Debug for Consumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Consumer")
.field("topic_id", &self.config.topic_id)
.field("current_offset", &self.current_offset)
.field("max_fetch_bytes", &self.config.max_fetch_bytes)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct StreamingConsumerConfig {
pub topic_id: u32,
pub max_batch_bytes: u32,
pub start_position: SeekPosition,
pub consumer_group: Option<String>,
pub auto_commit_interval_ms: u64,
}
impl Default for StreamingConsumerConfig {
fn default() -> Self {
Self {
topic_id: 0,
max_batch_bytes: 64 * 1024,
start_position: SeekPosition::Beginning,
consumer_group: None,
auto_commit_interval_ms: 5000, }
}
}
impl StreamingConsumerConfig {
pub fn new(topic_id: u32) -> Self {
Self {
topic_id,
..Default::default()
}
}
pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
self.max_batch_bytes = bytes;
self
}
pub fn with_start_position(mut self, position: SeekPosition) -> Self {
self.start_position = position;
self
}
pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
self.consumer_group = Some(group.into());
self
}
pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
self.auto_commit_interval_ms = interval_ms;
self
}
}
pub struct StreamingConsumer {
client: LanceClient,
config: StreamingConsumerConfig,
consumer_id: u64,
current_offset: u64,
committed_offset: u64,
is_subscribed: bool,
last_commit_time: std::time::Instant,
}
impl StreamingConsumer {
pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
let consumer_id = Self::generate_consumer_id();
let initial_offset = match config.start_position {
SeekPosition::Beginning => 0,
SeekPosition::Offset(offset) => offset,
SeekPosition::End => u64::MAX,
};
Self {
client,
config,
consumer_id,
current_offset: initial_offset,
committed_offset: 0,
is_subscribed: false,
last_commit_time: std::time::Instant::now(),
}
}
fn generate_consumer_id() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let thread_id = std::thread::current().id();
timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
}
pub async fn start(&mut self) -> Result<()> {
if self.is_subscribed {
return Ok(());
}
let result = self
.client
.subscribe(
self.config.topic_id,
self.current_offset,
self.config.max_batch_bytes,
self.consumer_id,
)
.await?;
self.current_offset = result.start_offset;
self.is_subscribed = true;
self.last_commit_time = std::time::Instant::now();
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if !self.is_subscribed {
return Ok(());
}
if self.current_offset > self.committed_offset {
let _ = self.commit().await;
}
self.client
.unsubscribe(self.config.topic_id, self.consumer_id)
.await?;
self.is_subscribed = false;
Ok(())
}
pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
if !self.is_subscribed {
return Err(ClientError::ProtocolError(
"Consumer not subscribed - call start() first".to_string(),
));
}
self.maybe_auto_commit().await?;
let fetch_result = self
.client
.fetch(
self.config.topic_id,
self.current_offset,
self.config.max_batch_bytes,
)
.await?;
let end_of_stream =
fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
let result = PollResult {
data: fetch_result.data,
current_offset: fetch_result.next_offset,
record_count: fetch_result.record_count,
end_of_stream,
};
self.current_offset = fetch_result.next_offset;
if result.is_empty() {
Ok(None)
} else {
Ok(Some(result))
}
}
#[inline]
pub async fn consume(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
#[inline]
pub async fn poll(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
pub async fn commit(&mut self) -> Result<()> {
if self.current_offset <= self.committed_offset {
return Ok(());
}
let result = self
.client
.commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
.await?;
self.committed_offset = result.committed_offset;
self.last_commit_time = std::time::Instant::now();
Ok(())
}
async fn maybe_auto_commit(&mut self) -> Result<()> {
if self.config.auto_commit_interval_ms == 0 {
return Ok(()); }
let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
if elapsed >= self.config.auto_commit_interval_ms {
self.commit().await?;
}
Ok(())
}
pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
let was_subscribed = self.is_subscribed;
if was_subscribed {
self.stop().await?;
}
let new_offset = match position {
SeekPosition::Beginning => 0,
SeekPosition::Offset(offset) => offset,
SeekPosition::End => u64::MAX,
};
self.current_offset = new_offset;
if was_subscribed {
self.start().await?;
}
Ok(self.current_offset)
}
pub fn current_offset(&self) -> u64 {
self.current_offset
}
pub fn committed_offset(&self) -> u64 {
self.committed_offset
}
pub fn consumer_id(&self) -> u64 {
self.consumer_id
}
pub fn is_subscribed(&self) -> bool {
self.is_subscribed
}
pub fn client(&self) -> &LanceClient {
&self.client
}
pub async fn into_client(mut self) -> Result<LanceClient> {
self.stop().await?;
Ok(self.client)
}
}
impl std::fmt::Debug for StreamingConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingConsumer")
.field("topic_id", &self.config.topic_id)
.field("consumer_id", &self.consumer_id)
.field("current_offset", &self.current_offset)
.field("committed_offset", &self.committed_offset)
.field("is_subscribed", &self.is_subscribed)
.finish()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_consumer_config_default() {
let config = ConsumerConfig::default();
assert_eq!(config.topic_id, 0);
assert_eq!(config.max_fetch_bytes, 64 * 1024);
assert_eq!(config.start_position, SeekPosition::Beginning);
}
#[test]
fn test_consumer_config_builder() {
let config = ConsumerConfig::new(42)
.with_max_fetch_bytes(128 * 1024)
.with_start_position(SeekPosition::Offset(1000));
assert_eq!(config.topic_id, 42);
assert_eq!(config.max_fetch_bytes, 128 * 1024);
assert_eq!(config.start_position, SeekPosition::Offset(1000));
}
#[test]
fn test_poll_result_is_empty() {
let empty = PollResult {
data: bytes::Bytes::new(),
current_offset: 0,
record_count: 0,
end_of_stream: true,
};
assert!(empty.is_empty());
let non_empty = PollResult {
data: bytes::Bytes::from_static(&[1, 2, 3]),
current_offset: 3,
record_count: 1,
end_of_stream: false,
};
assert!(!non_empty.is_empty());
}
#[test]
fn test_streaming_consumer_config_default() {
let config = StreamingConsumerConfig::default();
assert_eq!(config.topic_id, 0);
assert_eq!(config.max_batch_bytes, 64 * 1024);
assert_eq!(config.start_position, SeekPosition::Beginning);
assert!(config.consumer_group.is_none());
assert_eq!(config.auto_commit_interval_ms, 5000);
}
#[test]
fn test_streaming_consumer_config_builder() {
let config = StreamingConsumerConfig::new(42)
.with_max_batch_bytes(128 * 1024)
.with_start_position(SeekPosition::Offset(5000))
.with_consumer_group("my-group")
.with_auto_commit_interval(10000);
assert_eq!(config.topic_id, 42);
assert_eq!(config.max_batch_bytes, 128 * 1024);
assert_eq!(config.start_position, SeekPosition::Offset(5000));
assert_eq!(config.consumer_group, Some("my-group".to_string()));
assert_eq!(config.auto_commit_interval_ms, 10000);
}
#[test]
fn test_streaming_consumer_config_disable_auto_commit() {
let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
assert_eq!(config.auto_commit_interval_ms, 0);
}
#[test]
fn test_streaming_consumer_config_seek_positions() {
let beginning =
StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
assert_eq!(beginning.start_position, SeekPosition::Beginning);
let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
assert_eq!(end.start_position, SeekPosition::End);
let offset =
StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
assert_eq!(offset.start_position, SeekPosition::Offset(12345));
}
#[test]
fn test_seek_position_equality() {
assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
assert_eq!(SeekPosition::End, SeekPosition::End);
assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
assert_ne!(SeekPosition::Beginning, SeekPosition::End);
assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
}
#[test]
fn test_seek_position_clone() {
let pos = SeekPosition::Offset(42);
let cloned = pos;
assert_eq!(pos, cloned);
}
}