streamweave_offset/
offset.rs

1//! Offset tracking for resumable pipeline processing.
2//!
3//! This module provides abstractions for tracking processing offsets,
4//! enabling pipelines to resume from where they left off after restarts.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::{self, Display};
10use std::fs;
11use std::io::{self};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, RwLock};
14
15/// Represents a processing offset.
16///
17/// Offsets can be sequence numbers, timestamps, or custom values.
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum Offset {
20  /// A sequence number offset.
21  Sequence(u64),
22  /// A timestamp-based offset.
23  Timestamp(DateTime<Utc>),
24  /// A custom string offset.
25  Custom(String),
26  /// Represents the beginning of a stream.
27  #[default]
28  Earliest,
29  /// Represents the end of a stream (latest).
30  Latest,
31}
32
33impl Display for Offset {
34  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35    match self {
36      Offset::Sequence(n) => write!(f, "seq:{}", n),
37      Offset::Timestamp(ts) => write!(f, "ts:{}", ts.to_rfc3339()),
38      Offset::Custom(s) => write!(f, "custom:{}", s),
39      Offset::Earliest => write!(f, "earliest"),
40      Offset::Latest => write!(f, "latest"),
41    }
42  }
43}
44
45impl Offset {
46  /// Creates a sequence offset.
47  pub fn sequence(n: u64) -> Self {
48    Offset::Sequence(n)
49  }
50
51  /// Creates a timestamp offset.
52  pub fn timestamp(ts: DateTime<Utc>) -> Self {
53    Offset::Timestamp(ts)
54  }
55
56  /// Creates a custom string offset.
57  pub fn custom(s: impl Into<String>) -> Self {
58    Offset::Custom(s.into())
59  }
60
61  /// Increments a sequence offset by one.
62  /// Returns None for non-sequence offsets.
63  pub fn increment(&self) -> Option<Self> {
64    match self {
65      Offset::Sequence(n) => Some(Offset::Sequence(n + 1)),
66      _ => None,
67    }
68  }
69
70  /// Returns true if this is the earliest offset.
71  pub fn is_earliest(&self) -> bool {
72    matches!(self, Offset::Earliest)
73  }
74
75  /// Returns true if this is the latest offset.
76  pub fn is_latest(&self) -> bool {
77    matches!(self, Offset::Latest)
78  }
79}
80
81/// Policy for resetting offsets when no committed offset is found.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
83pub enum OffsetResetPolicy {
84  /// Start from the earliest available offset.
85  #[default]
86  Earliest,
87  /// Start from the latest available offset.
88  Latest,
89  /// Fail if no offset is found.
90  None,
91}
92
93/// Strategy for committing offsets.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub enum CommitStrategy {
96  /// Automatically commit after each item is processed.
97  #[default]
98  Auto,
99  /// Commit periodically based on count.
100  Periodic(usize),
101  /// Only commit when explicitly requested.
102  Manual,
103}
104
105/// Error type for offset operations.
106#[derive(Debug)]
107pub enum OffsetError {
108  /// IO error during persistence.
109  IoError(io::Error),
110  /// Serialization/deserialization error.
111  SerializationError(String),
112  /// Source not found.
113  SourceNotFound(String),
114  /// Lock acquisition failed.
115  LockError(String),
116  /// Invalid offset format.
117  InvalidOffset(String),
118}
119
120impl Display for OffsetError {
121  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122    match self {
123      OffsetError::IoError(e) => write!(f, "IO error: {}", e),
124      OffsetError::SerializationError(s) => write!(f, "Serialization error: {}", s),
125      OffsetError::SourceNotFound(s) => write!(f, "Source not found: {}", s),
126      OffsetError::LockError(s) => write!(f, "Lock error: {}", s),
127      OffsetError::InvalidOffset(s) => write!(f, "Invalid offset: {}", s),
128    }
129  }
130}
131
132impl std::error::Error for OffsetError {}
133
134impl From<io::Error> for OffsetError {
135  fn from(err: io::Error) -> Self {
136    OffsetError::IoError(err)
137  }
138}
139
140/// Result type for offset operations.
141pub type OffsetResult<T> = Result<T, OffsetError>;
142
143/// Trait for offset storage backends.
144///
145/// Implementations of this trait handle persisting and retrieving offsets.
146pub trait OffsetStore: Send + Sync + std::fmt::Debug {
147  /// Get the committed offset for a source.
148  fn get(&self, source: &str) -> OffsetResult<Option<Offset>>;
149
150  /// Commit an offset for a source.
151  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()>;
152
153  /// Get all committed offsets.
154  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>>;
155
156  /// Clear the offset for a source.
157  fn clear(&self, source: &str) -> OffsetResult<()>;
158
159  /// Clear all offsets.
160  fn clear_all(&self) -> OffsetResult<()>;
161}
162
163/// In-memory offset store.
164///
165/// This is useful for testing or scenarios where persistence across restarts
166/// is not required.
167#[derive(Debug, Clone, Default)]
168pub struct InMemoryOffsetStore {
169  offsets: Arc<RwLock<HashMap<String, Offset>>>,
170}
171
172impl InMemoryOffsetStore {
173  /// Creates a new in-memory offset store.
174  pub fn new() -> Self {
175    Self::default()
176  }
177
178  /// Creates an in-memory offset store with initial offsets.
179  pub fn with_offsets(offsets: HashMap<String, Offset>) -> Self {
180    Self {
181      offsets: Arc::new(RwLock::new(offsets)),
182    }
183  }
184}
185
186impl OffsetStore for InMemoryOffsetStore {
187  fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
188    let offsets = self
189      .offsets
190      .read()
191      .map_err(|e| OffsetError::LockError(e.to_string()))?;
192    Ok(offsets.get(source).cloned())
193  }
194
195  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
196    let mut offsets = self
197      .offsets
198      .write()
199      .map_err(|e| OffsetError::LockError(e.to_string()))?;
200    offsets.insert(source.to_string(), offset);
201    Ok(())
202  }
203
204  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
205    let offsets = self
206      .offsets
207      .read()
208      .map_err(|e| OffsetError::LockError(e.to_string()))?;
209    Ok(offsets.clone())
210  }
211
212  fn clear(&self, source: &str) -> OffsetResult<()> {
213    let mut offsets = self
214      .offsets
215      .write()
216      .map_err(|e| OffsetError::LockError(e.to_string()))?;
217    offsets.remove(source);
218    Ok(())
219  }
220
221  fn clear_all(&self) -> OffsetResult<()> {
222    let mut offsets = self
223      .offsets
224      .write()
225      .map_err(|e| OffsetError::LockError(e.to_string()))?;
226    offsets.clear();
227    Ok(())
228  }
229}
230
231/// File-based offset store.
232///
233/// Persists offsets to a JSON file on disk.
234///
235/// File-based offset store implementation.
236#[derive(Debug, Clone)]
237pub struct FileOffsetStore {
238  path: PathBuf,
239  cache: Arc<RwLock<HashMap<String, Offset>>>,
240}
241
242impl FileOffsetStore {
243  /// Creates a new file-based offset store.
244  pub fn new<P: AsRef<Path>>(path: P) -> OffsetResult<Self> {
245    let path = path.as_ref().to_path_buf();
246
247    // Load existing offsets if the file exists
248    let cache = if path.exists() {
249      let data = fs::read_to_string(&path)?;
250      if data.is_empty() {
251        HashMap::new()
252      } else {
253        serde_json::from_str(&data).map_err(|e| OffsetError::SerializationError(e.to_string()))?
254      }
255    } else {
256      HashMap::new()
257    };
258
259    Ok(Self {
260      path,
261      cache: Arc::new(RwLock::new(cache)),
262    })
263  }
264
265  /// Persists the current offsets to disk.
266  fn persist(&self, offsets: &HashMap<String, Offset>) -> OffsetResult<()> {
267    // Ensure parent directory exists
268    if let Some(parent) = self.path.parent() {
269      fs::create_dir_all(parent)?;
270    }
271
272    let data = serde_json::to_string_pretty(offsets)
273      .map_err(|e| OffsetError::SerializationError(e.to_string()))?;
274    fs::write(&self.path, data)?;
275    Ok(())
276  }
277
278  /// Returns the path to the offset file.
279  pub fn path(&self) -> &Path {
280    &self.path
281  }
282}
283
284impl OffsetStore for FileOffsetStore {
285  fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
286    let cache = self
287      .cache
288      .read()
289      .map_err(|e| OffsetError::LockError(e.to_string()))?;
290    Ok(cache.get(source).cloned())
291  }
292
293  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
294    let mut cache = self
295      .cache
296      .write()
297      .map_err(|e| OffsetError::LockError(e.to_string()))?;
298    cache.insert(source.to_string(), offset);
299    self.persist(&cache)?;
300    Ok(())
301  }
302
303  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
304    let cache = self
305      .cache
306      .read()
307      .map_err(|e| OffsetError::LockError(e.to_string()))?;
308    Ok(cache.clone())
309  }
310
311  fn clear(&self, source: &str) -> OffsetResult<()> {
312    let mut cache = self
313      .cache
314      .write()
315      .map_err(|e| OffsetError::LockError(e.to_string()))?;
316    cache.remove(source);
317    self.persist(&cache)?;
318    Ok(())
319  }
320
321  fn clear_all(&self) -> OffsetResult<()> {
322    let mut cache = self
323      .cache
324      .write()
325      .map_err(|e| OffsetError::LockError(e.to_string()))?;
326    cache.clear();
327    self.persist(&cache)?;
328    Ok(())
329  }
330}
331
332/// Tracks processing offsets with configurable commit strategies.
333///
334/// The `OffsetTracker` wraps an `OffsetStore` and provides convenient
335/// methods for tracking and committing offsets based on the configured
336/// strategy.
337#[derive(Debug)]
338pub struct OffsetTracker {
339  store: Box<dyn OffsetStore>,
340  strategy: CommitStrategy,
341  reset_policy: OffsetResetPolicy,
342  pending: Arc<RwLock<HashMap<String, (Offset, usize)>>>,
343}
344
345impl OffsetTracker {
346  /// Creates a new offset tracker with the given store and default settings.
347  pub fn new(store: Box<dyn OffsetStore>) -> Self {
348    Self {
349      store,
350      strategy: CommitStrategy::default(),
351      reset_policy: OffsetResetPolicy::default(),
352      pending: Arc::new(RwLock::new(HashMap::new())),
353    }
354  }
355
356  /// Creates a new offset tracker with the specified commit strategy.
357  pub fn with_strategy(store: Box<dyn OffsetStore>, strategy: CommitStrategy) -> Self {
358    Self {
359      store,
360      strategy,
361      reset_policy: OffsetResetPolicy::default(),
362      pending: Arc::new(RwLock::new(HashMap::new())),
363    }
364  }
365
366  /// Sets the offset reset policy.
367  pub fn with_reset_policy(mut self, policy: OffsetResetPolicy) -> Self {
368    self.reset_policy = policy;
369    self
370  }
371
372  /// Gets the current committed offset for a source, applying the reset policy
373  /// if no offset is found.
374  pub fn get_offset(&self, source: &str) -> OffsetResult<Offset> {
375    match self.store.get(source)? {
376      Some(offset) => Ok(offset),
377      None => match self.reset_policy {
378        OffsetResetPolicy::Earliest => Ok(Offset::Earliest),
379        OffsetResetPolicy::Latest => Ok(Offset::Latest),
380        OffsetResetPolicy::None => Err(OffsetError::SourceNotFound(source.to_string())),
381      },
382    }
383  }
384
385  /// Records that an offset has been processed.
386  ///
387  /// Based on the commit strategy, this may immediately commit the offset
388  /// or hold it for later batch commit.
389  pub fn record(&self, source: &str, offset: Offset) -> OffsetResult<()> {
390    match self.strategy {
391      CommitStrategy::Auto => {
392        self.store.commit(source, offset)?;
393      }
394      CommitStrategy::Periodic(interval) => {
395        let mut pending = self
396          .pending
397          .write()
398          .map_err(|e| OffsetError::LockError(e.to_string()))?;
399
400        let entry = pending
401          .entry(source.to_string())
402          .or_insert((offset.clone(), 0));
403        entry.0 = offset;
404        entry.1 += 1;
405
406        if entry.1 >= interval {
407          let offset_to_commit = entry.0.clone();
408          entry.1 = 0;
409          drop(pending); // Release lock before committing
410          self.store.commit(source, offset_to_commit)?;
411        }
412      }
413      CommitStrategy::Manual => {
414        let mut pending = self
415          .pending
416          .write()
417          .map_err(|e| OffsetError::LockError(e.to_string()))?;
418        let entry = pending
419          .entry(source.to_string())
420          .or_insert((offset.clone(), 0));
421        entry.0 = offset;
422        entry.1 += 1;
423      }
424    }
425    Ok(())
426  }
427
428  /// Commits the pending offset for a specific source.
429  ///
430  /// This is useful for manual commit strategy or when forcing a commit.
431  pub fn commit(&self, source: &str) -> OffsetResult<()> {
432    let pending_offset = {
433      let pending = self
434        .pending
435        .read()
436        .map_err(|e| OffsetError::LockError(e.to_string()))?;
437      pending.get(source).map(|(o, _)| o.clone())
438    };
439
440    if let Some(offset) = pending_offset {
441      self.store.commit(source, offset)?;
442      let mut pending = self
443        .pending
444        .write()
445        .map_err(|e| OffsetError::LockError(e.to_string()))?;
446      if let Some(entry) = pending.get_mut(source) {
447        entry.1 = 0;
448      }
449    }
450    Ok(())
451  }
452
453  /// Commits all pending offsets.
454  pub fn commit_all(&self) -> OffsetResult<()> {
455    let sources: Vec<String> = {
456      let pending = self
457        .pending
458        .read()
459        .map_err(|e| OffsetError::LockError(e.to_string()))?;
460      pending.keys().cloned().collect()
461    };
462
463    for source in sources {
464      self.commit(&source)?;
465    }
466    Ok(())
467  }
468
469  /// Resets the offset for a source to the specified value.
470  pub fn reset(&self, source: &str, offset: Offset) -> OffsetResult<()> {
471    self.store.commit(source, offset)
472  }
473
474  /// Clears the offset for a source.
475  pub fn clear(&self, source: &str) -> OffsetResult<()> {
476    self.store.clear(source)?;
477    let mut pending = self
478      .pending
479      .write()
480      .map_err(|e| OffsetError::LockError(e.to_string()))?;
481    pending.remove(source);
482    Ok(())
483  }
484
485  /// Returns the current commit strategy.
486  pub fn strategy(&self) -> CommitStrategy {
487    self.strategy
488  }
489
490  /// Returns the current reset policy.
491  pub fn reset_policy(&self) -> OffsetResetPolicy {
492    self.reset_policy
493  }
494
495  /// Gets all committed offsets.
496  pub fn get_all_committed(&self) -> OffsetResult<HashMap<String, Offset>> {
497    self.store.get_all()
498  }
499
500  /// Gets all pending offsets (not yet committed).
501  pub fn get_all_pending(&self) -> OffsetResult<HashMap<String, Offset>> {
502    let pending = self
503      .pending
504      .read()
505      .map_err(|e| OffsetError::LockError(e.to_string()))?;
506    Ok(
507      pending
508        .iter()
509        .map(|(k, (o, _))| (k.clone(), o.clone()))
510        .collect(),
511    )
512  }
513}
514
515#[cfg(test)]
516mod tests {
517  use super::*;
518  use tempfile::tempdir;
519
520  // Offset tests
521  #[test]
522  fn test_offset_display() {
523    assert_eq!(Offset::Sequence(42).to_string(), "seq:42");
524    assert_eq!(Offset::Earliest.to_string(), "earliest");
525    assert_eq!(Offset::Latest.to_string(), "latest");
526    assert_eq!(Offset::Custom("foo".to_string()).to_string(), "custom:foo");
527  }
528
529  #[test]
530  fn test_offset_increment() {
531    assert_eq!(Offset::Sequence(0).increment(), Some(Offset::Sequence(1)));
532    assert_eq!(Offset::Sequence(42).increment(), Some(Offset::Sequence(43)));
533    assert_eq!(Offset::Earliest.increment(), None);
534    assert_eq!(Offset::Latest.increment(), None);
535    assert_eq!(Offset::Custom("x".to_string()).increment(), None);
536  }
537
538  #[test]
539  fn test_offset_is_earliest_latest() {
540    assert!(Offset::Earliest.is_earliest());
541    assert!(!Offset::Latest.is_earliest());
542    assert!(Offset::Latest.is_latest());
543    assert!(!Offset::Earliest.is_latest());
544  }
545
546  #[test]
547  fn test_offset_default() {
548    assert_eq!(Offset::default(), Offset::Earliest);
549  }
550
551  // InMemoryOffsetStore tests
552  #[test]
553  fn test_in_memory_store_basic() {
554    let store = InMemoryOffsetStore::new();
555
556    assert!(store.get("source1").unwrap().is_none());
557
558    store.commit("source1", Offset::Sequence(10)).unwrap();
559    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
560
561    store.commit("source1", Offset::Sequence(20)).unwrap();
562    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(20)));
563  }
564
565  #[test]
566  fn test_in_memory_store_multiple_sources() {
567    let store = InMemoryOffsetStore::new();
568
569    store.commit("source1", Offset::Sequence(10)).unwrap();
570    store.commit("source2", Offset::Sequence(20)).unwrap();
571
572    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
573    assert_eq!(store.get("source2").unwrap(), Some(Offset::Sequence(20)));
574
575    let all = store.get_all().unwrap();
576    assert_eq!(all.len(), 2);
577  }
578
579  #[test]
580  fn test_in_memory_store_clear() {
581    let store = InMemoryOffsetStore::new();
582
583    store.commit("source1", Offset::Sequence(10)).unwrap();
584    store.commit("source2", Offset::Sequence(20)).unwrap();
585
586    store.clear("source1").unwrap();
587    assert!(store.get("source1").unwrap().is_none());
588    assert_eq!(store.get("source2").unwrap(), Some(Offset::Sequence(20)));
589
590    store.clear_all().unwrap();
591    assert!(store.get_all().unwrap().is_empty());
592  }
593
594  #[test]
595  fn test_in_memory_store_with_initial_offsets() {
596    let mut initial = HashMap::new();
597    initial.insert("source1".to_string(), Offset::Sequence(100));
598
599    let store = InMemoryOffsetStore::with_offsets(initial);
600    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(100)));
601  }
602
603  // FileOffsetStore tests (native only)
604  #[test]
605  fn test_file_store_basic() {
606    let dir = tempdir().unwrap();
607    let path = dir.path().join("offsets.json");
608
609    let store = FileOffsetStore::new(&path).unwrap();
610
611    assert!(store.get("source1").unwrap().is_none());
612
613    store.commit("source1", Offset::Sequence(10)).unwrap();
614    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
615
616    // Verify persistence
617    let store2 = FileOffsetStore::new(&path).unwrap();
618    assert_eq!(store2.get("source1").unwrap(), Some(Offset::Sequence(10)));
619  }
620
621  #[test]
622  fn test_file_store_clear() {
623    let dir = tempdir().unwrap();
624    let path = dir.path().join("offsets.json");
625
626    let store = FileOffsetStore::new(&path).unwrap();
627
628    store.commit("source1", Offset::Sequence(10)).unwrap();
629    store.commit("source2", Offset::Sequence(20)).unwrap();
630
631    store.clear("source1").unwrap();
632
633    // Verify persistence of clear
634    let store2 = FileOffsetStore::new(&path).unwrap();
635    assert!(store2.get("source1").unwrap().is_none());
636    assert_eq!(store2.get("source2").unwrap(), Some(Offset::Sequence(20)));
637  }
638
639  #[test]
640  fn test_file_store_creates_parent_dirs() {
641    let dir = tempdir().unwrap();
642    let path = dir.path().join("nested/dir/offsets.json");
643
644    let store = FileOffsetStore::new(&path).unwrap();
645    store.commit("source1", Offset::Sequence(10)).unwrap();
646
647    assert!(path.exists());
648  }
649
650  // OffsetTracker tests
651  #[test]
652  fn test_tracker_auto_commit() {
653    let store = Box::new(InMemoryOffsetStore::new());
654    let tracker = OffsetTracker::new(store);
655
656    tracker.record("source1", Offset::Sequence(10)).unwrap();
657
658    // With auto commit, offset should be immediately committed
659    assert_eq!(
660      tracker.get_all_committed().unwrap().get("source1"),
661      Some(&Offset::Sequence(10))
662    );
663  }
664
665  #[test]
666  fn test_tracker_periodic_commit() {
667    let store = Box::new(InMemoryOffsetStore::new());
668    let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Periodic(3));
669
670    tracker.record("source1", Offset::Sequence(1)).unwrap();
671    tracker.record("source1", Offset::Sequence(2)).unwrap();
672
673    // Not committed yet
674    assert!(tracker.get_all_committed().unwrap().is_empty());
675
676    tracker.record("source1", Offset::Sequence(3)).unwrap();
677
678    // Now committed (after 3rd record)
679    assert_eq!(
680      tracker.get_all_committed().unwrap().get("source1"),
681      Some(&Offset::Sequence(3))
682    );
683  }
684
685  #[test]
686  fn test_tracker_manual_commit() {
687    let store = Box::new(InMemoryOffsetStore::new());
688    let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
689
690    tracker.record("source1", Offset::Sequence(10)).unwrap();
691
692    // Not committed yet
693    assert!(tracker.get_all_committed().unwrap().is_empty());
694
695    // Pending should have the offset
696    assert_eq!(
697      tracker.get_all_pending().unwrap().get("source1"),
698      Some(&Offset::Sequence(10))
699    );
700
701    tracker.commit("source1").unwrap();
702
703    // Now committed
704    assert_eq!(
705      tracker.get_all_committed().unwrap().get("source1"),
706      Some(&Offset::Sequence(10))
707    );
708  }
709
710  #[test]
711  fn test_tracker_commit_all() {
712    let store = Box::new(InMemoryOffsetStore::new());
713    let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
714
715    tracker.record("source1", Offset::Sequence(10)).unwrap();
716    tracker.record("source2", Offset::Sequence(20)).unwrap();
717
718    tracker.commit_all().unwrap();
719
720    let committed = tracker.get_all_committed().unwrap();
721    assert_eq!(committed.get("source1"), Some(&Offset::Sequence(10)));
722    assert_eq!(committed.get("source2"), Some(&Offset::Sequence(20)));
723  }
724
725  #[test]
726  fn test_tracker_reset_policy_earliest() {
727    let store = Box::new(InMemoryOffsetStore::new());
728    let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::Earliest);
729
730    assert_eq!(tracker.get_offset("unknown").unwrap(), Offset::Earliest);
731  }
732
733  #[test]
734  fn test_tracker_reset_policy_latest() {
735    let store = Box::new(InMemoryOffsetStore::new());
736    let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::Latest);
737
738    assert_eq!(tracker.get_offset("unknown").unwrap(), Offset::Latest);
739  }
740
741  #[test]
742  fn test_tracker_reset_policy_none() {
743    let store = Box::new(InMemoryOffsetStore::new());
744    let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::None);
745
746    let result = tracker.get_offset("unknown");
747    assert!(matches!(result, Err(OffsetError::SourceNotFound(_))));
748  }
749
750  #[test]
751  fn test_tracker_reset_offset() {
752    let store = Box::new(InMemoryOffsetStore::new());
753    let tracker = OffsetTracker::new(store);
754
755    tracker.record("source1", Offset::Sequence(100)).unwrap();
756    tracker.reset("source1", Offset::Sequence(0)).unwrap();
757
758    assert_eq!(tracker.get_offset("source1").unwrap(), Offset::Sequence(0));
759  }
760
761  #[test]
762  fn test_tracker_clear() {
763    let store = Box::new(InMemoryOffsetStore::new());
764    let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
765
766    tracker.record("source1", Offset::Sequence(100)).unwrap();
767    tracker.commit("source1").unwrap();
768    tracker.record("source1", Offset::Sequence(200)).unwrap();
769
770    tracker.clear("source1").unwrap();
771
772    // Both committed and pending should be cleared
773    assert!(tracker.get_all_committed().unwrap().is_empty());
774    assert!(tracker.get_all_pending().unwrap().is_empty());
775  }
776
777  #[test]
778  fn test_offset_error_display() {
779    let io_err = OffsetError::IoError(io::Error::new(io::ErrorKind::NotFound, "file not found"));
780    assert!(io_err.to_string().contains("IO error"));
781
782    let ser_err = OffsetError::SerializationError("bad json".to_string());
783    assert!(ser_err.to_string().contains("Serialization error"));
784
785    let not_found = OffsetError::SourceNotFound("src".to_string());
786    assert!(not_found.to_string().contains("Source not found"));
787
788    let lock_err = OffsetError::LockError("poisoned".to_string());
789    assert!(lock_err.to_string().contains("Lock error"));
790
791    let invalid = OffsetError::InvalidOffset("bad format".to_string());
792    assert!(invalid.to_string().contains("Invalid offset"));
793  }
794
795  #[test]
796  fn test_tracker_strategy_getters() {
797    let store = Box::new(InMemoryOffsetStore::new());
798    let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Periodic(5))
799      .with_reset_policy(OffsetResetPolicy::Latest);
800
801    assert_eq!(tracker.strategy(), CommitStrategy::Periodic(5));
802    assert_eq!(tracker.reset_policy(), OffsetResetPolicy::Latest);
803  }
804
805  #[test]
806  fn test_offset_constructors() {
807    let seq = Offset::sequence(42);
808    assert_eq!(seq, Offset::Sequence(42));
809
810    let ts = Utc::now();
811    let ts_offset = Offset::timestamp(ts);
812    assert!(matches!(ts_offset, Offset::Timestamp(_)));
813
814    let custom = Offset::custom("my-offset");
815    assert_eq!(custom, Offset::Custom("my-offset".to_string()));
816  }
817
818  #[test]
819  fn test_file_store_path() {
820    let dir = tempdir().unwrap();
821    let path = dir.path().join("offsets.json");
822
823    let store = FileOffsetStore::new(&path).unwrap();
824    assert_eq!(store.path(), path);
825  }
826
827  #[test]
828  fn test_in_memory_store_clone() {
829    let store = InMemoryOffsetStore::new();
830    store.commit("source1", Offset::Sequence(10)).unwrap();
831
832    let cloned = store.clone();
833
834    // Both should see the same data (shared state)
835    assert_eq!(cloned.get("source1").unwrap(), Some(Offset::Sequence(10)));
836
837    // Update through one should be visible through the other
838    cloned.commit("source1", Offset::Sequence(20)).unwrap();
839    assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(20)));
840  }
841}