1use std::collections::hash_map::DefaultHasher;
36use std::hash::{Hash, Hasher};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use crate::client::{ClientConfig, LanceClient};
42use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
43use crate::error::Result;
44use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
45
46#[derive(Debug, Clone)]
48pub struct StandaloneConfig {
49 pub consumer_id: String,
51 pub topic_id: u32,
53 pub max_fetch_bytes: u32,
55 pub start_position: SeekPosition,
57 pub offset_dir: Option<PathBuf>,
59 pub auto_commit_interval: Option<Duration>,
61 pub connect_timeout: Duration,
63 pub poll_timeout: Duration,
65}
66
67impl StandaloneConfig {
68 pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
70 Self {
71 consumer_id: consumer_id.into(),
72 topic_id,
73 max_fetch_bytes: 1_048_576, start_position: SeekPosition::Beginning,
75 offset_dir: None,
76 auto_commit_interval: Some(Duration::from_secs(5)),
77 connect_timeout: Duration::from_secs(30),
78 poll_timeout: Duration::from_millis(100),
79 }
80 }
81
82 pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
84 self.consumer_id = id.into();
85 self
86 }
87
88 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
90 self.max_fetch_bytes = bytes;
91 self
92 }
93
94 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
96 self.start_position = position;
97 self
98 }
99
100 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
103 self.offset_dir = Some(dir.to_path_buf());
104 self
105 }
106
107 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
109 self.auto_commit_interval = interval;
110 self
111 }
112
113 pub fn with_manual_commit(mut self) -> Self {
115 self.auto_commit_interval = None;
116 self
117 }
118
119 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
121 self.connect_timeout = timeout;
122 self
123 }
124
125 pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
127 self.poll_timeout = timeout;
128 self
129 }
130}
131
132pub struct StandaloneConsumer {
143 inner: StreamingConsumer,
144 config: StandaloneConfig,
145 offset_store: Arc<dyn OffsetStore>,
146 last_commit_time: Instant,
147 pending_offset: u64,
148 committed_offset: u64,
149}
150
151impl StandaloneConsumer {
152 pub async fn connect(addr: &str, config: StandaloneConfig) -> Result<Self> {
166 let mut client_config = ClientConfig::new(addr);
167 client_config.connect_timeout = config.connect_timeout;
168
169 let client = LanceClient::connect(client_config).await?;
170 Self::from_client(client, config).await
171 }
172
173 pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
175 let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
177
178 let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
180 Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
181 } else {
182 Arc::new(MemoryOffsetStore::new())
183 };
184
185 let stored_offset = offset_store
187 .load(config.topic_id, numeric_consumer_id)
188 .ok()
189 .flatten();
190
191 let start_position = if let Some(offset) = stored_offset {
193 SeekPosition::Offset(offset)
194 } else {
195 config.start_position
196 };
197
198 let streaming_config = StreamingConsumerConfig::new(config.topic_id)
200 .with_max_batch_bytes(config.max_fetch_bytes)
201 .with_start_position(start_position)
202 .with_auto_commit_interval(0); let mut inner = StreamingConsumer::new(client, streaming_config);
205
206 inner.start().await?;
208
209 let current_offset = inner.current_offset();
210
211 Ok(Self {
212 inner,
213 config,
214 offset_store,
215 last_commit_time: Instant::now(),
216 pending_offset: current_offset,
217 committed_offset: stored_offset.unwrap_or(0),
218 })
219 }
220
221 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
228 self.maybe_auto_commit().await?;
230
231 let result = self.inner.poll().await?;
233
234 if let Some(ref poll_result) = result {
235 self.pending_offset = poll_result.current_offset;
236 }
237
238 Ok(result)
239 }
240
241 pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
246 let deadline = Instant::now() + timeout;
247
248 while Instant::now() < deadline {
249 if let Some(result) = self.poll().await? {
250 return Ok(Some(result));
251 }
252 tokio::time::sleep(Duration::from_millis(10)).await;
254 }
255
256 Ok(None)
257 }
258
259 pub async fn commit(&mut self) -> Result<()> {
264 self.commit_offset(self.pending_offset).await
265 }
266
267 pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
271 let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
273
274 self.offset_store
276 .save(self.config.topic_id, numeric_consumer_id, offset)?;
277
278 self.committed_offset = offset;
279 self.last_commit_time = Instant::now();
280
281 let _ = self.inner.commit().await;
283
284 Ok(())
285 }
286
287 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
292 let offset = self.inner.seek(position).await?;
293 self.pending_offset = offset;
294 Ok(offset)
295 }
296
297 pub fn current_offset(&self) -> u64 {
299 self.pending_offset
300 }
301
302 pub fn committed_offset(&self) -> u64 {
304 self.committed_offset
305 }
306
307 pub fn consumer_id(&self) -> &str {
309 &self.config.consumer_id
310 }
311
312 pub fn topic_id(&self) -> u32 {
314 self.config.topic_id
315 }
316
317 pub fn is_subscribed(&self) -> bool {
319 self.inner.is_subscribed()
320 }
321
322 pub fn client(&self) -> &LanceClient {
324 self.inner.client()
325 }
326
327 pub async fn close(mut self) -> Result<LanceClient> {
334 if self.pending_offset > self.committed_offset {
336 let _ = self.commit().await;
337 }
338
339 self.inner.into_client().await
340 }
341
342 fn hash_consumer_id(consumer_id: &str) -> u64 {
344 let mut hasher = DefaultHasher::new();
345 consumer_id.hash(&mut hasher);
346 hasher.finish()
347 }
348
349 async fn maybe_auto_commit(&mut self) -> Result<()> {
351 if let Some(interval) = self.config.auto_commit_interval {
352 if self.last_commit_time.elapsed() >= interval {
353 if self.pending_offset > self.committed_offset {
354 self.commit().await?;
355 } else {
356 self.last_commit_time = Instant::now();
358 }
359 }
360 }
361 Ok(())
362 }
363}
364
365impl std::fmt::Debug for StandaloneConsumer {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 f.debug_struct("StandaloneConsumer")
368 .field("consumer_id", &self.config.consumer_id)
369 .field("topic_id", &self.config.topic_id)
370 .field("pending_offset", &self.pending_offset)
371 .field("committed_offset", &self.committed_offset)
372 .field("is_subscribed", &self.inner.is_subscribed())
373 .finish()
374 }
375}
376
377pub struct StandaloneConsumerBuilder {
379 addr: String,
380 base_config: StandaloneConfig,
381}
382
383impl StandaloneConsumerBuilder {
384 pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
386 Self {
387 addr: addr.into(),
388 base_config: StandaloneConfig::new(consumer_id, 0),
389 }
390 }
391
392 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
394 self.base_config = self.base_config.with_offset_dir(dir);
395 self
396 }
397
398 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
400 self.base_config = self.base_config.with_max_fetch_bytes(bytes);
401 self
402 }
403
404 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
406 self.base_config = self.base_config.with_start_position(position);
407 self
408 }
409
410 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
412 self.base_config = self.base_config.with_auto_commit_interval(interval);
413 self
414 }
415
416 pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
418 let mut config = self.base_config.clone();
419 config.topic_id = topic_id;
420 StandaloneConsumer::connect(&self.addr, config).await
421 }
422}
423
424#[cfg(test)]
425#[allow(clippy::unwrap_used)]
426mod tests {
427 use super::*;
428
429 #[test]
430 fn test_standalone_config_defaults() {
431 let config = StandaloneConfig::new("test-consumer", 1);
432
433 assert_eq!(config.consumer_id, "test-consumer");
434 assert_eq!(config.topic_id, 1);
435 assert_eq!(config.max_fetch_bytes, 1_048_576);
436 assert!(config.offset_dir.is_none());
437 assert!(config.auto_commit_interval.is_some());
438 }
439
440 #[test]
441 fn test_standalone_config_builder() {
442 let config = StandaloneConfig::new("test", 1)
443 .with_max_fetch_bytes(512 * 1024)
444 .with_offset_dir(Path::new("/tmp/offsets"))
445 .with_manual_commit()
446 .with_start_position(SeekPosition::End);
447
448 assert_eq!(config.max_fetch_bytes, 512 * 1024);
449 assert!(config.offset_dir.is_some());
450 assert!(config.auto_commit_interval.is_none());
451 }
452
453 #[test]
454 fn test_standalone_config_with_auto_commit() {
455 let config = StandaloneConfig::new("test", 1)
456 .with_auto_commit_interval(Some(Duration::from_secs(10)));
457
458 assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
459 }
460}