1use std::sync::Arc;
9
10use crate::client::LanceClient;
11use crate::error::{ClientError, Result};
12use crate::offset::OffsetStore;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SeekPosition {
17 Beginning,
19 End,
21 Offset(u64),
23}
24
25#[derive(Debug, Clone)]
27pub struct ConsumerConfig {
28 pub topic_id: u32,
30 pub max_fetch_bytes: u32,
32 pub start_position: SeekPosition,
34}
35
36impl Default for ConsumerConfig {
37 fn default() -> Self {
38 Self {
39 topic_id: 0,
40 max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
42 }
43 }
44}
45
46impl ConsumerConfig {
47 pub fn new(topic_id: u32) -> Self {
49 Self {
50 topic_id,
51 ..Default::default()
52 }
53 }
54
55 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
57 self.max_fetch_bytes = bytes;
58 self
59 }
60
61 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
63 self.start_position = position;
64 self
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct PollResult {
71 pub data: bytes::Bytes,
73 pub current_offset: u64,
75 pub record_count: u32,
77 pub end_of_stream: bool,
79}
80
81impl PollResult {
82 pub fn is_empty(&self) -> bool {
84 self.data.is_empty()
85 }
86}
87
88pub struct Consumer {
114 client: LanceClient,
115 config: ConsumerConfig,
116 current_offset: u64,
117 cached_end_offset: Option<u64>,
119 offset_store: Option<Arc<dyn OffsetStore>>,
121 consumer_id: u64,
123}
124
125impl Consumer {
126 pub fn new(client: LanceClient, config: ConsumerConfig) -> Self {
128 Self::with_consumer_id(client, config, 0)
129 }
130
131 pub fn with_consumer_id(client: LanceClient, config: ConsumerConfig, consumer_id: u64) -> Self {
133 let initial_offset = match config.start_position {
134 SeekPosition::Beginning => 0,
135 SeekPosition::Offset(offset) => offset,
136 SeekPosition::End => u64::MAX, };
138
139 Self {
140 client,
141 config,
142 current_offset: initial_offset,
143 cached_end_offset: None,
144 offset_store: None,
145 consumer_id,
146 }
147 }
148
149 pub fn with_offset_store(
166 client: LanceClient,
167 config: ConsumerConfig,
168 consumer_id: u64,
169 offset_store: Arc<dyn OffsetStore>,
170 ) -> Result<Self> {
171 let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
173
174 let initial_offset = if let Some(offset) = stored_offset {
175 offset
177 } else {
178 match config.start_position {
180 SeekPosition::Beginning => 0,
181 SeekPosition::Offset(offset) => offset,
182 SeekPosition::End => u64::MAX, }
184 };
185
186 Ok(Self {
187 client,
188 config,
189 current_offset: initial_offset,
190 cached_end_offset: None,
191 offset_store: Some(offset_store),
192 consumer_id,
193 })
194 }
195
196 pub fn from_beginning(client: LanceClient, topic_id: u32) -> Self {
198 Self::new(client, ConsumerConfig::new(topic_id))
199 }
200
201 pub fn from_offset(client: LanceClient, topic_id: u32, offset: u64) -> Self {
203 let config =
204 ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
205 Self::new(client, config)
206 }
207
208 pub fn current_offset(&self) -> u64 {
210 self.current_offset
211 }
212
213 pub fn topic_id(&self) -> u32 {
215 self.config.topic_id
216 }
217
218 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
239 match position {
240 SeekPosition::Beginning => {
241 self.current_offset = 0;
242 Ok(0)
243 },
244 SeekPosition::Offset(offset) => {
245 self.current_offset = offset;
246 Ok(offset)
247 },
248 SeekPosition::End => {
249 let end_offset = self.discover_end_offset().await?;
251 self.current_offset = end_offset;
252 Ok(end_offset)
253 },
254 }
255 }
256
257 pub async fn rewind(&mut self) -> Result<()> {
261 self.seek(SeekPosition::Beginning).await?;
262 Ok(())
263 }
264
265 pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
269 self.seek(SeekPosition::Offset(offset)).await?;
270 Ok(())
271 }
272
273 pub async fn seek_to_end(&mut self) -> Result<u64> {
277 self.seek(SeekPosition::End).await
278 }
279
280 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
287 if self.current_offset == u64::MAX {
289 let end_offset = self.discover_end_offset().await?;
290 self.current_offset = end_offset;
291 }
292
293 let fetch_result = self
294 .client
295 .fetch(
296 self.config.topic_id,
297 self.current_offset,
298 self.config.max_fetch_bytes,
299 )
300 .await?;
301
302 let end_of_stream =
303 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
304
305 let result = PollResult {
306 data: fetch_result.data,
307 current_offset: fetch_result.next_offset,
308 record_count: fetch_result.record_count,
309 end_of_stream,
310 };
311
312 self.current_offset = fetch_result.next_offset;
314 self.cached_end_offset = Some(fetch_result.next_offset);
315
316 if result.is_empty() {
317 Ok(None)
318 } else {
319 Ok(Some(result))
320 }
321 }
322
323 pub async fn poll_blocking(&mut self) -> Result<PollResult> {
328 if self.current_offset == u64::MAX {
330 let end_offset = self.discover_end_offset().await?;
331 self.current_offset = end_offset;
332 }
333
334 let fetch_result = self
335 .client
336 .fetch(
337 self.config.topic_id,
338 self.current_offset,
339 self.config.max_fetch_bytes,
340 )
341 .await?;
342
343 let end_of_stream =
344 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
345
346 let result = PollResult {
347 data: fetch_result.data,
348 current_offset: fetch_result.next_offset,
349 record_count: fetch_result.record_count,
350 end_of_stream,
351 };
352
353 if !result.is_empty() {
355 self.current_offset = fetch_result.next_offset;
356 }
357 self.cached_end_offset = Some(fetch_result.next_offset);
358
359 Ok(result)
360 }
361
362 async fn discover_end_offset(&mut self) -> Result<u64> {
364 if let Some(end) = self.cached_end_offset {
366 return Ok(end);
367 }
368
369 let fetch_result = self
371 .client
372 .fetch(
373 self.config.topic_id,
374 0,
375 1, )
377 .await?;
378
379 let end_offset = fetch_result.next_offset;
382 self.cached_end_offset = Some(end_offset);
383 Ok(end_offset)
384 }
385
386 pub async fn commit(&mut self) -> Result<()> {
398 if let Some(ref store) = self.offset_store {
399 store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
400 }
401 Ok(())
402 }
403
404 pub fn consumer_id(&self) -> u64 {
406 self.consumer_id
407 }
408
409 pub fn has_offset_store(&self) -> bool {
411 self.offset_store.is_some()
412 }
413
414 pub fn client(&self) -> &LanceClient {
416 &self.client
417 }
418
419 pub fn client_mut(&mut self) -> &mut LanceClient {
421 &mut self.client
422 }
423
424 pub fn into_client(self) -> LanceClient {
426 self.client
427 }
428}
429
430impl std::fmt::Debug for Consumer {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 f.debug_struct("Consumer")
433 .field("topic_id", &self.config.topic_id)
434 .field("current_offset", &self.current_offset)
435 .field("max_fetch_bytes", &self.config.max_fetch_bytes)
436 .finish()
437 }
438}
439
440#[derive(Debug, Clone)]
442pub struct StreamingConsumerConfig {
443 pub topic_id: u32,
445 pub max_batch_bytes: u32,
447 pub start_position: SeekPosition,
449 pub consumer_group: Option<String>,
451 pub auto_commit_interval_ms: u64,
453}
454
455impl Default for StreamingConsumerConfig {
456 fn default() -> Self {
457 Self {
458 topic_id: 0,
459 max_batch_bytes: 64 * 1024,
460 start_position: SeekPosition::Beginning,
461 consumer_group: None,
462 auto_commit_interval_ms: 5000, }
464 }
465}
466
467impl StreamingConsumerConfig {
468 pub fn new(topic_id: u32) -> Self {
470 Self {
471 topic_id,
472 ..Default::default()
473 }
474 }
475
476 pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
478 self.max_batch_bytes = bytes;
479 self
480 }
481
482 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
484 self.start_position = position;
485 self
486 }
487
488 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
490 self.consumer_group = Some(group.into());
491 self
492 }
493
494 pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
496 self.auto_commit_interval_ms = interval_ms;
497 self
498 }
499}
500
501pub struct StreamingConsumer {
535 client: LanceClient,
536 config: StreamingConsumerConfig,
537 consumer_id: u64,
538 current_offset: u64,
539 committed_offset: u64,
540 is_subscribed: bool,
541 last_commit_time: std::time::Instant,
542}
543
544impl StreamingConsumer {
545 pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
547 let consumer_id = Self::generate_consumer_id();
548 let initial_offset = match config.start_position {
549 SeekPosition::Beginning => 0,
550 SeekPosition::Offset(offset) => offset,
551 SeekPosition::End => u64::MAX,
552 };
553
554 Self {
555 client,
556 config,
557 consumer_id,
558 current_offset: initial_offset,
559 committed_offset: 0,
560 is_subscribed: false,
561 last_commit_time: std::time::Instant::now(),
562 }
563 }
564
565 fn generate_consumer_id() -> u64 {
567 use std::time::{SystemTime, UNIX_EPOCH};
568 let timestamp = SystemTime::now()
569 .duration_since(UNIX_EPOCH)
570 .map(|d| d.as_nanos() as u64)
571 .unwrap_or(0);
572 let thread_id = std::thread::current().id();
574 timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
575 }
576
577 pub async fn start(&mut self) -> Result<()> {
582 if self.is_subscribed {
583 return Ok(());
584 }
585
586 let result = self
587 .client
588 .subscribe(
589 self.config.topic_id,
590 self.current_offset,
591 self.config.max_batch_bytes,
592 self.consumer_id,
593 )
594 .await?;
595
596 self.current_offset = result.start_offset;
597 self.is_subscribed = true;
598 self.last_commit_time = std::time::Instant::now();
599
600 Ok(())
601 }
602
603 pub async fn stop(&mut self) -> Result<()> {
608 if !self.is_subscribed {
609 return Ok(());
610 }
611
612 if self.current_offset > self.committed_offset {
614 let _ = self.commit().await;
615 }
616
617 self.client
618 .unsubscribe(self.config.topic_id, self.consumer_id)
619 .await?;
620 self.is_subscribed = false;
621
622 Ok(())
623 }
624
625 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
630 if !self.is_subscribed {
631 return Err(ClientError::ProtocolError(
632 "Consumer not subscribed - call start() first".to_string(),
633 ));
634 }
635
636 self.maybe_auto_commit().await?;
638
639 let fetch_result = self
640 .client
641 .fetch(
642 self.config.topic_id,
643 self.current_offset,
644 self.config.max_batch_bytes,
645 )
646 .await?;
647
648 let end_of_stream =
649 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
650
651 let result = PollResult {
652 data: fetch_result.data,
653 current_offset: fetch_result.next_offset,
654 record_count: fetch_result.record_count,
655 end_of_stream,
656 };
657
658 self.current_offset = fetch_result.next_offset;
660
661 if result.is_empty() {
662 Ok(None)
663 } else {
664 Ok(Some(result))
665 }
666 }
667
668 pub async fn commit(&mut self) -> Result<()> {
673 if self.current_offset <= self.committed_offset {
674 return Ok(());
675 }
676
677 let result = self
678 .client
679 .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
680 .await?;
681
682 self.committed_offset = result.committed_offset;
683 self.last_commit_time = std::time::Instant::now();
684
685 Ok(())
686 }
687
688 async fn maybe_auto_commit(&mut self) -> Result<()> {
690 if self.config.auto_commit_interval_ms == 0 {
691 return Ok(()); }
693
694 let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
695 if elapsed >= self.config.auto_commit_interval_ms {
696 self.commit().await?;
697 }
698
699 Ok(())
700 }
701
702 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
707 let was_subscribed = self.is_subscribed;
708
709 if was_subscribed {
710 self.stop().await?;
711 }
712
713 let new_offset = match position {
714 SeekPosition::Beginning => 0,
715 SeekPosition::Offset(offset) => offset,
716 SeekPosition::End => u64::MAX,
717 };
718
719 self.current_offset = new_offset;
720
721 if was_subscribed {
722 self.start().await?;
723 }
724
725 Ok(self.current_offset)
726 }
727
728 pub fn current_offset(&self) -> u64 {
730 self.current_offset
731 }
732
733 pub fn committed_offset(&self) -> u64 {
735 self.committed_offset
736 }
737
738 pub fn consumer_id(&self) -> u64 {
740 self.consumer_id
741 }
742
743 pub fn is_subscribed(&self) -> bool {
745 self.is_subscribed
746 }
747
748 pub fn client(&self) -> &LanceClient {
750 &self.client
751 }
752
753 pub async fn into_client(mut self) -> Result<LanceClient> {
755 self.stop().await?;
756 Ok(self.client)
757 }
758}
759
760impl std::fmt::Debug for StreamingConsumer {
761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
762 f.debug_struct("StreamingConsumer")
763 .field("topic_id", &self.config.topic_id)
764 .field("consumer_id", &self.consumer_id)
765 .field("current_offset", &self.current_offset)
766 .field("committed_offset", &self.committed_offset)
767 .field("is_subscribed", &self.is_subscribed)
768 .finish()
769 }
770}
771
772#[cfg(test)]
773#[allow(clippy::unwrap_used)]
774mod tests {
775 use super::*;
776
777 #[test]
778 fn test_consumer_config_default() {
779 let config = ConsumerConfig::default();
780 assert_eq!(config.topic_id, 0);
781 assert_eq!(config.max_fetch_bytes, 64 * 1024);
782 assert_eq!(config.start_position, SeekPosition::Beginning);
783 }
784
785 #[test]
786 fn test_consumer_config_builder() {
787 let config = ConsumerConfig::new(42)
788 .with_max_fetch_bytes(128 * 1024)
789 .with_start_position(SeekPosition::Offset(1000));
790
791 assert_eq!(config.topic_id, 42);
792 assert_eq!(config.max_fetch_bytes, 128 * 1024);
793 assert_eq!(config.start_position, SeekPosition::Offset(1000));
794 }
795
796 #[test]
797 fn test_poll_result_is_empty() {
798 let empty = PollResult {
799 data: bytes::Bytes::new(),
800 current_offset: 0,
801 record_count: 0,
802 end_of_stream: true,
803 };
804 assert!(empty.is_empty());
805
806 let non_empty = PollResult {
807 data: bytes::Bytes::from_static(&[1, 2, 3]),
808 current_offset: 3,
809 record_count: 1,
810 end_of_stream: false,
811 };
812 assert!(!non_empty.is_empty());
813 }
814
815 #[test]
820 fn test_streaming_consumer_config_default() {
821 let config = StreamingConsumerConfig::default();
822 assert_eq!(config.topic_id, 0);
823 assert_eq!(config.max_batch_bytes, 64 * 1024);
824 assert_eq!(config.start_position, SeekPosition::Beginning);
825 assert!(config.consumer_group.is_none());
826 assert_eq!(config.auto_commit_interval_ms, 5000);
827 }
828
829 #[test]
830 fn test_streaming_consumer_config_builder() {
831 let config = StreamingConsumerConfig::new(42)
832 .with_max_batch_bytes(128 * 1024)
833 .with_start_position(SeekPosition::Offset(5000))
834 .with_consumer_group("my-group")
835 .with_auto_commit_interval(10000);
836
837 assert_eq!(config.topic_id, 42);
838 assert_eq!(config.max_batch_bytes, 128 * 1024);
839 assert_eq!(config.start_position, SeekPosition::Offset(5000));
840 assert_eq!(config.consumer_group, Some("my-group".to_string()));
841 assert_eq!(config.auto_commit_interval_ms, 10000);
842 }
843
844 #[test]
845 fn test_streaming_consumer_config_disable_auto_commit() {
846 let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
847
848 assert_eq!(config.auto_commit_interval_ms, 0);
849 }
850
851 #[test]
852 fn test_streaming_consumer_config_seek_positions() {
853 let beginning =
854 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
855 assert_eq!(beginning.start_position, SeekPosition::Beginning);
856
857 let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
858 assert_eq!(end.start_position, SeekPosition::End);
859
860 let offset =
861 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
862 assert_eq!(offset.start_position, SeekPosition::Offset(12345));
863 }
864
865 #[test]
870 fn test_seek_position_equality() {
871 assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
872 assert_eq!(SeekPosition::End, SeekPosition::End);
873 assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
874
875 assert_ne!(SeekPosition::Beginning, SeekPosition::End);
876 assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
877 assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
878 }
879
880 #[test]
881 fn test_seek_position_clone() {
882 let pos = SeekPosition::Offset(42);
883 let cloned = pos;
884 assert_eq!(pos, cloned);
885 }
886}