Skip to main content

lnc_client/
consumer.rs

1//! Consumer abstraction for reading from LANCE streams
2//!
3//! Provides a high-level API for consuming records from topics with support for:
4//! - Offset tracking
5//! - Seek/rewind to specific offsets
6//! - Continuous polling
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16/// Position specifier for seeking within a stream
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19    /// Seek to the beginning of the stream (offset 0)
20    Beginning,
21    /// Seek to the end of the stream (latest data)
22    End,
23    /// Seek to a specific byte offset
24    Offset(u64),
25}
26
27/// Configuration for a consumer
28#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30    /// Topic ID to consume from
31    pub topic_id: u32,
32    /// Maximum bytes to fetch per poll (default: 64KB)
33    pub max_fetch_bytes: u32,
34    /// Starting position (default: Beginning)
35    pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39    fn default() -> Self {
40        Self {
41            topic_id: 0,
42            max_fetch_bytes: 64 * 1024, // 64KB
43            start_position: SeekPosition::Beginning,
44        }
45    }
46}
47
48impl ConsumerConfig {
49    /// Create a new consumer configuration for the specified topic
50    pub fn new(topic_id: u32) -> Self {
51        Self {
52            topic_id,
53            ..Default::default()
54        }
55    }
56
57    /// Set the maximum bytes to fetch per poll operation
58    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59        self.max_fetch_bytes = bytes;
60        self
61    }
62
63    /// Set the starting position for consumption
64    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65        self.start_position = position;
66        self
67    }
68}
69
70/// Result of a poll operation
71#[derive(Debug, Clone)]
72pub struct PollResult {
73    /// Raw data fetched from the stream (zero-copy Bytes)
74    pub data: bytes::Bytes,
75    /// Current offset after this fetch
76    pub current_offset: u64,
77    /// Number of records in this batch (estimate)
78    pub record_count: u32,
79    /// Whether the end of available data was reached
80    pub end_of_stream: bool,
81}
82
83impl PollResult {
84    /// Returns true if no data was returned
85    pub fn is_empty(&self) -> bool {
86        self.data.is_empty()
87    }
88}
89
90/// A consumer for reading records from a LANCE topic stream.
91///
92/// The consumer automatically reconnects on transient failures (connection
93/// drops, timeouts, server errors) with exponential backoff and DNS
94/// re-resolution. The current offset is preserved across reconnections
95/// so no data is lost.
96///
97/// # Example
98///
99/// ```text
100/// let config = ConsumerConfig::new(topic_id);
101/// let mut consumer = Consumer::connect("lance.example.com:1992", config).await?;
102///
103/// // Poll for records — auto-reconnects on failure
104/// while let Some(result) = consumer.poll().await? {
105///     process_data(&result.data);
106///     if result.end_of_stream {
107///         break;
108///     }
109/// }
110/// ```
111pub struct Consumer {
112    client: ReconnectingClient,
113    config: ConsumerConfig,
114    current_offset: u64,
115    /// Cached end offset for SeekPosition::End
116    cached_end_offset: Option<u64>,
117    /// Optional offset store for client-side offset persistence
118    offset_store: Option<Arc<dyn OffsetStore>>,
119    /// Consumer ID for offset store operations
120    consumer_id: u64,
121}
122
123impl Consumer {
124    /// Connect to a LANCE server and create a consumer with auto-reconnect.
125    ///
126    /// The address can be either an IP:port or hostname:port. DNS is
127    /// re-resolved on each reconnect for load-balanced endpoints.
128    pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129        let rc = ReconnectingClient::connect(addr)
130            .await?
131            .with_unlimited_retries()
132            .with_base_delay(Duration::from_millis(500))
133            .with_max_delay(Duration::from_secs(30));
134
135        Ok(Self::from_reconnecting_client(rc, config, 0))
136    }
137
138    /// Create a new consumer with the given client and configuration.
139    ///
140    /// The `addr` is stored for DNS re-resolution on reconnect.
141    pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142        Self::with_consumer_id(client, addr, config, 0)
143    }
144
145    /// Create a new consumer with a specific consumer ID
146    pub fn with_consumer_id(
147        client: LanceClient,
148        addr: &str,
149        config: ConsumerConfig,
150        consumer_id: u64,
151    ) -> Self {
152        let rc = ReconnectingClient::from_existing(client, addr);
153        Self::from_reconnecting_client(rc, config, consumer_id)
154    }
155
156    /// Internal constructor from a ReconnectingClient
157    fn from_reconnecting_client(
158        client: ReconnectingClient,
159        config: ConsumerConfig,
160        consumer_id: u64,
161    ) -> Self {
162        let initial_offset = match config.start_position {
163            SeekPosition::Beginning => 0,
164            SeekPosition::Offset(offset) => offset,
165            SeekPosition::End => u64::MAX, // Will be resolved on first poll
166        };
167
168        Self {
169            client,
170            config,
171            current_offset: initial_offset,
172            cached_end_offset: None,
173            offset_store: None,
174            consumer_id,
175        }
176    }
177
178    /// Create a consumer with an offset store for client-side offset persistence
179    ///
180    /// The consumer will automatically load its starting offset from the store
181    /// if one exists, otherwise it uses the configured start position.
182    ///
183    /// # Arguments
184    /// * `client` - The LANCE client connection
185    /// * `addr` - Server address for reconnection
186    /// * `config` - Consumer configuration
187    /// * `consumer_id` - Unique identifier for this consumer instance
188    /// * `offset_store` - The offset store for persistence
189    ///
190    /// # Example
191    /// ```ignore
192    /// let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
193    /// let consumer = Consumer::with_offset_store(client, "lance:1992", config, 12345, store)?;
194    /// ```
195    pub fn with_offset_store(
196        client: LanceClient,
197        addr: &str,
198        config: ConsumerConfig,
199        consumer_id: u64,
200        offset_store: Arc<dyn OffsetStore>,
201    ) -> Result<Self> {
202        // Try to load existing offset from store
203        let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205        let initial_offset = if let Some(offset) = stored_offset {
206            // Resume from stored offset
207            offset
208        } else {
209            // No stored offset, use configured start position
210            match config.start_position {
211                SeekPosition::Beginning => 0,
212                SeekPosition::Offset(offset) => offset,
213                SeekPosition::End => u64::MAX, // Will be resolved on first poll
214            }
215        };
216
217        let rc = ReconnectingClient::from_existing(client, addr);
218        Ok(Self {
219            client: rc,
220            config,
221            current_offset: initial_offset,
222            cached_end_offset: None,
223            offset_store: Some(offset_store),
224            consumer_id,
225        })
226    }
227
228    /// Create a consumer starting from the beginning of the stream
229    pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230        Self::new(client, addr, ConsumerConfig::new(topic_id))
231    }
232
233    /// Create a consumer starting from a specific offset
234    pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235        let config =
236            ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237        Self::new(client, addr, config)
238    }
239
240    /// Get the current offset position
241    pub fn current_offset(&self) -> u64 {
242        self.current_offset
243    }
244
245    /// Get the topic ID being consumed
246    pub fn topic_id(&self) -> u32 {
247        self.config.topic_id
248    }
249
250    /// Seek to a specific position in the stream
251    ///
252    /// This allows rewinding to replay historical data or fast-forwarding
253    /// to skip ahead.
254    ///
255    /// # Arguments
256    /// * `position` - The position to seek to
257    ///
258    /// # Examples
259    ///
260    /// ```text
261    /// // Rewind to the beginning
262    /// consumer.seek(SeekPosition::Beginning).await?;
263    ///
264    /// // Seek to a specific offset
265    /// consumer.seek(SeekPosition::Offset(1000)).await?;
266    ///
267    /// // Seek to the end (latest data)
268    /// consumer.seek(SeekPosition::End).await?;
269    /// ```
270    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271        match position {
272            SeekPosition::Beginning => {
273                self.current_offset = 0;
274                Ok(0)
275            },
276            SeekPosition::Offset(offset) => {
277                self.current_offset = offset;
278                Ok(offset)
279            },
280            SeekPosition::End => {
281                let end_offset = self.discover_end_offset().await?;
282                self.current_offset = end_offset;
283                Ok(end_offset)
284            },
285        }
286    }
287
288    /// Rewind to the beginning of the stream (offset 0)
289    ///
290    /// Convenience method equivalent to `seek(SeekPosition::Beginning)`
291    pub async fn rewind(&mut self) -> Result<()> {
292        self.seek(SeekPosition::Beginning).await?;
293        Ok(())
294    }
295
296    /// Seek to a specific byte offset in the stream
297    ///
298    /// Convenience method equivalent to `seek(SeekPosition::Offset(offset))`
299    pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300        self.seek(SeekPosition::Offset(offset)).await?;
301        Ok(())
302    }
303
304    /// Seek to the end of the stream (latest available data)
305    ///
306    /// Convenience method equivalent to `seek(SeekPosition::End)`
307    pub async fn seek_to_end(&mut self) -> Result<u64> {
308        self.seek(SeekPosition::End).await
309    }
310
311    /// Poll for the next batch of records
312    ///
313    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
314    /// there was no new data available (end of stream reached).
315    ///
316    /// The consumer's offset is automatically advanced after each successful
317    /// poll. On transient errors, the consumer auto-reconnects and retries
318    /// from the same offset.
319    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
320        // Handle SeekPosition::End that hasn't been resolved yet
321        if self.current_offset == u64::MAX {
322            let end_offset = self.discover_end_offset().await?;
323            self.current_offset = end_offset;
324        }
325
326        let fetch_result = self.fetch_with_retry().await?;
327
328        let end_of_stream =
329            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331        let result = PollResult {
332            data: fetch_result.data,
333            current_offset: fetch_result.next_offset,
334            record_count: fetch_result.record_count,
335            end_of_stream,
336        };
337
338        // Advance offset
339        self.current_offset = fetch_result.next_offset;
340        self.cached_end_offset = Some(fetch_result.next_offset);
341
342        if result.is_empty() {
343            Ok(None)
344        } else {
345            Ok(Some(result))
346        }
347    }
348
349    /// Poll for records, blocking until data is available or timeout
350    ///
351    /// Unlike `poll()`, this will return empty results instead of None,
352    /// allowing the caller to continue waiting.
353    pub async fn poll_blocking(&mut self) -> Result<PollResult> {
354        // Handle SeekPosition::End that hasn't been resolved yet
355        if self.current_offset == u64::MAX {
356            let end_offset = self.discover_end_offset().await?;
357            self.current_offset = end_offset;
358        }
359
360        let fetch_result = self.fetch_with_retry().await?;
361
362        let end_of_stream =
363            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
364
365        let result = PollResult {
366            data: fetch_result.data,
367            current_offset: fetch_result.next_offset,
368            record_count: fetch_result.record_count,
369            end_of_stream,
370        };
371
372        // Advance offset only if we got data
373        if !result.is_empty() {
374            self.current_offset = fetch_result.next_offset;
375        }
376        self.cached_end_offset = Some(fetch_result.next_offset);
377
378        Ok(result)
379    }
380
381    /// Fetch with automatic retry on transient errors.
382    /// Reconnects and retries from the same offset, preserving position.
383    ///
384    /// On `ServerCatchingUp`, backs off for 5 seconds. If the server offset
385    /// remains stagnant across repeated attempts, the client forces a
386    /// reconnect so reads can re-route to a healthier/leader path while still
387    /// preserving the requested offset.
388    ///
389    /// If the server offset does not advance after several attempts we emit a
390    /// warning but preserve the client-requested offset. This keeps read
391    /// semantics monotonic and avoids implicit offset rewinds that can produce
392    /// duplicate replay.
393    async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
394        const MAX_RETRIES: u32 = 30;
395        const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
396        /// Alert after this many consecutive CATCHING_UP responses with an
397        /// unchanged server_offset.
398        const STALE_ALERT_THRESHOLD: u32 = 3;
399        let mut attempt = 0u32;
400        let mut backoff = Duration::from_millis(500);
401        const MAX_BACKOFF: Duration = Duration::from_secs(30);
402
403        let mut last_server_offset: Option<u64> = None;
404        let mut stale_count: u32 = 0;
405
406        loop {
407            let result = match self.client.client().await {
408                Ok(c) => {
409                    c.fetch(
410                        self.config.topic_id,
411                        self.current_offset,
412                        self.config.max_fetch_bytes,
413                    )
414                    .await
415                },
416                Err(e) => Err(e),
417            };
418
419            match &result {
420                Ok(_) => return result,
421                // Server is healthy but behind on replication — fixed 5s backoff,
422                // do NOT mark connection as failed (no reconnect needed).
423                Err(ClientError::ServerCatchingUp { server_offset }) => {
424                    attempt += 1;
425
426                    // Track whether the server is making progress.
427                    if last_server_offset == Some(*server_offset) {
428                        stale_count += 1;
429                    } else {
430                        stale_count = 1;
431                        last_server_offset = Some(*server_offset);
432                    }
433
434                    if stale_count == STALE_ALERT_THRESHOLD {
435                        tracing::warn!(
436                            topic_id = self.config.topic_id,
437                            requested_offset = self.current_offset,
438                            server_offset,
439                            "Server offset stagnant while catching up; preserving consumer offset to avoid duplicate replay"
440                        );
441
442                        // Proactively reconnect after repeated stagnant catch-up
443                        // responses so we can re-resolve leader routing.
444                        self.client.mark_failed();
445
446                        // Surface to caller quickly instead of sleeping through
447                        // an extended catch-up loop on a stale route.
448                        return result;
449                    }
450
451                    if attempt >= MAX_RETRIES {
452                        return result;
453                    }
454                    tracing::info!(
455                        topic_id = self.config.topic_id,
456                        requested_offset = self.current_offset,
457                        server_offset,
458                        attempt,
459                        "Server catching up, backing off {}s",
460                        CATCHING_UP_BACKOFF.as_secs()
461                    );
462                    tokio::time::sleep(CATCHING_UP_BACKOFF).await;
463                },
464                Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
465                    attempt += 1;
466                    self.client.mark_failed();
467                    tokio::time::sleep(backoff).await;
468                    backoff = (backoff * 2).min(MAX_BACKOFF);
469                },
470                _ => return result,
471            }
472        }
473    }
474
475    /// Discover the current end offset by fetching from a very high offset
476    async fn discover_end_offset(&mut self) -> Result<u64> {
477        // Use cached value if available
478        if let Some(end) = self.cached_end_offset {
479            return Ok(end);
480        }
481
482        // Fetch from a very high offset so the server reports current end
483        // offset without returning partial record bytes from the beginning.
484        let c = self.client.client().await?;
485        let fetch_result = c
486            .fetch(
487                self.config.topic_id,
488                u64::MAX,
489                1, // Minimal fetch just to get stream info
490            )
491            .await?;
492
493        // The next_offset tells us where data ends
494        // For empty streams, next_offset will be 0
495        let end_offset = fetch_result.next_offset;
496        self.cached_end_offset = Some(end_offset);
497        Ok(end_offset)
498    }
499
500    /// Commit the current offset to the offset store
501    ///
502    /// If an offset store is configured, this persists the current offset
503    /// so the consumer can resume from this position on restart.
504    ///
505    /// # Example
506    /// ```ignore
507    /// let records = consumer.poll().await?;
508    /// process(records);
509    /// consumer.commit().await?;  // Persist offset for crash recovery
510    /// ```
511    pub async fn commit(&mut self) -> Result<()> {
512        if let Some(ref store) = self.offset_store {
513            store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
514        }
515        Ok(())
516    }
517
518    /// Get the consumer ID
519    pub fn consumer_id(&self) -> u64 {
520        self.consumer_id
521    }
522
523    /// Check if this consumer has an offset store configured
524    pub fn has_offset_store(&self) -> bool {
525        self.offset_store.is_some()
526    }
527
528    /// Get mutable access to the underlying reconnecting client
529    pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
530        &mut self.client
531    }
532}
533
534impl std::fmt::Debug for Consumer {
535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536        f.debug_struct("Consumer")
537            .field("topic_id", &self.config.topic_id)
538            .field("current_offset", &self.current_offset)
539            .field("max_fetch_bytes", &self.config.max_fetch_bytes)
540            .finish()
541    }
542}
543
544/// Configuration for a streaming consumer
545#[derive(Debug, Clone)]
546pub struct StreamingConsumerConfig {
547    /// Topic ID to consume from
548    pub topic_id: u32,
549    /// Maximum bytes per batch (default: 64KB)
550    pub max_batch_bytes: u32,
551    /// Starting position (default: Beginning)
552    pub start_position: SeekPosition,
553    /// Consumer group ID (for offset tracking)
554    pub consumer_group: Option<String>,
555    /// Auto-commit interval in milliseconds (0 = manual commit only)
556    pub auto_commit_interval_ms: u64,
557}
558
559impl Default for StreamingConsumerConfig {
560    fn default() -> Self {
561        Self {
562            topic_id: 0,
563            max_batch_bytes: 64 * 1024,
564            start_position: SeekPosition::Beginning,
565            consumer_group: None,
566            auto_commit_interval_ms: 5000, // 5 seconds
567        }
568    }
569}
570
571impl StreamingConsumerConfig {
572    /// Create a new streaming consumer configuration for the specified topic
573    pub fn new(topic_id: u32) -> Self {
574        Self {
575            topic_id,
576            ..Default::default()
577        }
578    }
579
580    /// Set the maximum bytes per batch
581    pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
582        self.max_batch_bytes = bytes;
583        self
584    }
585
586    /// Set the starting position for consumption
587    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
588        self.start_position = position;
589        self
590    }
591
592    /// Set the consumer group for coordinated consumption
593    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
594        self.consumer_group = Some(group.into());
595        self
596    }
597
598    /// Set the auto-commit interval in milliseconds (0 = disabled)
599    pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
600        self.auto_commit_interval_ms = interval_ms;
601        self
602    }
603}
604
605/// A streaming consumer that uses subscribe/unsubscribe signals
606///
607/// Unlike the poll-based `Consumer`, `StreamingConsumer` explicitly signals
608/// to the server when it starts and stops consuming, and reports position
609/// updates for consumer group tracking.
610///
611/// # Lifecycle
612///
613/// 1. **Subscribe** - Call `start()` to signal the server to begin streaming
614/// 2. **Consume** - Call `poll()` to receive batches of data
615/// 3. **Commit** - Call `commit()` to checkpoint progress (or use auto-commit)
616/// 4. **Unsubscribe** - Call `stop()` or drop the consumer to signal completion
617///
618/// # Example
619///
620/// ```text
621/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
622/// let config = StreamingConsumerConfig::new(topic_id)
623///     .with_consumer_group("my-group");
624/// let mut consumer = StreamingConsumer::new(client, config);
625///
626/// // Start streaming
627/// consumer.start().await?;
628///
629/// // Process records
630/// while let Some(result) = consumer.poll().await? {
631///     process_data(&result.data);
632///     consumer.commit().await?; // Checkpoint progress
633/// }
634///
635/// // Stop streaming
636/// consumer.stop().await?;
637/// ```
638pub struct StreamingConsumer {
639    client: LanceClient,
640    config: StreamingConsumerConfig,
641    consumer_id: u64,
642    current_offset: u64,
643    committed_offset: u64,
644    is_subscribed: bool,
645    last_commit_time: std::time::Instant,
646}
647
648impl StreamingConsumer {
649    /// Create a new streaming consumer
650    pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
651        let consumer_id = Self::generate_consumer_id();
652        let initial_offset = match config.start_position {
653            SeekPosition::Beginning => 0,
654            SeekPosition::Offset(offset) => offset,
655            SeekPosition::End => u64::MAX,
656        };
657
658        Self {
659            client,
660            config,
661            consumer_id,
662            current_offset: initial_offset,
663            committed_offset: 0,
664            is_subscribed: false,
665            last_commit_time: std::time::Instant::now(),
666        }
667    }
668
669    /// Generate a unique consumer ID
670    fn generate_consumer_id() -> u64 {
671        use std::time::{SystemTime, UNIX_EPOCH};
672        let timestamp = SystemTime::now()
673            .duration_since(UNIX_EPOCH)
674            .map(|d| d.as_nanos() as u64)
675            .unwrap_or(0);
676        // Mix in thread ID for uniqueness
677        let thread_id = std::thread::current().id();
678        timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
679    }
680
681    /// Start streaming from the topic
682    ///
683    /// Sends a Subscribe signal to the server indicating this consumer
684    /// wants to start receiving data from the configured position.
685    pub async fn start(&mut self) -> Result<()> {
686        if self.is_subscribed {
687            return Ok(());
688        }
689
690        let result = self
691            .client
692            .subscribe(
693                self.config.topic_id,
694                self.current_offset,
695                self.config.max_batch_bytes,
696                self.consumer_id,
697            )
698            .await?;
699
700        self.current_offset = result.start_offset;
701        self.is_subscribed = true;
702        self.last_commit_time = std::time::Instant::now();
703
704        Ok(())
705    }
706
707    /// Stop streaming from the topic
708    ///
709    /// Sends an Unsubscribe signal to the server. This should be called
710    /// when the consumer is done processing to free server resources.
711    pub async fn stop(&mut self) -> Result<()> {
712        if !self.is_subscribed {
713            return Ok(());
714        }
715
716        // Commit any uncommitted offset before stopping
717        if self.current_offset > self.committed_offset {
718            let _ = self.commit().await;
719        }
720
721        self.client
722            .unsubscribe(self.config.topic_id, self.consumer_id)
723            .await?;
724        self.is_subscribed = false;
725
726        Ok(())
727    }
728
729    /// Poll for the next batch of records
730    ///
731    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
732    /// no new data is available.
733    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
734        if !self.is_subscribed {
735            return Err(ClientError::ProtocolError(
736                "Consumer not subscribed - call start() first".to_string(),
737            ));
738        }
739
740        // Check if auto-commit is due
741        self.maybe_auto_commit().await?;
742
743        let fetch_result = self
744            .client
745            .fetch(
746                self.config.topic_id,
747                self.current_offset,
748                self.config.max_batch_bytes,
749            )
750            .await?;
751
752        let end_of_stream =
753            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
754
755        let result = PollResult {
756            data: fetch_result.data,
757            current_offset: fetch_result.next_offset,
758            record_count: fetch_result.record_count,
759            end_of_stream,
760        };
761
762        // Advance offset
763        self.current_offset = fetch_result.next_offset;
764
765        if result.is_empty() {
766            Ok(None)
767        } else {
768            Ok(Some(result))
769        }
770    }
771
772    /// Commit the current offset to the server
773    ///
774    /// This checkpoints the consumer's position so that if it restarts,
775    /// it can resume from this point.
776    pub async fn commit(&mut self) -> Result<()> {
777        if self.current_offset <= self.committed_offset {
778            return Ok(());
779        }
780
781        let result = self
782            .client
783            .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
784            .await?;
785
786        self.committed_offset = result.committed_offset;
787        self.last_commit_time = std::time::Instant::now();
788
789        Ok(())
790    }
791
792    /// Auto-commit if interval has elapsed
793    async fn maybe_auto_commit(&mut self) -> Result<()> {
794        if self.config.auto_commit_interval_ms == 0 {
795            return Ok(()); // Auto-commit disabled
796        }
797
798        let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
799        if elapsed >= self.config.auto_commit_interval_ms {
800            self.commit().await?;
801        }
802
803        Ok(())
804    }
805
806    /// Seek to a new position
807    ///
808    /// Note: This will commit the current offset and restart the subscription
809    /// at the new position.
810    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
811        let was_subscribed = self.is_subscribed;
812
813        if was_subscribed {
814            self.stop().await?;
815        }
816
817        let new_offset = match position {
818            SeekPosition::Beginning => 0,
819            SeekPosition::Offset(offset) => offset,
820            SeekPosition::End => u64::MAX,
821        };
822
823        self.current_offset = new_offset;
824
825        if was_subscribed {
826            self.start().await?;
827        }
828
829        Ok(self.current_offset)
830    }
831
832    /// Get the current offset
833    pub fn current_offset(&self) -> u64 {
834        self.current_offset
835    }
836
837    /// Get the last committed offset
838    pub fn committed_offset(&self) -> u64 {
839        self.committed_offset
840    }
841
842    /// Get the consumer ID
843    pub fn consumer_id(&self) -> u64 {
844        self.consumer_id
845    }
846
847    /// Check if currently subscribed
848    pub fn is_subscribed(&self) -> bool {
849        self.is_subscribed
850    }
851
852    /// Get access to the underlying client
853    pub fn client(&self) -> &LanceClient {
854        &self.client
855    }
856
857    /// Consume this consumer and return the underlying client
858    pub async fn into_client(mut self) -> Result<LanceClient> {
859        self.stop().await?;
860        Ok(self.client)
861    }
862}
863
864impl std::fmt::Debug for StreamingConsumer {
865    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
866        f.debug_struct("StreamingConsumer")
867            .field("topic_id", &self.config.topic_id)
868            .field("consumer_id", &self.consumer_id)
869            .field("current_offset", &self.current_offset)
870            .field("committed_offset", &self.committed_offset)
871            .field("is_subscribed", &self.is_subscribed)
872            .finish()
873    }
874}
875
876#[cfg(test)]
877#[allow(clippy::unwrap_used)]
878mod tests {
879    use super::*;
880
881    #[test]
882    fn test_consumer_config_default() {
883        let config = ConsumerConfig::default();
884        assert_eq!(config.topic_id, 0);
885        assert_eq!(config.max_fetch_bytes, 64 * 1024);
886        assert_eq!(config.start_position, SeekPosition::Beginning);
887    }
888
889    #[test]
890    fn test_consumer_config_builder() {
891        let config = ConsumerConfig::new(42)
892            .with_max_fetch_bytes(128 * 1024)
893            .with_start_position(SeekPosition::Offset(1000));
894
895        assert_eq!(config.topic_id, 42);
896        assert_eq!(config.max_fetch_bytes, 128 * 1024);
897        assert_eq!(config.start_position, SeekPosition::Offset(1000));
898    }
899
900    #[test]
901    fn test_poll_result_is_empty() {
902        let empty = PollResult {
903            data: bytes::Bytes::new(),
904            current_offset: 0,
905            record_count: 0,
906            end_of_stream: true,
907        };
908        assert!(empty.is_empty());
909
910        let non_empty = PollResult {
911            data: bytes::Bytes::from_static(&[1, 2, 3]),
912            current_offset: 3,
913            record_count: 1,
914            end_of_stream: false,
915        };
916        assert!(!non_empty.is_empty());
917    }
918
919    // =========================================================================
920    // StreamingConsumerConfig Tests
921    // =========================================================================
922
923    #[test]
924    fn test_streaming_consumer_config_default() {
925        let config = StreamingConsumerConfig::default();
926        assert_eq!(config.topic_id, 0);
927        assert_eq!(config.max_batch_bytes, 64 * 1024);
928        assert_eq!(config.start_position, SeekPosition::Beginning);
929        assert!(config.consumer_group.is_none());
930        assert_eq!(config.auto_commit_interval_ms, 5000);
931    }
932
933    #[test]
934    fn test_streaming_consumer_config_builder() {
935        let config = StreamingConsumerConfig::new(42)
936            .with_max_batch_bytes(128 * 1024)
937            .with_start_position(SeekPosition::Offset(5000))
938            .with_consumer_group("my-group")
939            .with_auto_commit_interval(10000);
940
941        assert_eq!(config.topic_id, 42);
942        assert_eq!(config.max_batch_bytes, 128 * 1024);
943        assert_eq!(config.start_position, SeekPosition::Offset(5000));
944        assert_eq!(config.consumer_group, Some("my-group".to_string()));
945        assert_eq!(config.auto_commit_interval_ms, 10000);
946    }
947
948    #[test]
949    fn test_streaming_consumer_config_disable_auto_commit() {
950        let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
951
952        assert_eq!(config.auto_commit_interval_ms, 0);
953    }
954
955    #[test]
956    fn test_streaming_consumer_config_seek_positions() {
957        let beginning =
958            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
959        assert_eq!(beginning.start_position, SeekPosition::Beginning);
960
961        let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
962        assert_eq!(end.start_position, SeekPosition::End);
963
964        let offset =
965            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
966        assert_eq!(offset.start_position, SeekPosition::Offset(12345));
967    }
968
969    // =========================================================================
970    // SeekPosition Tests
971    // =========================================================================
972
973    #[test]
974    fn test_seek_position_equality() {
975        assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
976        assert_eq!(SeekPosition::End, SeekPosition::End);
977        assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
978
979        assert_ne!(SeekPosition::Beginning, SeekPosition::End);
980        assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
981        assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
982    }
983
984    #[test]
985    fn test_seek_position_clone() {
986        let pos = SeekPosition::Offset(42);
987        let cloned = pos;
988        assert_eq!(pos, cloned);
989    }
990}