1use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19 Beginning,
21 End,
23 Offset(u64),
25}
26
27#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30 pub topic_id: u32,
32 pub max_fetch_bytes: u32,
34 pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39 fn default() -> Self {
40 Self {
41 topic_id: 0,
42 max_fetch_bytes: 64 * 1024, start_position: SeekPosition::Beginning,
44 }
45 }
46}
47
48impl ConsumerConfig {
49 pub fn new(topic_id: u32) -> Self {
51 Self {
52 topic_id,
53 ..Default::default()
54 }
55 }
56
57 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59 self.max_fetch_bytes = bytes;
60 self
61 }
62
63 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65 self.start_position = position;
66 self
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct PollResult {
73 pub data: bytes::Bytes,
75 pub current_offset: u64,
77 pub record_count: u32,
79 pub end_of_stream: bool,
81}
82
83impl PollResult {
84 pub fn is_empty(&self) -> bool {
86 self.data.is_empty()
87 }
88}
89
90pub struct Consumer {
112 client: ReconnectingClient,
113 config: ConsumerConfig,
114 current_offset: u64,
115 cached_end_offset: Option<u64>,
117 offset_store: Option<Arc<dyn OffsetStore>>,
119 consumer_id: u64,
121}
122
123impl Consumer {
124 pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129 let rc = ReconnectingClient::connect(addr)
130 .await?
131 .with_unlimited_retries()
132 .with_base_delay(Duration::from_millis(500))
133 .with_max_delay(Duration::from_secs(30));
134
135 Ok(Self::from_reconnecting_client(rc, config, 0))
136 }
137
138 pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142 Self::with_consumer_id(client, addr, config, 0)
143 }
144
145 pub fn with_consumer_id(
147 client: LanceClient,
148 addr: &str,
149 config: ConsumerConfig,
150 consumer_id: u64,
151 ) -> Self {
152 let rc = ReconnectingClient::from_existing(client, addr);
153 Self::from_reconnecting_client(rc, config, consumer_id)
154 }
155
156 fn from_reconnecting_client(
158 client: ReconnectingClient,
159 config: ConsumerConfig,
160 consumer_id: u64,
161 ) -> Self {
162 let initial_offset = match config.start_position {
163 SeekPosition::Beginning => 0,
164 SeekPosition::Offset(offset) => offset,
165 SeekPosition::End => u64::MAX, };
167
168 Self {
169 client,
170 config,
171 current_offset: initial_offset,
172 cached_end_offset: None,
173 offset_store: None,
174 consumer_id,
175 }
176 }
177
178 pub fn with_offset_store(
196 client: LanceClient,
197 addr: &str,
198 config: ConsumerConfig,
199 consumer_id: u64,
200 offset_store: Arc<dyn OffsetStore>,
201 ) -> Result<Self> {
202 let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205 let initial_offset = if let Some(offset) = stored_offset {
206 offset
208 } else {
209 match config.start_position {
211 SeekPosition::Beginning => 0,
212 SeekPosition::Offset(offset) => offset,
213 SeekPosition::End => u64::MAX, }
215 };
216
217 let rc = ReconnectingClient::from_existing(client, addr);
218 Ok(Self {
219 client: rc,
220 config,
221 current_offset: initial_offset,
222 cached_end_offset: None,
223 offset_store: Some(offset_store),
224 consumer_id,
225 })
226 }
227
228 pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230 Self::new(client, addr, ConsumerConfig::new(topic_id))
231 }
232
233 pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235 let config =
236 ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237 Self::new(client, addr, config)
238 }
239
240 pub fn current_offset(&self) -> u64 {
242 self.current_offset
243 }
244
245 pub fn topic_id(&self) -> u32 {
247 self.config.topic_id
248 }
249
250 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271 match position {
272 SeekPosition::Beginning => {
273 self.current_offset = 0;
274 Ok(0)
275 },
276 SeekPosition::Offset(offset) => {
277 self.current_offset = offset;
278 Ok(offset)
279 },
280 SeekPosition::End => {
281 let end_offset = self.discover_end_offset().await?;
282 self.current_offset = end_offset;
283 Ok(end_offset)
284 },
285 }
286 }
287
288 pub async fn rewind(&mut self) -> Result<()> {
292 self.seek(SeekPosition::Beginning).await?;
293 Ok(())
294 }
295
296 pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300 self.seek(SeekPosition::Offset(offset)).await?;
301 Ok(())
302 }
303
304 pub async fn seek_to_end(&mut self) -> Result<u64> {
308 self.seek(SeekPosition::End).await
309 }
310
311 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
320 if self.current_offset == u64::MAX {
322 let end_offset = self.discover_end_offset().await?;
323 self.current_offset = end_offset;
324 }
325
326 let fetch_result = self.fetch_with_retry().await?;
327
328 let end_of_stream =
329 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331 let result = PollResult {
332 data: fetch_result.data,
333 current_offset: fetch_result.next_offset,
334 record_count: fetch_result.record_count,
335 end_of_stream,
336 };
337
338 self.current_offset = fetch_result.next_offset;
340 self.cached_end_offset = Some(fetch_result.next_offset);
341
342 if result.is_empty() {
343 Ok(None)
344 } else {
345 Ok(Some(result))
346 }
347 }
348
349 pub async fn poll_blocking(&mut self) -> Result<PollResult> {
354 if self.current_offset == u64::MAX {
356 let end_offset = self.discover_end_offset().await?;
357 self.current_offset = end_offset;
358 }
359
360 let fetch_result = self.fetch_with_retry().await?;
361
362 let end_of_stream =
363 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
364
365 let result = PollResult {
366 data: fetch_result.data,
367 current_offset: fetch_result.next_offset,
368 record_count: fetch_result.record_count,
369 end_of_stream,
370 };
371
372 if !result.is_empty() {
374 self.current_offset = fetch_result.next_offset;
375 }
376 self.cached_end_offset = Some(fetch_result.next_offset);
377
378 Ok(result)
379 }
380
381 async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
394 const MAX_RETRIES: u32 = 30;
395 const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
396 const STALE_ALERT_THRESHOLD: u32 = 3;
399 let mut attempt = 0u32;
400 let mut backoff = Duration::from_millis(500);
401 const MAX_BACKOFF: Duration = Duration::from_secs(30);
402
403 let mut last_server_offset: Option<u64> = None;
404 let mut stale_count: u32 = 0;
405
406 loop {
407 let result = match self.client.client().await {
408 Ok(c) => {
409 c.fetch(
410 self.config.topic_id,
411 self.current_offset,
412 self.config.max_fetch_bytes,
413 )
414 .await
415 },
416 Err(e) => Err(e),
417 };
418
419 match &result {
420 Ok(_) => return result,
421 Err(ClientError::ServerCatchingUp { server_offset }) => {
424 attempt += 1;
425
426 if last_server_offset == Some(*server_offset) {
428 stale_count += 1;
429 } else {
430 stale_count = 1;
431 last_server_offset = Some(*server_offset);
432 }
433
434 if stale_count == STALE_ALERT_THRESHOLD {
435 tracing::warn!(
436 topic_id = self.config.topic_id,
437 requested_offset = self.current_offset,
438 server_offset,
439 "Server offset stagnant while catching up; preserving consumer offset to avoid duplicate replay"
440 );
441
442 self.client.mark_failed();
445
446 return result;
449 }
450
451 if attempt >= MAX_RETRIES {
452 return result;
453 }
454 tracing::info!(
455 topic_id = self.config.topic_id,
456 requested_offset = self.current_offset,
457 server_offset,
458 attempt,
459 "Server catching up, backing off {}s",
460 CATCHING_UP_BACKOFF.as_secs()
461 );
462 tokio::time::sleep(CATCHING_UP_BACKOFF).await;
463 },
464 Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
465 attempt += 1;
466 self.client.mark_failed();
467 tokio::time::sleep(backoff).await;
468 backoff = (backoff * 2).min(MAX_BACKOFF);
469 },
470 _ => return result,
471 }
472 }
473 }
474
475 async fn discover_end_offset(&mut self) -> Result<u64> {
477 if let Some(end) = self.cached_end_offset {
479 return Ok(end);
480 }
481
482 let c = self.client.client().await?;
485 let fetch_result = c
486 .fetch(
487 self.config.topic_id,
488 u64::MAX,
489 1, )
491 .await?;
492
493 let end_offset = fetch_result.next_offset;
496 self.cached_end_offset = Some(end_offset);
497 Ok(end_offset)
498 }
499
500 pub async fn commit(&mut self) -> Result<()> {
512 if let Some(ref store) = self.offset_store {
513 store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
514 }
515 Ok(())
516 }
517
518 pub fn consumer_id(&self) -> u64 {
520 self.consumer_id
521 }
522
523 pub fn has_offset_store(&self) -> bool {
525 self.offset_store.is_some()
526 }
527
528 pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
530 &mut self.client
531 }
532}
533
534impl std::fmt::Debug for Consumer {
535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536 f.debug_struct("Consumer")
537 .field("topic_id", &self.config.topic_id)
538 .field("current_offset", &self.current_offset)
539 .field("max_fetch_bytes", &self.config.max_fetch_bytes)
540 .finish()
541 }
542}
543
544#[derive(Debug, Clone)]
546pub struct StreamingConsumerConfig {
547 pub topic_id: u32,
549 pub max_batch_bytes: u32,
551 pub start_position: SeekPosition,
553 pub consumer_group: Option<String>,
555 pub auto_commit_interval_ms: u64,
557}
558
559impl Default for StreamingConsumerConfig {
560 fn default() -> Self {
561 Self {
562 topic_id: 0,
563 max_batch_bytes: 64 * 1024,
564 start_position: SeekPosition::Beginning,
565 consumer_group: None,
566 auto_commit_interval_ms: 5000, }
568 }
569}
570
571impl StreamingConsumerConfig {
572 pub fn new(topic_id: u32) -> Self {
574 Self {
575 topic_id,
576 ..Default::default()
577 }
578 }
579
580 pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
582 self.max_batch_bytes = bytes;
583 self
584 }
585
586 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
588 self.start_position = position;
589 self
590 }
591
592 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
594 self.consumer_group = Some(group.into());
595 self
596 }
597
598 pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
600 self.auto_commit_interval_ms = interval_ms;
601 self
602 }
603}
604
605pub struct StreamingConsumer {
639 client: LanceClient,
640 config: StreamingConsumerConfig,
641 consumer_id: u64,
642 current_offset: u64,
643 committed_offset: u64,
644 is_subscribed: bool,
645 last_commit_time: std::time::Instant,
646}
647
648impl StreamingConsumer {
649 pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
651 let consumer_id = Self::generate_consumer_id();
652 let initial_offset = match config.start_position {
653 SeekPosition::Beginning => 0,
654 SeekPosition::Offset(offset) => offset,
655 SeekPosition::End => u64::MAX,
656 };
657
658 Self {
659 client,
660 config,
661 consumer_id,
662 current_offset: initial_offset,
663 committed_offset: 0,
664 is_subscribed: false,
665 last_commit_time: std::time::Instant::now(),
666 }
667 }
668
669 fn generate_consumer_id() -> u64 {
671 use std::time::{SystemTime, UNIX_EPOCH};
672 let timestamp = SystemTime::now()
673 .duration_since(UNIX_EPOCH)
674 .map(|d| d.as_nanos() as u64)
675 .unwrap_or(0);
676 let thread_id = std::thread::current().id();
678 timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
679 }
680
681 pub async fn start(&mut self) -> Result<()> {
686 if self.is_subscribed {
687 return Ok(());
688 }
689
690 let result = self
691 .client
692 .subscribe(
693 self.config.topic_id,
694 self.current_offset,
695 self.config.max_batch_bytes,
696 self.consumer_id,
697 )
698 .await?;
699
700 self.current_offset = result.start_offset;
701 self.is_subscribed = true;
702 self.last_commit_time = std::time::Instant::now();
703
704 Ok(())
705 }
706
707 pub async fn stop(&mut self) -> Result<()> {
712 if !self.is_subscribed {
713 return Ok(());
714 }
715
716 if self.current_offset > self.committed_offset {
718 let _ = self.commit().await;
719 }
720
721 self.client
722 .unsubscribe(self.config.topic_id, self.consumer_id)
723 .await?;
724 self.is_subscribed = false;
725
726 Ok(())
727 }
728
729 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
734 if !self.is_subscribed {
735 return Err(ClientError::ProtocolError(
736 "Consumer not subscribed - call start() first".to_string(),
737 ));
738 }
739
740 self.maybe_auto_commit().await?;
742
743 let fetch_result = self
744 .client
745 .fetch(
746 self.config.topic_id,
747 self.current_offset,
748 self.config.max_batch_bytes,
749 )
750 .await?;
751
752 let end_of_stream =
753 fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
754
755 let result = PollResult {
756 data: fetch_result.data,
757 current_offset: fetch_result.next_offset,
758 record_count: fetch_result.record_count,
759 end_of_stream,
760 };
761
762 self.current_offset = fetch_result.next_offset;
764
765 if result.is_empty() {
766 Ok(None)
767 } else {
768 Ok(Some(result))
769 }
770 }
771
772 pub async fn commit(&mut self) -> Result<()> {
777 if self.current_offset <= self.committed_offset {
778 return Ok(());
779 }
780
781 let result = self
782 .client
783 .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
784 .await?;
785
786 self.committed_offset = result.committed_offset;
787 self.last_commit_time = std::time::Instant::now();
788
789 Ok(())
790 }
791
792 async fn maybe_auto_commit(&mut self) -> Result<()> {
794 if self.config.auto_commit_interval_ms == 0 {
795 return Ok(()); }
797
798 let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
799 if elapsed >= self.config.auto_commit_interval_ms {
800 self.commit().await?;
801 }
802
803 Ok(())
804 }
805
806 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
811 let was_subscribed = self.is_subscribed;
812
813 if was_subscribed {
814 self.stop().await?;
815 }
816
817 let new_offset = match position {
818 SeekPosition::Beginning => 0,
819 SeekPosition::Offset(offset) => offset,
820 SeekPosition::End => u64::MAX,
821 };
822
823 self.current_offset = new_offset;
824
825 if was_subscribed {
826 self.start().await?;
827 }
828
829 Ok(self.current_offset)
830 }
831
832 pub fn current_offset(&self) -> u64 {
834 self.current_offset
835 }
836
837 pub fn committed_offset(&self) -> u64 {
839 self.committed_offset
840 }
841
842 pub fn consumer_id(&self) -> u64 {
844 self.consumer_id
845 }
846
847 pub fn is_subscribed(&self) -> bool {
849 self.is_subscribed
850 }
851
852 pub fn client(&self) -> &LanceClient {
854 &self.client
855 }
856
857 pub async fn into_client(mut self) -> Result<LanceClient> {
859 self.stop().await?;
860 Ok(self.client)
861 }
862}
863
864impl std::fmt::Debug for StreamingConsumer {
865 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
866 f.debug_struct("StreamingConsumer")
867 .field("topic_id", &self.config.topic_id)
868 .field("consumer_id", &self.consumer_id)
869 .field("current_offset", &self.current_offset)
870 .field("committed_offset", &self.committed_offset)
871 .field("is_subscribed", &self.is_subscribed)
872 .finish()
873 }
874}
875
876#[cfg(test)]
877#[allow(clippy::unwrap_used)]
878mod tests {
879 use super::*;
880
881 #[test]
882 fn test_consumer_config_default() {
883 let config = ConsumerConfig::default();
884 assert_eq!(config.topic_id, 0);
885 assert_eq!(config.max_fetch_bytes, 64 * 1024);
886 assert_eq!(config.start_position, SeekPosition::Beginning);
887 }
888
889 #[test]
890 fn test_consumer_config_builder() {
891 let config = ConsumerConfig::new(42)
892 .with_max_fetch_bytes(128 * 1024)
893 .with_start_position(SeekPosition::Offset(1000));
894
895 assert_eq!(config.topic_id, 42);
896 assert_eq!(config.max_fetch_bytes, 128 * 1024);
897 assert_eq!(config.start_position, SeekPosition::Offset(1000));
898 }
899
900 #[test]
901 fn test_poll_result_is_empty() {
902 let empty = PollResult {
903 data: bytes::Bytes::new(),
904 current_offset: 0,
905 record_count: 0,
906 end_of_stream: true,
907 };
908 assert!(empty.is_empty());
909
910 let non_empty = PollResult {
911 data: bytes::Bytes::from_static(&[1, 2, 3]),
912 current_offset: 3,
913 record_count: 1,
914 end_of_stream: false,
915 };
916 assert!(!non_empty.is_empty());
917 }
918
919 #[test]
924 fn test_streaming_consumer_config_default() {
925 let config = StreamingConsumerConfig::default();
926 assert_eq!(config.topic_id, 0);
927 assert_eq!(config.max_batch_bytes, 64 * 1024);
928 assert_eq!(config.start_position, SeekPosition::Beginning);
929 assert!(config.consumer_group.is_none());
930 assert_eq!(config.auto_commit_interval_ms, 5000);
931 }
932
933 #[test]
934 fn test_streaming_consumer_config_builder() {
935 let config = StreamingConsumerConfig::new(42)
936 .with_max_batch_bytes(128 * 1024)
937 .with_start_position(SeekPosition::Offset(5000))
938 .with_consumer_group("my-group")
939 .with_auto_commit_interval(10000);
940
941 assert_eq!(config.topic_id, 42);
942 assert_eq!(config.max_batch_bytes, 128 * 1024);
943 assert_eq!(config.start_position, SeekPosition::Offset(5000));
944 assert_eq!(config.consumer_group, Some("my-group".to_string()));
945 assert_eq!(config.auto_commit_interval_ms, 10000);
946 }
947
948 #[test]
949 fn test_streaming_consumer_config_disable_auto_commit() {
950 let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
951
952 assert_eq!(config.auto_commit_interval_ms, 0);
953 }
954
955 #[test]
956 fn test_streaming_consumer_config_seek_positions() {
957 let beginning =
958 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
959 assert_eq!(beginning.start_position, SeekPosition::Beginning);
960
961 let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
962 assert_eq!(end.start_position, SeekPosition::End);
963
964 let offset =
965 StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
966 assert_eq!(offset.start_position, SeekPosition::Offset(12345));
967 }
968
969 #[test]
974 fn test_seek_position_equality() {
975 assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
976 assert_eq!(SeekPosition::End, SeekPosition::End);
977 assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
978
979 assert_ne!(SeekPosition::Beginning, SeekPosition::End);
980 assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
981 assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
982 }
983
984 #[test]
985 fn test_seek_position_clone() {
986 let pos = SeekPosition::Offset(42);
987 let cloned = pos;
988 assert_eq!(pos, cloned);
989 }
990}