Skip to main content

lnc_client/
standalone.rs

1//! Standalone Consumer Mode
2//!
3//! Provides a simplified, Kafka-like consumer API for independent consumption.
4//! In standalone mode, each consumer operates independently without coordination
5//! with other consumers. This is the simplest consumption pattern.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
11//! use std::path::Path;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     // Connect and start consuming
16//!     let mut consumer = StandaloneConsumer::connect(
17//!         "127.0.0.1:1992",
18//!         StandaloneConfig::new("my-topic-id", 1)
19//!             .with_consumer_id("my-consumer")
20//!             .with_offset_dir(Path::new("/var/lib/lance/offsets")),
21//!     ).await?;
22//!
23//!     // Poll for records
24//!     loop {
25//!         if let Some(records) = consumer.poll().await? {
26//!             for record in records.data.chunks(256) {
27//!                 // Process record
28//!             }
29//!             consumer.commit().await?;
30//!         }
31//!     }
32//! }
33//! ```
34
35use std::collections::hash_map::DefaultHasher;
36use std::hash::{Hash, Hasher};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use crate::client::{ClientConfig, LanceClient};
42use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
43use crate::error::Result;
44use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
45
46/// Configuration for standalone consumer mode
47#[derive(Debug, Clone)]
48pub struct StandaloneConfig {
49    /// Identifier for this consumer (used for offset storage)
50    pub consumer_id: String,
51    /// Topic ID to consume from
52    pub topic_id: u32,
53    /// Maximum bytes to fetch per poll
54    pub max_fetch_bytes: u32,
55    /// Starting position if no stored offset exists
56    pub start_position: SeekPosition,
57    /// Directory for offset persistence (None = in-memory only)
58    pub offset_dir: Option<PathBuf>,
59    /// Auto-commit interval (None = manual commit only)
60    pub auto_commit_interval: Option<Duration>,
61    /// Connection timeout
62    pub connect_timeout: Duration,
63    /// Poll timeout for blocking operations
64    pub poll_timeout: Duration,
65}
66
67impl StandaloneConfig {
68    /// Create a new standalone config for the given topic
69    pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
70        Self {
71            consumer_id: consumer_id.into(),
72            topic_id,
73            max_fetch_bytes: 1_048_576, // 1MB default
74            start_position: SeekPosition::Beginning,
75            offset_dir: None,
76            auto_commit_interval: Some(Duration::from_secs(5)),
77            connect_timeout: Duration::from_secs(30),
78            poll_timeout: Duration::from_millis(100),
79        }
80    }
81
82    /// Set the consumer ID for offset tracking
83    pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
84        self.consumer_id = id.into();
85        self
86    }
87
88    /// Set the maximum bytes to fetch per poll
89    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
90        self.max_fetch_bytes = bytes;
91        self
92    }
93
94    /// Set the starting position when no stored offset exists
95    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
96        self.start_position = position;
97        self
98    }
99
100    /// Set the directory for offset persistence
101    /// If not set, offsets are stored in memory only (lost on restart)
102    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
103        self.offset_dir = Some(dir.to_path_buf());
104        self
105    }
106
107    /// Set auto-commit interval (None to disable auto-commit)
108    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
109        self.auto_commit_interval = interval;
110        self
111    }
112
113    /// Disable auto-commit (manual commit only)
114    pub fn with_manual_commit(mut self) -> Self {
115        self.auto_commit_interval = None;
116        self
117    }
118
119    /// Set connection timeout
120    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
121        self.connect_timeout = timeout;
122        self
123    }
124
125    /// Set poll timeout
126    pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
127        self.poll_timeout = timeout;
128        self
129    }
130}
131
132/// Standalone consumer for independent consumption
133///
134/// This consumer operates independently without coordination with other consumers.
135/// It manages its own offset tracking and persistence.
136///
137/// # Kafka Equivalence
138///
139/// This is similar to a Kafka consumer with `enable.auto.commit=true` and no
140/// consumer group coordination. Each StandaloneConsumer instance tracks its
141/// own progress independently.
142pub struct StandaloneConsumer {
143    inner: StreamingConsumer,
144    config: StandaloneConfig,
145    offset_store: Arc<dyn OffsetStore>,
146    last_commit_time: Instant,
147    pending_offset: u64,
148    committed_offset: u64,
149}
150
151impl StandaloneConsumer {
152    /// Connect to a LANCE server and start consuming
153    ///
154    /// This will:
155    /// 1. Establish connection to the server
156    /// 2. Load any previously committed offset from storage
157    /// 3. Subscribe to the topic starting from the stored offset (or start_position)
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if:
162    /// - Connection to the server fails
163    /// - Offset store cannot be initialized
164    /// - Subscription fails
165    pub async fn connect(addr: &str, config: StandaloneConfig) -> Result<Self> {
166        let mut client_config = ClientConfig::new(addr);
167        client_config.connect_timeout = config.connect_timeout;
168
169        let client = LanceClient::connect(client_config).await?;
170        Self::from_client(client, config).await
171    }
172
173    /// Create a standalone consumer from an existing client connection
174    pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
175        // Generate numeric consumer ID from string name
176        let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
177
178        // Initialize offset store
179        let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
180            Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
181        } else {
182            Arc::new(MemoryOffsetStore::new())
183        };
184
185        // Load stored offset
186        let stored_offset = offset_store
187            .load(config.topic_id, numeric_consumer_id)
188            .ok()
189            .flatten();
190
191        // Determine starting position
192        let start_position = if let Some(offset) = stored_offset {
193            SeekPosition::Offset(offset)
194        } else {
195            config.start_position
196        };
197
198        // Create streaming consumer config
199        let streaming_config = StreamingConsumerConfig::new(config.topic_id)
200            .with_max_batch_bytes(config.max_fetch_bytes)
201            .with_start_position(start_position)
202            .with_auto_commit_interval(0); // We handle auto-commit ourselves
203
204        let mut inner = StreamingConsumer::new(client, streaming_config);
205
206        // Start the subscription
207        inner.start().await?;
208
209        let current_offset = inner.current_offset();
210
211        Ok(Self {
212            inner,
213            config,
214            offset_store,
215            last_commit_time: Instant::now(),
216            pending_offset: current_offset,
217            committed_offset: stored_offset.unwrap_or(0),
218        })
219    }
220
221    /// Poll for new records
222    ///
223    /// Returns `Ok(Some(PollResult))` if records are available,
224    /// `Ok(None)` if no records are ready (non-blocking behavior).
225    ///
226    /// This method may trigger auto-commit if configured.
227    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
228        // Check for auto-commit
229        self.maybe_auto_commit().await?;
230
231        // Poll the inner consumer
232        let result = self.inner.poll().await?;
233
234        if let Some(ref poll_result) = result {
235            self.pending_offset = poll_result.current_offset;
236        }
237
238        Ok(result)
239    }
240
241    /// Poll for records, blocking until data is available or timeout
242    ///
243    /// This is a convenience method that retries polling until records
244    /// are available or the configured poll_timeout is reached.
245    pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
246        let deadline = Instant::now() + timeout;
247
248        while Instant::now() < deadline {
249            if let Some(result) = self.poll().await? {
250                return Ok(Some(result));
251            }
252            // Small sleep to avoid busy-waiting
253            tokio::time::sleep(Duration::from_millis(10)).await;
254        }
255
256        Ok(None)
257    }
258
259    /// Commit the current offset
260    ///
261    /// This persists the current consumption progress to the offset store.
262    /// If the consumer crashes and restarts, it will resume from this offset.
263    pub async fn commit(&mut self) -> Result<()> {
264        self.commit_offset(self.pending_offset).await
265    }
266
267    /// Commit a specific offset
268    ///
269    /// Use this for fine-grained control over exactly which offset is committed.
270    pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
271        // Generate numeric consumer ID
272        let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
273
274        // Persist to offset store
275        self.offset_store
276            .save(self.config.topic_id, numeric_consumer_id, offset)?;
277
278        self.committed_offset = offset;
279        self.last_commit_time = Instant::now();
280
281        // Also commit to server for visibility (optional, for monitoring)
282        let _ = self.inner.commit().await;
283
284        Ok(())
285    }
286
287    /// Seek to a specific position
288    ///
289    /// This changes the consumption position. The next poll will start
290    /// from the new position.
291    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
292        let offset = self.inner.seek(position).await?;
293        self.pending_offset = offset;
294        Ok(offset)
295    }
296
297    /// Get the current consumption offset (may not be committed yet)
298    pub fn current_offset(&self) -> u64 {
299        self.pending_offset
300    }
301
302    /// Get the last committed offset
303    pub fn committed_offset(&self) -> u64 {
304        self.committed_offset
305    }
306
307    /// Get the consumer ID
308    pub fn consumer_id(&self) -> &str {
309        &self.config.consumer_id
310    }
311
312    /// Get the topic ID being consumed
313    pub fn topic_id(&self) -> u32 {
314        self.config.topic_id
315    }
316
317    /// Check if the consumer is actively subscribed
318    pub fn is_subscribed(&self) -> bool {
319        self.inner.is_subscribed()
320    }
321
322    /// Get a reference to the underlying client
323    pub fn client(&self) -> &LanceClient {
324        self.inner.client()
325    }
326
327    /// Stop the consumer and release resources
328    ///
329    /// This will:
330    /// 1. Commit any pending offset (if auto-commit is enabled)
331    /// 2. Unsubscribe from the topic
332    /// 3. Close the connection
333    pub async fn close(mut self) -> Result<LanceClient> {
334        // Final commit if we have pending changes
335        if self.pending_offset > self.committed_offset {
336            let _ = self.commit().await;
337        }
338
339        self.inner.into_client().await
340    }
341
342    /// Hash a string consumer ID to a numeric ID
343    fn hash_consumer_id(consumer_id: &str) -> u64 {
344        let mut hasher = DefaultHasher::new();
345        consumer_id.hash(&mut hasher);
346        hasher.finish()
347    }
348
349    /// Check and perform auto-commit if interval has elapsed
350    async fn maybe_auto_commit(&mut self) -> Result<()> {
351        if let Some(interval) = self.config.auto_commit_interval {
352            if self.last_commit_time.elapsed() >= interval {
353                if self.pending_offset > self.committed_offset {
354                    self.commit().await?;
355                } else {
356                    // Update time even if no commit needed
357                    self.last_commit_time = Instant::now();
358                }
359            }
360        }
361        Ok(())
362    }
363}
364
365impl std::fmt::Debug for StandaloneConsumer {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.debug_struct("StandaloneConsumer")
368            .field("consumer_id", &self.config.consumer_id)
369            .field("topic_id", &self.config.topic_id)
370            .field("pending_offset", &self.pending_offset)
371            .field("committed_offset", &self.committed_offset)
372            .field("is_subscribed", &self.inner.is_subscribed())
373            .finish()
374    }
375}
376
377/// Builder for creating multiple standalone consumers with shared configuration
378pub struct StandaloneConsumerBuilder {
379    addr: String,
380    base_config: StandaloneConfig,
381}
382
383impl StandaloneConsumerBuilder {
384    /// Create a new builder
385    pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
386        Self {
387            addr: addr.into(),
388            base_config: StandaloneConfig::new(consumer_id, 0),
389        }
390    }
391
392    /// Set the offset storage directory
393    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
394        self.base_config = self.base_config.with_offset_dir(dir);
395        self
396    }
397
398    /// Set the max fetch bytes
399    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
400        self.base_config = self.base_config.with_max_fetch_bytes(bytes);
401        self
402    }
403
404    /// Set the start position for new consumers
405    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
406        self.base_config = self.base_config.with_start_position(position);
407        self
408    }
409
410    /// Set auto-commit interval
411    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
412        self.base_config = self.base_config.with_auto_commit_interval(interval);
413        self
414    }
415
416    /// Build a consumer for a specific topic
417    pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
418        let mut config = self.base_config.clone();
419        config.topic_id = topic_id;
420        StandaloneConsumer::connect(&self.addr, config).await
421    }
422}
423
424#[cfg(test)]
425#[allow(clippy::unwrap_used)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn test_standalone_config_defaults() {
431        let config = StandaloneConfig::new("test-consumer", 1);
432
433        assert_eq!(config.consumer_id, "test-consumer");
434        assert_eq!(config.topic_id, 1);
435        assert_eq!(config.max_fetch_bytes, 1_048_576);
436        assert!(config.offset_dir.is_none());
437        assert!(config.auto_commit_interval.is_some());
438    }
439
440    #[test]
441    fn test_standalone_config_builder() {
442        let config = StandaloneConfig::new("test", 1)
443            .with_max_fetch_bytes(512 * 1024)
444            .with_offset_dir(Path::new("/tmp/offsets"))
445            .with_manual_commit()
446            .with_start_position(SeekPosition::End);
447
448        assert_eq!(config.max_fetch_bytes, 512 * 1024);
449        assert!(config.offset_dir.is_some());
450        assert!(config.auto_commit_interval.is_none());
451    }
452
453    #[test]
454    fn test_standalone_config_with_auto_commit() {
455        let config = StandaloneConfig::new("test", 1)
456            .with_auto_commit_interval(Some(Duration::from_secs(10)));
457
458        assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
459    }
460}