Skip to main content

lnc_client/
offset.rs

1//! Client-side offset persistence for LANCE consumers
2//!
3//! Provides traits and implementations for storing consumer offsets locally,
4//! enabling stateless broker architecture per the Client-Managed Offset Strategy.
5//!
6//! # Design Philosophy
7//!
8//! LANCE treats the broker as a stateless data pipe. Offset tracking is the
9//! client's responsibility, enabling:
10//! - Exactly-once semantics (store offset with business data atomically)
11//! - Stateless horizontal scaling
12//! - Reduced broker metadata overhead
13//!
14//! # Example
15//!
16//! ```ignore
17//! use lnc_client::offset::{OffsetStore, LockFileOffsetStore};
18//! use std::path::Path;
19//!
20//! let store = LockFileOffsetStore::open(
21//!     Path::new("/var/lib/lance/offsets"),
22//!     "my-consumer",
23//! )?;
24//!
25//! // Load stored offset for a topic
26//! let offset = store.load(topic_id, consumer_id).unwrap_or(0);
27//!
28//! // After processing, save the new offset
29//! store.save(topic_id, consumer_id, new_offset)?;
30//! ```
31
32use std::collections::HashMap;
33use std::fs::{self, File, OpenOptions};
34use std::io::{Read, Write};
35use std::path::{Path, PathBuf};
36use std::sync::{Arc, RwLock};
37
38use crate::error::{ClientError, Result};
39
40/// Trait for client-side offset persistence
41///
42/// Implementations store consumer offsets locally, allowing consumers to resume
43/// from their last position after restarts without server-side state.
44pub trait OffsetStore: Send + Sync {
45    /// Load the stored offset for a topic and consumer
46    ///
47    /// Returns `None` if no offset has been stored (first run).
48    fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>>;
49
50    /// Save the current offset for a topic and consumer
51    ///
52    /// This should be called after successfully processing records.
53    fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()>;
54
55    /// Delete stored offset for a topic and consumer
56    ///
57    /// Use when resetting consumer position or cleaning up.
58    fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()>;
59
60    /// List all stored offsets
61    ///
62    /// Returns a map of (topic_id, consumer_id) -> offset
63    fn list_all(&self) -> Result<HashMap<(u32, u64), u64>>;
64}
65
66/// In-memory offset store for testing and ephemeral consumers
67///
68/// Offsets are lost when the process exits. Use for testing or when
69/// offset persistence is handled externally (e.g., in a database transaction).
70#[derive(Debug, Default, Clone)]
71pub struct MemoryOffsetStore {
72    offsets: Arc<RwLock<HashMap<(u32, u64), u64>>>,
73}
74
75impl MemoryOffsetStore {
76    /// Create a new in-memory offset store
77    pub fn new() -> Self {
78        Self {
79            offsets: Arc::new(RwLock::new(HashMap::new())),
80        }
81    }
82}
83
84impl OffsetStore for MemoryOffsetStore {
85    fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
86        let offsets = self.offsets.read().map_err(|e| {
87            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
88        })?;
89        Ok(offsets.get(&(topic_id, consumer_id)).copied())
90    }
91
92    fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
93        let mut offsets = self.offsets.write().map_err(|e| {
94            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
95        })?;
96        offsets.insert((topic_id, consumer_id), offset);
97        Ok(())
98    }
99
100    fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
101        let mut offsets = self.offsets.write().map_err(|e| {
102            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
103        })?;
104        offsets.remove(&(topic_id, consumer_id));
105        Ok(())
106    }
107
108    fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
109        let offsets = self.offsets.read().map_err(|e| {
110            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
111        })?;
112        Ok(offsets.clone())
113    }
114}
115
116/// File-based offset store with lock file protection
117///
118/// Stores offsets in individual files under a base directory:
119/// ```text
120/// {base_dir}/{consumer_name}/
121/// ├── topic-{topic_id}-consumer-{consumer_id}.offset
122/// └── ...
123/// ```
124///
125/// Uses file locking (flock on Unix, LockFile on Windows) to ensure
126/// safe concurrent access from multiple processes.
127#[derive(Debug)]
128pub struct LockFileOffsetStore {
129    base_dir: PathBuf,
130    consumer_name: String,
131    cache: RwLock<HashMap<(u32, u64), u64>>,
132}
133
134impl LockFileOffsetStore {
135    /// Open or create an offset store at the given directory
136    ///
137    /// # Arguments
138    /// * `base_dir` - Base directory for offset files
139    /// * `consumer_name` - Name for this consumer instance (subdirectory name)
140    ///
141    /// # Example
142    /// ```ignore
143    /// let store = LockFileOffsetStore::open(
144    ///     Path::new("/var/lib/lance/offsets"),
145    ///     "my-service",
146    /// )?;
147    /// ```
148    pub fn open(base_dir: &Path, consumer_name: &str) -> Result<Self> {
149        let dir = base_dir.join(consumer_name);
150        fs::create_dir_all(&dir).map_err(ClientError::IoError)?;
151
152        let store = Self {
153            base_dir: dir,
154            consumer_name: consumer_name.to_string(),
155            cache: RwLock::new(HashMap::new()),
156        };
157
158        // Pre-load existing offsets into cache
159        store.load_all_into_cache()?;
160
161        Ok(store)
162    }
163
164    /// Get the file path for a specific topic/consumer offset
165    fn offset_file_path(&self, topic_id: u32, consumer_id: u64) -> PathBuf {
166        self.base_dir.join(format!(
167            "topic-{}-consumer-{}.offset",
168            topic_id, consumer_id
169        ))
170    }
171
172    /// Get the lock file path for a specific topic/consumer
173    fn lock_file_path(&self, topic_id: u32, consumer_id: u64) -> PathBuf {
174        self.base_dir
175            .join(format!("topic-{}-consumer-{}.lock", topic_id, consumer_id))
176    }
177
178    /// Load all existing offsets into the cache
179    fn load_all_into_cache(&self) -> Result<()> {
180        let mut cache = self.cache.write().map_err(|e| {
181            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
182        })?;
183
184        let entries = match fs::read_dir(&self.base_dir) {
185            Ok(entries) => entries,
186            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
187            Err(e) => return Err(ClientError::IoError(e)),
188        };
189
190        for entry in entries.flatten() {
191            let path = entry.path();
192            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
193                if name.ends_with(".offset") {
194                    // Parse topic-{id}-consumer-{id}.offset
195                    if let Some((topic_id, consumer_id)) = Self::parse_offset_filename(name) {
196                        if let Ok(offset) = Self::read_offset_file(&path) {
197                            cache.insert((topic_id, consumer_id), offset);
198                        }
199                    }
200                }
201            }
202        }
203
204        Ok(())
205    }
206
207    /// Parse offset filename to extract topic_id and consumer_id
208    fn parse_offset_filename(name: &str) -> Option<(u32, u64)> {
209        // Format: topic-{topic_id}-consumer-{consumer_id}.offset
210        let name = name.strip_suffix(".offset")?;
211        let parts: Vec<&str> = name.split('-').collect();
212        if parts.len() >= 4 && parts[0] == "topic" && parts[2] == "consumer" {
213            let topic_id: u32 = parts[1].parse().ok()?;
214            let consumer_id: u64 = parts[3].parse().ok()?;
215            Some((topic_id, consumer_id))
216        } else {
217            None
218        }
219    }
220
221    /// Read offset from a file
222    fn read_offset_file(path: &Path) -> Result<u64> {
223        let mut file = File::open(path).map_err(ClientError::IoError)?;
224        let mut content = String::new();
225        file.read_to_string(&mut content)
226            .map_err(ClientError::IoError)?;
227        content
228            .trim()
229            .parse()
230            .map_err(|e| ClientError::IoError(std::io::Error::other(e)))
231    }
232
233    /// Write offset to a file atomically
234    fn write_offset_file(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
235        let path = self.offset_file_path(topic_id, consumer_id);
236        let lock_path = self.lock_file_path(topic_id, consumer_id);
237        let temp_path = path.with_extension("offset.tmp");
238
239        // Acquire lock file
240        let lock_file = OpenOptions::new()
241            .write(true)
242            .create(true)
243            .truncate(true)
244            .open(&lock_path)
245            .map_err(ClientError::IoError)?;
246
247        // Platform-specific file locking
248        #[cfg(unix)]
249        {
250            use std::os::unix::io::AsRawFd;
251            let fd = lock_file.as_raw_fd();
252            // SAFETY: fd is valid file descriptor, flock with LOCK_EX is safe
253            unsafe {
254                if libc::flock(fd, libc::LOCK_EX) != 0 {
255                    return Err(ClientError::IoError(std::io::Error::last_os_error()));
256                }
257            }
258        }
259
260        #[cfg(windows)]
261        {
262            use std::os::windows::io::AsRawHandle;
263            use windows_sys::Win32::Foundation::HANDLE;
264            use windows_sys::Win32::Storage::FileSystem::{LOCKFILE_EXCLUSIVE_LOCK, LockFileEx};
265            let handle = lock_file.as_raw_handle() as HANDLE;
266            // SAFETY: handle is valid from as_raw_handle, OVERLAPPED is zeroed correctly
267            unsafe {
268                let mut overlapped =
269                    std::mem::zeroed::<windows_sys::Win32::System::IO::OVERLAPPED>();
270                if LockFileEx(
271                    handle,
272                    LOCKFILE_EXCLUSIVE_LOCK,
273                    0,
274                    u32::MAX,
275                    u32::MAX,
276                    &mut overlapped,
277                ) == 0
278                {
279                    return Err(ClientError::IoError(std::io::Error::last_os_error()));
280                }
281            }
282        }
283
284        // Write to temp file
285        let mut temp_file = File::create(&temp_path).map_err(ClientError::IoError)?;
286        writeln!(temp_file, "{}", offset).map_err(ClientError::IoError)?;
287        temp_file.sync_all().map_err(ClientError::IoError)?;
288        drop(temp_file);
289
290        // Atomic rename
291        fs::rename(&temp_path, &path).map_err(ClientError::IoError)?;
292
293        // Lock is released when lock_file is dropped
294        drop(lock_file);
295
296        Ok(())
297    }
298
299    /// Get the consumer name
300    pub fn consumer_name(&self) -> &str {
301        &self.consumer_name
302    }
303
304    /// Get the base directory
305    pub fn base_dir(&self) -> &Path {
306        &self.base_dir
307    }
308}
309
310impl OffsetStore for LockFileOffsetStore {
311    fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
312        // Check cache first
313        {
314            let cache = self.cache.read().map_err(|e| {
315                ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
316            })?;
317            if let Some(&offset) = cache.get(&(topic_id, consumer_id)) {
318                return Ok(Some(offset));
319            }
320        }
321
322        // Try to read from file
323        let path = self.offset_file_path(topic_id, consumer_id);
324        if !path.exists() {
325            return Ok(None);
326        }
327
328        let offset = Self::read_offset_file(&path)?;
329
330        // Update cache
331        {
332            let mut cache = self.cache.write().map_err(|e| {
333                ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
334            })?;
335            cache.insert((topic_id, consumer_id), offset);
336        }
337
338        Ok(Some(offset))
339    }
340
341    fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
342        // Write to file
343        self.write_offset_file(topic_id, consumer_id, offset)?;
344
345        // Update cache
346        {
347            let mut cache = self.cache.write().map_err(|e| {
348                ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
349            })?;
350            cache.insert((topic_id, consumer_id), offset);
351        }
352
353        Ok(())
354    }
355
356    fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
357        let path = self.offset_file_path(topic_id, consumer_id);
358        let lock_path = self.lock_file_path(topic_id, consumer_id);
359
360        // Remove from cache
361        {
362            let mut cache = self.cache.write().map_err(|e| {
363                ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
364            })?;
365            cache.remove(&(topic_id, consumer_id));
366        }
367
368        // Delete files (ignore if not found)
369        let _ = fs::remove_file(&path);
370        let _ = fs::remove_file(&lock_path);
371
372        Ok(())
373    }
374
375    fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
376        let cache = self.cache.read().map_err(|e| {
377            ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
378        })?;
379        Ok(cache.clone())
380    }
381}
382
383/// Information about a committed offset
384#[derive(Debug, Clone)]
385pub struct CommitInfo {
386    /// Topic ID
387    pub topic_id: u32,
388    /// Consumer ID
389    pub consumer_id: u64,
390    /// Committed offset
391    pub offset: u64,
392    /// Previous offset (if known)
393    pub previous_offset: Option<u64>,
394    /// Timestamp of commit
395    pub timestamp: std::time::SystemTime,
396}
397
398/// Trait for post-commit hooks
399///
400/// Implement this trait to receive notifications after offsets are committed.
401/// Use cases include: external offset persistence, metrics, audit logging.
402pub trait PostCommitHook: Send + Sync {
403    /// Called after an offset is successfully committed
404    ///
405    /// This method should not block for extended periods. For expensive
406    /// operations, consider spawning a task or using a channel.
407    fn on_commit(&self, info: &CommitInfo) -> Result<()>;
408}
409
410/// Offset store wrapper that invokes post-commit hooks
411///
412/// Wraps an inner `OffsetStore` and calls registered hooks after each commit.
413pub struct HookedOffsetStore<S: OffsetStore> {
414    inner: S,
415    hooks: Vec<Arc<dyn PostCommitHook>>,
416    previous_offsets: RwLock<HashMap<(u32, u64), u64>>,
417}
418
419impl<S: OffsetStore> HookedOffsetStore<S> {
420    /// Create a new hooked offset store
421    pub fn new(inner: S) -> Self {
422        Self {
423            inner,
424            hooks: Vec::new(),
425            previous_offsets: RwLock::new(HashMap::new()),
426        }
427    }
428
429    /// Add a post-commit hook
430    pub fn add_hook(mut self, hook: Arc<dyn PostCommitHook>) -> Self {
431        self.hooks.push(hook);
432        self
433    }
434
435    /// Add multiple post-commit hooks
436    pub fn with_hooks(mut self, hooks: Vec<Arc<dyn PostCommitHook>>) -> Self {
437        self.hooks.extend(hooks);
438        self
439    }
440
441    /// Get the inner store
442    pub fn inner(&self) -> &S {
443        &self.inner
444    }
445
446    /// Invoke all registered hooks
447    fn invoke_hooks(&self, info: &CommitInfo) -> Result<()> {
448        for hook in &self.hooks {
449            hook.on_commit(info)?;
450        }
451        Ok(())
452    }
453}
454
455impl<S: OffsetStore> OffsetStore for HookedOffsetStore<S> {
456    fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
457        let offset = self.inner.load(topic_id, consumer_id)?;
458
459        // Track for previous_offset in commits
460        if let Some(off) = offset {
461            if let Ok(mut prev) = self.previous_offsets.write() {
462                prev.insert((topic_id, consumer_id), off);
463            }
464        }
465
466        Ok(offset)
467    }
468
469    fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
470        // Get previous offset
471        let previous_offset = self
472            .previous_offsets
473            .read()
474            .ok()
475            .and_then(|prev| prev.get(&(topic_id, consumer_id)).copied());
476
477        // Save to inner store
478        self.inner.save(topic_id, consumer_id, offset)?;
479
480        // Update previous offset tracking
481        if let Ok(mut prev) = self.previous_offsets.write() {
482            prev.insert((topic_id, consumer_id), offset);
483        }
484
485        // Invoke hooks
486        let info = CommitInfo {
487            topic_id,
488            consumer_id,
489            offset,
490            previous_offset,
491            timestamp: std::time::SystemTime::now(),
492        };
493        self.invoke_hooks(&info)?;
494
495        Ok(())
496    }
497
498    fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
499        // Remove from previous tracking
500        if let Ok(mut prev) = self.previous_offsets.write() {
501            prev.remove(&(topic_id, consumer_id));
502        }
503
504        self.inner.delete(topic_id, consumer_id)
505    }
506
507    fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
508        self.inner.list_all()
509    }
510}
511
512/// A simple logging hook for debugging
513#[derive(Debug, Default)]
514pub struct LoggingCommitHook;
515
516impl PostCommitHook for LoggingCommitHook {
517    fn on_commit(&self, info: &CommitInfo) -> Result<()> {
518        tracing::debug!(
519            topic_id = info.topic_id,
520            consumer_id = info.consumer_id,
521            offset = info.offset,
522            previous = ?info.previous_offset,
523            "Offset committed"
524        );
525        Ok(())
526    }
527}
528
529/// A hook that collects commit events for testing
530#[derive(Debug, Default)]
531pub struct CollectingCommitHook {
532    commits: RwLock<Vec<CommitInfo>>,
533}
534
535impl CollectingCommitHook {
536    /// Create a new collecting hook
537    pub fn new() -> Self {
538        Self::default()
539    }
540
541    /// Get all collected commits
542    pub fn commits(&self) -> Vec<CommitInfo> {
543        self.commits.read().map(|c| c.clone()).unwrap_or_default()
544    }
545
546    /// Clear collected commits
547    pub fn clear(&self) {
548        if let Ok(mut commits) = self.commits.write() {
549            commits.clear();
550        }
551    }
552}
553
554impl PostCommitHook for CollectingCommitHook {
555    fn on_commit(&self, info: &CommitInfo) -> Result<()> {
556        if let Ok(mut commits) = self.commits.write() {
557            commits.push(info.clone());
558        }
559        Ok(())
560    }
561}
562
563#[cfg(test)]
564#[allow(clippy::unwrap_used)]
565mod tests {
566    use super::*;
567    use tempfile::TempDir;
568
569    #[test]
570    fn test_memory_offset_store() {
571        let store = MemoryOffsetStore::new();
572
573        // Initially empty
574        assert!(store.load(1, 100).unwrap().is_none());
575
576        // Save and load
577        store.save(1, 100, 42).unwrap();
578        assert_eq!(store.load(1, 100).unwrap(), Some(42));
579
580        // Update
581        store.save(1, 100, 100).unwrap();
582        assert_eq!(store.load(1, 100).unwrap(), Some(100));
583
584        // Different topic/consumer
585        store.save(2, 200, 999).unwrap();
586        assert_eq!(store.load(2, 200).unwrap(), Some(999));
587        assert_eq!(store.load(1, 100).unwrap(), Some(100));
588
589        // List all
590        let all = store.list_all().unwrap();
591        assert_eq!(all.len(), 2);
592        assert_eq!(all.get(&(1, 100)), Some(&100));
593        assert_eq!(all.get(&(2, 200)), Some(&999));
594
595        // Delete
596        store.delete(1, 100).unwrap();
597        assert!(store.load(1, 100).unwrap().is_none());
598        assert_eq!(store.load(2, 200).unwrap(), Some(999));
599    }
600
601    #[test]
602    fn test_lock_file_offset_store() {
603        let temp_dir = TempDir::new().unwrap();
604        let store = LockFileOffsetStore::open(temp_dir.path(), "test-consumer").unwrap();
605
606        // Initially empty
607        assert!(store.load(1, 100).unwrap().is_none());
608
609        // Save and load
610        store.save(1, 100, 12345).unwrap();
611        assert_eq!(store.load(1, 100).unwrap(), Some(12345));
612
613        // Verify file exists
614        let file_path = store.offset_file_path(1, 100);
615        assert!(file_path.exists());
616
617        // Read file content
618        let content = fs::read_to_string(&file_path).unwrap();
619        assert_eq!(content.trim(), "12345");
620
621        // Update
622        store.save(1, 100, 67890).unwrap();
623        assert_eq!(store.load(1, 100).unwrap(), Some(67890));
624
625        // Delete
626        store.delete(1, 100).unwrap();
627        assert!(store.load(1, 100).unwrap().is_none());
628        assert!(!file_path.exists());
629    }
630
631    #[test]
632    fn test_lock_file_offset_store_persistence() {
633        let temp_dir = TempDir::new().unwrap();
634
635        // Create store and save offset
636        {
637            let store = LockFileOffsetStore::open(temp_dir.path(), "persist-test").unwrap();
638            store.save(5, 500, 99999).unwrap();
639        }
640
641        // Reopen and verify offset is still there
642        {
643            let store = LockFileOffsetStore::open(temp_dir.path(), "persist-test").unwrap();
644            assert_eq!(store.load(5, 500).unwrap(), Some(99999));
645        }
646    }
647
648    #[test]
649    fn test_parse_offset_filename() {
650        assert_eq!(
651            LockFileOffsetStore::parse_offset_filename("topic-1-consumer-100.offset"),
652            Some((1, 100))
653        );
654        assert_eq!(
655            LockFileOffsetStore::parse_offset_filename("topic-999-consumer-12345.offset"),
656            Some((999, 12345))
657        );
658        assert_eq!(
659            LockFileOffsetStore::parse_offset_filename("invalid.offset"),
660            None
661        );
662        assert_eq!(
663            LockFileOffsetStore::parse_offset_filename("topic-abc-consumer-100.offset"),
664            None
665        );
666    }
667
668    #[test]
669    fn test_hooked_offset_store() {
670        let inner = MemoryOffsetStore::new();
671        let hook = Arc::new(CollectingCommitHook::new());
672        let store = HookedOffsetStore::new(inner).add_hook(hook.clone());
673
674        // Save triggers hook
675        store.save(1, 100, 42).unwrap();
676
677        let commits = hook.commits();
678        assert_eq!(commits.len(), 1);
679        assert_eq!(commits[0].topic_id, 1);
680        assert_eq!(commits[0].consumer_id, 100);
681        assert_eq!(commits[0].offset, 42);
682        assert!(commits[0].previous_offset.is_none());
683
684        // Second save has previous offset
685        store.save(1, 100, 100).unwrap();
686
687        let commits = hook.commits();
688        assert_eq!(commits.len(), 2);
689        assert_eq!(commits[1].offset, 100);
690        assert_eq!(commits[1].previous_offset, Some(42));
691
692        // Different topic/consumer
693        store.save(2, 200, 500).unwrap();
694        assert_eq!(hook.commits().len(), 3);
695
696        // Load returns correct values
697        assert_eq!(store.load(1, 100).unwrap(), Some(100));
698        assert_eq!(store.load(2, 200).unwrap(), Some(500));
699
700        // Clear and verify
701        hook.clear();
702        assert!(hook.commits().is_empty());
703    }
704
705    #[test]
706    fn test_collecting_commit_hook() {
707        let hook = CollectingCommitHook::new();
708
709        let info = CommitInfo {
710            topic_id: 1,
711            consumer_id: 100,
712            offset: 42,
713            previous_offset: None,
714            timestamp: std::time::SystemTime::now(),
715        };
716
717        hook.on_commit(&info).unwrap();
718
719        let commits = hook.commits();
720        assert_eq!(commits.len(), 1);
721        assert_eq!(commits[0].topic_id, 1);
722
723        hook.clear();
724        assert!(hook.commits().is_empty());
725    }
726}