1use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19 Beginning,
21 End,
23 Offset(u64),
25}
26
27#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30 pub topic_id: u32,
32 pub max_fetch_bytes: u32,
34 pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39 fn default() -> Self {
40 Self {
41 topic_id: 0,
42 max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
44 }
45 }
46}
47
48impl ConsumerConfig {
49 pub fn new(topic_id: u32) -> Self {
51 Self {
52 topic_id,
53 ..Default::default()
54 }
55 }
56
57 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59 self.max_fetch_bytes = bytes;
60 self
61 }
62
63 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65 self.start_position = position;
66 self
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct PollResult {
73 pub data: bytes::Bytes,
75 pub current_offset: u64,
77 pub record_count: u32,
79 pub end_of_stream: bool,
81}
82
83impl PollResult {
84 pub fn is_empty(&self) -> bool {
86 self.data.is_empty()
87 }
88}
89
90pub struct Consumer {
112 client: ReconnectingClient,
113 config: ConsumerConfig,
114 current_offset: u64,
115 cached_end_offset: Option<u64>,
117 offset_store: Option<Arc<dyn OffsetStore>>,
119 consumer_id: u64,
121}
122
123impl Consumer {
124 pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129 let rc = ReconnectingClient::connect(addr)
130 .await?
131 .with_unlimited_retries()
132 .with_base_delay(Duration::from_millis(500))
133 .with_max_delay(Duration::from_secs(30));
134
135 Ok(Self::from_reconnecting_client(rc, config, 0))
136 }
137
138 pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142 Self::with_consumer_id(client, addr, config, 0)
143 }
144
145 pub fn with_consumer_id(
147 client: LanceClient,
148 addr: &str,
149 config: ConsumerConfig,
150 consumer_id: u64,
151 ) -> Self {
152 let rc = ReconnectingClient::from_existing(client, addr);
153 Self::from_reconnecting_client(rc, config, consumer_id)
154 }
155
156 fn from_reconnecting_client(
158 client: ReconnectingClient,
159 config: ConsumerConfig,
160 consumer_id: u64,
161 ) -> Self {
162 let initial_offset = match config.start_position {
163 SeekPosition::Beginning => 0,
164 SeekPosition::Offset(offset) => offset,
165 SeekPosition::End => u64::MAX, };
167
168 Self {
169 client,
170 config,
171 current_offset: initial_offset,
172 cached_end_offset: None,
173 offset_store: None,
174 consumer_id,
175 }
176 }
177
178 pub fn with_offset_store(
196 client: LanceClient,
197 addr: &str,
198 config: ConsumerConfig,
199 consumer_id: u64,
200 offset_store: Arc<dyn OffsetStore>,
201 ) -> Result<Self> {
202 let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205 let initial_offset = if let Some(offset) = stored_offset {
206 offset
208 } else {
209 match config.start_position {
211 SeekPosition::Beginning => 0,
212 SeekPosition::Offset(offset) => offset,
213 SeekPosition::End => u64::MAX, }
215 };
216
217 let rc = ReconnectingClient::from_existing(client, addr);
218 Ok(Self {
219 client: rc,
220 config,
221 current_offset: initial_offset,
222 cached_end_offset: None,
223 offset_store: Some(offset_store),
224 consumer_id,
225 })
226 }
227
228 pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230 Self::new(client, addr, ConsumerConfig::new(topic_id))
231 }
232
233 pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235 let config =
236 ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237 Self::new(client, addr, config)
238 }
239
240 pub fn current_offset(&self) -> u64 {
242 self.current_offset
243 }
244
245 pub fn topic_id(&self) -> u32 {
247 self.config.topic_id
248 }
249
250 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271 match position {
272 SeekPosition::Beginning => {
273 self.current_offset = 0;
274 Ok(0)
275 },
276 SeekPosition::Offset(offset) => {
277 self.current_offset = offset;
278 Ok(offset)
279 },
280 SeekPosition::End => {
281 let end_offset = self.discover_end_offset().await?;
282 self.current_offset = end_offset;
283 Ok(end_offset)
284 },
285 }
286 }
287
288 pub async fn rewind(&mut self) -> Result<()> {
292 self.seek(SeekPosition::Beginning).await?;
293 Ok(())
294 }
295
296 pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300 self.seek(SeekPosition::Offset(offset)).await?;
301 Ok(())
302 }
303
304 pub async fn seek_to_end(&mut self) -> Result<u64> {
308 self.seek(SeekPosition::End).await
309 }
310
311 pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
320 if self.current_offset == u64::MAX {
322 let end_offset = self.discover_end_offset().await?;
323 self.current_offset = end_offset;
324 }
325
326 let fetch_result = self.fetch_with_retry().await?;
327
328 let end_of_stream =
329 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331 let result = PollResult {
332 data: fetch_result.data,
333 current_offset: fetch_result.next_offset,
334 record_count: fetch_result.record_count,
335 end_of_stream,
336 };
337
338 if !result.is_empty() {
344 self.current_offset = fetch_result.next_offset;
345 }
346 self.cached_end_offset = Some(fetch_result.next_offset);
347
348 if result.is_empty() {
349 Ok(None)
350 } else {
351 Ok(Some(result))
352 }
353 }
354
355 #[inline]
357 pub async fn consume(&mut self) -> Result<Option<PollResult>> {
358 self.next_batch().await
359 }
360
361 #[inline]
363 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
364 self.next_batch().await
365 }
366
367 pub async fn poll_blocking(&mut self) -> Result<PollResult> {
372 if self.current_offset == u64::MAX {
374 let end_offset = self.discover_end_offset().await?;
375 self.current_offset = end_offset;
376 }
377
378 let fetch_result = self.fetch_with_retry().await?;
379
380 let end_of_stream =
381 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
382
383 let result = PollResult {
384 data: fetch_result.data,
385 current_offset: fetch_result.next_offset,
386 record_count: fetch_result.record_count,
387 end_of_stream,
388 };
389
390 if !result.is_empty() {
392 self.current_offset = fetch_result.next_offset;
393 }
394 self.cached_end_offset = Some(fetch_result.next_offset);
395
396 Ok(result)
397 }
398
399 async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
412 const MAX_RETRIES: u32 = 30;
413 const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
414 const STALE_ALERT_THRESHOLD: u32 = 3;
417 let mut attempt = 0u32;
418 let mut backoff = Duration::from_millis(500);
419 const MAX_BACKOFF: Duration = Duration::from_secs(30);
420
421 let mut last_server_offset: Option<u64> = None;
422 let mut stale_count: u32 = 0;
423
424 loop {
425 let result = match self.client.client().await {
426 Ok(c) => {
427 c.fetch(
428 self.config.topic_id,
429 self.current_offset,
430 self.config.max_fetch_bytes,
431 )
432 .await
433 },
434 Err(e) => Err(e),
435 };
436
437 match &result {
438 Ok(_) => return result,
439 Err(ClientError::ServerCatchingUp { server_offset }) => {
442 attempt += 1;
443
444 if last_server_offset == Some(*server_offset) {
446 stale_count += 1;
447 } else {
448 stale_count = 1;
449 last_server_offset = Some(*server_offset);
450 }
451
452 if stale_count == STALE_ALERT_THRESHOLD {
453 tracing::warn!(
454 topic_id = self.config.topic_id,
455 requested_offset = self.current_offset,
456 server_offset,
457 "Server offset stagnant while catching up; preserving consumer offset to avoid duplicate replay"
458 );
459
460 self.client.mark_failed();
463
464 return result;
467 }
468
469 if attempt >= MAX_RETRIES {
470 return result;
471 }
472 tracing::info!(
473 topic_id = self.config.topic_id,
474 requested_offset = self.current_offset,
475 server_offset,
476 attempt,
477 "Server catching up, backing off {}s",
478 CATCHING_UP_BACKOFF.as_secs()
479 );
480 tokio::time::sleep(CATCHING_UP_BACKOFF).await;
481 },
482 Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
483 attempt += 1;
484 self.client.mark_failed();
485 tokio::time::sleep(backoff).await;
486 backoff = (backoff * 2).min(MAX_BACKOFF);
487 },
488 _ => return result,
489 }
490 }
491 }
492
493 async fn discover_end_offset(&mut self) -> Result<u64> {
495 if let Some(end) = self.cached_end_offset {
497 return Ok(end);
498 }
499
500 let c = self.client.client().await?;
503 let fetch_result = c
504 .fetch(
505 self.config.topic_id,
506 u64::MAX,
507 1, )
509 .await?;
510
511 let end_offset = fetch_result.next_offset;
514 self.cached_end_offset = Some(end_offset);
515 Ok(end_offset)
516 }
517
518 pub async fn commit(&mut self) -> Result<()> {
530 if let Some(ref store) = self.offset_store {
531 store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
532 }
533 Ok(())
534 }
535
536 pub fn consumer_id(&self) -> u64 {
538 self.consumer_id
539 }
540
541 pub fn has_offset_store(&self) -> bool {
543 self.offset_store.is_some()
544 }
545
546 pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
548 &mut self.client
549 }
550}
551
552impl std::fmt::Debug for Consumer {
553 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
554 f.debug_struct("Consumer")
555 .field("topic_id", &self.config.topic_id)
556 .field("current_offset", &self.current_offset)
557 .field("max_fetch_bytes", &self.config.max_fetch_bytes)
558 .finish()
559 }
560}
561
562#[derive(Debug, Clone)]
564pub struct StreamingConsumerConfig {
565 pub topic_id: u32,
567 pub max_batch_bytes: u32,
569 pub start_position: SeekPosition,
571 pub consumer_group: Option<String>,
573 pub auto_commit_interval_ms: u64,
575}
576
577impl Default for StreamingConsumerConfig {
578 fn default() -> Self {
579 Self {
580 topic_id: 0,
581 max_batch_bytes: 64 * 1024,
582 start_position: SeekPosition::Beginning,
583 consumer_group: None,
584 auto_commit_interval_ms: 5000, }
586 }
587}
588
589impl StreamingConsumerConfig {
590 pub fn new(topic_id: u32) -> Self {
592 Self {
593 topic_id,
594 ..Default::default()
595 }
596 }
597
598 pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
600 self.max_batch_bytes = bytes;
601 self
602 }
603
604 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
606 self.start_position = position;
607 self
608 }
609
610 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
612 self.consumer_group = Some(group.into());
613 self
614 }
615
616 pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
618 self.auto_commit_interval_ms = interval_ms;
619 self
620 }
621}
622
623pub struct StreamingConsumer {
657 client: LanceClient,
658 config: StreamingConsumerConfig,
659 consumer_id: u64,
660 current_offset: u64,
661 committed_offset: u64,
662 is_subscribed: bool,
663 last_commit_time: std::time::Instant,
664}
665
666impl StreamingConsumer {
667 pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
669 let consumer_id = Self::generate_consumer_id();
670 let initial_offset = match config.start_position {
671 SeekPosition::Beginning => 0,
672 SeekPosition::Offset(offset) => offset,
673 SeekPosition::End => u64::MAX,
674 };
675
676 Self {
677 client,
678 config,
679 consumer_id,
680 current_offset: initial_offset,
681 committed_offset: 0,
682 is_subscribed: false,
683 last_commit_time: std::time::Instant::now(),
684 }
685 }
686
687 fn generate_consumer_id() -> u64 {
689 use std::time::{SystemTime, UNIX_EPOCH};
690 let timestamp = SystemTime::now()
691 .duration_since(UNIX_EPOCH)
692 .map(|d| d.as_nanos() as u64)
693 .unwrap_or(0);
694 let thread_id = std::thread::current().id();
696 timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
697 }
698
699 pub async fn start(&mut self) -> Result<()> {
704 if self.is_subscribed {
705 return Ok(());
706 }
707
708 let result = self
709 .client
710 .subscribe(
711 self.config.topic_id,
712 self.current_offset,
713 self.config.max_batch_bytes,
714 self.consumer_id,
715 )
716 .await?;
717
718 self.current_offset = result.start_offset;
719 self.is_subscribed = true;
720 self.last_commit_time = std::time::Instant::now();
721
722 Ok(())
723 }
724
725 pub async fn stop(&mut self) -> Result<()> {
730 if !self.is_subscribed {
731 return Ok(());
732 }
733
734 if self.current_offset > self.committed_offset {
736 let _ = self.commit().await;
737 }
738
739 self.client
740 .unsubscribe(self.config.topic_id, self.consumer_id)
741 .await?;
742 self.is_subscribed = false;
743
744 Ok(())
745 }
746
747 pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
752 if !self.is_subscribed {
753 return Err(ClientError::ProtocolError(
754 "Consumer not subscribed - call start() first".to_string(),
755 ));
756 }
757
758 self.maybe_auto_commit().await?;
760
761 let fetch_result = self
762 .client
763 .fetch(
764 self.config.topic_id,
765 self.current_offset,
766 self.config.max_batch_bytes,
767 )
768 .await?;
769
770 let end_of_stream =
771 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
772
773 let result = PollResult {
774 data: fetch_result.data,
775 current_offset: fetch_result.next_offset,
776 record_count: fetch_result.record_count,
777 end_of_stream,
778 };
779
780 self.current_offset = fetch_result.next_offset;
782
783 if result.is_empty() {
784 Ok(None)
785 } else {
786 Ok(Some(result))
787 }
788 }
789
790 #[inline]
792 pub async fn consume(&mut self) -> Result<Option<PollResult>> {
793 self.next_batch().await
794 }
795
796 #[inline]
798 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
799 self.next_batch().await
800 }
801
802 pub async fn commit(&mut self) -> Result<()> {
807 if self.current_offset <= self.committed_offset {
808 return Ok(());
809 }
810
811 let result = self
812 .client
813 .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
814 .await?;
815
816 self.committed_offset = result.committed_offset;
817 self.last_commit_time = std::time::Instant::now();
818
819 Ok(())
820 }
821
822 async fn maybe_auto_commit(&mut self) -> Result<()> {
824 if self.config.auto_commit_interval_ms == 0 {
825 return Ok(()); }
827
828 let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
829 if elapsed >= self.config.auto_commit_interval_ms {
830 self.commit().await?;
831 }
832
833 Ok(())
834 }
835
836 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
841 let was_subscribed = self.is_subscribed;
842
843 if was_subscribed {
844 self.stop().await?;
845 }
846
847 let new_offset = match position {
848 SeekPosition::Beginning => 0,
849 SeekPosition::Offset(offset) => offset,
850 SeekPosition::End => u64::MAX,
851 };
852
853 self.current_offset = new_offset;
854
855 if was_subscribed {
856 self.start().await?;
857 }
858
859 Ok(self.current_offset)
860 }
861
862 pub fn current_offset(&self) -> u64 {
864 self.current_offset
865 }
866
867 pub fn committed_offset(&self) -> u64 {
869 self.committed_offset
870 }
871
872 pub fn consumer_id(&self) -> u64 {
874 self.consumer_id
875 }
876
877 pub fn is_subscribed(&self) -> bool {
879 self.is_subscribed
880 }
881
882 pub fn client(&self) -> &LanceClient {
884 &self.client
885 }
886
887 pub async fn into_client(mut self) -> Result<LanceClient> {
889 self.stop().await?;
890 Ok(self.client)
891 }
892}
893
894impl std::fmt::Debug for StreamingConsumer {
895 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896 f.debug_struct("StreamingConsumer")
897 .field("topic_id", &self.config.topic_id)
898 .field("consumer_id", &self.consumer_id)
899 .field("current_offset", &self.current_offset)
900 .field("committed_offset", &self.committed_offset)
901 .field("is_subscribed", &self.is_subscribed)
902 .finish()
903 }
904}
905
906#[cfg(test)]
907#[allow(clippy::unwrap_used)]
908mod tests {
909 use super::*;
910
911 #[test]
912 fn test_consumer_config_default() {
913 let config = ConsumerConfig::default();
914 assert_eq!(config.topic_id, 0);
915 assert_eq!(config.max_fetch_bytes, 64 * 1024);
916 assert_eq!(config.start_position, SeekPosition::Beginning);
917 }
918
919 #[test]
920 fn test_consumer_config_builder() {
921 let config = ConsumerConfig::new(42)
922 .with_max_fetch_bytes(128 * 1024)
923 .with_start_position(SeekPosition::Offset(1000));
924
925 assert_eq!(config.topic_id, 42);
926 assert_eq!(config.max_fetch_bytes, 128 * 1024);
927 assert_eq!(config.start_position, SeekPosition::Offset(1000));
928 }
929
930 #[test]
931 fn test_poll_result_is_empty() {
932 let empty = PollResult {
933 data: bytes::Bytes::new(),
934 current_offset: 0,
935 record_count: 0,
936 end_of_stream: true,
937 };
938 assert!(empty.is_empty());
939
940 let non_empty = PollResult {
941 data: bytes::Bytes::from_static(&[1, 2, 3]),
942 current_offset: 3,
943 record_count: 1,
944 end_of_stream: false,
945 };
946 assert!(!non_empty.is_empty());
947 }
948
949 #[test]
954 fn test_streaming_consumer_config_default() {
955 let config = StreamingConsumerConfig::default();
956 assert_eq!(config.topic_id, 0);
957 assert_eq!(config.max_batch_bytes, 64 * 1024);
958 assert_eq!(config.start_position, SeekPosition::Beginning);
959 assert!(config.consumer_group.is_none());
960 assert_eq!(config.auto_commit_interval_ms, 5000);
961 }
962
963 #[test]
964 fn test_streaming_consumer_config_builder() {
965 let config = StreamingConsumerConfig::new(42)
966 .with_max_batch_bytes(128 * 1024)
967 .with_start_position(SeekPosition::Offset(5000))
968 .with_consumer_group("my-group")
969 .with_auto_commit_interval(10000);
970
971 assert_eq!(config.topic_id, 42);
972 assert_eq!(config.max_batch_bytes, 128 * 1024);
973 assert_eq!(config.start_position, SeekPosition::Offset(5000));
974 assert_eq!(config.consumer_group, Some("my-group".to_string()));
975 assert_eq!(config.auto_commit_interval_ms, 10000);
976 }
977
978 #[test]
979 fn test_streaming_consumer_config_disable_auto_commit() {
980 let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
981
982 assert_eq!(config.auto_commit_interval_ms, 0);
983 }
984
985 #[test]
986 fn test_streaming_consumer_config_seek_positions() {
987 let beginning =
988 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
989 assert_eq!(beginning.start_position, SeekPosition::Beginning);
990
991 let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
992 assert_eq!(end.start_position, SeekPosition::End);
993
994 let offset =
995 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
996 assert_eq!(offset.start_position, SeekPosition::Offset(12345));
997 }
998
999 #[test]
1004 fn test_seek_position_equality() {
1005 assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
1006 assert_eq!(SeekPosition::End, SeekPosition::End);
1007 assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
1008
1009 assert_ne!(SeekPosition::Beginning, SeekPosition::End);
1010 assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
1011 assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
1012 }
1013
1014 #[test]
1015 fn test_seek_position_clone() {
1016 let pos = SeekPosition::Offset(42);
1017 let cloned = pos;
1018 assert_eq!(pos, cloned);
1019 }
1020}