1use std::sync::Arc;
9
10use crate::client::LanceClient;
11use crate::error::{ClientError, Result};
12use crate::offset::OffsetStore;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SeekPosition {
17 Beginning,
19 End,
21 Offset(u64),
23}
24
25#[derive(Debug, Clone)]
27pub struct ConsumerConfig {
28 pub topic_id: u32,
30 pub max_fetch_bytes: u32,
32 pub start_position: SeekPosition,
34}
35
36impl Default for ConsumerConfig {
37 fn default() -> Self {
38 Self {
39 topic_id: 0,
40 max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
42 }
43 }
44}
45
46impl ConsumerConfig {
47 pub fn new(topic_id: u32) -> Self {
48 Self {
49 topic_id,
50 ..Default::default()
51 }
52 }
53
54 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
55 self.max_fetch_bytes = bytes;
56 self
57 }
58
59 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
60 self.start_position = position;
61 self
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct PollResult {
68 pub data: bytes::Bytes,
70 pub current_offset: u64,
72 pub record_count: u32,
74 pub end_of_stream: bool,
76}
77
78impl PollResult {
79 pub fn is_empty(&self) -> bool {
81 self.data.is_empty()
82 }
83}
84
85pub struct Consumer {
111 client: LanceClient,
112 config: ConsumerConfig,
113 current_offset: u64,
114 cached_end_offset: Option<u64>,
116 offset_store: Option<Arc<dyn OffsetStore>>,
118 consumer_id: u64,
120}
121
122impl Consumer {
123 pub fn new(client: LanceClient, config: ConsumerConfig) -> Self {
125 Self::with_consumer_id(client, config, 0)
126 }
127
128 pub fn with_consumer_id(client: LanceClient, config: ConsumerConfig, consumer_id: u64) -> Self {
130 let initial_offset = match config.start_position {
131 SeekPosition::Beginning => 0,
132 SeekPosition::Offset(offset) => offset,
133 SeekPosition::End => u64::MAX, };
135
136 Self {
137 client,
138 config,
139 current_offset: initial_offset,
140 cached_end_offset: None,
141 offset_store: None,
142 consumer_id,
143 }
144 }
145
146 pub fn with_offset_store(
163 client: LanceClient,
164 config: ConsumerConfig,
165 consumer_id: u64,
166 offset_store: Arc<dyn OffsetStore>,
167 ) -> Result<Self> {
168 let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
170
171 let initial_offset = if let Some(offset) = stored_offset {
172 offset
174 } else {
175 match config.start_position {
177 SeekPosition::Beginning => 0,
178 SeekPosition::Offset(offset) => offset,
179 SeekPosition::End => u64::MAX, }
181 };
182
183 Ok(Self {
184 client,
185 config,
186 current_offset: initial_offset,
187 cached_end_offset: None,
188 offset_store: Some(offset_store),
189 consumer_id,
190 })
191 }
192
193 pub fn from_beginning(client: LanceClient, topic_id: u32) -> Self {
195 Self::new(client, ConsumerConfig::new(topic_id))
196 }
197
198 pub fn from_offset(client: LanceClient, topic_id: u32, offset: u64) -> Self {
200 let config =
201 ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
202 Self::new(client, config)
203 }
204
205 pub fn current_offset(&self) -> u64 {
207 self.current_offset
208 }
209
210 pub fn topic_id(&self) -> u32 {
212 self.config.topic_id
213 }
214
215 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
236 match position {
237 SeekPosition::Beginning => {
238 self.current_offset = 0;
239 Ok(0)
240 },
241 SeekPosition::Offset(offset) => {
242 self.current_offset = offset;
243 Ok(offset)
244 },
245 SeekPosition::End => {
246 let end_offset = self.discover_end_offset().await?;
248 self.current_offset = end_offset;
249 Ok(end_offset)
250 },
251 }
252 }
253
254 pub async fn rewind(&mut self) -> Result<()> {
258 self.seek(SeekPosition::Beginning).await?;
259 Ok(())
260 }
261
262 pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
266 self.seek(SeekPosition::Offset(offset)).await?;
267 Ok(())
268 }
269
270 pub async fn seek_to_end(&mut self) -> Result<u64> {
274 self.seek(SeekPosition::End).await
275 }
276
277 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
284 if self.current_offset == u64::MAX {
286 let end_offset = self.discover_end_offset().await?;
287 self.current_offset = end_offset;
288 }
289
290 let fetch_result = self
291 .client
292 .fetch(
293 self.config.topic_id,
294 self.current_offset,
295 self.config.max_fetch_bytes,
296 )
297 .await?;
298
299 let end_of_stream =
300 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
301
302 let result = PollResult {
303 data: fetch_result.data,
304 current_offset: fetch_result.next_offset,
305 record_count: fetch_result.record_count,
306 end_of_stream,
307 };
308
309 self.current_offset = fetch_result.next_offset;
311 self.cached_end_offset = Some(fetch_result.next_offset);
312
313 if result.is_empty() {
314 Ok(None)
315 } else {
316 Ok(Some(result))
317 }
318 }
319
320 pub async fn poll_blocking(&mut self) -> Result<PollResult> {
325 if self.current_offset == u64::MAX {
327 let end_offset = self.discover_end_offset().await?;
328 self.current_offset = end_offset;
329 }
330
331 let fetch_result = self
332 .client
333 .fetch(
334 self.config.topic_id,
335 self.current_offset,
336 self.config.max_fetch_bytes,
337 )
338 .await?;
339
340 let end_of_stream =
341 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
342
343 let result = PollResult {
344 data: fetch_result.data,
345 current_offset: fetch_result.next_offset,
346 record_count: fetch_result.record_count,
347 end_of_stream,
348 };
349
350 if !result.is_empty() {
352 self.current_offset = fetch_result.next_offset;
353 }
354 self.cached_end_offset = Some(fetch_result.next_offset);
355
356 Ok(result)
357 }
358
359 async fn discover_end_offset(&mut self) -> Result<u64> {
361 if let Some(end) = self.cached_end_offset {
363 return Ok(end);
364 }
365
366 let fetch_result = self
368 .client
369 .fetch(
370 self.config.topic_id,
371 0,
372 1, )
374 .await?;
375
376 let end_offset = fetch_result.next_offset;
379 self.cached_end_offset = Some(end_offset);
380 Ok(end_offset)
381 }
382
383 pub async fn commit(&mut self) -> Result<()> {
395 if let Some(ref store) = self.offset_store {
396 store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
397 }
398 Ok(())
399 }
400
401 pub fn consumer_id(&self) -> u64 {
403 self.consumer_id
404 }
405
406 pub fn has_offset_store(&self) -> bool {
408 self.offset_store.is_some()
409 }
410
411 pub fn client(&self) -> &LanceClient {
413 &self.client
414 }
415
416 pub fn client_mut(&mut self) -> &mut LanceClient {
418 &mut self.client
419 }
420
421 pub fn into_client(self) -> LanceClient {
423 self.client
424 }
425}
426
427impl std::fmt::Debug for Consumer {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 f.debug_struct("Consumer")
430 .field("topic_id", &self.config.topic_id)
431 .field("current_offset", &self.current_offset)
432 .field("max_fetch_bytes", &self.config.max_fetch_bytes)
433 .finish()
434 }
435}
436
437#[derive(Debug, Clone)]
439pub struct StreamingConsumerConfig {
440 pub topic_id: u32,
442 pub max_batch_bytes: u32,
444 pub start_position: SeekPosition,
446 pub consumer_group: Option<String>,
448 pub auto_commit_interval_ms: u64,
450}
451
452impl Default for StreamingConsumerConfig {
453 fn default() -> Self {
454 Self {
455 topic_id: 0,
456 max_batch_bytes: 64 * 1024,
457 start_position: SeekPosition::Beginning,
458 consumer_group: None,
459 auto_commit_interval_ms: 5000, }
461 }
462}
463
464impl StreamingConsumerConfig {
465 pub fn new(topic_id: u32) -> Self {
466 Self {
467 topic_id,
468 ..Default::default()
469 }
470 }
471
472 pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
473 self.max_batch_bytes = bytes;
474 self
475 }
476
477 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
478 self.start_position = position;
479 self
480 }
481
482 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
483 self.consumer_group = Some(group.into());
484 self
485 }
486
487 pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
488 self.auto_commit_interval_ms = interval_ms;
489 self
490 }
491}
492
493pub struct StreamingConsumer {
527 client: LanceClient,
528 config: StreamingConsumerConfig,
529 consumer_id: u64,
530 current_offset: u64,
531 committed_offset: u64,
532 is_subscribed: bool,
533 last_commit_time: std::time::Instant,
534}
535
536impl StreamingConsumer {
537 pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
539 let consumer_id = Self::generate_consumer_id();
540 let initial_offset = match config.start_position {
541 SeekPosition::Beginning => 0,
542 SeekPosition::Offset(offset) => offset,
543 SeekPosition::End => u64::MAX,
544 };
545
546 Self {
547 client,
548 config,
549 consumer_id,
550 current_offset: initial_offset,
551 committed_offset: 0,
552 is_subscribed: false,
553 last_commit_time: std::time::Instant::now(),
554 }
555 }
556
557 fn generate_consumer_id() -> u64 {
559 use std::time::{SystemTime, UNIX_EPOCH};
560 let timestamp = SystemTime::now()
561 .duration_since(UNIX_EPOCH)
562 .map(|d| d.as_nanos() as u64)
563 .unwrap_or(0);
564 let thread_id = std::thread::current().id();
566 timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
567 }
568
569 pub async fn start(&mut self) -> Result<()> {
574 if self.is_subscribed {
575 return Ok(());
576 }
577
578 let result = self
579 .client
580 .subscribe(
581 self.config.topic_id,
582 self.current_offset,
583 self.config.max_batch_bytes,
584 self.consumer_id,
585 )
586 .await?;
587
588 self.current_offset = result.start_offset;
589 self.is_subscribed = true;
590 self.last_commit_time = std::time::Instant::now();
591
592 Ok(())
593 }
594
595 pub async fn stop(&mut self) -> Result<()> {
600 if !self.is_subscribed {
601 return Ok(());
602 }
603
604 if self.current_offset > self.committed_offset {
606 let _ = self.commit().await;
607 }
608
609 self.client
610 .unsubscribe(self.config.topic_id, self.consumer_id)
611 .await?;
612 self.is_subscribed = false;
613
614 Ok(())
615 }
616
617 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
622 if !self.is_subscribed {
623 return Err(ClientError::ProtocolError(
624 "Consumer not subscribed - call start() first".to_string(),
625 ));
626 }
627
628 self.maybe_auto_commit().await?;
630
631 let fetch_result = self
632 .client
633 .fetch(
634 self.config.topic_id,
635 self.current_offset,
636 self.config.max_batch_bytes,
637 )
638 .await?;
639
640 let end_of_stream =
641 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
642
643 let result = PollResult {
644 data: fetch_result.data,
645 current_offset: fetch_result.next_offset,
646 record_count: fetch_result.record_count,
647 end_of_stream,
648 };
649
650 self.current_offset = fetch_result.next_offset;
652
653 if result.is_empty() {
654 Ok(None)
655 } else {
656 Ok(Some(result))
657 }
658 }
659
660 pub async fn commit(&mut self) -> Result<()> {
665 if self.current_offset <= self.committed_offset {
666 return Ok(());
667 }
668
669 let result = self
670 .client
671 .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
672 .await?;
673
674 self.committed_offset = result.committed_offset;
675 self.last_commit_time = std::time::Instant::now();
676
677 Ok(())
678 }
679
680 async fn maybe_auto_commit(&mut self) -> Result<()> {
682 if self.config.auto_commit_interval_ms == 0 {
683 return Ok(()); }
685
686 let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
687 if elapsed >= self.config.auto_commit_interval_ms {
688 self.commit().await?;
689 }
690
691 Ok(())
692 }
693
694 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
699 let was_subscribed = self.is_subscribed;
700
701 if was_subscribed {
702 self.stop().await?;
703 }
704
705 let new_offset = match position {
706 SeekPosition::Beginning => 0,
707 SeekPosition::Offset(offset) => offset,
708 SeekPosition::End => u64::MAX,
709 };
710
711 self.current_offset = new_offset;
712
713 if was_subscribed {
714 self.start().await?;
715 }
716
717 Ok(self.current_offset)
718 }
719
720 pub fn current_offset(&self) -> u64 {
722 self.current_offset
723 }
724
725 pub fn committed_offset(&self) -> u64 {
727 self.committed_offset
728 }
729
730 pub fn consumer_id(&self) -> u64 {
732 self.consumer_id
733 }
734
735 pub fn is_subscribed(&self) -> bool {
737 self.is_subscribed
738 }
739
740 pub fn client(&self) -> &LanceClient {
742 &self.client
743 }
744
745 pub async fn into_client(mut self) -> Result<LanceClient> {
747 self.stop().await?;
748 Ok(self.client)
749 }
750}
751
752impl std::fmt::Debug for StreamingConsumer {
753 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754 f.debug_struct("StreamingConsumer")
755 .field("topic_id", &self.config.topic_id)
756 .field("consumer_id", &self.consumer_id)
757 .field("current_offset", &self.current_offset)
758 .field("committed_offset", &self.committed_offset)
759 .field("is_subscribed", &self.is_subscribed)
760 .finish()
761 }
762}
763
764#[cfg(test)]
765#[allow(clippy::unwrap_used)]
766mod tests {
767 use super::*;
768
769 #[test]
770 fn test_consumer_config_default() {
771 let config = ConsumerConfig::default();
772 assert_eq!(config.topic_id, 0);
773 assert_eq!(config.max_fetch_bytes, 64 * 1024);
774 assert_eq!(config.start_position, SeekPosition::Beginning);
775 }
776
777 #[test]
778 fn test_consumer_config_builder() {
779 let config = ConsumerConfig::new(42)
780 .with_max_fetch_bytes(128 * 1024)
781 .with_start_position(SeekPosition::Offset(1000));
782
783 assert_eq!(config.topic_id, 42);
784 assert_eq!(config.max_fetch_bytes, 128 * 1024);
785 assert_eq!(config.start_position, SeekPosition::Offset(1000));
786 }
787
788 #[test]
789 fn test_poll_result_is_empty() {
790 let empty = PollResult {
791 data: bytes::Bytes::new(),
792 current_offset: 0,
793 record_count: 0,
794 end_of_stream: true,
795 };
796 assert!(empty.is_empty());
797
798 let non_empty = PollResult {
799 data: bytes::Bytes::from_static(&[1, 2, 3]),
800 current_offset: 3,
801 record_count: 1,
802 end_of_stream: false,
803 };
804 assert!(!non_empty.is_empty());
805 }
806
807 #[test]
812 fn test_streaming_consumer_config_default() {
813 let config = StreamingConsumerConfig::default();
814 assert_eq!(config.topic_id, 0);
815 assert_eq!(config.max_batch_bytes, 64 * 1024);
816 assert_eq!(config.start_position, SeekPosition::Beginning);
817 assert!(config.consumer_group.is_none());
818 assert_eq!(config.auto_commit_interval_ms, 5000);
819 }
820
821 #[test]
822 fn test_streaming_consumer_config_builder() {
823 let config = StreamingConsumerConfig::new(42)
824 .with_max_batch_bytes(128 * 1024)
825 .with_start_position(SeekPosition::Offset(5000))
826 .with_consumer_group("my-group")
827 .with_auto_commit_interval(10000);
828
829 assert_eq!(config.topic_id, 42);
830 assert_eq!(config.max_batch_bytes, 128 * 1024);
831 assert_eq!(config.start_position, SeekPosition::Offset(5000));
832 assert_eq!(config.consumer_group, Some("my-group".to_string()));
833 assert_eq!(config.auto_commit_interval_ms, 10000);
834 }
835
836 #[test]
837 fn test_streaming_consumer_config_disable_auto_commit() {
838 let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
839
840 assert_eq!(config.auto_commit_interval_ms, 0);
841 }
842
843 #[test]
844 fn test_streaming_consumer_config_seek_positions() {
845 let beginning =
846 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
847 assert_eq!(beginning.start_position, SeekPosition::Beginning);
848
849 let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
850 assert_eq!(end.start_position, SeekPosition::End);
851
852 let offset =
853 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
854 assert_eq!(offset.start_position, SeekPosition::Offset(12345));
855 }
856
857 #[test]
862 fn test_seek_position_equality() {
863 assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
864 assert_eq!(SeekPosition::End, SeekPosition::End);
865 assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
866
867 assert_ne!(SeekPosition::Beginning, SeekPosition::End);
868 assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
869 assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
870 }
871
872 #[test]
873 fn test_seek_position_clone() {
874 let pos = SeekPosition::Offset(42);
875 let cloned = pos;
876 assert_eq!(pos, cloned);
877 }
878}