use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::client::{ClientConfig, LanceClient};
use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
use crate::error::{Result, validate_topic_name};
use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
#[derive(Debug, Clone)]
pub struct StandaloneConfig {
pub consumer_id: String,
pub topic_id: u32,
pub topic_name: Option<String>,
pub max_fetch_bytes: u32,
pub start_position: SeekPosition,
pub offset_dir: Option<PathBuf>,
pub auto_commit_interval: Option<Duration>,
pub connect_timeout: Duration,
pub poll_timeout: Duration,
}
impl StandaloneConfig {
pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
Self {
consumer_id: consumer_id.into(),
topic_id,
topic_name: None,
max_fetch_bytes: 1_048_576, start_position: SeekPosition::Beginning,
offset_dir: None,
auto_commit_interval: Some(Duration::from_secs(5)),
connect_timeout: Duration::from_secs(30),
poll_timeout: Duration::from_millis(100),
}
}
pub fn with_topic_name(consumer_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
Self {
consumer_id: consumer_id.into(),
topic_id: 0,
topic_name: Some(topic_name.into()),
max_fetch_bytes: 1_048_576,
start_position: SeekPosition::Beginning,
offset_dir: None,
auto_commit_interval: Some(Duration::from_secs(5)),
connect_timeout: Duration::from_secs(30),
poll_timeout: Duration::from_millis(100),
}
}
pub fn new_with_id(
consumer_id: impl Into<String>,
topic_name: impl Into<String>,
topic_id: u32,
) -> Self {
Self {
consumer_id: consumer_id.into(),
topic_id,
topic_name: Some(topic_name.into()),
max_fetch_bytes: 1_048_576,
start_position: SeekPosition::Beginning,
offset_dir: None,
auto_commit_interval: Some(Duration::from_secs(5)),
connect_timeout: Duration::from_secs(30),
poll_timeout: Duration::from_millis(100),
}
}
pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
self.consumer_id = id.into();
self
}
pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
self.max_fetch_bytes = bytes;
self
}
pub fn with_start_position(mut self, position: SeekPosition) -> Self {
self.start_position = position;
self
}
pub fn with_offset_dir(mut self, dir: &Path) -> Self {
self.offset_dir = Some(dir.to_path_buf());
self
}
pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
self.auto_commit_interval = interval;
self
}
pub fn with_manual_commit(mut self) -> Self {
self.auto_commit_interval = None;
self
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
self.poll_timeout = timeout;
self
}
}
pub struct StandaloneConsumer {
inner: StreamingConsumer,
config: StandaloneConfig,
offset_store: Arc<dyn OffsetStore>,
last_commit_time: Instant,
pending_offset: u64,
committed_offset: u64,
}
impl StandaloneConsumer {
pub async fn connect(addr: &str, mut config: StandaloneConfig) -> Result<Self> {
let mut client_config = ClientConfig::new(addr);
client_config.connect_timeout = config.connect_timeout;
let mut client = LanceClient::connect(client_config).await?;
if config.topic_id == 0 {
if let Some(ref name) = config.topic_name.clone() {
validate_topic_name(name)?;
let topic_info = client.create_topic(name).await?;
config.topic_id = topic_info.id;
}
}
Self::from_client(client, config).await
}
pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
} else {
Arc::new(MemoryOffsetStore::new())
};
let stored_offset = offset_store
.load(config.topic_id, numeric_consumer_id)
.ok()
.flatten();
let start_position = if let Some(offset) = stored_offset {
SeekPosition::Offset(offset)
} else {
config.start_position
};
let streaming_config = StreamingConsumerConfig::new(config.topic_id)
.with_max_batch_bytes(config.max_fetch_bytes)
.with_start_position(start_position)
.with_auto_commit_interval(0);
let mut inner = StreamingConsumer::new(client, streaming_config);
inner.start().await?;
let current_offset = inner.current_offset();
Ok(Self {
inner,
config,
offset_store,
last_commit_time: Instant::now(),
pending_offset: current_offset,
committed_offset: stored_offset.unwrap_or(0),
})
}
pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
self.maybe_auto_commit().await?;
let result = self.inner.next_batch().await?;
if let Some(ref poll_result) = result {
self.pending_offset = poll_result.current_offset;
}
Ok(result)
}
#[inline]
pub async fn consume(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
#[inline]
pub async fn poll(&mut self) -> Result<Option<PollResult>> {
self.next_batch().await
}
pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if let Some(result) = self.next_batch().await? {
return Ok(Some(result));
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
Ok(None)
}
pub async fn commit(&mut self) -> Result<()> {
self.commit_offset(self.pending_offset).await
}
pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
self.offset_store
.save(self.config.topic_id, numeric_consumer_id, offset)?;
self.committed_offset = offset;
self.last_commit_time = Instant::now();
let _ = self.inner.commit().await;
Ok(())
}
pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
let offset = self.inner.seek(position).await?;
self.pending_offset = offset;
Ok(offset)
}
pub fn current_offset(&self) -> u64 {
self.pending_offset
}
pub fn committed_offset(&self) -> u64 {
self.committed_offset
}
pub fn consumer_id(&self) -> &str {
&self.config.consumer_id
}
pub fn topic_id(&self) -> u32 {
self.config.topic_id
}
pub fn is_subscribed(&self) -> bool {
self.inner.is_subscribed()
}
pub fn client(&self) -> &LanceClient {
self.inner.client()
}
pub async fn close(mut self) -> Result<LanceClient> {
if self.pending_offset > self.committed_offset {
let _ = self.commit().await;
}
self.inner.into_client().await
}
fn hash_consumer_id(consumer_id: &str) -> u64 {
let mut hasher = DefaultHasher::new();
consumer_id.hash(&mut hasher);
hasher.finish()
}
async fn maybe_auto_commit(&mut self) -> Result<()> {
if let Some(interval) = self.config.auto_commit_interval {
if self.last_commit_time.elapsed() >= interval {
if self.pending_offset > self.committed_offset {
self.commit().await?;
} else {
self.last_commit_time = Instant::now();
}
}
}
Ok(())
}
}
impl std::fmt::Debug for StandaloneConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StandaloneConsumer")
.field("consumer_id", &self.config.consumer_id)
.field("topic_id", &self.config.topic_id)
.field("pending_offset", &self.pending_offset)
.field("committed_offset", &self.committed_offset)
.field("is_subscribed", &self.inner.is_subscribed())
.finish()
}
}
pub struct StandaloneConsumerBuilder {
addr: String,
base_config: StandaloneConfig,
}
impl StandaloneConsumerBuilder {
pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
Self {
addr: addr.into(),
base_config: StandaloneConfig::new(consumer_id, 0),
}
}
pub fn with_offset_dir(mut self, dir: &Path) -> Self {
self.base_config = self.base_config.with_offset_dir(dir);
self
}
pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
self.base_config = self.base_config.with_max_fetch_bytes(bytes);
self
}
pub fn with_start_position(mut self, position: SeekPosition) -> Self {
self.base_config = self.base_config.with_start_position(position);
self
}
pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
self.base_config = self.base_config.with_auto_commit_interval(interval);
self
}
pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
let mut config = self.base_config.clone();
config.topic_id = topic_id;
StandaloneConsumer::connect(&self.addr, config).await
}
pub async fn build_for_topic_name(
&self,
topic_name: impl Into<String>,
) -> Result<StandaloneConsumer> {
let name = topic_name.into();
validate_topic_name(&name)?;
let mut config = self.base_config.clone();
config.topic_id = 0;
config.topic_name = Some(name);
StandaloneConsumer::connect(&self.addr, config).await
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::error::validate_topic_name;
#[test]
fn test_standalone_config_defaults() {
let config = StandaloneConfig::new("test-consumer", 1);
assert_eq!(config.consumer_id, "test-consumer");
assert_eq!(config.topic_id, 1);
assert_eq!(config.max_fetch_bytes, 1_048_576);
assert!(config.offset_dir.is_none());
assert!(config.auto_commit_interval.is_some());
assert!(config.topic_name.is_none());
}
#[test]
fn test_standalone_config_builder() {
let config = StandaloneConfig::new("test", 1)
.with_max_fetch_bytes(512 * 1024)
.with_offset_dir(Path::new("/tmp/offsets"))
.with_manual_commit()
.with_start_position(SeekPosition::End);
assert_eq!(config.max_fetch_bytes, 512 * 1024);
assert!(config.offset_dir.is_some());
assert!(config.auto_commit_interval.is_none());
}
#[test]
fn test_standalone_config_with_auto_commit() {
let config = StandaloneConfig::new("test", 1)
.with_auto_commit_interval(Some(Duration::from_secs(10)));
assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
}
#[test]
fn test_with_topic_name_sets_name_and_zero_id() {
let config = StandaloneConfig::with_topic_name("my-consumer", "rithmic-actions");
assert_eq!(config.consumer_id, "my-consumer");
assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
assert_eq!(config.topic_id, 0);
}
#[test]
fn test_new_with_id_sets_both_name_and_id() {
let config = StandaloneConfig::new_with_id("my-consumer", "rithmic-actions", 42);
assert_eq!(config.consumer_id, "my-consumer");
assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
assert_eq!(config.topic_id, 42);
}
#[test]
fn test_with_topic_name_builder_chain() {
let config = StandaloneConfig::with_topic_name("consumer-1", "data-stream")
.with_max_fetch_bytes(256 * 1024)
.with_manual_commit()
.with_start_position(SeekPosition::End);
assert_eq!(config.topic_name.as_deref(), Some("data-stream"));
assert_eq!(config.topic_id, 0);
assert_eq!(config.max_fetch_bytes, 256 * 1024);
assert!(config.auto_commit_interval.is_none());
}
#[test]
fn test_validate_topic_name_accepts_valid_names() {
let valid = &[
"rithmic-actions",
"data",
"topic-123",
"MyTopic",
"ABC",
"a-b-c-1-2-3",
];
for name in valid {
assert!(
validate_topic_name(name).is_ok(),
"Expected {:?} to be valid",
name
);
}
}
#[test]
fn test_validate_topic_name_rejects_empty() {
let result = validate_topic_name("");
assert!(
matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
"Empty name should be invalid"
);
}
#[test]
fn test_validate_topic_name_rejects_spaces() {
let result = validate_topic_name("bad topic");
assert!(
matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
"Name with space should be invalid"
);
}
#[test]
fn test_validate_topic_name_rejects_special_chars() {
let invalid = &[
"topic!",
"topic/sub",
"topic.name",
"topic_name",
"topic@v2",
];
for name in invalid {
assert!(
matches!(
validate_topic_name(name),
Err(crate::ClientError::InvalidTopicName(_))
),
"Expected {:?} to be invalid",
name
);
}
}
#[test]
fn test_validate_topic_name_rejects_unicode() {
let result = validate_topic_name("tópico");
assert!(
matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
"Name with unicode should be invalid"
);
}
}