Skip to main content

lnc_client/
consumer.rs

1//! Consumer abstraction for reading from LANCE streams
2//!
3//! Provides a high-level API for consuming records from topics with support for:
4//! - Offset tracking
5//! - Seek/rewind to specific offsets
6//! - Continuous polling
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::client::LanceClient;
12use crate::connection::ReconnectingClient;
13use crate::error::{ClientError, Result};
14use crate::offset::OffsetStore;
15
16/// Position specifier for seeking within a stream
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SeekPosition {
19    /// Seek to the beginning of the stream (offset 0)
20    Beginning,
21    /// Seek to the end of the stream (latest data)
22    End,
23    /// Seek to a specific byte offset
24    Offset(u64),
25}
26
27/// Configuration for a consumer
28#[derive(Debug, Clone)]
29pub struct ConsumerConfig {
30    /// Topic ID to consume from
31    pub topic_id: u32,
32    /// Maximum bytes to fetch per poll (default: 64KB)
33    pub max_fetch_bytes: u32,
34    /// Starting position (default: Beginning)
35    pub start_position: SeekPosition,
36}
37
38impl Default for ConsumerConfig {
39    fn default() -> Self {
40        Self {
41            topic_id: 0,
42            max_fetch_bytes: 64 * 1024, // 64KB
43            start_position: SeekPosition::Beginning,
44        }
45    }
46}
47
48impl ConsumerConfig {
49    /// Create a new consumer configuration for the specified topic
50    pub fn new(topic_id: u32) -> Self {
51        Self {
52            topic_id,
53            ..Default::default()
54        }
55    }
56
57    /// Set the maximum bytes to fetch per poll operation
58    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
59        self.max_fetch_bytes = bytes;
60        self
61    }
62
63    /// Set the starting position for consumption
64    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
65        self.start_position = position;
66        self
67    }
68}
69
70/// Result of a poll operation
71#[derive(Debug, Clone)]
72pub struct PollResult {
73    /// Raw data fetched from the stream (zero-copy Bytes)
74    pub data: bytes::Bytes,
75    /// Current offset after this fetch
76    pub current_offset: u64,
77    /// Number of records in this batch (estimate)
78    pub record_count: u32,
79    /// Whether the end of available data was reached
80    pub end_of_stream: bool,
81}
82
83impl PollResult {
84    /// Returns true if no data was returned
85    pub fn is_empty(&self) -> bool {
86        self.data.is_empty()
87    }
88}
89
90/// A consumer for reading records from a LANCE topic stream.
91///
92/// The consumer automatically reconnects on transient failures (connection
93/// drops, timeouts, server errors) with exponential backoff and DNS
94/// re-resolution. The current offset is preserved across reconnections
95/// so no data is lost.
96///
97/// # Example
98///
99/// ```text
100/// let config = ConsumerConfig::new(topic_id);
101/// let mut consumer = Consumer::connect("lance.example.com:1992", config).await?;
102///
103/// // Receive batches — auto-reconnects on failure
104/// while let Some(result) = consumer.next_batch().await? {
105///     process_data(&result.data);
106///     if result.end_of_stream {
107///         break;
108///     }
109/// }
110/// ```
111pub struct Consumer {
112    client: ReconnectingClient,
113    config: ConsumerConfig,
114    current_offset: u64,
115    /// Cached end offset for SeekPosition::End
116    cached_end_offset: Option<u64>,
117    /// Optional offset store for client-side offset persistence
118    offset_store: Option<Arc<dyn OffsetStore>>,
119    /// Consumer ID for offset store operations
120    consumer_id: u64,
121}
122
123impl Consumer {
124    /// Connect to a LANCE server and create a consumer with auto-reconnect.
125    ///
126    /// The address can be either an IP:port or hostname:port. DNS is
127    /// re-resolved on each reconnect for load-balanced endpoints.
128    pub async fn connect(addr: &str, config: ConsumerConfig) -> Result<Self> {
129        let rc = ReconnectingClient::connect(addr)
130            .await?
131            .with_unlimited_retries()
132            .with_base_delay(Duration::from_millis(500))
133            .with_max_delay(Duration::from_secs(30));
134
135        Ok(Self::from_reconnecting_client(rc, config, 0))
136    }
137
138    /// Create a new consumer with the given client and configuration.
139    ///
140    /// The `addr` is stored for DNS re-resolution on reconnect.
141    pub fn new(client: LanceClient, addr: &str, config: ConsumerConfig) -> Self {
142        Self::with_consumer_id(client, addr, config, 0)
143    }
144
145    /// Create a new consumer with a specific consumer ID
146    pub fn with_consumer_id(
147        client: LanceClient,
148        addr: &str,
149        config: ConsumerConfig,
150        consumer_id: u64,
151    ) -> Self {
152        let rc = ReconnectingClient::from_existing(client, addr);
153        Self::from_reconnecting_client(rc, config, consumer_id)
154    }
155
156    /// Internal constructor from a ReconnectingClient
157    fn from_reconnecting_client(
158        client: ReconnectingClient,
159        config: ConsumerConfig,
160        consumer_id: u64,
161    ) -> Self {
162        let initial_offset = match config.start_position {
163            SeekPosition::Beginning => 0,
164            SeekPosition::Offset(offset) => offset,
165            SeekPosition::End => u64::MAX, // Will be resolved on first poll
166        };
167
168        Self {
169            client,
170            config,
171            current_offset: initial_offset,
172            cached_end_offset: None,
173            offset_store: None,
174            consumer_id,
175        }
176    }
177
178    /// Create a consumer with an offset store for client-side offset persistence
179    ///
180    /// The consumer will automatically load its starting offset from the store
181    /// if one exists, otherwise it uses the configured start position.
182    ///
183    /// # Arguments
184    /// * `client` - The LANCE client connection
185    /// * `addr` - Server address for reconnection
186    /// * `config` - Consumer configuration
187    /// * `consumer_id` - Unique identifier for this consumer instance
188    /// * `offset_store` - The offset store for persistence
189    ///
190    /// # Example
191    /// ```ignore
192    /// let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
193    /// let consumer = Consumer::with_offset_store(client, "lance:1992", config, 12345, store)?;
194    /// ```
195    pub fn with_offset_store(
196        client: LanceClient,
197        addr: &str,
198        config: ConsumerConfig,
199        consumer_id: u64,
200        offset_store: Arc<dyn OffsetStore>,
201    ) -> Result<Self> {
202        // Try to load existing offset from store
203        let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
204
205        let initial_offset = if let Some(offset) = stored_offset {
206            // Resume from stored offset
207            offset
208        } else {
209            // No stored offset, use configured start position
210            match config.start_position {
211                SeekPosition::Beginning => 0,
212                SeekPosition::Offset(offset) => offset,
213                SeekPosition::End => u64::MAX, // Will be resolved on first poll
214            }
215        };
216
217        let rc = ReconnectingClient::from_existing(client, addr);
218        Ok(Self {
219            client: rc,
220            config,
221            current_offset: initial_offset,
222            cached_end_offset: None,
223            offset_store: Some(offset_store),
224            consumer_id,
225        })
226    }
227
228    /// Create a consumer starting from the beginning of the stream
229    pub fn from_beginning(client: LanceClient, addr: &str, topic_id: u32) -> Self {
230        Self::new(client, addr, ConsumerConfig::new(topic_id))
231    }
232
233    /// Create a consumer starting from a specific offset
234    pub fn from_offset(client: LanceClient, addr: &str, topic_id: u32, offset: u64) -> Self {
235        let config =
236            ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
237        Self::new(client, addr, config)
238    }
239
240    /// Get the current offset position
241    pub fn current_offset(&self) -> u64 {
242        self.current_offset
243    }
244
245    /// Get the topic ID being consumed
246    pub fn topic_id(&self) -> u32 {
247        self.config.topic_id
248    }
249
250    /// Seek to a specific position in the stream
251    ///
252    /// This allows rewinding to replay historical data or fast-forwarding
253    /// to skip ahead.
254    ///
255    /// # Arguments
256    /// * `position` - The position to seek to
257    ///
258    /// # Examples
259    ///
260    /// ```text
261    /// // Rewind to the beginning
262    /// consumer.seek(SeekPosition::Beginning).await?;
263    ///
264    /// // Seek to a specific offset
265    /// consumer.seek(SeekPosition::Offset(1000)).await?;
266    ///
267    /// // Seek to the end (latest data)
268    /// consumer.seek(SeekPosition::End).await?;
269    /// ```
270    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
271        match position {
272            SeekPosition::Beginning => {
273                self.current_offset = 0;
274                Ok(0)
275            },
276            SeekPosition::Offset(offset) => {
277                self.current_offset = offset;
278                Ok(offset)
279            },
280            SeekPosition::End => {
281                let end_offset = self.discover_end_offset().await?;
282                self.current_offset = end_offset;
283                Ok(end_offset)
284            },
285        }
286    }
287
288    /// Rewind to the beginning of the stream (offset 0)
289    ///
290    /// Convenience method equivalent to `seek(SeekPosition::Beginning)`
291    pub async fn rewind(&mut self) -> Result<()> {
292        self.seek(SeekPosition::Beginning).await?;
293        Ok(())
294    }
295
296    /// Seek to a specific byte offset in the stream
297    ///
298    /// Convenience method equivalent to `seek(SeekPosition::Offset(offset))`
299    pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
300        self.seek(SeekPosition::Offset(offset)).await?;
301        Ok(())
302    }
303
304    /// Seek to the end of the stream (latest available data)
305    ///
306    /// Convenience method equivalent to `seek(SeekPosition::End)`
307    pub async fn seek_to_end(&mut self) -> Result<u64> {
308        self.seek(SeekPosition::End).await
309    }
310
311    /// Receive the next batch of records.
312    ///
313    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
314    /// there was no new data available (end of stream reached).
315    ///
316    /// The consumer's offset is automatically advanced after each successful
317    /// fetch. On transient errors, the consumer auto-reconnects and retries
318    /// from the same offset.
319    pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
320        // Handle SeekPosition::End that hasn't been resolved yet
321        if self.current_offset == u64::MAX {
322            let end_offset = self.discover_end_offset().await?;
323            self.current_offset = end_offset;
324        }
325
326        let fetch_result = self.fetch_with_retry().await?;
327
328        let end_of_stream =
329            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
330
331        let result = PollResult {
332            data: fetch_result.data,
333            current_offset: fetch_result.next_offset,
334            record_count: fetch_result.record_count,
335            end_of_stream,
336        };
337
338        // Advance offset only when data is returned.
339        //
340        // During leader churn/catch-up windows, servers may temporarily reply with
341        // empty batches while advertising a later next_offset. Blindly advancing on
342        // empty responses can skip unread records and create apparent gaps.
343        if !result.is_empty() {
344            self.current_offset = fetch_result.next_offset;
345        }
346        self.cached_end_offset = Some(fetch_result.next_offset);
347
348        if result.is_empty() {
349            Ok(None)
350        } else {
351            Ok(Some(result))
352        }
353    }
354
355    /// Primary consume interface alias.
356    #[inline]
357    pub async fn consume(&mut self) -> Result<Option<PollResult>> {
358        self.next_batch().await
359    }
360
361    /// Compatibility wrapper for callers still using polling terminology.
362    #[inline]
363    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
364        self.next_batch().await
365    }
366
367    /// Poll for records, blocking until data is available or timeout
368    ///
369    /// Unlike `poll()`, this will return empty results instead of None,
370    /// allowing the caller to continue waiting.
371    pub async fn poll_blocking(&mut self) -> Result<PollResult> {
372        // Handle SeekPosition::End that hasn't been resolved yet
373        if self.current_offset == u64::MAX {
374            let end_offset = self.discover_end_offset().await?;
375            self.current_offset = end_offset;
376        }
377
378        let fetch_result = self.fetch_with_retry().await?;
379
380        let end_of_stream =
381            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
382
383        let result = PollResult {
384            data: fetch_result.data,
385            current_offset: fetch_result.next_offset,
386            record_count: fetch_result.record_count,
387            end_of_stream,
388        };
389
390        // Advance offset only if we got data
391        if !result.is_empty() {
392            self.current_offset = fetch_result.next_offset;
393        }
394        self.cached_end_offset = Some(fetch_result.next_offset);
395
396        Ok(result)
397    }
398
399    /// Fetch with automatic retry on transient errors.
400    /// Reconnects and retries from the same offset, preserving position.
401    ///
402    /// On `ServerCatchingUp`, backs off for 5 seconds. If the server offset
403    /// remains stagnant across repeated attempts, the client forces a
404    /// reconnect so reads can re-route to a healthier/leader path while still
405    /// preserving the requested offset.
406    ///
407    /// If the server offset does not advance after several attempts we emit a
408    /// warning but preserve the client-requested offset. This keeps read
409    /// semantics monotonic and avoids implicit offset rewinds that can produce
410    /// duplicate replay.
411    async fn fetch_with_retry(&mut self) -> Result<crate::client::FetchResult> {
412        const MAX_RETRIES: u32 = 30;
413        const CATCHING_UP_BACKOFF: Duration = Duration::from_secs(5);
414        /// Alert after this many consecutive CATCHING_UP responses with an
415        /// unchanged server_offset.
416        const STALE_ALERT_THRESHOLD: u32 = 3;
417        let mut attempt = 0u32;
418        let mut backoff = Duration::from_millis(500);
419        const MAX_BACKOFF: Duration = Duration::from_secs(30);
420
421        let mut last_server_offset: Option<u64> = None;
422        let mut stale_count: u32 = 0;
423
424        loop {
425            let result = match self.client.client().await {
426                Ok(c) => {
427                    c.fetch(
428                        self.config.topic_id,
429                        self.current_offset,
430                        self.config.max_fetch_bytes,
431                    )
432                    .await
433                },
434                Err(e) => Err(e),
435            };
436
437            match &result {
438                Ok(_) => return result,
439                // Server is healthy but behind on replication — fixed 5s backoff,
440                // do NOT mark connection as failed (no reconnect needed).
441                Err(ClientError::ServerCatchingUp { server_offset }) => {
442                    attempt += 1;
443
444                    // Track whether the server is making progress.
445                    if last_server_offset == Some(*server_offset) {
446                        stale_count += 1;
447                    } else {
448                        stale_count = 1;
449                        last_server_offset = Some(*server_offset);
450                    }
451
452                    if stale_count == STALE_ALERT_THRESHOLD {
453                        tracing::warn!(
454                            topic_id = self.config.topic_id,
455                            requested_offset = self.current_offset,
456                            server_offset,
457                            "Server offset stagnant while catching up; preserving consumer offset to avoid duplicate replay"
458                        );
459
460                        // Proactively reconnect after repeated stagnant catch-up
461                        // responses so we can re-resolve leader routing.
462                        self.client.mark_failed();
463
464                        // Surface to caller quickly instead of sleeping through
465                        // an extended catch-up loop on a stale route.
466                        return result;
467                    }
468
469                    if attempt >= MAX_RETRIES {
470                        return result;
471                    }
472                    tracing::info!(
473                        topic_id = self.config.topic_id,
474                        requested_offset = self.current_offset,
475                        server_offset,
476                        attempt,
477                        "Server catching up, backing off {}s",
478                        CATCHING_UP_BACKOFF.as_secs()
479                    );
480                    tokio::time::sleep(CATCHING_UP_BACKOFF).await;
481                },
482                Err(e) if e.is_retryable() && attempt < MAX_RETRIES => {
483                    attempt += 1;
484                    self.client.mark_failed();
485                    tokio::time::sleep(backoff).await;
486                    backoff = (backoff * 2).min(MAX_BACKOFF);
487                },
488                _ => return result,
489            }
490        }
491    }
492
493    /// Discover the current end offset by fetching from a very high offset
494    async fn discover_end_offset(&mut self) -> Result<u64> {
495        // Use cached value if available
496        if let Some(end) = self.cached_end_offset {
497            return Ok(end);
498        }
499
500        // Fetch from a very high offset so the server reports current end
501        // offset without returning partial record bytes from the beginning.
502        let c = self.client.client().await?;
503        let fetch_result = c
504            .fetch(
505                self.config.topic_id,
506                u64::MAX,
507                1, // Minimal fetch just to get stream info
508            )
509            .await?;
510
511        // The next_offset tells us where data ends
512        // For empty streams, next_offset will be 0
513        let end_offset = fetch_result.next_offset;
514        self.cached_end_offset = Some(end_offset);
515        Ok(end_offset)
516    }
517
518    /// Commit the current offset to the offset store
519    ///
520    /// If an offset store is configured, this persists the current offset
521    /// so the consumer can resume from this position on restart.
522    ///
523    /// # Example
524    /// ```ignore
525    /// let records = consumer.next_batch().await?;
526    /// process(records);
527    /// consumer.commit().await?;  // Persist offset for crash recovery
528    /// ```
529    pub async fn commit(&mut self) -> Result<()> {
530        if let Some(ref store) = self.offset_store {
531            store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
532        }
533        Ok(())
534    }
535
536    /// Get the consumer ID
537    pub fn consumer_id(&self) -> u64 {
538        self.consumer_id
539    }
540
541    /// Check if this consumer has an offset store configured
542    pub fn has_offset_store(&self) -> bool {
543        self.offset_store.is_some()
544    }
545
546    /// Get mutable access to the underlying reconnecting client
547    pub fn reconnecting_client(&mut self) -> &mut ReconnectingClient {
548        &mut self.client
549    }
550}
551
552impl std::fmt::Debug for Consumer {
553    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
554        f.debug_struct("Consumer")
555            .field("topic_id", &self.config.topic_id)
556            .field("current_offset", &self.current_offset)
557            .field("max_fetch_bytes", &self.config.max_fetch_bytes)
558            .finish()
559    }
560}
561
562/// Configuration for a streaming consumer
563#[derive(Debug, Clone)]
564pub struct StreamingConsumerConfig {
565    /// Topic ID to consume from
566    pub topic_id: u32,
567    /// Maximum bytes per batch (default: 64KB)
568    pub max_batch_bytes: u32,
569    /// Starting position (default: Beginning)
570    pub start_position: SeekPosition,
571    /// Consumer group ID (for offset tracking)
572    pub consumer_group: Option<String>,
573    /// Auto-commit interval in milliseconds (0 = manual commit only)
574    pub auto_commit_interval_ms: u64,
575}
576
577impl Default for StreamingConsumerConfig {
578    fn default() -> Self {
579        Self {
580            topic_id: 0,
581            max_batch_bytes: 64 * 1024,
582            start_position: SeekPosition::Beginning,
583            consumer_group: None,
584            auto_commit_interval_ms: 5000, // 5 seconds
585        }
586    }
587}
588
589impl StreamingConsumerConfig {
590    /// Create a new streaming consumer configuration for the specified topic
591    pub fn new(topic_id: u32) -> Self {
592        Self {
593            topic_id,
594            ..Default::default()
595        }
596    }
597
598    /// Set the maximum bytes per batch
599    pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
600        self.max_batch_bytes = bytes;
601        self
602    }
603
604    /// Set the starting position for consumption
605    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
606        self.start_position = position;
607        self
608    }
609
610    /// Set the consumer group for coordinated consumption
611    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
612        self.consumer_group = Some(group.into());
613        self
614    }
615
616    /// Set the auto-commit interval in milliseconds (0 = disabled)
617    pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
618        self.auto_commit_interval_ms = interval_ms;
619        self
620    }
621}
622
623/// A streaming consumer that uses subscribe/unsubscribe signals
624///
625/// Unlike the poll-based `Consumer`, `StreamingConsumer` explicitly signals
626/// to the server when it starts and stops consuming, and reports position
627/// updates for consumer group tracking.
628///
629/// # Lifecycle
630///
631/// 1. **Subscribe** - Call `start()` to signal the server to begin streaming
632/// 2. **Consume** - Call `poll()` to receive batches of data
633/// 3. **Commit** - Call `commit()` to checkpoint progress (or use auto-commit)
634/// 4. **Unsubscribe** - Call `stop()` or drop the consumer to signal completion
635///
636/// # Example
637///
638/// ```text
639/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
640/// let config = StreamingConsumerConfig::new(topic_id)
641///     .with_consumer_group("my-group");
642/// let mut consumer = StreamingConsumer::new(client, config);
643///
644/// // Start streaming
645/// consumer.start().await?;
646///
647/// // Process records
648/// while let Some(result) = consumer.next_batch().await? {
649///     process_data(&result.data);
650///     consumer.commit().await?; // Checkpoint progress
651/// }
652///
653/// // Stop streaming
654/// consumer.stop().await?;
655/// ```
656pub struct StreamingConsumer {
657    client: LanceClient,
658    config: StreamingConsumerConfig,
659    consumer_id: u64,
660    current_offset: u64,
661    committed_offset: u64,
662    is_subscribed: bool,
663    last_commit_time: std::time::Instant,
664}
665
666impl StreamingConsumer {
667    /// Create a new streaming consumer
668    pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
669        let consumer_id = Self::generate_consumer_id();
670        let initial_offset = match config.start_position {
671            SeekPosition::Beginning => 0,
672            SeekPosition::Offset(offset) => offset,
673            SeekPosition::End => u64::MAX,
674        };
675
676        Self {
677            client,
678            config,
679            consumer_id,
680            current_offset: initial_offset,
681            committed_offset: 0,
682            is_subscribed: false,
683            last_commit_time: std::time::Instant::now(),
684        }
685    }
686
687    /// Generate a unique consumer ID
688    fn generate_consumer_id() -> u64 {
689        use std::time::{SystemTime, UNIX_EPOCH};
690        let timestamp = SystemTime::now()
691            .duration_since(UNIX_EPOCH)
692            .map(|d| d.as_nanos() as u64)
693            .unwrap_or(0);
694        // Mix in thread ID for uniqueness
695        let thread_id = std::thread::current().id();
696        timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
697    }
698
699    /// Start streaming from the topic
700    ///
701    /// Sends a Subscribe signal to the server indicating this consumer
702    /// wants to start receiving data from the configured position.
703    pub async fn start(&mut self) -> Result<()> {
704        if self.is_subscribed {
705            return Ok(());
706        }
707
708        let result = self
709            .client
710            .subscribe(
711                self.config.topic_id,
712                self.current_offset,
713                self.config.max_batch_bytes,
714                self.consumer_id,
715            )
716            .await?;
717
718        self.current_offset = result.start_offset;
719        self.is_subscribed = true;
720        self.last_commit_time = std::time::Instant::now();
721
722        Ok(())
723    }
724
725    /// Stop streaming from the topic
726    ///
727    /// Sends an Unsubscribe signal to the server. This should be called
728    /// when the consumer is done processing to free server resources.
729    pub async fn stop(&mut self) -> Result<()> {
730        if !self.is_subscribed {
731            return Ok(());
732        }
733
734        // Commit any uncommitted offset before stopping
735        if self.current_offset > self.committed_offset {
736            let _ = self.commit().await;
737        }
738
739        self.client
740            .unsubscribe(self.config.topic_id, self.consumer_id)
741            .await?;
742        self.is_subscribed = false;
743
744        Ok(())
745    }
746
747    /// Receive the next batch of records for an active subscription.
748    ///
749    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
750    /// no new data is available.
751    pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
752        if !self.is_subscribed {
753            return Err(ClientError::ProtocolError(
754                "Consumer not subscribed - call start() first".to_string(),
755            ));
756        }
757
758        // Check if auto-commit is due
759        self.maybe_auto_commit().await?;
760
761        let fetch_result = self
762            .client
763            .fetch(
764                self.config.topic_id,
765                self.current_offset,
766                self.config.max_batch_bytes,
767            )
768            .await?;
769
770        let end_of_stream =
771            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
772
773        let result = PollResult {
774            data: fetch_result.data,
775            current_offset: fetch_result.next_offset,
776            record_count: fetch_result.record_count,
777            end_of_stream,
778        };
779
780        // Advance offset
781        self.current_offset = fetch_result.next_offset;
782
783        if result.is_empty() {
784            Ok(None)
785        } else {
786            Ok(Some(result))
787        }
788    }
789
790    /// Primary consume interface alias.
791    #[inline]
792    pub async fn consume(&mut self) -> Result<Option<PollResult>> {
793        self.next_batch().await
794    }
795
796    /// Compatibility wrapper for callers still using polling terminology.
797    #[inline]
798    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
799        self.next_batch().await
800    }
801
802    /// Commit the current offset to the server
803    ///
804    /// This checkpoints the consumer's position so that if it restarts,
805    /// it can resume from this point.
806    pub async fn commit(&mut self) -> Result<()> {
807        if self.current_offset <= self.committed_offset {
808            return Ok(());
809        }
810
811        let result = self
812            .client
813            .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
814            .await?;
815
816        self.committed_offset = result.committed_offset;
817        self.last_commit_time = std::time::Instant::now();
818
819        Ok(())
820    }
821
822    /// Auto-commit if interval has elapsed
823    async fn maybe_auto_commit(&mut self) -> Result<()> {
824        if self.config.auto_commit_interval_ms == 0 {
825            return Ok(()); // Auto-commit disabled
826        }
827
828        let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
829        if elapsed >= self.config.auto_commit_interval_ms {
830            self.commit().await?;
831        }
832
833        Ok(())
834    }
835
836    /// Seek to a new position
837    ///
838    /// Note: This will commit the current offset and restart the subscription
839    /// at the new position.
840    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
841        let was_subscribed = self.is_subscribed;
842
843        if was_subscribed {
844            self.stop().await?;
845        }
846
847        let new_offset = match position {
848            SeekPosition::Beginning => 0,
849            SeekPosition::Offset(offset) => offset,
850            SeekPosition::End => u64::MAX,
851        };
852
853        self.current_offset = new_offset;
854
855        if was_subscribed {
856            self.start().await?;
857        }
858
859        Ok(self.current_offset)
860    }
861
862    /// Get the current offset
863    pub fn current_offset(&self) -> u64 {
864        self.current_offset
865    }
866
867    /// Get the last committed offset
868    pub fn committed_offset(&self) -> u64 {
869        self.committed_offset
870    }
871
872    /// Get the consumer ID
873    pub fn consumer_id(&self) -> u64 {
874        self.consumer_id
875    }
876
877    /// Check if currently subscribed
878    pub fn is_subscribed(&self) -> bool {
879        self.is_subscribed
880    }
881
882    /// Get access to the underlying client
883    pub fn client(&self) -> &LanceClient {
884        &self.client
885    }
886
887    /// Consume this consumer and return the underlying client
888    pub async fn into_client(mut self) -> Result<LanceClient> {
889        self.stop().await?;
890        Ok(self.client)
891    }
892}
893
894impl std::fmt::Debug for StreamingConsumer {
895    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896        f.debug_struct("StreamingConsumer")
897            .field("topic_id", &self.config.topic_id)
898            .field("consumer_id", &self.consumer_id)
899            .field("current_offset", &self.current_offset)
900            .field("committed_offset", &self.committed_offset)
901            .field("is_subscribed", &self.is_subscribed)
902            .finish()
903    }
904}
905
906#[cfg(test)]
907#[allow(clippy::unwrap_used)]
908mod tests {
909    use super::*;
910
911    #[test]
912    fn test_consumer_config_default() {
913        let config = ConsumerConfig::default();
914        assert_eq!(config.topic_id, 0);
915        assert_eq!(config.max_fetch_bytes, 64 * 1024);
916        assert_eq!(config.start_position, SeekPosition::Beginning);
917    }
918
919    #[test]
920    fn test_consumer_config_builder() {
921        let config = ConsumerConfig::new(42)
922            .with_max_fetch_bytes(128 * 1024)
923            .with_start_position(SeekPosition::Offset(1000));
924
925        assert_eq!(config.topic_id, 42);
926        assert_eq!(config.max_fetch_bytes, 128 * 1024);
927        assert_eq!(config.start_position, SeekPosition::Offset(1000));
928    }
929
930    #[test]
931    fn test_poll_result_is_empty() {
932        let empty = PollResult {
933            data: bytes::Bytes::new(),
934            current_offset: 0,
935            record_count: 0,
936            end_of_stream: true,
937        };
938        assert!(empty.is_empty());
939
940        let non_empty = PollResult {
941            data: bytes::Bytes::from_static(&[1, 2, 3]),
942            current_offset: 3,
943            record_count: 1,
944            end_of_stream: false,
945        };
946        assert!(!non_empty.is_empty());
947    }
948
949    // =========================================================================
950    // StreamingConsumerConfig Tests
951    // =========================================================================
952
953    #[test]
954    fn test_streaming_consumer_config_default() {
955        let config = StreamingConsumerConfig::default();
956        assert_eq!(config.topic_id, 0);
957        assert_eq!(config.max_batch_bytes, 64 * 1024);
958        assert_eq!(config.start_position, SeekPosition::Beginning);
959        assert!(config.consumer_group.is_none());
960        assert_eq!(config.auto_commit_interval_ms, 5000);
961    }
962
963    #[test]
964    fn test_streaming_consumer_config_builder() {
965        let config = StreamingConsumerConfig::new(42)
966            .with_max_batch_bytes(128 * 1024)
967            .with_start_position(SeekPosition::Offset(5000))
968            .with_consumer_group("my-group")
969            .with_auto_commit_interval(10000);
970
971        assert_eq!(config.topic_id, 42);
972        assert_eq!(config.max_batch_bytes, 128 * 1024);
973        assert_eq!(config.start_position, SeekPosition::Offset(5000));
974        assert_eq!(config.consumer_group, Some("my-group".to_string()));
975        assert_eq!(config.auto_commit_interval_ms, 10000);
976    }
977
978    #[test]
979    fn test_streaming_consumer_config_disable_auto_commit() {
980        let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
981
982        assert_eq!(config.auto_commit_interval_ms, 0);
983    }
984
985    #[test]
986    fn test_streaming_consumer_config_seek_positions() {
987        let beginning =
988            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
989        assert_eq!(beginning.start_position, SeekPosition::Beginning);
990
991        let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
992        assert_eq!(end.start_position, SeekPosition::End);
993
994        let offset =
995            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
996        assert_eq!(offset.start_position, SeekPosition::Offset(12345));
997    }
998
999    // =========================================================================
1000    // SeekPosition Tests
1001    // =========================================================================
1002
1003    #[test]
1004    fn test_seek_position_equality() {
1005        assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
1006        assert_eq!(SeekPosition::End, SeekPosition::End);
1007        assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
1008
1009        assert_ne!(SeekPosition::Beginning, SeekPosition::End);
1010        assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
1011        assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
1012    }
1013
1014    #[test]
1015    fn test_seek_position_clone() {
1016        let pos = SeekPosition::Offset(42);
1017        let cloned = pos;
1018        assert_eq!(pos, cloned);
1019    }
1020}