Skip to main content

lnc_client/
standalone.rs

1//! Standalone Consumer Mode
2//!
3//! Provides a simplified, Kafka-like consumer API for independent consumption.
4//! In standalone mode, each consumer operates independently without coordination
5//! with other consumers. This is the simplest consumption pattern.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
11//! use std::path::Path;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     // Connect and start consuming
16//!     let mut consumer = StandaloneConsumer::connect(
17//!         "127.0.0.1:1992",
18//!         StandaloneConfig::new("my-topic-id", 1)
19//!             .with_consumer_id("my-consumer")
20//!             .with_offset_dir(Path::new("/var/lib/lance/offsets")),
21//!     ).await?;
22//!
23//!     // Receive records
24//!     loop {
25//!         if let Some(records) = consumer.next_batch().await? {
26//!             for record in records.data.chunks(256) {
27//!                 // Process record
28//!             }
29//!             consumer.commit().await?;
30//!         }
31//!     }
32//! }
33//! ```
34
35use std::collections::hash_map::DefaultHasher;
36use std::hash::{Hash, Hasher};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use crate::client::{ClientConfig, LanceClient};
42use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
43use crate::error::Result;
44use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
45
46/// Configuration for standalone consumer mode
47#[derive(Debug, Clone)]
48pub struct StandaloneConfig {
49    /// Identifier for this consumer (used for offset storage)
50    pub consumer_id: String,
51    /// Topic ID to consume from
52    pub topic_id: u32,
53    /// Maximum bytes to fetch per poll
54    pub max_fetch_bytes: u32,
55    /// Starting position if no stored offset exists
56    pub start_position: SeekPosition,
57    /// Directory for offset persistence (None = in-memory only)
58    pub offset_dir: Option<PathBuf>,
59    /// Auto-commit interval (None = manual commit only)
60    pub auto_commit_interval: Option<Duration>,
61    /// Connection timeout
62    pub connect_timeout: Duration,
63    /// Poll timeout for blocking operations
64    pub poll_timeout: Duration,
65}
66
67impl StandaloneConfig {
68    /// Create a new standalone config for the given topic
69    pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
70        Self {
71            consumer_id: consumer_id.into(),
72            topic_id,
73            max_fetch_bytes: 1_048_576, // 1MB default
74            start_position: SeekPosition::Beginning,
75            offset_dir: None,
76            auto_commit_interval: Some(Duration::from_secs(5)),
77            connect_timeout: Duration::from_secs(30),
78            poll_timeout: Duration::from_millis(100),
79        }
80    }
81
82    /// Set the consumer ID for offset tracking
83    pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
84        self.consumer_id = id.into();
85        self
86    }
87
88    /// Set the maximum bytes to fetch per poll
89    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
90        self.max_fetch_bytes = bytes;
91        self
92    }
93
94    /// Set the starting position when no stored offset exists
95    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
96        self.start_position = position;
97        self
98    }
99
100    /// Set the directory for offset persistence
101    /// If not set, offsets are stored in memory only (lost on restart)
102    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
103        self.offset_dir = Some(dir.to_path_buf());
104        self
105    }
106
107    /// Set auto-commit interval (None to disable auto-commit)
108    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
109        self.auto_commit_interval = interval;
110        self
111    }
112
113    /// Disable auto-commit (manual commit only)
114    pub fn with_manual_commit(mut self) -> Self {
115        self.auto_commit_interval = None;
116        self
117    }
118
119    /// Set connection timeout
120    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
121        self.connect_timeout = timeout;
122        self
123    }
124
125    /// Set poll timeout
126    pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
127        self.poll_timeout = timeout;
128        self
129    }
130}
131
132/// Standalone consumer for independent consumption
133///
134/// This consumer operates independently without coordination with other consumers.
135/// It manages its own offset tracking and persistence.
136///
137/// # Kafka Equivalence
138///
139/// This is similar to a Kafka consumer with `enable.auto.commit=true` and no
140/// consumer group coordination. Each StandaloneConsumer instance tracks its
141/// own progress independently.
142pub struct StandaloneConsumer {
143    inner: StreamingConsumer,
144    config: StandaloneConfig,
145    offset_store: Arc<dyn OffsetStore>,
146    last_commit_time: Instant,
147    pending_offset: u64,
148    committed_offset: u64,
149}
150
151impl StandaloneConsumer {
152    /// Connect to a LANCE server and start consuming
153    ///
154    /// This will:
155    /// 1. Establish connection to the server
156    /// 2. Load any previously committed offset from storage
157    /// 3. Subscribe to the topic starting from the stored offset (or start_position)
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if:
162    /// - Connection to the server fails
163    /// - Offset store cannot be initialized
164    /// - Subscription fails
165    pub async fn connect(addr: &str, config: StandaloneConfig) -> Result<Self> {
166        let mut client_config = ClientConfig::new(addr);
167        client_config.connect_timeout = config.connect_timeout;
168
169        let client = LanceClient::connect(client_config).await?;
170        Self::from_client(client, config).await
171    }
172
173    /// Create a standalone consumer from an existing client connection
174    pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
175        // Generate numeric consumer ID from string name
176        let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
177
178        // Initialize offset store
179        let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
180            Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
181        } else {
182            Arc::new(MemoryOffsetStore::new())
183        };
184
185        // Load stored offset
186        let stored_offset = offset_store
187            .load(config.topic_id, numeric_consumer_id)
188            .ok()
189            .flatten();
190
191        // Determine starting position
192        let start_position = if let Some(offset) = stored_offset {
193            SeekPosition::Offset(offset)
194        } else {
195            config.start_position
196        };
197
198        // Create streaming consumer config
199        let streaming_config = StreamingConsumerConfig::new(config.topic_id)
200            .with_max_batch_bytes(config.max_fetch_bytes)
201            .with_start_position(start_position)
202            .with_auto_commit_interval(0); // We handle auto-commit ourselves
203
204        let mut inner = StreamingConsumer::new(client, streaming_config);
205
206        // Start the subscription
207        inner.start().await?;
208
209        let current_offset = inner.current_offset();
210
211        Ok(Self {
212            inner,
213            config,
214            offset_store,
215            last_commit_time: Instant::now(),
216            pending_offset: current_offset,
217            committed_offset: stored_offset.unwrap_or(0),
218        })
219    }
220
221    /// Receive the next batch for an active subscription.
222    ///
223    /// Returns `Ok(Some(PollResult))` if records are available,
224    /// `Ok(None)` if no records are ready (non-blocking behavior).
225    ///
226    /// This method may trigger auto-commit if configured.
227    pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
228        // Check for auto-commit
229        self.maybe_auto_commit().await?;
230
231        // Pull from the inner streaming consumer
232        let result = self.inner.next_batch().await?;
233
234        if let Some(ref poll_result) = result {
235            self.pending_offset = poll_result.current_offset;
236        }
237
238        Ok(result)
239    }
240
241    /// Primary consume interface alias.
242    #[inline]
243    pub async fn consume(&mut self) -> Result<Option<PollResult>> {
244        self.next_batch().await
245    }
246
247    /// Compatibility wrapper for callers still using polling terminology.
248    #[inline]
249    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
250        self.next_batch().await
251    }
252
253    /// Poll for records, blocking until data is available or timeout
254    ///
255    /// This is a convenience method that retries polling until records
256    /// are available or the configured poll_timeout is reached.
257    pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
258        let deadline = Instant::now() + timeout;
259
260        while Instant::now() < deadline {
261            if let Some(result) = self.next_batch().await? {
262                return Ok(Some(result));
263            }
264            // Small sleep to avoid busy-waiting
265            tokio::time::sleep(Duration::from_millis(10)).await;
266        }
267
268        Ok(None)
269    }
270
271    /// Commit the current offset
272    ///
273    /// This persists the current consumption progress to the offset store.
274    /// If the consumer crashes and restarts, it will resume from this offset.
275    pub async fn commit(&mut self) -> Result<()> {
276        self.commit_offset(self.pending_offset).await
277    }
278
279    /// Commit a specific offset
280    ///
281    /// Use this for fine-grained control over exactly which offset is committed.
282    pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
283        // Generate numeric consumer ID
284        let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
285
286        // Persist to offset store
287        self.offset_store
288            .save(self.config.topic_id, numeric_consumer_id, offset)?;
289
290        self.committed_offset = offset;
291        self.last_commit_time = Instant::now();
292
293        // Also commit to server for visibility (optional, for monitoring)
294        let _ = self.inner.commit().await;
295
296        Ok(())
297    }
298
299    /// Seek to a specific position
300    ///
301    /// This changes the consumption position. The next poll will start
302    /// from the new position.
303    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
304        let offset = self.inner.seek(position).await?;
305        self.pending_offset = offset;
306        Ok(offset)
307    }
308
309    /// Get the current consumption offset (may not be committed yet)
310    pub fn current_offset(&self) -> u64 {
311        self.pending_offset
312    }
313
314    /// Get the last committed offset
315    pub fn committed_offset(&self) -> u64 {
316        self.committed_offset
317    }
318
319    /// Get the consumer ID
320    pub fn consumer_id(&self) -> &str {
321        &self.config.consumer_id
322    }
323
324    /// Get the topic ID being consumed
325    pub fn topic_id(&self) -> u32 {
326        self.config.topic_id
327    }
328
329    /// Check if the consumer is actively subscribed
330    pub fn is_subscribed(&self) -> bool {
331        self.inner.is_subscribed()
332    }
333
334    /// Get a reference to the underlying client
335    pub fn client(&self) -> &LanceClient {
336        self.inner.client()
337    }
338
339    /// Stop the consumer and release resources
340    ///
341    /// This will:
342    /// 1. Commit any pending offset (if auto-commit is enabled)
343    /// 2. Unsubscribe from the topic
344    /// 3. Close the connection
345    pub async fn close(mut self) -> Result<LanceClient> {
346        // Final commit if we have pending changes
347        if self.pending_offset > self.committed_offset {
348            let _ = self.commit().await;
349        }
350
351        self.inner.into_client().await
352    }
353
354    /// Hash a string consumer ID to a numeric ID
355    fn hash_consumer_id(consumer_id: &str) -> u64 {
356        let mut hasher = DefaultHasher::new();
357        consumer_id.hash(&mut hasher);
358        hasher.finish()
359    }
360
361    /// Check and perform auto-commit if interval has elapsed
362    async fn maybe_auto_commit(&mut self) -> Result<()> {
363        if let Some(interval) = self.config.auto_commit_interval {
364            if self.last_commit_time.elapsed() >= interval {
365                if self.pending_offset > self.committed_offset {
366                    self.commit().await?;
367                } else {
368                    // Update time even if no commit needed
369                    self.last_commit_time = Instant::now();
370                }
371            }
372        }
373        Ok(())
374    }
375}
376
377impl std::fmt::Debug for StandaloneConsumer {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.debug_struct("StandaloneConsumer")
380            .field("consumer_id", &self.config.consumer_id)
381            .field("topic_id", &self.config.topic_id)
382            .field("pending_offset", &self.pending_offset)
383            .field("committed_offset", &self.committed_offset)
384            .field("is_subscribed", &self.inner.is_subscribed())
385            .finish()
386    }
387}
388
389/// Builder for creating multiple standalone consumers with shared configuration
390pub struct StandaloneConsumerBuilder {
391    addr: String,
392    base_config: StandaloneConfig,
393}
394
395impl StandaloneConsumerBuilder {
396    /// Create a new builder
397    pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
398        Self {
399            addr: addr.into(),
400            base_config: StandaloneConfig::new(consumer_id, 0),
401        }
402    }
403
404    /// Set the offset storage directory
405    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
406        self.base_config = self.base_config.with_offset_dir(dir);
407        self
408    }
409
410    /// Set the max fetch bytes
411    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
412        self.base_config = self.base_config.with_max_fetch_bytes(bytes);
413        self
414    }
415
416    /// Set the start position for new consumers
417    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
418        self.base_config = self.base_config.with_start_position(position);
419        self
420    }
421
422    /// Set auto-commit interval
423    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
424        self.base_config = self.base_config.with_auto_commit_interval(interval);
425        self
426    }
427
428    /// Build a consumer for a specific topic
429    pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
430        let mut config = self.base_config.clone();
431        config.topic_id = topic_id;
432        StandaloneConsumer::connect(&self.addr, config).await
433    }
434}
435
436#[cfg(test)]
437#[allow(clippy::unwrap_used)]
438mod tests {
439    use super::*;
440
441    #[test]
442    fn test_standalone_config_defaults() {
443        let config = StandaloneConfig::new("test-consumer", 1);
444
445        assert_eq!(config.consumer_id, "test-consumer");
446        assert_eq!(config.topic_id, 1);
447        assert_eq!(config.max_fetch_bytes, 1_048_576);
448        assert!(config.offset_dir.is_none());
449        assert!(config.auto_commit_interval.is_some());
450    }
451
452    #[test]
453    fn test_standalone_config_builder() {
454        let config = StandaloneConfig::new("test", 1)
455            .with_max_fetch_bytes(512 * 1024)
456            .with_offset_dir(Path::new("/tmp/offsets"))
457            .with_manual_commit()
458            .with_start_position(SeekPosition::End);
459
460        assert_eq!(config.max_fetch_bytes, 512 * 1024);
461        assert!(config.offset_dir.is_some());
462        assert!(config.auto_commit_interval.is_none());
463    }
464
465    #[test]
466    fn test_standalone_config_with_auto_commit() {
467        let config = StandaloneConfig::new("test", 1)
468            .with_auto_commit_interval(Some(Duration::from_secs(10)));
469
470        assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
471    }
472}