1use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19 Beginning,
21 End,
23 Offset(u64),
25}
26
27#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30 pub topic_id: u32,
32 pub max_fetch_bytes: u32,
34 pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39 fn default() -> Self {
40 Self {
41 topic_id: 0,
42 max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
44 }
45 }
46}
47
48impl ConsumerConfig {
49 pub fn new(topic_id: u32) -> Self {
51 Self {
52 topic_id,
53 ..Default::default()
54 }
55 }
56
57 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59 self.max_fetch_bytes = bytes;
60 self
61 }
62
63 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65 self.start_position = position;
66 self
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct PollResult {
73 pub data: bytes::Bytes,
75 pub current_offset: u64,
77 pub record_count: u32,
79 pub end_of_stream: bool,
81}
82
83impl PollResult {
84 pub fn is_empty(&self) -> bool {
86 self.data.is_empty()
87 }
88}
89
90pub struct Consumer {
112 client: ReconnectingClient,
113 config: ConsumerConfig,
114 current_offset: u64,
115 cached_end_offset: Option<u64>,
117 offset_store: Option<Arc<dyn OffsetStore>>,
119 consumer_id: u64,
121}
122
123impl Consumer {
124 pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129 let rc = ReconnectingClient::connect(addr)
130 .await?
131 .with_unlimited_retries()
132 .with_base_delay(Duration::from_millis(500))
133 .with_max_delay(Duration::from_secs(30));
134
135 Ok(Self::from_reconnecting_client(rc, config, 0))
136 }
137
138 pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142 Self::with_consumer_id(client, addr, config, 0)
143 }
144
145 pub fn with_consumer_id(
147 client: LanceClient,
148 addr: &str,
149 config: ConsumerConfig,
150 consumer_id: u64,
151 ) -> Self {
152 let rc = ReconnectingClient::from_existing(client, addr);
153 Self::from_reconnecting_client(rc, config, consumer_id)
154 }
155
156 fn from_reconnecting_client(
158 client: ReconnectingClient,
159 config: ConsumerConfig,
160 consumer_id: u64,
161 ) -> Self {
162 let initial_offset = match config.start_position {
163 SeekPosition::Beginning => 0,
164 SeekPosition::Offset(offset) => offset,
165 SeekPosition::End => u64::MAX, };
167
168 Self {
169 client,
170 config,
171 current_offset: initial_offset,
172 cached_end_offset: None,
173 offset_store: None,
174 consumer_id,
175 }
176 }
177
178 pub fn with_offset_store(
196 client: LanceClient,
197 addr: &str,
198 config: ConsumerConfig,
199 consumer_id: u64,
200 offset_store: Arc<dyn OffsetStore>,
201 ) -> Result<Self> {
202 let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205 let initial_offset = if let Some(offset) = stored_offset {
206 offset
208 } else {
209 match config.start_position {
211 SeekPosition::Beginning => 0,
212 SeekPosition::Offset(offset) => offset,
213 SeekPosition::End => u64::MAX, }
215 };
216
217 let rc = ReconnectingClient::from_existing(client, addr);
218 Ok(Self {
219 client: rc,
220 config,
221 current_offset: initial_offset,
222 cached_end_offset: None,
223 offset_store: Some(offset_store),
224 consumer_id,
225 })
226 }
227
228 pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230 Self::new(client, addr, ConsumerConfig::new(topic_id))
231 }
232
233 pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235 let config =
236 ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237 Self::new(client, addr, config)
238 }
239
240 pub fn current_offset(&self) -> u64 {
242 self.current_offset
243 }
244
245 pub fn topic_id(&self) -> u32 {
247 self.config.topic_id
248 }
249
250 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271 match position {
272 SeekPosition::Beginning => {
273 self.current_offset = 0;
274 Ok(0)
275 },
276 SeekPosition::Offset(offset) => {
277 self.current_offset = offset;
278 Ok(offset)
279 },
280 SeekPosition::End => {
281 let end_offset = self.discover_end_offset().await?;
282 self.current_offset = end_offset;
283 Ok(end_offset)
284 },
285 }
286 }
287
288 pub async fn rewind(&mut self) -> Result<()> {
292 self.seek(SeekPosition::Beginning).await?;
293 Ok(())
294 }
295
296 pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300 self.seek(SeekPosition::Offset(offset)).await?;
301 Ok(())
302 }
303
304 pub async fn seek_to_end(&mut self) -> Result<u64> {
308 self.seek(SeekPosition::End).await
309 }
310
311 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
320 if self.current_offset == u64::MAX {
322 let end_offset = self.discover_end_offset().await?;
323 self.current_offset = end_offset;
324 }
325
326 let fetch_result = self.fetch_with_retry().await?;
327
328 let end_of_stream =
329 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331 let result = PollResult {
332 data: fetch_result.data,
333 current_offset: fetch_result.next_offset,
334 record_count: fetch_result.record_count,
335 end_of_stream,
336 };
337
338 self.current_offset = fetch_result.next_offset;
340 self.cached_end_offset = Some(fetch_result.next_offset);
341
342 if result.is_empty() {
343 Ok(None)
344 } else {
345 Ok(Some(result))
346 }
347 }
348
349 pub async fn poll_blocking(&mut self) -> Result<PollResult> {
354 if self.current_offset == u64::MAX {
356 let end_offset = self.discover_end_offset().await?;
357 self.current_offset = end_offset;
358 }
359
360 let fetch_result = self.fetch_with_retry().await?;
361
362 let end_of_stream =
363 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
364
365 let result = PollResult {
366 data: fetch_result.data,
367 current_offset: fetch_result.next_offset,
368 record_count: fetch_result.record_count,
369 end_of_stream,
370 };
371
372 if !result.is_empty() {
374 self.current_offset = fetch_result.next_offset;
375 }
376 self.cached_end_offset = Some(fetch_result.next_offset);
377
378 Ok(result)
379 }
380
381 async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
393 const MAX_RETRIES: u32 = 30;
394 const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
395 const STALE_RESET_THRESHOLD: u32 = 3;
398 let mut attempt = 0u32;
399 let mut backoff = Duration::from_millis(500);
400 const MAX_BACKOFF: Duration = Duration::from_secs(30);
401
402 let mut last_server_offset: Option<u64> = None;
403 let mut stale_count: u32 = 0;
404
405 loop {
406 let result = match self.client.client().await {
407 Ok(c) => {
408 c.fetch(
409 self.config.topic_id,
410 self.current_offset,
411 self.config.max_fetch_bytes,
412 )
413 .await
414 },
415 Err(e) => Err(e),
416 };
417
418 match &result {
419 Ok(_) => return result,
420 Err(ClientError::ServerCatchingUp { server_offset }) => {
423 attempt += 1;
424
425 if last_server_offset == Some(*server_offset) {
431 stale_count += 1;
432 } else {
433 stale_count = 1;
434 last_server_offset = Some(*server_offset);
435 }
436
437 if stale_count >= STALE_RESET_THRESHOLD {
438 tracing::warn!(
439 topic_id = self.config.topic_id,
440 old_offset = self.current_offset,
441 new_offset = *server_offset,
442 lost_bytes = self.current_offset.saturating_sub(*server_offset),
443 "Data permanently unreachable — resetting consumer offset to server position"
444 );
445 self.current_offset = *server_offset;
446 stale_count = 0;
448 last_server_offset = None;
449 attempt = 0;
450 continue;
451 }
452
453 if attempt >= MAX_RETRIES {
454 return result;
455 }
456 tracing::info!(
457 topic_id = self.config.topic_id,
458 requested_offset = self.current_offset,
459 server_offset,
460 attempt,
461 "Server catching up, backing off {}s",
462 CATCHING_UP_BACKOFF.as_secs()
463 );
464 tokio::time::sleep(CATCHING_UP_BACKOFF).await;
465 },
466 Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
467 attempt += 1;
468 self.client.mark_failed();
469 tokio::time::sleep(backoff).await;
470 backoff = (backoff * 2).min(MAX_BACKOFF);
471 },
472 _ => return result,
473 }
474 }
475 }
476
477 async fn discover_end_offset(&mut self) -> Result<u64> {
479 if let Some(end) = self.cached_end_offset {
481 return Ok(end);
482 }
483
484 let c = self.client.client().await?;
486 let fetch_result = c
487 .fetch(
488 self.config.topic_id,
489 0,
490 1, )
492 .await?;
493
494 let end_offset = fetch_result.next_offset;
497 self.cached_end_offset = Some(end_offset);
498 Ok(end_offset)
499 }
500
501 pub async fn commit(&mut self) -> Result<()> {
513 if let Some(ref store) = self.offset_store {
514 store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
515 }
516 Ok(())
517 }
518
519 pub fn consumer_id(&self) -> u64 {
521 self.consumer_id
522 }
523
524 pub fn has_offset_store(&self) -> bool {
526 self.offset_store.is_some()
527 }
528
529 pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
531 &mut self.client
532 }
533}
534
535impl std::fmt::Debug for Consumer {
536 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537 f.debug_struct("Consumer")
538 .field("topic_id", &self.config.topic_id)
539 .field("current_offset", &self.current_offset)
540 .field("max_fetch_bytes", &self.config.max_fetch_bytes)
541 .finish()
542 }
543}
544
545#[derive(Debug, Clone)]
547pub struct StreamingConsumerConfig {
548 pub topic_id: u32,
550 pub max_batch_bytes: u32,
552 pub start_position: SeekPosition,
554 pub consumer_group: Option<String>,
556 pub auto_commit_interval_ms: u64,
558}
559
560impl Default for StreamingConsumerConfig {
561 fn default() -> Self {
562 Self {
563 topic_id: 0,
564 max_batch_bytes: 64 * 1024,
565 start_position: SeekPosition::Beginning,
566 consumer_group: None,
567 auto_commit_interval_ms: 5000, }
569 }
570}
571
572impl StreamingConsumerConfig {
573 pub fn new(topic_id: u32) -> Self {
575 Self {
576 topic_id,
577 ..Default::default()
578 }
579 }
580
581 pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
583 self.max_batch_bytes = bytes;
584 self
585 }
586
587 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
589 self.start_position = position;
590 self
591 }
592
593 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
595 self.consumer_group = Some(group.into());
596 self
597 }
598
599 pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
601 self.auto_commit_interval_ms = interval_ms;
602 self
603 }
604}
605
606pub struct StreamingConsumer {
640 client: LanceClient,
641 config: StreamingConsumerConfig,
642 consumer_id: u64,
643 current_offset: u64,
644 committed_offset: u64,
645 is_subscribed: bool,
646 last_commit_time: std::time::Instant,
647}
648
649impl StreamingConsumer {
650 pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
652 let consumer_id = Self::generate_consumer_id();
653 let initial_offset = match config.start_position {
654 SeekPosition::Beginning => 0,
655 SeekPosition::Offset(offset) => offset,
656 SeekPosition::End => u64::MAX,
657 };
658
659 Self {
660 client,
661 config,
662 consumer_id,
663 current_offset: initial_offset,
664 committed_offset: 0,
665 is_subscribed: false,
666 last_commit_time: std::time::Instant::now(),
667 }
668 }
669
670 fn generate_consumer_id() -> u64 {
672 use std::time::{SystemTime, UNIX_EPOCH};
673 let timestamp = SystemTime::now()
674 .duration_since(UNIX_EPOCH)
675 .map(|d| d.as_nanos() as u64)
676 .unwrap_or(0);
677 let thread_id = std::thread::current().id();
679 timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
680 }
681
682 pub async fn start(&mut self) -> Result<()> {
687 if self.is_subscribed {
688 return Ok(());
689 }
690
691 let result = self
692 .client
693 .subscribe(
694 self.config.topic_id,
695 self.current_offset,
696 self.config.max_batch_bytes,
697 self.consumer_id,
698 )
699 .await?;
700
701 self.current_offset = result.start_offset;
702 self.is_subscribed = true;
703 self.last_commit_time = std::time::Instant::now();
704
705 Ok(())
706 }
707
708 pub async fn stop(&mut self) -> Result<()> {
713 if !self.is_subscribed {
714 return Ok(());
715 }
716
717 if self.current_offset > self.committed_offset {
719 let _ = self.commit().await;
720 }
721
722 self.client
723 .unsubscribe(self.config.topic_id, self.consumer_id)
724 .await?;
725 self.is_subscribed = false;
726
727 Ok(())
728 }
729
730 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
735 if !self.is_subscribed {
736 return Err(ClientError::ProtocolError(
737 "Consumer not subscribed - call start() first".to_string(),
738 ));
739 }
740
741 self.maybe_auto_commit().await?;
743
744 let fetch_result = self
745 .client
746 .fetch(
747 self.config.topic_id,
748 self.current_offset,
749 self.config.max_batch_bytes,
750 )
751 .await?;
752
753 let end_of_stream =
754 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
755
756 let result = PollResult {
757 data: fetch_result.data,
758 current_offset: fetch_result.next_offset,
759 record_count: fetch_result.record_count,
760 end_of_stream,
761 };
762
763 self.current_offset = fetch_result.next_offset;
765
766 if result.is_empty() {
767 Ok(None)
768 } else {
769 Ok(Some(result))
770 }
771 }
772
773 pub async fn commit(&mut self) -> Result<()> {
778 if self.current_offset <= self.committed_offset {
779 return Ok(());
780 }
781
782 let result = self
783 .client
784 .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
785 .await?;
786
787 self.committed_offset = result.committed_offset;
788 self.last_commit_time = std::time::Instant::now();
789
790 Ok(())
791 }
792
793 async fn maybe_auto_commit(&mut self) -> Result<()> {
795 if self.config.auto_commit_interval_ms == 0 {
796 return Ok(()); }
798
799 let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
800 if elapsed >= self.config.auto_commit_interval_ms {
801 self.commit().await?;
802 }
803
804 Ok(())
805 }
806
807 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
812 let was_subscribed = self.is_subscribed;
813
814 if was_subscribed {
815 self.stop().await?;
816 }
817
818 let new_offset = match position {
819 SeekPosition::Beginning => 0,
820 SeekPosition::Offset(offset) => offset,
821 SeekPosition::End => u64::MAX,
822 };
823
824 self.current_offset = new_offset;
825
826 if was_subscribed {
827 self.start().await?;
828 }
829
830 Ok(self.current_offset)
831 }
832
833 pub fn current_offset(&self) -> u64 {
835 self.current_offset
836 }
837
838 pub fn committed_offset(&self) -> u64 {
840 self.committed_offset
841 }
842
843 pub fn consumer_id(&self) -> u64 {
845 self.consumer_id
846 }
847
848 pub fn is_subscribed(&self) -> bool {
850 self.is_subscribed
851 }
852
853 pub fn client(&self) -> &LanceClient {
855 &self.client
856 }
857
858 pub async fn into_client(mut self) -> Result<LanceClient> {
860 self.stop().await?;
861 Ok(self.client)
862 }
863}
864
865impl std::fmt::Debug for StreamingConsumer {
866 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867 f.debug_struct("StreamingConsumer")
868 .field("topic_id", &self.config.topic_id)
869 .field("consumer_id", &self.consumer_id)
870 .field("current_offset", &self.current_offset)
871 .field("committed_offset", &self.committed_offset)
872 .field("is_subscribed", &self.is_subscribed)
873 .finish()
874 }
875}
876
877#[cfg(test)]
878#[allow(clippy::unwrap_used)]
879mod tests {
880 use super::*;
881
882 #[test]
883 fn test_consumer_config_default() {
884 let config = ConsumerConfig::default();
885 assert_eq!(config.topic_id, 0);
886 assert_eq!(config.max_fetch_bytes, 64 * 1024);
887 assert_eq!(config.start_position, SeekPosition::Beginning);
888 }
889
890 #[test]
891 fn test_consumer_config_builder() {
892 let config = ConsumerConfig::new(42)
893 .with_max_fetch_bytes(128 * 1024)
894 .with_start_position(SeekPosition::Offset(1000));
895
896 assert_eq!(config.topic_id, 42);
897 assert_eq!(config.max_fetch_bytes, 128 * 1024);
898 assert_eq!(config.start_position, SeekPosition::Offset(1000));
899 }
900
901 #[test]
902 fn test_poll_result_is_empty() {
903 let empty = PollResult {
904 data: bytes::Bytes::new(),
905 current_offset: 0,
906 record_count: 0,
907 end_of_stream: true,
908 };
909 assert!(empty.is_empty());
910
911 let non_empty = PollResult {
912 data: bytes::Bytes::from_static(&[1, 2, 3]),
913 current_offset: 3,
914 record_count: 1,
915 end_of_stream: false,
916 };
917 assert!(!non_empty.is_empty());
918 }
919
920 #[test]
925 fn test_streaming_consumer_config_default() {
926 let config = StreamingConsumerConfig::default();
927 assert_eq!(config.topic_id, 0);
928 assert_eq!(config.max_batch_bytes, 64 * 1024);
929 assert_eq!(config.start_position, SeekPosition::Beginning);
930 assert!(config.consumer_group.is_none());
931 assert_eq!(config.auto_commit_interval_ms, 5000);
932 }
933
934 #[test]
935 fn test_streaming_consumer_config_builder() {
936 let config = StreamingConsumerConfig::new(42)
937 .with_max_batch_bytes(128 * 1024)
938 .with_start_position(SeekPosition::Offset(5000))
939 .with_consumer_group("my-group")
940 .with_auto_commit_interval(10000);
941
942 assert_eq!(config.topic_id, 42);
943 assert_eq!(config.max_batch_bytes, 128 * 1024);
944 assert_eq!(config.start_position, SeekPosition::Offset(5000));
945 assert_eq!(config.consumer_group, Some("my-group".to_string()));
946 assert_eq!(config.auto_commit_interval_ms, 10000);
947 }
948
949 #[test]
950 fn test_streaming_consumer_config_disable_auto_commit() {
951 let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
952
953 assert_eq!(config.auto_commit_interval_ms, 0);
954 }
955
956 #[test]
957 fn test_streaming_consumer_config_seek_positions() {
958 let beginning =
959 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
960 assert_eq!(beginning.start_position, SeekPosition::Beginning);
961
962 let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
963 assert_eq!(end.start_position, SeekPosition::End);
964
965 let offset =
966 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
967 assert_eq!(offset.start_position, SeekPosition::Offset(12345));
968 }
969
970 #[test]
975 fn test_seek_position_equality() {
976 assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
977 assert_eq!(SeekPosition::End, SeekPosition::End);
978 assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
979
980 assert_ne!(SeekPosition::Beginning, SeekPosition::End);
981 assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
982 assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
983 }
984
985 #[test]
986 fn test_seek_position_clone() {
987 let pos = SeekPosition::Offset(42);
988 let cloned = pos;
989 assert_eq!(pos, cloned);
990 }
991}