Skip to main content

lnc_client/
consumer.rs

1//! Consumer abstraction for reading from LANCE streams
2//!
3//! Provides a high-level API for consuming records from topics with support for:
4//! - Offset tracking
5//! - Seek/rewind to specific offsets
6//! - Continuous polling
7
8use std::sync::Arc;
9
10use crate::client::LanceClient;
11use crate::error::{ClientError, Result};
12use crate::offset::OffsetStore;
13
14/// Position specifier for seeking within a stream
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SeekPosition {
17    /// Seek to the beginning of the stream (offset 0)
18    Beginning,
19    /// Seek to the end of the stream (latest data)
20    End,
21    /// Seek to a specific byte offset
22    Offset(u64),
23}
24
25/// Configuration for a consumer
26#[derive(Debug, Clone)]
27pub struct ConsumerConfig {
28    /// Topic ID to consume from
29    pub topic_id: u32,
30    /// Maximum bytes to fetch per poll (default: 64KB)
31    pub max_fetch_bytes: u32,
32    /// Starting position (default: Beginning)
33    pub start_position: SeekPosition,
34}
35
36impl Default for ConsumerConfig {
37    fn default() -> Self {
38        Self {
39            topic_id: 0,
40            max_fetch_bytes: 64 * 1024, // 64KB
41            start_position: SeekPosition::Beginning,
42        }
43    }
44}
45
46impl ConsumerConfig {
47    /// Create a new consumer configuration for the specified topic
48    pub fn new(topic_id: u32) -> Self {
49        Self {
50            topic_id,
51            ..Default::default()
52        }
53    }
54
55    /// Set the maximum bytes to fetch per poll operation
56    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
57        self.max_fetch_bytes = bytes;
58        self
59    }
60
61    /// Set the starting position for consumption
62    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
63        self.start_position = position;
64        self
65    }
66}
67
68/// Result of a poll operation
69#[derive(Debug, Clone)]
70pub struct PollResult {
71    /// Raw data fetched from the stream (zero-copy Bytes)
72    pub data: bytes::Bytes,
73    /// Current offset after this fetch
74    pub current_offset: u64,
75    /// Number of records in this batch (estimate)
76    pub record_count: u32,
77    /// Whether the end of available data was reached
78    pub end_of_stream: bool,
79}
80
81impl PollResult {
82    /// Returns true if no data was returned
83    pub fn is_empty(&self) -> bool {
84        self.data.is_empty()
85    }
86}
87
88/// A consumer for reading records from a LANCE topic stream
89///
90/// The consumer tracks its current position and provides methods to:
91/// - Poll for new records
92/// - Seek to specific offsets (rewind/fast-forward)
93/// - Query current position
94///
95/// # Example
96///
97/// ```text
98/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
99/// let config = ConsumerConfig::new(topic_id);
100/// let mut consumer = Consumer::new(client, config);
101///
102/// // Rewind to beginning
103/// consumer.seek(SeekPosition::Beginning).await?;
104///
105/// // Poll for records
106/// while let Some(result) = consumer.poll().await? {
107///     process_data(&result.data);
108///     if result.end_of_stream {
109///         break;
110///     }
111/// }
112/// ```
113pub struct Consumer {
114    client: LanceClient,
115    config: ConsumerConfig,
116    current_offset: u64,
117    /// Cached end offset for SeekPosition::End
118    cached_end_offset: Option<u64>,
119    /// Optional offset store for client-side offset persistence
120    offset_store: Option<Arc<dyn OffsetStore>>,
121    /// Consumer ID for offset store operations
122    consumer_id: u64,
123}
124
125impl Consumer {
126    /// Create a new consumer with the given client and configuration
127    pub fn new(client: LanceClient, config: ConsumerConfig) -> Self {
128        Self::with_consumer_id(client, config, 0)
129    }
130
131    /// Create a new consumer with a specific consumer ID
132    pub fn with_consumer_id(client: LanceClient, config: ConsumerConfig, consumer_id: u64) -> Self {
133        let initial_offset = match config.start_position {
134            SeekPosition::Beginning => 0,
135            SeekPosition::Offset(offset) => offset,
136            SeekPosition::End => u64::MAX, // Will be resolved on first poll
137        };
138
139        Self {
140            client,
141            config,
142            current_offset: initial_offset,
143            cached_end_offset: None,
144            offset_store: None,
145            consumer_id,
146        }
147    }
148
149    /// Create a consumer with an offset store for client-side offset persistence
150    ///
151    /// The consumer will automatically load its starting offset from the store
152    /// if one exists, otherwise it uses the configured start position.
153    ///
154    /// # Arguments
155    /// * `client` - The LANCE client connection
156    /// * `config` - Consumer configuration
157    /// * `consumer_id` - Unique identifier for this consumer instance
158    /// * `offset_store` - The offset store for persistence
159    ///
160    /// # Example
161    /// ```ignore
162    /// let store = Arc::new(LockFileOffsetStore::open(path, "my-consumer")?);
163    /// let consumer = Consumer::with_offset_store(client, config, 12345, store)?;
164    /// ```
165    pub fn with_offset_store(
166        client: LanceClient,
167        config: ConsumerConfig,
168        consumer_id: u64,
169        offset_store: Arc<dyn OffsetStore>,
170    ) -> Result<Self> {
171        // Try to load existing offset from store
172        let stored_offset = offset_store.load(config.topic_id, consumer_id)?;
173
174        let initial_offset = if let Some(offset) = stored_offset {
175            // Resume from stored offset
176            offset
177        } else {
178            // No stored offset, use configured start position
179            match config.start_position {
180                SeekPosition::Beginning => 0,
181                SeekPosition::Offset(offset) => offset,
182                SeekPosition::End => u64::MAX, // Will be resolved on first poll
183            }
184        };
185
186        Ok(Self {
187            client,
188            config,
189            current_offset: initial_offset,
190            cached_end_offset: None,
191            offset_store: Some(offset_store),
192            consumer_id,
193        })
194    }
195
196    /// Create a consumer starting from the beginning of the stream
197    pub fn from_beginning(client: LanceClient, topic_id: u32) -> Self {
198        Self::new(client, ConsumerConfig::new(topic_id))
199    }
200
201    /// Create a consumer starting from a specific offset
202    pub fn from_offset(client: LanceClient, topic_id: u32, offset: u64) -> Self {
203        let config =
204            ConsumerConfig::new(topic_id).with_start_position(SeekPosition::Offset(offset));
205        Self::new(client, config)
206    }
207
208    /// Get the current offset position
209    pub fn current_offset(&self) -> u64 {
210        self.current_offset
211    }
212
213    /// Get the topic ID being consumed
214    pub fn topic_id(&self) -> u32 {
215        self.config.topic_id
216    }
217
218    /// Seek to a specific position in the stream
219    ///
220    /// This allows rewinding to replay historical data or fast-forwarding
221    /// to skip ahead.
222    ///
223    /// # Arguments
224    /// * `position` - The position to seek to
225    ///
226    /// # Examples
227    ///
228    /// ```text
229    /// // Rewind to the beginning
230    /// consumer.seek(SeekPosition::Beginning).await?;
231    ///
232    /// // Seek to a specific offset
233    /// consumer.seek(SeekPosition::Offset(1000)).await?;
234    ///
235    /// // Seek to the end (latest data)
236    /// consumer.seek(SeekPosition::End).await?;
237    /// ```
238    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
239        match position {
240            SeekPosition::Beginning => {
241                self.current_offset = 0;
242                Ok(0)
243            },
244            SeekPosition::Offset(offset) => {
245                self.current_offset = offset;
246                Ok(offset)
247            },
248            SeekPosition::End => {
249                // Fetch with offset u64::MAX to discover end
250                let end_offset = self.discover_end_offset().await?;
251                self.current_offset = end_offset;
252                Ok(end_offset)
253            },
254        }
255    }
256
257    /// Rewind to the beginning of the stream (offset 0)
258    ///
259    /// Convenience method equivalent to `seek(SeekPosition::Beginning)`
260    pub async fn rewind(&mut self) -> Result<()> {
261        self.seek(SeekPosition::Beginning).await?;
262        Ok(())
263    }
264
265    /// Seek to a specific byte offset in the stream
266    ///
267    /// Convenience method equivalent to `seek(SeekPosition::Offset(offset))`
268    pub async fn seek_to_offset(&mut self, offset: u64) -> Result<()> {
269        self.seek(SeekPosition::Offset(offset)).await?;
270        Ok(())
271    }
272
273    /// Seek to the end of the stream (latest available data)
274    ///
275    /// Convenience method equivalent to `seek(SeekPosition::End)`
276    pub async fn seek_to_end(&mut self) -> Result<u64> {
277        self.seek(SeekPosition::End).await
278    }
279
280    /// Poll for the next batch of records
281    ///
282    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
283    /// there was no new data available (end of stream reached).
284    ///
285    /// The consumer's offset is automatically advanced after each successful poll.
286    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
287        // Handle SeekPosition::End that hasn't been resolved yet
288        if self.current_offset == u64::MAX {
289            let end_offset = self.discover_end_offset().await?;
290            self.current_offset = end_offset;
291        }
292
293        let fetch_result = self
294            .client
295            .fetch(
296                self.config.topic_id,
297                self.current_offset,
298                self.config.max_fetch_bytes,
299            )
300            .await?;
301
302        let end_of_stream =
303            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
304
305        let result = PollResult {
306            data: fetch_result.data,
307            current_offset: fetch_result.next_offset,
308            record_count: fetch_result.record_count,
309            end_of_stream,
310        };
311
312        // Advance offset
313        self.current_offset = fetch_result.next_offset;
314        self.cached_end_offset = Some(fetch_result.next_offset);
315
316        if result.is_empty() {
317            Ok(None)
318        } else {
319            Ok(Some(result))
320        }
321    }
322
323    /// Poll for records, blocking until data is available or timeout
324    ///
325    /// Unlike `poll()`, this will return empty results instead of None,
326    /// allowing the caller to continue waiting.
327    pub async fn poll_blocking(&mut self) -> Result<PollResult> {
328        // Handle SeekPosition::End that hasn't been resolved yet
329        if self.current_offset == u64::MAX {
330            let end_offset = self.discover_end_offset().await?;
331            self.current_offset = end_offset;
332        }
333
334        let fetch_result = self
335            .client
336            .fetch(
337                self.config.topic_id,
338                self.current_offset,
339                self.config.max_fetch_bytes,
340            )
341            .await?;
342
343        let end_of_stream =
344            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
345
346        let result = PollResult {
347            data: fetch_result.data,
348            current_offset: fetch_result.next_offset,
349            record_count: fetch_result.record_count,
350            end_of_stream,
351        };
352
353        // Advance offset only if we got data
354        if !result.is_empty() {
355            self.current_offset = fetch_result.next_offset;
356        }
357        self.cached_end_offset = Some(fetch_result.next_offset);
358
359        Ok(result)
360    }
361
362    /// Discover the current end offset by fetching from a very high offset
363    async fn discover_end_offset(&mut self) -> Result<u64> {
364        // Use cached value if available
365        if let Some(end) = self.cached_end_offset {
366            return Ok(end);
367        }
368
369        // Fetch from offset 0 to discover stream state
370        let fetch_result = self
371            .client
372            .fetch(
373                self.config.topic_id,
374                0,
375                1, // Minimal fetch just to get stream info
376            )
377            .await?;
378
379        // The next_offset tells us where data ends
380        // For empty streams, next_offset will be 0
381        let end_offset = fetch_result.next_offset;
382        self.cached_end_offset = Some(end_offset);
383        Ok(end_offset)
384    }
385
386    /// Commit the current offset to the offset store
387    ///
388    /// If an offset store is configured, this persists the current offset
389    /// so the consumer can resume from this position on restart.
390    ///
391    /// # Example
392    /// ```ignore
393    /// let records = consumer.poll().await?;
394    /// process(records);
395    /// consumer.commit().await?;  // Persist offset for crash recovery
396    /// ```
397    pub async fn commit(&mut self) -> Result<()> {
398        if let Some(ref store) = self.offset_store {
399            store.save(self.config.topic_id, self.consumer_id, self.current_offset)?;
400        }
401        Ok(())
402    }
403
404    /// Get the consumer ID
405    pub fn consumer_id(&self) -> u64 {
406        self.consumer_id
407    }
408
409    /// Check if this consumer has an offset store configured
410    pub fn has_offset_store(&self) -> bool {
411        self.offset_store.is_some()
412    }
413
414    /// Get access to the underlying client
415    pub fn client(&self) -> &LanceClient {
416        &self.client
417    }
418
419    /// Get mutable access to the underlying client
420    pub fn client_mut(&mut self) -> &mut LanceClient {
421        &mut self.client
422    }
423
424    /// Consume this consumer and return the underlying client
425    pub fn into_client(self) -> LanceClient {
426        self.client
427    }
428}
429
430impl std::fmt::Debug for Consumer {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        f.debug_struct("Consumer")
433            .field("topic_id", &self.config.topic_id)
434            .field("current_offset", &self.current_offset)
435            .field("max_fetch_bytes", &self.config.max_fetch_bytes)
436            .finish()
437    }
438}
439
440/// Configuration for a streaming consumer
441#[derive(Debug, Clone)]
442pub struct StreamingConsumerConfig {
443    /// Topic ID to consume from
444    pub topic_id: u32,
445    /// Maximum bytes per batch (default: 64KB)
446    pub max_batch_bytes: u32,
447    /// Starting position (default: Beginning)
448    pub start_position: SeekPosition,
449    /// Consumer group ID (for offset tracking)
450    pub consumer_group: Option<String>,
451    /// Auto-commit interval in milliseconds (0 = manual commit only)
452    pub auto_commit_interval_ms: u64,
453}
454
455impl Default for StreamingConsumerConfig {
456    fn default() -> Self {
457        Self {
458            topic_id: 0,
459            max_batch_bytes: 64 * 1024,
460            start_position: SeekPosition::Beginning,
461            consumer_group: None,
462            auto_commit_interval_ms: 5000, // 5 seconds
463        }
464    }
465}
466
467impl StreamingConsumerConfig {
468    /// Create a new streaming consumer configuration for the specified topic
469    pub fn new(topic_id: u32) -> Self {
470        Self {
471            topic_id,
472            ..Default::default()
473        }
474    }
475
476    /// Set the maximum bytes per batch
477    pub fn with_max_batch_bytes(mut self, bytes: u32) -> Self {
478        self.max_batch_bytes = bytes;
479        self
480    }
481
482    /// Set the starting position for consumption
483    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
484        self.start_position = position;
485        self
486    }
487
488    /// Set the consumer group for coordinated consumption
489    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
490        self.consumer_group = Some(group.into());
491        self
492    }
493
494    /// Set the auto-commit interval in milliseconds (0 = disabled)
495    pub fn with_auto_commit_interval(mut self, interval_ms: u64) -> Self {
496        self.auto_commit_interval_ms = interval_ms;
497        self
498    }
499}
500
501/// A streaming consumer that uses subscribe/unsubscribe signals
502///
503/// Unlike the poll-based `Consumer`, `StreamingConsumer` explicitly signals
504/// to the server when it starts and stops consuming, and reports position
505/// updates for consumer group tracking.
506///
507/// # Lifecycle
508///
509/// 1. **Subscribe** - Call `start()` to signal the server to begin streaming
510/// 2. **Consume** - Call `poll()` to receive batches of data
511/// 3. **Commit** - Call `commit()` to checkpoint progress (or use auto-commit)
512/// 4. **Unsubscribe** - Call `stop()` or drop the consumer to signal completion
513///
514/// # Example
515///
516/// ```text
517/// let client = LanceClient::connect_to("127.0.0.1:1992").await?;
518/// let config = StreamingConsumerConfig::new(topic_id)
519///     .with_consumer_group("my-group");
520/// let mut consumer = StreamingConsumer::new(client, config);
521///
522/// // Start streaming
523/// consumer.start().await?;
524///
525/// // Process records
526/// while let Some(result) = consumer.poll().await? {
527///     process_data(&result.data);
528///     consumer.commit().await?; // Checkpoint progress
529/// }
530///
531/// // Stop streaming
532/// consumer.stop().await?;
533/// ```
534pub struct StreamingConsumer {
535    client: LanceClient,
536    config: StreamingConsumerConfig,
537    consumer_id: u64,
538    current_offset: u64,
539    committed_offset: u64,
540    is_subscribed: bool,
541    last_commit_time: std::time::Instant,
542}
543
544impl StreamingConsumer {
545    /// Create a new streaming consumer
546    pub fn new(client: LanceClient, config: StreamingConsumerConfig) -> Self {
547        let consumer_id = Self::generate_consumer_id();
548        let initial_offset = match config.start_position {
549            SeekPosition::Beginning => 0,
550            SeekPosition::Offset(offset) => offset,
551            SeekPosition::End => u64::MAX,
552        };
553
554        Self {
555            client,
556            config,
557            consumer_id,
558            current_offset: initial_offset,
559            committed_offset: 0,
560            is_subscribed: false,
561            last_commit_time: std::time::Instant::now(),
562        }
563    }
564
565    /// Generate a unique consumer ID
566    fn generate_consumer_id() -> u64 {
567        use std::time::{SystemTime, UNIX_EPOCH};
568        let timestamp = SystemTime::now()
569            .duration_since(UNIX_EPOCH)
570            .map(|d| d.as_nanos() as u64)
571            .unwrap_or(0);
572        // Mix in thread ID for uniqueness
573        let thread_id = std::thread::current().id();
574        timestamp ^ (format!("{:?}", thread_id).len() as u64).wrapping_mul(0x517cc1b727220a95)
575    }
576
577    /// Start streaming from the topic
578    ///
579    /// Sends a Subscribe signal to the server indicating this consumer
580    /// wants to start receiving data from the configured position.
581    pub async fn start(&mut self) -> Result<()> {
582        if self.is_subscribed {
583            return Ok(());
584        }
585
586        let result = self
587            .client
588            .subscribe(
589                self.config.topic_id,
590                self.current_offset,
591                self.config.max_batch_bytes,
592                self.consumer_id,
593            )
594            .await?;
595
596        self.current_offset = result.start_offset;
597        self.is_subscribed = true;
598        self.last_commit_time = std::time::Instant::now();
599
600        Ok(())
601    }
602
603    /// Stop streaming from the topic
604    ///
605    /// Sends an Unsubscribe signal to the server. This should be called
606    /// when the consumer is done processing to free server resources.
607    pub async fn stop(&mut self) -> Result<()> {
608        if !self.is_subscribed {
609            return Ok(());
610        }
611
612        // Commit any uncommitted offset before stopping
613        if self.current_offset > self.committed_offset {
614            let _ = self.commit().await;
615        }
616
617        self.client
618            .unsubscribe(self.config.topic_id, self.consumer_id)
619            .await?;
620        self.is_subscribed = false;
621
622        Ok(())
623    }
624
625    /// Poll for the next batch of records
626    ///
627    /// Returns `Ok(Some(result))` if data was fetched, or `Ok(None)` if
628    /// no new data is available.
629    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
630        if !self.is_subscribed {
631            return Err(ClientError::ProtocolError(
632                "Consumer not subscribed - call start() first".to_string(),
633            ));
634        }
635
636        // Check if auto-commit is due
637        self.maybe_auto_commit().await?;
638
639        let fetch_result = self
640            .client
641            .fetch(
642                self.config.topic_id,
643                self.current_offset,
644                self.config.max_batch_bytes,
645            )
646            .await?;
647
648        let end_of_stream =
649            fetch_result.data.is_empty() || fetch_result.next_offset == self.current_offset;
650
651        let result = PollResult {
652            data: fetch_result.data,
653            current_offset: fetch_result.next_offset,
654            record_count: fetch_result.record_count,
655            end_of_stream,
656        };
657
658        // Advance offset
659        self.current_offset = fetch_result.next_offset;
660
661        if result.is_empty() {
662            Ok(None)
663        } else {
664            Ok(Some(result))
665        }
666    }
667
668    /// Commit the current offset to the server
669    ///
670    /// This checkpoints the consumer's position so that if it restarts,
671    /// it can resume from this point.
672    pub async fn commit(&mut self) -> Result<()> {
673        if self.current_offset <= self.committed_offset {
674            return Ok(());
675        }
676
677        let result = self
678            .client
679            .commit_offset(self.config.topic_id, self.consumer_id, self.current_offset)
680            .await?;
681
682        self.committed_offset = result.committed_offset;
683        self.last_commit_time = std::time::Instant::now();
684
685        Ok(())
686    }
687
688    /// Auto-commit if interval has elapsed
689    async fn maybe_auto_commit(&mut self) -> Result<()> {
690        if self.config.auto_commit_interval_ms == 0 {
691            return Ok(()); // Auto-commit disabled
692        }
693
694        let elapsed = self.last_commit_time.elapsed().as_millis() as u64;
695        if elapsed >= self.config.auto_commit_interval_ms {
696            self.commit().await?;
697        }
698
699        Ok(())
700    }
701
702    /// Seek to a new position
703    ///
704    /// Note: This will commit the current offset and restart the subscription
705    /// at the new position.
706    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
707        let was_subscribed = self.is_subscribed;
708
709        if was_subscribed {
710            self.stop().await?;
711        }
712
713        let new_offset = match position {
714            SeekPosition::Beginning => 0,
715            SeekPosition::Offset(offset) => offset,
716            SeekPosition::End => u64::MAX,
717        };
718
719        self.current_offset = new_offset;
720
721        if was_subscribed {
722            self.start().await?;
723        }
724
725        Ok(self.current_offset)
726    }
727
728    /// Get the current offset
729    pub fn current_offset(&self) -> u64 {
730        self.current_offset
731    }
732
733    /// Get the last committed offset
734    pub fn committed_offset(&self) -> u64 {
735        self.committed_offset
736    }
737
738    /// Get the consumer ID
739    pub fn consumer_id(&self) -> u64 {
740        self.consumer_id
741    }
742
743    /// Check if currently subscribed
744    pub fn is_subscribed(&self) -> bool {
745        self.is_subscribed
746    }
747
748    /// Get access to the underlying client
749    pub fn client(&self) -> &LanceClient {
750        &self.client
751    }
752
753    /// Consume this consumer and return the underlying client
754    pub async fn into_client(mut self) -> Result<LanceClient> {
755        self.stop().await?;
756        Ok(self.client)
757    }
758}
759
760impl std::fmt::Debug for StreamingConsumer {
761    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
762        f.debug_struct("StreamingConsumer")
763            .field("topic_id", &self.config.topic_id)
764            .field("consumer_id", &self.consumer_id)
765            .field("current_offset", &self.current_offset)
766            .field("committed_offset", &self.committed_offset)
767            .field("is_subscribed", &self.is_subscribed)
768            .finish()
769    }
770}
771
772#[cfg(test)]
773#[allow(clippy::unwrap_used)]
774mod tests {
775    use super::*;
776
777    #[test]
778    fn test_consumer_config_default() {
779        let config = ConsumerConfig::default();
780        assert_eq!(config.topic_id, 0);
781        assert_eq!(config.max_fetch_bytes, 64 * 1024);
782        assert_eq!(config.start_position, SeekPosition::Beginning);
783    }
784
785    #[test]
786    fn test_consumer_config_builder() {
787        let config = ConsumerConfig::new(42)
788            .with_max_fetch_bytes(128 * 1024)
789            .with_start_position(SeekPosition::Offset(1000));
790
791        assert_eq!(config.topic_id, 42);
792        assert_eq!(config.max_fetch_bytes, 128 * 1024);
793        assert_eq!(config.start_position, SeekPosition::Offset(1000));
794    }
795
796    #[test]
797    fn test_poll_result_is_empty() {
798        let empty = PollResult {
799            data: bytes::Bytes::new(),
800            current_offset: 0,
801            record_count: 0,
802            end_of_stream: true,
803        };
804        assert!(empty.is_empty());
805
806        let non_empty = PollResult {
807            data: bytes::Bytes::from_static(&[1, 2, 3]),
808            current_offset: 3,
809            record_count: 1,
810            end_of_stream: false,
811        };
812        assert!(!non_empty.is_empty());
813    }
814
815    // =========================================================================
816    // StreamingConsumerConfig Tests
817    // =========================================================================
818
819    #[test]
820    fn test_streaming_consumer_config_default() {
821        let config = StreamingConsumerConfig::default();
822        assert_eq!(config.topic_id, 0);
823        assert_eq!(config.max_batch_bytes, 64 * 1024);
824        assert_eq!(config.start_position, SeekPosition::Beginning);
825        assert!(config.consumer_group.is_none());
826        assert_eq!(config.auto_commit_interval_ms, 5000);
827    }
828
829    #[test]
830    fn test_streaming_consumer_config_builder() {
831        let config = StreamingConsumerConfig::new(42)
832            .with_max_batch_bytes(128 * 1024)
833            .with_start_position(SeekPosition::Offset(5000))
834            .with_consumer_group("my-group")
835            .with_auto_commit_interval(10000);
836
837        assert_eq!(config.topic_id, 42);
838        assert_eq!(config.max_batch_bytes, 128 * 1024);
839        assert_eq!(config.start_position, SeekPosition::Offset(5000));
840        assert_eq!(config.consumer_group, Some("my-group".to_string()));
841        assert_eq!(config.auto_commit_interval_ms, 10000);
842    }
843
844    #[test]
845    fn test_streaming_consumer_config_disable_auto_commit() {
846        let config = StreamingConsumerConfig::new(1).with_auto_commit_interval(0);
847
848        assert_eq!(config.auto_commit_interval_ms, 0);
849    }
850
851    #[test]
852    fn test_streaming_consumer_config_seek_positions() {
853        let beginning =
854            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Beginning);
855        assert_eq!(beginning.start_position, SeekPosition::Beginning);
856
857        let end = StreamingConsumerConfig::new(1).with_start_position(SeekPosition::End);
858        assert_eq!(end.start_position, SeekPosition::End);
859
860        let offset =
861            StreamingConsumerConfig::new(1).with_start_position(SeekPosition::Offset(12345));
862        assert_eq!(offset.start_position, SeekPosition::Offset(12345));
863    }
864
865    // =========================================================================
866    // SeekPosition Tests
867    // =========================================================================
868
869    #[test]
870    fn test_seek_position_equality() {
871        assert_eq!(SeekPosition::Beginning, SeekPosition::Beginning);
872        assert_eq!(SeekPosition::End, SeekPosition::End);
873        assert_eq!(SeekPosition::Offset(100), SeekPosition::Offset(100));
874
875        assert_ne!(SeekPosition::Beginning, SeekPosition::End);
876        assert_ne!(SeekPosition::Offset(100), SeekPosition::Offset(200));
877        assert_ne!(SeekPosition::Beginning, SeekPosition::Offset(0));
878    }
879
880    #[test]
881    fn test_seek_position_clone() {
882        let pos = SeekPosition::Offset(42);
883        let cloned = pos;
884        assert_eq!(pos, cloned);
885    }
886}