Skip to main content

lnc_client/
consumer.rs

1//! Consumer abstraction for reading from LANCE streams
2//!
3//! Provides a high-level API for consuming records from topics with support for:
4//! - Offset tracking
5//! - Seek/rewind to specific offsets
6//! - Continuous polling
7
8use std::sync::Arc;
9
10use crate::client::LanceClient;
11use crate::error::{ClientError, Result};
12use crate::offset::OffsetStore;
13
14/// Position specifier for seeking within a stream
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SeekPosition {
17    /// Seek to the beginning of the stream (offset 0)
18    Beginning,
19    /// Seek to the end of the stream (latest data)
20    End,
21    /// Seek to a specific byte offset
22    Offset(u64),
23}
24
25/// Configuration for a consumer
26#[derive(Debug, Clone)]
27pub struct ConsumerConfig {
28    /// Topic ID to consume from
29    pub topic_id: u32,
30    /// Maximum bytes to fetch per poll (default: 64KB)
31    pub max_fetch_bytes: u32,
32    /// Starting position (default: Beginning)
33    pub start_position: SeekPosition,
34}
35
36impl Default for ConsumerConfig {
37    fn default() -> Self {
38        Self {
39            topic_id: 0,
40            max_fetch_bytes: 64 * 1024, // 64KB
41            start_position: SeekPosition::Beginning,
42        }
43    }
44}
45
46impl ConsumerConfig {
47    pub fn new(topic_id: u32) -> Self {
48        Self {
49            topic_id,
50            ..Default::default()
51        }
52    }
53
54    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
55        self.max_fetch_bytes = bytes;
56        self
57    }
58
59    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
60        self.start_position = position;
61        self
62    }
63}
64
65/// Result of a poll operation
66#[derive(Debug, Clone)]
67pub struct PollResult {
68    /// Raw data fetched from the stream (zero-copy Bytes)
69    pub data: bytes::Bytes,
70    /// Current offset after this fetch
71    pub current_offset: u64,
72    /// Number of records in this batch (estimate)
73    pub record_count: u32,
74    /// Whether the end of available data was reached
75    pub end_of_stream: bool,
76}
77
78impl PollResult {
79    /// Returns true if no data was returned
80    pub fn is_empty(&self) -> bool {
81        self.data.is_empty()
82    }
83}
84
85/// A consumer for reading records from a LANCE topic stream
86///
87/// The consumer tracks its current position and provides methods to:
88/// - Poll for new records
89/// - Seek to specific offsets (rewind/fast-forward)
90/// - Query current position
91///
92/// # Example
93///
94/// ```text
95/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
96/// let config = ConsumerConfig::new(topic_id);
97/// let mut consumer = Consumer::new(client, config);
98///
99/// // Rewind to beginning
100/// consumer.seek(SeekPosition::Beginning).await?;
101///
102/// // Poll for records
103/// while let Some(result) = consumer.poll().await? {
104///     process_data(&result.data);
105///     if result.end_of_stream {
106///         break;
107///     }
108/// }
109/// ```
110pub struct Consumer {
111    client: LanceClient,
112    config: ConsumerConfig,
113    current_offset: u64,
114    /// Cached end offset for SeekPosition::End
115    cached_end_offset: Option<u64>,
116    /// Optional offset store for client-side offset persistence
117    offset_store: Option<Arc<dyn OffsetStore>>,
118    /// Consumer ID for offset store operations
119    consumer_id: u64,
120}
121
122impl Consumer {
123    /// Create a new consumer with the given client and configuration
124    pub fn new(client: LanceClient, config: ConsumerConfig) -> Self {
125        Self::with_consumer_id(client, config, 0)
126    }
127
128    /// Create a new consumer with a specific consumer ID
129    pub fn with_consumer_id(client: LanceClient, config: ConsumerConfig, consumer_id: u64) -> Self {
130        let initial_offset = match config.start_position {
131            SeekPosition::Beginning => 0,
132            SeekPosition::Offset(offset) => offset,
133            SeekPosition::End => u64::MAX, // Will be resolved on first poll
134        };
135
136        Self {
137            client,
138            config,
139            current_offset: initial_offset,
140            cached_end_offset: None,
141            offset_store: None,
142            consumer_id,
143        }
144    }
145
146    /// Create a consumer with an offset store for client-side offset persistence
147    ///
148    /// The consumer will automatically load its starting offset from the store
149    /// if one exists, otherwise it uses the configured start position.
150    ///
151    /// # Arguments
152    /// * `client` - The LANCE client connection
153    /// * `config` - Consumer configuration
154    /// * `consumer_id` - Unique identifier for this consumer instance
155    /// * `offset_store` - The offset store for persistence
156    ///
157    /// # Example
158    /// ```ignore
159    /// let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
160    /// let consumer = Consumer::with_offset_store(client, config, 12345, store)?;
161    /// ```
162    pub fn with_offset_store(
163        client: LanceClient,
164        config: ConsumerConfig,
165        consumer_id: u64,
166        offset_store: Arc<dyn OffsetStore>,
167    ) -> Result<Self> {
168        // Try to load existing offset from store
169        let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
170
171        let initial_offset = if let Some(offset) = stored_offset {
172            // Resume from stored offset
173            offset
174        } else {
175            // No stored offset, use configured start position
176            match config.start_position {
177                SeekPosition::Beginning => 0,
178                SeekPosition::Offset(offset) => offset,
179                SeekPosition::End => u64::MAX, // Will be resolved on first poll
180            }
181        };
182
183        Ok(Self {
184            client,
185            config,
186            current_offset: initial_offset,
187            cached_end_offset: None,
188            offset_store: Some(offset_store),
189            consumer_id,
190        })
191    }
192
193    /// Create a consumer starting from the beginning of the stream
194    pub fn from_beginning(client: LanceClient, topic_id: u32) -> Self {
195        Self::new(client, ConsumerConfig::new(topic_id))
196    }
197
198    /// Create a consumer starting from a specific offset
199    pub fn from_offset(client: LanceClient, topic_id: u32, offset: u64) -> Self {
200        let config =
201            ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
202        Self::new(client, config)
203    }
204
205    /// Get the current offset position
206    pub fn current_offset(&self) -> u64 {
207        self.current_offset
208    }
209
210    /// Get the topic ID being consumed
211    pub fn topic_id(&self) -> u32 {
212        self.config.topic_id
213    }
214
215    /// Seek to a specific position in the stream
216    ///
217    /// This allows rewinding to replay historical data or fast-forwarding
218    /// to skip ahead.
219    ///
220    /// # Arguments
221    /// * `position` - The position to seek to
222    ///
223    /// # Examples
224    ///
225    /// ```text
226    /// // Rewind to the beginning
227    /// consumer.seek(SeekPosition::Beginning).await?;
228    ///
229    /// // Seek to a specific offset
230    /// consumer.seek(SeekPosition::Offset(1000)).await?;
231    ///
232    /// // Seek to the end (latest data)
233    /// consumer.seek(SeekPosition::End).await?;
234    /// ```
235    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
236        match position {
237            SeekPosition::Beginning => {
238                self.current_offset = 0;
239                Ok(0)
240            },
241            SeekPosition::Offset(offset) => {
242                self.current_offset = offset;
243                Ok(offset)
244            },
245            SeekPosition::End => {
246                // Fetch with offset u64::MAX to discover end
247                let end_offset = self.discover_end_offset().await?;
248                self.current_offset = end_offset;
249                Ok(end_offset)
250            },
251        }
252    }
253
254    /// Rewind to the beginning of the stream (offset 0)
255    ///
256    /// Convenience method equivalent to `seek(SeekPosition::Beginning)`
257    pub async fn rewind(&mut self) -> Result<()> {
258        self.seek(SeekPosition::Beginning).await?;
259        Ok(())
260    }
261
262    /// Seek to a specific byte offset in the stream
263    ///
264    /// Convenience method equivalent to `seek(SeekPosition::Offset(offset))`
265    pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
266        self.seek(SeekPosition::Offset(offset)).await?;
267        Ok(())
268    }
269
270    /// Seek to the end of the stream (latest available data)
271    ///
272    /// Convenience method equivalent to `seek(SeekPosition::End)`
273    pub async fn seek_to_end(&mut self) -> Result<u64> {
274        self.seek(SeekPosition::End).await
275    }
276
277    /// Poll for the next batch of records
278    ///
279    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
280    /// there was no new data available (end of stream reached).
281    ///
282    /// The consumer's offset is automatically advanced after each successful poll.
283    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
284        // Handle SeekPosition::End that hasn't been resolved yet
285        if self.current_offset == u64::MAX {
286            let end_offset = self.discover_end_offset().await?;
287            self.current_offset = end_offset;
288        }
289
290        let fetch_result = self
291            .client
292            .fetch(
293                self.config.topic_id,
294                self.current_offset,
295                self.config.max_fetch_bytes,
296            )
297            .await?;
298
299        let end_of_stream =
300            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
301
302        let result = PollResult {
303            data: fetch_result.data,
304            current_offset: fetch_result.next_offset,
305            record_count: fetch_result.record_count,
306            end_of_stream,
307        };
308
309        // Advance offset
310        self.current_offset = fetch_result.next_offset;
311        self.cached_end_offset = Some(fetch_result.next_offset);
312
313        if result.is_empty() {
314            Ok(None)
315        } else {
316            Ok(Some(result))
317        }
318    }
319
320    /// Poll for records, blocking until data is available or timeout
321    ///
322    /// Unlike `poll()`, this will return empty results instead of None,
323    /// allowing the caller to continue waiting.
324    pub async fn poll_blocking(&mut self) -> Result<PollResult> {
325        // Handle SeekPosition::End that hasn't been resolved yet
326        if self.current_offset == u64::MAX {
327            let end_offset = self.discover_end_offset().await?;
328            self.current_offset = end_offset;
329        }
330
331        let fetch_result = self
332            .client
333            .fetch(
334                self.config.topic_id,
335                self.current_offset,
336                self.config.max_fetch_bytes,
337            )
338            .await?;
339
340        let end_of_stream =
341            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
342
343        let result = PollResult {
344            data: fetch_result.data,
345            current_offset: fetch_result.next_offset,
346            record_count: fetch_result.record_count,
347            end_of_stream,
348        };
349
350        // Advance offset only if we got data
351        if !result.is_empty() {
352            self.current_offset = fetch_result.next_offset;
353        }
354        self.cached_end_offset = Some(fetch_result.next_offset);
355
356        Ok(result)
357    }
358
359    /// Discover the current end offset by fetching from a very high offset
360    async fn discover_end_offset(&mut self) -> Result<u64> {
361        // Use cached value if available
362        if let Some(end) = self.cached_end_offset {
363            return Ok(end);
364        }
365
366        // Fetch from offset 0 to discover stream state
367        let fetch_result = self
368            .client
369            .fetch(
370                self.config.topic_id,
371                0,
372                1, // Minimal fetch just to get stream info
373            )
374            .await?;
375
376        // The next_offset tells us where data ends
377        // For empty streams, next_offset will be 0
378        let end_offset = fetch_result.next_offset;
379        self.cached_end_offset = Some(end_offset);
380        Ok(end_offset)
381    }
382
383    /// Commit the current offset to the offset store
384    ///
385    /// If an offset store is configured, this persists the current offset
386    /// so the consumer can resume from this position on restart.
387    ///
388    /// # Example
389    /// ```ignore
390    /// let records = consumer.poll().await?;
391    /// process(records);
392    /// consumer.commit().await?;  // Persist offset for crash recovery
393    /// ```
394    pub async fn commit(&mut self) -> Result<()> {
395        if let Some(ref store) = self.offset_store {
396            store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
397        }
398        Ok(())
399    }
400
401    /// Get the consumer ID
402    pub fn consumer_id(&self) -> u64 {
403        self.consumer_id
404    }
405
406    /// Check if this consumer has an offset store configured
407    pub fn has_offset_store(&self) -> bool {
408        self.offset_store.is_some()
409    }
410
411    /// Get access to the underlying client
412    pub fn client(&self) -> &LanceClient {
413        &self.client
414    }
415
416    /// Get mutable access to the underlying client
417    pub fn client_mut(&mut self) -> &mut LanceClient {
418        &mut self.client
419    }
420
421    /// Consume this consumer and return the underlying client
422    pub fn into_client(self) -> LanceClient {
423        self.client
424    }
425}
426
427impl std::fmt::Debug for Consumer {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        f.debug_struct("Consumer")
430            .field("topic_id", &self.config.topic_id)
431            .field("current_offset", &self.current_offset)
432            .field("max_fetch_bytes", &self.config.max_fetch_bytes)
433            .finish()
434    }
435}
436
437/// Configuration for a streaming consumer
438#[derive(Debug, Clone)]
439pub struct StreamingConsumerConfig {
440    /// Topic ID to consume from
441    pub topic_id: u32,
442    /// Maximum bytes per batch (default: 64KB)
443    pub max_batch_bytes: u32,
444    /// Starting position (default: Beginning)
445    pub start_position: SeekPosition,
446    /// Consumer group ID (for offset tracking)
447    pub consumer_group: Option<String>,
448    /// Auto-commit interval in milliseconds (0 = manual commit only)
449    pub auto_commit_interval_ms: u64,
450}
451
452impl Default for StreamingConsumerConfig {
453    fn default() -> Self {
454        Self {
455            topic_id: 0,
456            max_batch_bytes: 64 * 1024,
457            start_position: SeekPosition::Beginning,
458            consumer_group: None,
459            auto_commit_interval_ms: 5000, // 5 seconds
460        }
461    }
462}
463
464impl StreamingConsumerConfig {
465    pub fn new(topic_id: u32) -> Self {
466        Self {
467            topic_id,
468            ..Default::default()
469        }
470    }
471
472    pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
473        self.max_batch_bytes = bytes;
474        self
475    }
476
477    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
478        self.start_position = position;
479        self
480    }
481
482    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
483        self.consumer_group = Some(group.into());
484        self
485    }
486
487    pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
488        self.auto_commit_interval_ms = interval_ms;
489        self
490    }
491}
492
493/// A streaming consumer that uses subscribe/unsubscribe signals
494///
495/// Unlike the poll-based `Consumer`, `StreamingConsumer` explicitly signals
496/// to the server when it starts and stops consuming, and reports position
497/// updates for consumer group tracking.
498///
499/// # Lifecycle
500///
501/// 1. **Subscribe** - Call `start()` to signal the server to begin streaming
502/// 2. **Consume** - Call `poll()` to receive batches of data
503/// 3. **Commit** - Call `commit()` to checkpoint progress (or use auto-commit)
504/// 4. **Unsubscribe** - Call `stop()` or drop the consumer to signal completion
505///
506/// # Example
507///
508/// ```text
509/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
510/// let config = StreamingConsumerConfig::new(topic_id)
511///     .with_consumer_group("my-group");
512/// let mut consumer = StreamingConsumer::new(client, config);
513///
514/// // Start streaming
515/// consumer.start().await?;
516///
517/// // Process records
518/// while let Some(result) = consumer.poll().await? {
519///     process_data(&result.data);
520///     consumer.commit().await?; // Checkpoint progress
521/// }
522///
523/// // Stop streaming
524/// consumer.stop().await?;
525/// ```
526pub struct StreamingConsumer {
527    client: LanceClient,
528    config: StreamingConsumerConfig,
529    consumer_id: u64,
530    current_offset: u64,
531    committed_offset: u64,
532    is_subscribed: bool,
533    last_commit_time: std::time::Instant,
534}
535
536impl StreamingConsumer {
537    /// Create a new streaming consumer
538    pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
539        let consumer_id = Self::generate_consumer_id();
540        let initial_offset = match config.start_position {
541            SeekPosition::Beginning => 0,
542            SeekPosition::Offset(offset) => offset,
543            SeekPosition::End => u64::MAX,
544        };
545
546        Self {
547            client,
548            config,
549            consumer_id,
550            current_offset: initial_offset,
551            committed_offset: 0,
552            is_subscribed: false,
553            last_commit_time: std::time::Instant::now(),
554        }
555    }
556
557    /// Generate a unique consumer ID
558    fn generate_consumer_id() -> u64 {
559        use std::time::{SystemTime, UNIX_EPOCH};
560        let timestamp = SystemTime::now()
561            .duration_since(UNIX_EPOCH)
562            .map(|d| d.as_nanos() as u64)
563            .unwrap_or(0);
564        // Mix in thread ID for uniqueness
565        let thread_id = std::thread::current().id();
566        timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
567    }
568
569    /// Start streaming from the topic
570    ///
571    /// Sends a Subscribe signal to the server indicating this consumer
572    /// wants to start receiving data from the configured position.
573    pub async fn start(&mut self) -> Result<()> {
574        if self.is_subscribed {
575            return Ok(());
576        }
577
578        let result = self
579            .client
580            .subscribe(
581                self.config.topic_id,
582                self.current_offset,
583                self.config.max_batch_bytes,
584                self.consumer_id,
585            )
586            .await?;
587
588        self.current_offset = result.start_offset;
589        self.is_subscribed = true;
590        self.last_commit_time = std::time::Instant::now();
591
592        Ok(())
593    }
594
595    /// Stop streaming from the topic
596    ///
597    /// Sends an Unsubscribe signal to the server. This should be called
598    /// when the consumer is done processing to free server resources.
599    pub async fn stop(&mut self) -> Result<()> {
600        if !self.is_subscribed {
601            return Ok(());
602        }
603
604        // Commit any uncommitted offset before stopping
605        if self.current_offset > self.committed_offset {
606            let _ = self.commit().await;
607        }
608
609        self.client
610            .unsubscribe(self.config.topic_id, self.consumer_id)
611            .await?;
612        self.is_subscribed = false;
613
614        Ok(())
615    }
616
617    /// Poll for the next batch of records
618    ///
619    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
620    /// no new data is available.
621    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
622        if !self.is_subscribed {
623            return Err(ClientError::ProtocolError(
624                "Consumer not subscribed - call start() first".to_string(),
625            ));
626        }
627
628        // Check if auto-commit is due
629        self.maybe_auto_commit().await?;
630
631        let fetch_result = self
632            .client
633            .fetch(
634                self.config.topic_id,
635                self.current_offset,
636                self.config.max_batch_bytes,
637            )
638            .await?;
639
640        let end_of_stream =
641            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
642
643        let result = PollResult {
644            data: fetch_result.data,
645            current_offset: fetch_result.next_offset,
646            record_count: fetch_result.record_count,
647            end_of_stream,
648        };
649
650        // Advance offset
651        self.current_offset = fetch_result.next_offset;
652
653        if result.is_empty() {
654            Ok(None)
655        } else {
656            Ok(Some(result))
657        }
658    }
659
660    /// Commit the current offset to the server
661    ///
662    /// This checkpoints the consumer's position so that if it restarts,
663    /// it can resume from this point.
664    pub async fn commit(&mut self) -> Result<()> {
665        if self.current_offset <= self.committed_offset {
666            return Ok(());
667        }
668
669        let result = self
670            .client
671            .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
672            .await?;
673
674        self.committed_offset = result.committed_offset;
675        self.last_commit_time = std::time::Instant::now();
676
677        Ok(())
678    }
679
680    /// Auto-commit if interval has elapsed
681    async fn maybe_auto_commit(&mut self) -> Result<()> {
682        if self.config.auto_commit_interval_ms == 0 {
683            return Ok(()); // Auto-commit disabled
684        }
685
686        let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
687        if elapsed >= self.config.auto_commit_interval_ms {
688            self.commit().await?;
689        }
690
691        Ok(())
692    }
693
694    /// Seek to a new position
695    ///
696    /// Note: This will commit the current offset and restart the subscription
697    /// at the new position.
698    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
699        let was_subscribed = self.is_subscribed;
700
701        if was_subscribed {
702            self.stop().await?;
703        }
704
705        let new_offset = match position {
706            SeekPosition::Beginning => 0,
707            SeekPosition::Offset(offset) => offset,
708            SeekPosition::End => u64::MAX,
709        };
710
711        self.current_offset = new_offset;
712
713        if was_subscribed {
714            self.start().await?;
715        }
716
717        Ok(self.current_offset)
718    }
719
720    /// Get the current offset
721    pub fn current_offset(&self) -> u64 {
722        self.current_offset
723    }
724
725    /// Get the last committed offset
726    pub fn committed_offset(&self) -> u64 {
727        self.committed_offset
728    }
729
730    /// Get the consumer ID
731    pub fn consumer_id(&self) -> u64 {
732        self.consumer_id
733    }
734
735    /// Check if currently subscribed
736    pub fn is_subscribed(&self) -> bool {
737        self.is_subscribed
738    }
739
740    /// Get access to the underlying client
741    pub fn client(&self) -> &LanceClient {
742        &self.client
743    }
744
745    /// Consume this consumer and return the underlying client
746    pub async fn into_client(mut self) -> Result<LanceClient> {
747        self.stop().await?;
748        Ok(self.client)
749    }
750}
751
752impl std::fmt::Debug for StreamingConsumer {
753    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754        f.debug_struct("StreamingConsumer")
755            .field("topic_id", &self.config.topic_id)
756            .field("consumer_id", &self.consumer_id)
757            .field("current_offset", &self.current_offset)
758            .field("committed_offset", &self.committed_offset)
759            .field("is_subscribed", &self.is_subscribed)
760            .finish()
761    }
762}
763
764#[cfg(test)]
765#[allow(clippy::unwrap_used)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn test_consumer_config_default() {
771        let config = ConsumerConfig::default();
772        assert_eq!(config.topic_id, 0);
773        assert_eq!(config.max_fetch_bytes, 64 * 1024);
774        assert_eq!(config.start_position, SeekPosition::Beginning);
775    }
776
777    #[test]
778    fn test_consumer_config_builder() {
779        let config = ConsumerConfig::new(42)
780            .with_max_fetch_bytes(128 * 1024)
781            .with_start_position(SeekPosition::Offset(1000));
782
783        assert_eq!(config.topic_id, 42);
784        assert_eq!(config.max_fetch_bytes, 128 * 1024);
785        assert_eq!(config.start_position, SeekPosition::Offset(1000));
786    }
787
788    #[test]
789    fn test_poll_result_is_empty() {
790        let empty = PollResult {
791            data: bytes::Bytes::new(),
792            current_offset: 0,
793            record_count: 0,
794            end_of_stream: true,
795        };
796        assert!(empty.is_empty());
797
798        let non_empty = PollResult {
799            data: bytes::Bytes::from_static(&[1, 2, 3]),
800            current_offset: 3,
801            record_count: 1,
802            end_of_stream: false,
803        };
804        assert!(!non_empty.is_empty());
805    }
806
807    // =========================================================================
808    // StreamingConsumerConfig Tests
809    // =========================================================================
810
811    #[test]
812    fn test_streaming_consumer_config_default() {
813        let config = StreamingConsumerConfig::default();
814        assert_eq!(config.topic_id, 0);
815        assert_eq!(config.max_batch_bytes, 64 * 1024);
816        assert_eq!(config.start_position, SeekPosition::Beginning);
817        assert!(config.consumer_group.is_none());
818        assert_eq!(config.auto_commit_interval_ms, 5000);
819    }
820
821    #[test]
822    fn test_streaming_consumer_config_builder() {
823        let config = StreamingConsumerConfig::new(42)
824            .with_max_batch_bytes(128 * 1024)
825            .with_start_position(SeekPosition::Offset(5000))
826            .with_consumer_group("my-group")
827            .with_auto_commit_interval(10000);
828
829        assert_eq!(config.topic_id, 42);
830        assert_eq!(config.max_batch_bytes, 128 * 1024);
831        assert_eq!(config.start_position, SeekPosition::Offset(5000));
832        assert_eq!(config.consumer_group, Some("my-group".to_string()));
833        assert_eq!(config.auto_commit_interval_ms, 10000);
834    }
835
836    #[test]
837    fn test_streaming_consumer_config_disable_auto_commit() {
838        let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
839
840        assert_eq!(config.auto_commit_interval_ms, 0);
841    }
842
843    #[test]
844    fn test_streaming_consumer_config_seek_positions() {
845        let beginning =
846            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
847        assert_eq!(beginning.start_position, SeekPosition::Beginning);
848
849        let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
850        assert_eq!(end.start_position, SeekPosition::End);
851
852        let offset =
853            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
854        assert_eq!(offset.start_position, SeekPosition::Offset(12345));
855    }
856
857    // =========================================================================
858    // SeekPosition Tests
859    // =========================================================================
860
861    #[test]
862    fn test_seek_position_equality() {
863        assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
864        assert_eq!(SeekPosition::End, SeekPosition::End);
865        assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
866
867        assert_ne!(SeekPosition::Beginning, SeekPosition::End);
868        assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
869        assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
870    }
871
872    #[test]
873    fn test_seek_position_clone() {
874        let pos = SeekPosition::Offset(42);
875        let cloned = pos;
876        assert_eq!(pos, cloned);
877    }
878}