1use std::collections::hash_map::DefaultHasher;
36use std::hash::{Hash, Hasher};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use crate::client::{ClientConfig, LanceClient};
42use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
43use crate::error::Result;
44use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
45
46#[derive(Debug, Clone)]
48pub struct StandaloneConfig {
49 pub consumer_id: String,
51 pub topic_id: u32,
53 pub max_fetch_bytes: u32,
55 pub start_position: SeekPosition,
57 pub offset_dir: Option<PathBuf>,
59 pub auto_commit_interval: Option<Duration>,
61 pub connect_timeout: Duration,
63 pub poll_timeout: Duration,
65}
66
67impl StandaloneConfig {
68 pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
70 Self {
71 consumer_id: consumer_id.into(),
72 topic_id,
73 max_fetch_bytes: 1_048_576, start_position: SeekPosition::Beginning,
75 offset_dir: None,
76 auto_commit_interval: Some(Duration::from_secs(5)),
77 connect_timeout: Duration::from_secs(30),
78 poll_timeout: Duration::from_millis(100),
79 }
80 }
81
82 pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
84 self.consumer_id = id.into();
85 self
86 }
87
88 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
90 self.max_fetch_bytes = bytes;
91 self
92 }
93
94 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
96 self.start_position = position;
97 self
98 }
99
100 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
103 self.offset_dir = Some(dir.to_path_buf());
104 self
105 }
106
107 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
109 self.auto_commit_interval = interval;
110 self
111 }
112
113 pub fn with_manual_commit(mut self) -> Self {
115 self.auto_commit_interval = None;
116 self
117 }
118
119 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
121 self.connect_timeout = timeout;
122 self
123 }
124
125 pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
127 self.poll_timeout = timeout;
128 self
129 }
130}
131
132pub struct StandaloneConsumer {
143 inner: StreamingConsumer,
144 config: StandaloneConfig,
145 offset_store: Arc<dyn OffsetStore>,
146 last_commit_time: Instant,
147 pending_offset: u64,
148 committed_offset: u64,
149}
150
151impl StandaloneConsumer {
152 pub async fn connect(addr: &str, config: StandaloneConfig) -> Result<Self> {
166 let mut client_config = ClientConfig::new(addr);
167 client_config.connect_timeout = config.connect_timeout;
168
169 let client = LanceClient::connect(client_config).await?;
170 Self::from_client(client, config).await
171 }
172
173 pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
175 let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
177
178 let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
180 Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
181 } else {
182 Arc::new(MemoryOffsetStore::new())
183 };
184
185 let stored_offset = offset_store
187 .load(config.topic_id, numeric_consumer_id)
188 .ok()
189 .flatten();
190
191 let start_position = if let Some(offset) = stored_offset {
193 SeekPosition::Offset(offset)
194 } else {
195 config.start_position
196 };
197
198 let streaming_config = StreamingConsumerConfig::new(config.topic_id)
200 .with_max_batch_bytes(config.max_fetch_bytes)
201 .with_start_position(start_position)
202 .with_auto_commit_interval(0); let mut inner = StreamingConsumer::new(client, streaming_config);
205
206 inner.start().await?;
208
209 let current_offset = inner.current_offset();
210
211 Ok(Self {
212 inner,
213 config,
214 offset_store,
215 last_commit_time: Instant::now(),
216 pending_offset: current_offset,
217 committed_offset: stored_offset.unwrap_or(0),
218 })
219 }
220
221 pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
228 self.maybe_auto_commit().await?;
230
231 let result = self.inner.next_batch().await?;
233
234 if let Some(ref poll_result) = result {
235 self.pending_offset = poll_result.current_offset;
236 }
237
238 Ok(result)
239 }
240
241 #[inline]
243 pub async fn consume(&mut self) -> Result<Option<PollResult>> {
244 self.next_batch().await
245 }
246
247 #[inline]
249 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
250 self.next_batch().await
251 }
252
253 pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
258 let deadline = Instant::now() + timeout;
259
260 while Instant::now() < deadline {
261 if let Some(result) = self.next_batch().await? {
262 return Ok(Some(result));
263 }
264 tokio::time::sleep(Duration::from_millis(10)).await;
266 }
267
268 Ok(None)
269 }
270
271 pub async fn commit(&mut self) -> Result<()> {
276 self.commit_offset(self.pending_offset).await
277 }
278
279 pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
283 let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
285
286 self.offset_store
288 .save(self.config.topic_id, numeric_consumer_id, offset)?;
289
290 self.committed_offset = offset;
291 self.last_commit_time = Instant::now();
292
293 let _ = self.inner.commit().await;
295
296 Ok(())
297 }
298
299 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
304 let offset = self.inner.seek(position).await?;
305 self.pending_offset = offset;
306 Ok(offset)
307 }
308
309 pub fn current_offset(&self) -> u64 {
311 self.pending_offset
312 }
313
314 pub fn committed_offset(&self) -> u64 {
316 self.committed_offset
317 }
318
319 pub fn consumer_id(&self) -> &str {
321 &self.config.consumer_id
322 }
323
324 pub fn topic_id(&self) -> u32 {
326 self.config.topic_id
327 }
328
329 pub fn is_subscribed(&self) -> bool {
331 self.inner.is_subscribed()
332 }
333
334 pub fn client(&self) -> &LanceClient {
336 self.inner.client()
337 }
338
339 pub async fn close(mut self) -> Result<LanceClient> {
346 if self.pending_offset > self.committed_offset {
348 let _ = self.commit().await;
349 }
350
351 self.inner.into_client().await
352 }
353
354 fn hash_consumer_id(consumer_id: &str) -> u64 {
356 let mut hasher = DefaultHasher::new();
357 consumer_id.hash(&mut hasher);
358 hasher.finish()
359 }
360
361 async fn maybe_auto_commit(&mut self) -> Result<()> {
363 if let Some(interval) = self.config.auto_commit_interval {
364 if self.last_commit_time.elapsed() >= interval {
365 if self.pending_offset > self.committed_offset {
366 self.commit().await?;
367 } else {
368 self.last_commit_time = Instant::now();
370 }
371 }
372 }
373 Ok(())
374 }
375}
376
377impl std::fmt::Debug for StandaloneConsumer {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.debug_struct("StandaloneConsumer")
380 .field("consumer_id", &self.config.consumer_id)
381 .field("topic_id", &self.config.topic_id)
382 .field("pending_offset", &self.pending_offset)
383 .field("committed_offset", &self.committed_offset)
384 .field("is_subscribed", &self.inner.is_subscribed())
385 .finish()
386 }
387}
388
389pub struct StandaloneConsumerBuilder {
391 addr: String,
392 base_config: StandaloneConfig,
393}
394
395impl StandaloneConsumerBuilder {
396 pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
398 Self {
399 addr: addr.into(),
400 base_config: StandaloneConfig::new(consumer_id, 0),
401 }
402 }
403
404 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
406 self.base_config = self.base_config.with_offset_dir(dir);
407 self
408 }
409
410 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
412 self.base_config = self.base_config.with_max_fetch_bytes(bytes);
413 self
414 }
415
416 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
418 self.base_config = self.base_config.with_start_position(position);
419 self
420 }
421
422 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
424 self.base_config = self.base_config.with_auto_commit_interval(interval);
425 self
426 }
427
428 pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
430 let mut config = self.base_config.clone();
431 config.topic_id = topic_id;
432 StandaloneConsumer::connect(&self.addr, config).await
433 }
434}
435
436#[cfg(test)]
437#[allow(clippy::unwrap_used)]
438mod tests {
439 use super::*;
440
441 #[test]
442 fn test_standalone_config_defaults() {
443 let config = StandaloneConfig::new("test-consumer", 1);
444
445 assert_eq!(config.consumer_id, "test-consumer");
446 assert_eq!(config.topic_id, 1);
447 assert_eq!(config.max_fetch_bytes, 1_048_576);
448 assert!(config.offset_dir.is_none());
449 assert!(config.auto_commit_interval.is_some());
450 }
451
452 #[test]
453 fn test_standalone_config_builder() {
454 let config = StandaloneConfig::new("test", 1)
455 .with_max_fetch_bytes(512 * 1024)
456 .with_offset_dir(Path::new("/tmp/offsets"))
457 .with_manual_commit()
458 .with_start_position(SeekPosition::End);
459
460 assert_eq!(config.max_fetch_bytes, 512 * 1024);
461 assert!(config.offset_dir.is_some());
462 assert!(config.auto_commit_interval.is_none());
463 }
464
465 #[test]
466 fn test_standalone_config_with_auto_commit() {
467 let config = StandaloneConfig::new("test", 1)
468 .with_auto_commit_interval(Some(Duration::from_secs(10)));
469
470 assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
471 }
472}