Skip to main content

lnc_client/
consumer.rs

1//! Consumer abstraction for reading from LANCE streams
2//!
3//! Provides a high-level API for consuming records from topics with support for:
4//! - Offset tracking
5//! - Seek/rewind to specific offsets
6//! - Continuous polling
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16/// Position specifier for seeking within a stream
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19    /// Seek to the beginning of the stream (offset 0)
20    Beginning,
21    /// Seek to the end of the stream (latest data)
22    End,
23    /// Seek to a specific byte offset
24    Offset(u64),
25}
26
27/// Configuration for a consumer
28#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30    /// Topic ID to consume from
31    pub topic_id: u32,
32    /// Maximum bytes to fetch per poll (default: 64KB)
33    pub max_fetch_bytes: u32,
34    /// Starting position (default: Beginning)
35    pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39    fn default() -> Self {
40        Self {
41            topic_id: 0,
42            max_fetch_bytes: 64 * 1024, // 64KB
43            start_position: SeekPosition::Beginning,
44        }
45    }
46}
47
48impl ConsumerConfig {
49    /// Create a new consumer configuration for the specified topic
50    pub fn new(topic_id: u32) -> Self {
51        Self {
52            topic_id,
53            ..Default::default()
54        }
55    }
56
57    /// Set the maximum bytes to fetch per poll operation
58    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59        self.max_fetch_bytes = bytes;
60        self
61    }
62
63    /// Set the starting position for consumption
64    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65        self.start_position = position;
66        self
67    }
68}
69
70/// Result of a poll operation
71#[derive(Debug, Clone)]
72pub struct PollResult {
73    /// Raw data fetched from the stream (zero-copy Bytes)
74    pub data: bytes::Bytes,
75    /// Current offset after this fetch
76    pub current_offset: u64,
77    /// Number of records in this batch (estimate)
78    pub record_count: u32,
79    /// Whether the end of available data was reached
80    pub end_of_stream: bool,
81}
82
83impl PollResult {
84    /// Returns true if no data was returned
85    pub fn is_empty(&self) -> bool {
86        self.data.is_empty()
87    }
88}
89
90/// A consumer for reading records from a LANCE topic stream.
91///
92/// The consumer automatically reconnects on transient failures (connection
93/// drops, timeouts, server errors) with exponential backoff and DNS
94/// re-resolution. The current offset is preserved across reconnections
95/// so no data is lost.
96///
97/// # Example
98///
99/// ```text
100/// let config = ConsumerConfig::new(topic_id);
101/// let mut consumer = Consumer::connect("lance.example.com:1992", config).await?;
102///
103/// // Poll for records — auto-reconnects on failure
104/// while let Some(result) = consumer.poll().await? {
105///     process_data(&result.data);
106///     if result.end_of_stream {
107///         break;
108///     }
109/// }
110/// ```
111pub struct Consumer {
112    client: ReconnectingClient,
113    config: ConsumerConfig,
114    current_offset: u64,
115    /// Cached end offset for SeekPosition::End
116    cached_end_offset: Option<u64>,
117    /// Optional offset store for client-side offset persistence
118    offset_store: Option<Arc<dyn OffsetStore>>,
119    /// Consumer ID for offset store operations
120    consumer_id: u64,
121}
122
123impl Consumer {
124    /// Connect to a LANCE server and create a consumer with auto-reconnect.
125    ///
126    /// The address can be either an IP:port or hostname:port. DNS is
127    /// re-resolved on each reconnect for load-balanced endpoints.
128    pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129        let rc = ReconnectingClient::connect(addr)
130            .await?
131            .with_unlimited_retries()
132            .with_base_delay(Duration::from_millis(500))
133            .with_max_delay(Duration::from_secs(30));
134
135        Ok(Self::from_reconnecting_client(rc, config, 0))
136    }
137
138    /// Create a new consumer with the given client and configuration.
139    ///
140    /// The `addr` is stored for DNS re-resolution on reconnect.
141    pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142        Self::with_consumer_id(client, addr, config, 0)
143    }
144
145    /// Create a new consumer with a specific consumer ID
146    pub fn with_consumer_id(
147        client: LanceClient,
148        addr: &str,
149        config: ConsumerConfig,
150        consumer_id: u64,
151    ) -> Self {
152        let rc = ReconnectingClient::from_existing(client, addr);
153        Self::from_reconnecting_client(rc, config, consumer_id)
154    }
155
156    /// Internal constructor from a ReconnectingClient
157    fn from_reconnecting_client(
158        client: ReconnectingClient,
159        config: ConsumerConfig,
160        consumer_id: u64,
161    ) -> Self {
162        let initial_offset = match config.start_position {
163            SeekPosition::Beginning => 0,
164            SeekPosition::Offset(offset) => offset,
165            SeekPosition::End => u64::MAX, // Will be resolved on first poll
166        };
167
168        Self {
169            client,
170            config,
171            current_offset: initial_offset,
172            cached_end_offset: None,
173            offset_store: None,
174            consumer_id,
175        }
176    }
177
178    /// Create a consumer with an offset store for client-side offset persistence
179    ///
180    /// The consumer will automatically load its starting offset from the store
181    /// if one exists, otherwise it uses the configured start position.
182    ///
183    /// # Arguments
184    /// * `client` - The LANCE client connection
185    /// * `addr` - Server address for reconnection
186    /// * `config` - Consumer configuration
187    /// * `consumer_id` - Unique identifier for this consumer instance
188    /// * `offset_store` - The offset store for persistence
189    ///
190    /// # Example
191    /// ```ignore
192    /// let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
193    /// let consumer = Consumer::with_offset_store(client, "lance:1992", config, 12345, store)?;
194    /// ```
195    pub fn with_offset_store(
196        client: LanceClient,
197        addr: &str,
198        config: ConsumerConfig,
199        consumer_id: u64,
200        offset_store: Arc<dyn OffsetStore>,
201    ) -> Result<Self> {
202        // Try to load existing offset from store
203        let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205        let initial_offset = if let Some(offset) = stored_offset {
206            // Resume from stored offset
207            offset
208        } else {
209            // No stored offset, use configured start position
210            match config.start_position {
211                SeekPosition::Beginning => 0,
212                SeekPosition::Offset(offset) => offset,
213                SeekPosition::End => u64::MAX, // Will be resolved on first poll
214            }
215        };
216
217        let rc = ReconnectingClient::from_existing(client, addr);
218        Ok(Self {
219            client: rc,
220            config,
221            current_offset: initial_offset,
222            cached_end_offset: None,
223            offset_store: Some(offset_store),
224            consumer_id,
225        })
226    }
227
228    /// Create a consumer starting from the beginning of the stream
229    pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230        Self::new(client, addr, ConsumerConfig::new(topic_id))
231    }
232
233    /// Create a consumer starting from a specific offset
234    pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235        let config =
236            ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237        Self::new(client, addr, config)
238    }
239
240    /// Get the current offset position
241    pub fn current_offset(&self) -> u64 {
242        self.current_offset
243    }
244
245    /// Get the topic ID being consumed
246    pub fn topic_id(&self) -> u32 {
247        self.config.topic_id
248    }
249
250    /// Seek to a specific position in the stream
251    ///
252    /// This allows rewinding to replay historical data or fast-forwarding
253    /// to skip ahead.
254    ///
255    /// # Arguments
256    /// * `position` - The position to seek to
257    ///
258    /// # Examples
259    ///
260    /// ```text
261    /// // Rewind to the beginning
262    /// consumer.seek(SeekPosition::Beginning).await?;
263    ///
264    /// // Seek to a specific offset
265    /// consumer.seek(SeekPosition::Offset(1000)).await?;
266    ///
267    /// // Seek to the end (latest data)
268    /// consumer.seek(SeekPosition::End).await?;
269    /// ```
270    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271        match position {
272            SeekPosition::Beginning => {
273                self.current_offset = 0;
274                Ok(0)
275            },
276            SeekPosition::Offset(offset) => {
277                self.current_offset = offset;
278                Ok(offset)
279            },
280            SeekPosition::End => {
281                let end_offset = self.discover_end_offset().await?;
282                self.current_offset = end_offset;
283                Ok(end_offset)
284            },
285        }
286    }
287
288    /// Rewind to the beginning of the stream (offset 0)
289    ///
290    /// Convenience method equivalent to `seek(SeekPosition::Beginning)`
291    pub async fn rewind(&mut self) -> Result<()> {
292        self.seek(SeekPosition::Beginning).await?;
293        Ok(())
294    }
295
296    /// Seek to a specific byte offset in the stream
297    ///
298    /// Convenience method equivalent to `seek(SeekPosition::Offset(offset))`
299    pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300        self.seek(SeekPosition::Offset(offset)).await?;
301        Ok(())
302    }
303
304    /// Seek to the end of the stream (latest available data)
305    ///
306    /// Convenience method equivalent to `seek(SeekPosition::End)`
307    pub async fn seek_to_end(&mut self) -> Result<u64> {
308        self.seek(SeekPosition::End).await
309    }
310
311    /// Poll for the next batch of records
312    ///
313    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
314    /// there was no new data available (end of stream reached).
315    ///
316    /// The consumer's offset is automatically advanced after each successful
317    /// poll. On transient errors, the consumer auto-reconnects and retries
318    /// from the same offset.
319    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
320        // Handle SeekPosition::End that hasn't been resolved yet
321        if self.current_offset == u64::MAX {
322            let end_offset = self.discover_end_offset().await?;
323            self.current_offset = end_offset;
324        }
325
326        let fetch_result = self.fetch_with_retry().await?;
327
328        let end_of_stream =
329            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331        let result = PollResult {
332            data: fetch_result.data,
333            current_offset: fetch_result.next_offset,
334            record_count: fetch_result.record_count,
335            end_of_stream,
336        };
337
338        // Advance offset
339        self.current_offset = fetch_result.next_offset;
340        self.cached_end_offset = Some(fetch_result.next_offset);
341
342        if result.is_empty() {
343            Ok(None)
344        } else {
345            Ok(Some(result))
346        }
347    }
348
349    /// Poll for records, blocking until data is available or timeout
350    ///
351    /// Unlike `poll()`, this will return empty results instead of None,
352    /// allowing the caller to continue waiting.
353    pub async fn poll_blocking(&mut self) -> Result<PollResult> {
354        // Handle SeekPosition::End that hasn't been resolved yet
355        if self.current_offset == u64::MAX {
356            let end_offset = self.discover_end_offset().await?;
357            self.current_offset = end_offset;
358        }
359
360        let fetch_result = self.fetch_with_retry().await?;
361
362        let end_of_stream =
363            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
364
365        let result = PollResult {
366            data: fetch_result.data,
367            current_offset: fetch_result.next_offset,
368            record_count: fetch_result.record_count,
369            end_of_stream,
370        };
371
372        // Advance offset only if we got data
373        if !result.is_empty() {
374            self.current_offset = fetch_result.next_offset;
375        }
376        self.cached_end_offset = Some(fetch_result.next_offset);
377
378        Ok(result)
379    }
380
381    /// Fetch with automatic retry on transient errors.
382    /// Reconnects and retries from the same offset, preserving position.
383    ///
384    /// On `ServerCatchingUp`, backs off for 5 seconds without marking the
385    /// connection as failed — the server is healthy, just behind on
386    /// replication. This lets the follower catch up before the next attempt.
387    ///
388    /// If the server offset does not advance after several attempts the data
389    /// is considered permanently lost (e.g. segment truncation after crash
390    /// recovery).  In that case the consumer resets its offset to the
391    /// server's position so it can resume reading new data.
392    async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
393        const MAX_RETRIES: u32 = 30;
394        const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
395        /// After this many consecutive CATCHING_UP responses with an
396        /// unchanged server_offset we conclude the gap is permanent.
397        const STALE_RESET_THRESHOLD: u32 = 3;
398        let mut attempt = 0u32;
399        let mut backoff = Duration::from_millis(500);
400        const MAX_BACKOFF: Duration = Duration::from_secs(30);
401
402        let mut last_server_offset: Option<u64> = None;
403        let mut stale_count: u32 = 0;
404
405        loop {
406            let result = match self.client.client().await {
407                Ok(c) => {
408                    c.fetch(
409                        self.config.topic_id,
410                        self.current_offset,
411                        self.config.max_fetch_bytes,
412                    )
413                    .await
414                },
415                Err(e) => Err(e),
416            };
417
418            match &result {
419                Ok(_) => return result,
420                // Server is healthy but behind on replication — fixed 5s backoff,
421                // do NOT mark connection as failed (no reconnect needed).
422                Err(ClientError::ServerCatchingUp { server_offset }) => {
423                    attempt += 1;
424
425                    // Track whether the server is making progress.  If the
426                    // server_offset hasn't changed for STALE_RESET_THRESHOLD
427                    // consecutive attempts the data between server_offset and
428                    // our current_offset is permanently gone (crash-recovery
429                    // truncation).  Reset to the server's position to unblock.
430                    if last_server_offset == Some(*server_offset) {
431                        stale_count += 1;
432                    } else {
433                        stale_count = 1;
434                        last_server_offset = Some(*server_offset);
435                    }
436
437                    if stale_count >= STALE_RESET_THRESHOLD {
438                        tracing::warn!(
439                            topic_id = self.config.topic_id,
440                            old_offset = self.current_offset,
441                            new_offset = *server_offset,
442                            lost_bytes = self.current_offset.saturating_sub(*server_offset),
443                            "Data permanently unreachable — resetting consumer offset to server position"
444                        );
445                        self.current_offset = *server_offset;
446                        // Reset retry state so the next fetch starts clean
447                        stale_count = 0;
448                        last_server_offset = None;
449                        attempt = 0;
450                        continue;
451                    }
452
453                    if attempt >= MAX_RETRIES {
454                        return result;
455                    }
456                    tracing::info!(
457                        topic_id = self.config.topic_id,
458                        requested_offset = self.current_offset,
459                        server_offset,
460                        attempt,
461                        "Server catching up, backing off {}s",
462                        CATCHING_UP_BACKOFF.as_secs()
463                    );
464                    tokio::time::sleep(CATCHING_UP_BACKOFF).await;
465                },
466                Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
467                    attempt += 1;
468                    self.client.mark_failed();
469                    tokio::time::sleep(backoff).await;
470                    backoff = (backoff * 2).min(MAX_BACKOFF);
471                },
472                _ => return result,
473            }
474        }
475    }
476
477    /// Discover the current end offset by fetching from a very high offset
478    async fn discover_end_offset(&mut self) -> Result<u64> {
479        // Use cached value if available
480        if let Some(end) = self.cached_end_offset {
481            return Ok(end);
482        }
483
484        // Fetch from offset 0 to discover stream state
485        let c = self.client.client().await?;
486        let fetch_result = c
487            .fetch(
488                self.config.topic_id,
489                0,
490                1, // Minimal fetch just to get stream info
491            )
492            .await?;
493
494        // The next_offset tells us where data ends
495        // For empty streams, next_offset will be 0
496        let end_offset = fetch_result.next_offset;
497        self.cached_end_offset = Some(end_offset);
498        Ok(end_offset)
499    }
500
501    /// Commit the current offset to the offset store
502    ///
503    /// If an offset store is configured, this persists the current offset
504    /// so the consumer can resume from this position on restart.
505    ///
506    /// # Example
507    /// ```ignore
508    /// let records = consumer.poll().await?;
509    /// process(records);
510    /// consumer.commit().await?;  // Persist offset for crash recovery
511    /// ```
512    pub async fn commit(&mut self) -> Result<()> {
513        if let Some(ref store) = self.offset_store {
514            store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
515        }
516        Ok(())
517    }
518
519    /// Get the consumer ID
520    pub fn consumer_id(&self) -> u64 {
521        self.consumer_id
522    }
523
524    /// Check if this consumer has an offset store configured
525    pub fn has_offset_store(&self) -> bool {
526        self.offset_store.is_some()
527    }
528
529    /// Get mutable access to the underlying reconnecting client
530    pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
531        &mut self.client
532    }
533}
534
535impl std::fmt::Debug for Consumer {
536    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537        f.debug_struct("Consumer")
538            .field("topic_id", &self.config.topic_id)
539            .field("current_offset", &self.current_offset)
540            .field("max_fetch_bytes", &self.config.max_fetch_bytes)
541            .finish()
542    }
543}
544
545/// Configuration for a streaming consumer
546#[derive(Debug, Clone)]
547pub struct StreamingConsumerConfig {
548    /// Topic ID to consume from
549    pub topic_id: u32,
550    /// Maximum bytes per batch (default: 64KB)
551    pub max_batch_bytes: u32,
552    /// Starting position (default: Beginning)
553    pub start_position: SeekPosition,
554    /// Consumer group ID (for offset tracking)
555    pub consumer_group: Option<String>,
556    /// Auto-commit interval in milliseconds (0 = manual commit only)
557    pub auto_commit_interval_ms: u64,
558}
559
560impl Default for StreamingConsumerConfig {
561    fn default() -> Self {
562        Self {
563            topic_id: 0,
564            max_batch_bytes: 64 * 1024,
565            start_position: SeekPosition::Beginning,
566            consumer_group: None,
567            auto_commit_interval_ms: 5000, // 5 seconds
568        }
569    }
570}
571
572impl StreamingConsumerConfig {
573    /// Create a new streaming consumer configuration for the specified topic
574    pub fn new(topic_id: u32) -> Self {
575        Self {
576            topic_id,
577            ..Default::default()
578        }
579    }
580
581    /// Set the maximum bytes per batch
582    pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
583        self.max_batch_bytes = bytes;
584        self
585    }
586
587    /// Set the starting position for consumption
588    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
589        self.start_position = position;
590        self
591    }
592
593    /// Set the consumer group for coordinated consumption
594    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
595        self.consumer_group = Some(group.into());
596        self
597    }
598
599    /// Set the auto-commit interval in milliseconds (0 = disabled)
600    pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
601        self.auto_commit_interval_ms = interval_ms;
602        self
603    }
604}
605
606/// A streaming consumer that uses subscribe/unsubscribe signals
607///
608/// Unlike the poll-based `Consumer`, `StreamingConsumer` explicitly signals
609/// to the server when it starts and stops consuming, and reports position
610/// updates for consumer group tracking.
611///
612/// # Lifecycle
613///
614/// 1. **Subscribe** - Call `start()` to signal the server to begin streaming
615/// 2. **Consume** - Call `poll()` to receive batches of data
616/// 3. **Commit** - Call `commit()` to checkpoint progress (or use auto-commit)
617/// 4. **Unsubscribe** - Call `stop()` or drop the consumer to signal completion
618///
619/// # Example
620///
621/// ```text
622/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
623/// let config = StreamingConsumerConfig::new(topic_id)
624///     .with_consumer_group("my-group");
625/// let mut consumer = StreamingConsumer::new(client, config);
626///
627/// // Start streaming
628/// consumer.start().await?;
629///
630/// // Process records
631/// while let Some(result) = consumer.poll().await? {
632///     process_data(&result.data);
633///     consumer.commit().await?; // Checkpoint progress
634/// }
635///
636/// // Stop streaming
637/// consumer.stop().await?;
638/// ```
639pub struct StreamingConsumer {
640    client: LanceClient,
641    config: StreamingConsumerConfig,
642    consumer_id: u64,
643    current_offset: u64,
644    committed_offset: u64,
645    is_subscribed: bool,
646    last_commit_time: std::time::Instant,
647}
648
649impl StreamingConsumer {
650    /// Create a new streaming consumer
651    pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
652        let consumer_id = Self::generate_consumer_id();
653        let initial_offset = match config.start_position {
654            SeekPosition::Beginning => 0,
655            SeekPosition::Offset(offset) => offset,
656            SeekPosition::End => u64::MAX,
657        };
658
659        Self {
660            client,
661            config,
662            consumer_id,
663            current_offset: initial_offset,
664            committed_offset: 0,
665            is_subscribed: false,
666            last_commit_time: std::time::Instant::now(),
667        }
668    }
669
670    /// Generate a unique consumer ID
671    fn generate_consumer_id() -> u64 {
672        use std::time::{SystemTime, UNIX_EPOCH};
673        let timestamp = SystemTime::now()
674            .duration_since(UNIX_EPOCH)
675            .map(|d| d.as_nanos() as u64)
676            .unwrap_or(0);
677        // Mix in thread ID for uniqueness
678        let thread_id = std::thread::current().id();
679        timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
680    }
681
682    /// Start streaming from the topic
683    ///
684    /// Sends a Subscribe signal to the server indicating this consumer
685    /// wants to start receiving data from the configured position.
686    pub async fn start(&mut self) -> Result<()> {
687        if self.is_subscribed {
688            return Ok(());
689        }
690
691        let result = self
692            .client
693            .subscribe(
694                self.config.topic_id,
695                self.current_offset,
696                self.config.max_batch_bytes,
697                self.consumer_id,
698            )
699            .await?;
700
701        self.current_offset = result.start_offset;
702        self.is_subscribed = true;
703        self.last_commit_time = std::time::Instant::now();
704
705        Ok(())
706    }
707
708    /// Stop streaming from the topic
709    ///
710    /// Sends an Unsubscribe signal to the server. This should be called
711    /// when the consumer is done processing to free server resources.
712    pub async fn stop(&mut self) -> Result<()> {
713        if !self.is_subscribed {
714            return Ok(());
715        }
716
717        // Commit any uncommitted offset before stopping
718        if self.current_offset > self.committed_offset {
719            let _ = self.commit().await;
720        }
721
722        self.client
723            .unsubscribe(self.config.topic_id, self.consumer_id)
724            .await?;
725        self.is_subscribed = false;
726
727        Ok(())
728    }
729
730    /// Poll for the next batch of records
731    ///
732    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
733    /// no new data is available.
734    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
735        if !self.is_subscribed {
736            return Err(ClientError::ProtocolError(
737                "Consumer not subscribed - call start() first".to_string(),
738            ));
739        }
740
741        // Check if auto-commit is due
742        self.maybe_auto_commit().await?;
743
744        let fetch_result = self
745            .client
746            .fetch(
747                self.config.topic_id,
748                self.current_offset,
749                self.config.max_batch_bytes,
750            )
751            .await?;
752
753        let end_of_stream =
754            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
755
756        let result = PollResult {
757            data: fetch_result.data,
758            current_offset: fetch_result.next_offset,
759            record_count: fetch_result.record_count,
760            end_of_stream,
761        };
762
763        // Advance offset
764        self.current_offset = fetch_result.next_offset;
765
766        if result.is_empty() {
767            Ok(None)
768        } else {
769            Ok(Some(result))
770        }
771    }
772
773    /// Commit the current offset to the server
774    ///
775    /// This checkpoints the consumer's position so that if it restarts,
776    /// it can resume from this point.
777    pub async fn commit(&mut self) -> Result<()> {
778        if self.current_offset <= self.committed_offset {
779            return Ok(());
780        }
781
782        let result = self
783            .client
784            .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
785            .await?;
786
787        self.committed_offset = result.committed_offset;
788        self.last_commit_time = std::time::Instant::now();
789
790        Ok(())
791    }
792
793    /// Auto-commit if interval has elapsed
794    async fn maybe_auto_commit(&mut self) -> Result<()> {
795        if self.config.auto_commit_interval_ms == 0 {
796            return Ok(()); // Auto-commit disabled
797        }
798
799        let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
800        if elapsed >= self.config.auto_commit_interval_ms {
801            self.commit().await?;
802        }
803
804        Ok(())
805    }
806
807    /// Seek to a new position
808    ///
809    /// Note: This will commit the current offset and restart the subscription
810    /// at the new position.
811    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
812        let was_subscribed = self.is_subscribed;
813
814        if was_subscribed {
815            self.stop().await?;
816        }
817
818        let new_offset = match position {
819            SeekPosition::Beginning => 0,
820            SeekPosition::Offset(offset) => offset,
821            SeekPosition::End => u64::MAX,
822        };
823
824        self.current_offset = new_offset;
825
826        if was_subscribed {
827            self.start().await?;
828        }
829
830        Ok(self.current_offset)
831    }
832
833    /// Get the current offset
834    pub fn current_offset(&self) -> u64 {
835        self.current_offset
836    }
837
838    /// Get the last committed offset
839    pub fn committed_offset(&self) -> u64 {
840        self.committed_offset
841    }
842
843    /// Get the consumer ID
844    pub fn consumer_id(&self) -> u64 {
845        self.consumer_id
846    }
847
848    /// Check if currently subscribed
849    pub fn is_subscribed(&self) -> bool {
850        self.is_subscribed
851    }
852
853    /// Get access to the underlying client
854    pub fn client(&self) -> &LanceClient {
855        &self.client
856    }
857
858    /// Consume this consumer and return the underlying client
859    pub async fn into_client(mut self) -> Result<LanceClient> {
860        self.stop().await?;
861        Ok(self.client)
862    }
863}
864
865impl std::fmt::Debug for StreamingConsumer {
866    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867        f.debug_struct("StreamingConsumer")
868            .field("topic_id", &self.config.topic_id)
869            .field("consumer_id", &self.consumer_id)
870            .field("current_offset", &self.current_offset)
871            .field("committed_offset", &self.committed_offset)
872            .field("is_subscribed", &self.is_subscribed)
873            .finish()
874    }
875}
876
877#[cfg(test)]
878#[allow(clippy::unwrap_used)]
879mod tests {
880    use super::*;
881
882    #[test]
883    fn test_consumer_config_default() {
884        let config = ConsumerConfig::default();
885        assert_eq!(config.topic_id, 0);
886        assert_eq!(config.max_fetch_bytes, 64 * 1024);
887        assert_eq!(config.start_position, SeekPosition::Beginning);
888    }
889
890    #[test]
891    fn test_consumer_config_builder() {
892        let config = ConsumerConfig::new(42)
893            .with_max_fetch_bytes(128 * 1024)
894            .with_start_position(SeekPosition::Offset(1000));
895
896        assert_eq!(config.topic_id, 42);
897        assert_eq!(config.max_fetch_bytes, 128 * 1024);
898        assert_eq!(config.start_position, SeekPosition::Offset(1000));
899    }
900
901    #[test]
902    fn test_poll_result_is_empty() {
903        let empty = PollResult {
904            data: bytes::Bytes::new(),
905            current_offset: 0,
906            record_count: 0,
907            end_of_stream: true,
908        };
909        assert!(empty.is_empty());
910
911        let non_empty = PollResult {
912            data: bytes::Bytes::from_static(&[1, 2, 3]),
913            current_offset: 3,
914            record_count: 1,
915            end_of_stream: false,
916        };
917        assert!(!non_empty.is_empty());
918    }
919
920    // =========================================================================
921    // StreamingConsumerConfig Tests
922    // =========================================================================
923
924    #[test]
925    fn test_streaming_consumer_config_default() {
926        let config = StreamingConsumerConfig::default();
927        assert_eq!(config.topic_id, 0);
928        assert_eq!(config.max_batch_bytes, 64 * 1024);
929        assert_eq!(config.start_position, SeekPosition::Beginning);
930        assert!(config.consumer_group.is_none());
931        assert_eq!(config.auto_commit_interval_ms, 5000);
932    }
933
934    #[test]
935    fn test_streaming_consumer_config_builder() {
936        let config = StreamingConsumerConfig::new(42)
937            .with_max_batch_bytes(128 * 1024)
938            .with_start_position(SeekPosition::Offset(5000))
939            .with_consumer_group("my-group")
940            .with_auto_commit_interval(10000);
941
942        assert_eq!(config.topic_id, 42);
943        assert_eq!(config.max_batch_bytes, 128 * 1024);
944        assert_eq!(config.start_position, SeekPosition::Offset(5000));
945        assert_eq!(config.consumer_group, Some("my-group".to_string()));
946        assert_eq!(config.auto_commit_interval_ms, 10000);
947    }
948
949    #[test]
950    fn test_streaming_consumer_config_disable_auto_commit() {
951        let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
952
953        assert_eq!(config.auto_commit_interval_ms, 0);
954    }
955
956    #[test]
957    fn test_streaming_consumer_config_seek_positions() {
958        let beginning =
959            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
960        assert_eq!(beginning.start_position, SeekPosition::Beginning);
961
962        let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
963        assert_eq!(end.start_position, SeekPosition::End);
964
965        let offset =
966            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
967        assert_eq!(offset.start_position, SeekPosition::Offset(12345));
968    }
969
970    // =========================================================================
971    // SeekPosition Tests
972    // =========================================================================
973
974    #[test]
975    fn test_seek_position_equality() {
976        assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
977        assert_eq!(SeekPosition::End, SeekPosition::End);
978        assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
979
980        assert_ne!(SeekPosition::Beginning, SeekPosition::End);
981        assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
982        assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
983    }
984
985    #[test]
986    fn test_seek_position_clone() {
987        let pos = SeekPosition::Offset(42);
988        let cloned = pos;
989        assert_eq!(pos, cloned);
990    }
991}