streamweave_offset/
offset.rs

1//! Offset tracking for resumable pipeline processing.
2//!
3//! This module provides abstractions for tracking processing offsets,
4//! enabling pipelines to resume from where they left off after restarts.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::{self, Display};
10use std::fs;
11use std::io::{self};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, RwLock};
14
15/// Represents a processing offset.
16///
17/// Offsets can be sequence numbers, timestamps, or custom values.
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum Offset {
20  /// A sequence number offset.
21  Sequence(u64),
22  /// A timestamp-based offset.
23  Timestamp(DateTime<Utc>),
24  /// A custom string offset.
25  Custom(String),
26  /// Represents the beginning of a stream.
27  #[default]
28  Earliest,
29  /// Represents the end of a stream (latest).
30  Latest,
31}
32
33impl Display for Offset {
34  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35    match self {
36      Offset::Sequence(n) => write!(f, "seq:{}", n),
37      Offset::Timestamp(ts) => write!(f, "ts:{}", ts.to_rfc3339()),
38      Offset::Custom(s) => write!(f, "custom:{}", s),
39      Offset::Earliest => write!(f, "earliest"),
40      Offset::Latest => write!(f, "latest"),
41    }
42  }
43}
44
45impl Offset {
46  /// Creates a sequence offset.
47  pub fn sequence(n: u64) -> Self {
48    Offset::Sequence(n)
49  }
50
51  /// Creates a timestamp offset.
52  pub fn timestamp(ts: DateTime<Utc>) -> Self {
53    Offset::Timestamp(ts)
54  }
55
56  /// Creates a custom string offset.
57  pub fn custom(s: impl Into<String>) -> Self {
58    Offset::Custom(s.into())
59  }
60
61  /// Increments a sequence offset by one.
62  /// Returns None for non-sequence offsets.
63  pub fn increment(&self) -> Option<Self> {
64    match self {
65      Offset::Sequence(n) => Some(Offset::Sequence(n + 1)),
66      _ => None,
67    }
68  }
69
70  /// Returns true if this is the earliest offset.
71  pub fn is_earliest(&self) -> bool {
72    matches!(self, Offset::Earliest)
73  }
74
75  /// Returns true if this is the latest offset.
76  pub fn is_latest(&self) -> bool {
77    matches!(self, Offset::Latest)
78  }
79}
80
81/// Policy for resetting offsets when no committed offset is found.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
83pub enum OffsetResetPolicy {
84  /// Start from the earliest available offset.
85  #[default]
86  Earliest,
87  /// Start from the latest available offset.
88  Latest,
89  /// Fail if no offset is found.
90  None,
91}
92
93/// Strategy for committing offsets.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub enum CommitStrategy {
96  /// Automatically commit after each item is processed.
97  #[default]
98  Auto,
99  /// Commit periodically based on count.
100  Periodic(usize),
101  /// Only commit when explicitly requested.
102  Manual,
103}
104
105/// Error type for offset operations.
106#[derive(Debug)]
107pub enum OffsetError {
108  /// IO error during persistence.
109  IoError(io::Error),
110  /// Serialization/deserialization error.
111  SerializationError(String),
112  /// Source not found.
113  SourceNotFound(String),
114  /// Lock acquisition failed.
115  LockError(String),
116  /// Invalid offset format.
117  InvalidOffset(String),
118}
119
120impl Display for OffsetError {
121  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122    match self {
123      OffsetError::IoError(e) => write!(f, "IO error: {}", e),
124      OffsetError::SerializationError(s) => write!(f, "Serialization error: {}", s),
125      OffsetError::SourceNotFound(s) => write!(f, "Source not found: {}", s),
126      OffsetError::LockError(s) => write!(f, "Lock error: {}", s),
127      OffsetError::InvalidOffset(s) => write!(f, "Invalid offset: {}", s),
128    }
129  }
130}
131
132impl std::error::Error for OffsetError {}
133
134impl From<io::Error> for OffsetError {
135  fn from(err: io::Error) -> Self {
136    OffsetError::IoError(err)
137  }
138}
139
140/// Result type for offset operations.
141pub type OffsetResult<T> = Result<T, OffsetError>;
142
143/// Trait for offset storage backends.
144///
145/// Implementations of this trait handle persisting and retrieving offsets.
146pub trait OffsetStore: Send + Sync + std::fmt::Debug {
147  /// Get the committed offset for a source.
148  fn get(&self, source: &str) -> OffsetResult<Option<Offset>>;
149
150  /// Commit an offset for a source.
151  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()>;
152
153  /// Get all committed offsets.
154  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>>;
155
156  /// Clear the offset for a source.
157  fn clear(&self, source: &str) -> OffsetResult<()>;
158
159  /// Clear all offsets.
160  fn clear_all(&self) -> OffsetResult<()>;
161}
162
163/// In-memory offset store.
164///
165/// This is useful for testing or scenarios where persistence across restarts
166/// is not required.
167#[derive(Debug, Clone, Default)]
168pub struct InMemoryOffsetStore {
169  offsets: Arc<RwLock<HashMap<String, Offset>>>,
170}
171
172impl InMemoryOffsetStore {
173  /// Creates a new in-memory offset store.
174  pub fn new() -> Self {
175    Self::default()
176  }
177
178  /// Creates an in-memory offset store with initial offsets.
179  pub fn with_offsets(offsets: HashMap<String, Offset>) -> Self {
180    Self {
181      offsets: Arc::new(RwLock::new(offsets)),
182    }
183  }
184}
185
186impl OffsetStore for InMemoryOffsetStore {
187  fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
188    let offsets = self
189      .offsets
190      .read()
191      .map_err(|e| OffsetError::LockError(e.to_string()))?;
192    Ok(offsets.get(source).cloned())
193  }
194
195  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
196    let mut offsets = self
197      .offsets
198      .write()
199      .map_err(|e| OffsetError::LockError(e.to_string()))?;
200    offsets.insert(source.to_string(), offset);
201    Ok(())
202  }
203
204  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
205    let offsets = self
206      .offsets
207      .read()
208      .map_err(|e| OffsetError::LockError(e.to_string()))?;
209    Ok(offsets.clone())
210  }
211
212  fn clear(&self, source: &str) -> OffsetResult<()> {
213    let mut offsets = self
214      .offsets
215      .write()
216      .map_err(|e| OffsetError::LockError(e.to_string()))?;
217    offsets.remove(source);
218    Ok(())
219  }
220
221  fn clear_all(&self) -> OffsetResult<()> {
222    let mut offsets = self
223      .offsets
224      .write()
225      .map_err(|e| OffsetError::LockError(e.to_string()))?;
226    offsets.clear();
227    Ok(())
228  }
229}
230
231/// File-based offset store.
232///
233/// Persists offsets to a JSON file on disk.
234///
235/// File-based offset store implementation.
236#[derive(Debug, Clone)]
237pub struct FileOffsetStore {
238  path: PathBuf,
239  cache: Arc<RwLock<HashMap<String, Offset>>>,
240}
241
242impl FileOffsetStore {
243  /// Creates a new file-based offset store.
244  pub fn new<P: AsRef<Path>>(path: P) -> OffsetResult<Self> {
245    let path = path.as_ref().to_path_buf();
246
247    // Load existing offsets if the file exists
248    let cache = if path.exists() {
249      let data = fs::read_to_string(&path)?;
250      if data.is_empty() {
251        HashMap::new()
252      } else {
253        serde_json::from_str(&data).map_err(|e| OffsetError::SerializationError(e.to_string()))?
254      }
255    } else {
256      HashMap::new()
257    };
258
259    Ok(Self {
260      path,
261      cache: Arc::new(RwLock::new(cache)),
262    })
263  }
264
265  /// Persists the current offsets to disk.
266  fn persist(&self, offsets: &HashMap<String, Offset>) -> OffsetResult<()> {
267    // Ensure parent directory exists
268    if let Some(parent) = self.path.parent() {
269      fs::create_dir_all(parent)?;
270    }
271
272    let data = serde_json::to_string_pretty(offsets)
273      .map_err(|e| OffsetError::SerializationError(e.to_string()))?;
274    fs::write(&self.path, data)?;
275    Ok(())
276  }
277
278  /// Returns the path to the offset file.
279  pub fn path(&self) -> &Path {
280    &self.path
281  }
282}
283
284impl OffsetStore for FileOffsetStore {
285  fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
286    let cache = self
287      .cache
288      .read()
289      .map_err(|e| OffsetError::LockError(e.to_string()))?;
290    Ok(cache.get(source).cloned())
291  }
292
293  fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
294    let mut cache = self
295      .cache
296      .write()
297      .map_err(|e| OffsetError::LockError(e.to_string()))?;
298    cache.insert(source.to_string(), offset);
299    self.persist(&cache)?;
300    Ok(())
301  }
302
303  fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
304    let cache = self
305      .cache
306      .read()
307      .map_err(|e| OffsetError::LockError(e.to_string()))?;
308    Ok(cache.clone())
309  }
310
311  fn clear(&self, source: &str) -> OffsetResult<()> {
312    let mut cache = self
313      .cache
314      .write()
315      .map_err(|e| OffsetError::LockError(e.to_string()))?;
316    cache.remove(source);
317    self.persist(&cache)?;
318    Ok(())
319  }
320
321  fn clear_all(&self) -> OffsetResult<()> {
322    let mut cache = self
323      .cache
324      .write()
325      .map_err(|e| OffsetError::LockError(e.to_string()))?;
326    cache.clear();
327    self.persist(&cache)?;
328    Ok(())
329  }
330}
331
332/// Tracks processing offsets with configurable commit strategies.
333///
334/// The `OffsetTracker` wraps an `OffsetStore` and provides convenient
335/// methods for tracking and committing offsets based on the configured
336/// strategy.
337#[derive(Debug)]
338pub struct OffsetTracker {
339  store: Box<dyn OffsetStore>,
340  strategy: CommitStrategy,
341  reset_policy: OffsetResetPolicy,
342  pending: Arc<RwLock<HashMap<String, (Offset, usize)>>>,
343}
344
345impl OffsetTracker {
346  /// Creates a new offset tracker with the given store and default settings.
347  pub fn new(store: Box<dyn OffsetStore>) -> Self {
348    Self {
349      store,
350      strategy: CommitStrategy::default(),
351      reset_policy: OffsetResetPolicy::default(),
352      pending: Arc::new(RwLock::new(HashMap::new())),
353    }
354  }
355
356  /// Creates a new offset tracker with the specified commit strategy.
357  pub fn with_strategy(store: Box<dyn OffsetStore>, strategy: CommitStrategy) -> Self {
358    Self {
359      store,
360      strategy,
361      reset_policy: OffsetResetPolicy::default(),
362      pending: Arc::new(RwLock::new(HashMap::new())),
363    }
364  }
365
366  /// Sets the offset reset policy.
367  pub fn with_reset_policy(mut self, policy: OffsetResetPolicy) -> Self {
368    self.reset_policy = policy;
369    self
370  }
371
372  /// Gets the current committed offset for a source, applying the reset policy
373  /// if no offset is found.
374  pub fn get_offset(&self, source: &str) -> OffsetResult<Offset> {
375    match self.store.get(source)? {
376      Some(offset) => Ok(offset),
377      None => match self.reset_policy {
378        OffsetResetPolicy::Earliest => Ok(Offset::Earliest),
379        OffsetResetPolicy::Latest => Ok(Offset::Latest),
380        OffsetResetPolicy::None => Err(OffsetError::SourceNotFound(source.to_string())),
381      },
382    }
383  }
384
385  /// Records that an offset has been processed.
386  ///
387  /// Based on the commit strategy, this may immediately commit the offset
388  /// or hold it for later batch commit.
389  pub fn record(&self, source: &str, offset: Offset) -> OffsetResult<()> {
390    match self.strategy {
391      CommitStrategy::Auto => {
392        self.store.commit(source, offset)?;
393      }
394      CommitStrategy::Periodic(interval) => {
395        let mut pending = self
396          .pending
397          .write()
398          .map_err(|e| OffsetError::LockError(e.to_string()))?;
399
400        let entry = pending
401          .entry(source.to_string())
402          .or_insert((offset.clone(), 0));
403        entry.0 = offset;
404        entry.1 += 1;
405
406        if entry.1 >= interval {
407          let offset_to_commit = entry.0.clone();
408          entry.1 = 0;
409          drop(pending); // Release lock before committing
410          self.store.commit(source, offset_to_commit)?;
411        }
412      }
413      CommitStrategy::Manual => {
414        let mut pending = self
415          .pending
416          .write()
417          .map_err(|e| OffsetError::LockError(e.to_string()))?;
418        let entry = pending
419          .entry(source.to_string())
420          .or_insert((offset.clone(), 0));
421        entry.0 = offset;
422        entry.1 += 1;
423      }
424    }
425    Ok(())
426  }
427
428  /// Commits the pending offset for a specific source.
429  ///
430  /// This is useful for manual commit strategy or when forcing a commit.
431  pub fn commit(&self, source: &str) -> OffsetResult<()> {
432    let pending_offset = {
433      let pending = self
434        .pending
435        .read()
436        .map_err(|e| OffsetError::LockError(e.to_string()))?;
437      pending.get(source).map(|(o, _)| o.clone())
438    };
439
440    if let Some(offset) = pending_offset {
441      self.store.commit(source, offset)?;
442      let mut pending = self
443        .pending
444        .write()
445        .map_err(|e| OffsetError::LockError(e.to_string()))?;
446      if let Some(entry) = pending.get_mut(source) {
447        entry.1 = 0;
448      }
449    }
450    Ok(())
451  }
452
453  /// Commits all pending offsets.
454  pub fn commit_all(&self) -> OffsetResult<()> {
455    let sources: Vec<String> = {
456      let pending = self
457        .pending
458        .read()
459        .map_err(|e| OffsetError::LockError(e.to_string()))?;
460      pending.keys().cloned().collect()
461    };
462
463    for source in sources {
464      self.commit(&source)?;
465    }
466    Ok(())
467  }
468
469  /// Resets the offset for a source to the specified value.
470  pub fn reset(&self, source: &str, offset: Offset) -> OffsetResult<()> {
471    self.store.commit(source, offset)
472  }
473
474  /// Clears the offset for a source.
475  pub fn clear(&self, source: &str) -> OffsetResult<()> {
476    self.store.clear(source)?;
477    let mut pending = self
478      .pending
479      .write()
480      .map_err(|e| OffsetError::LockError(e.to_string()))?;
481    pending.remove(source);
482    Ok(())
483  }
484
485  /// Returns the current commit strategy.
486  pub fn strategy(&self) -> CommitStrategy {
487    self.strategy
488  }
489
490  /// Returns the current reset policy.
491  pub fn reset_policy(&self) -> OffsetResetPolicy {
492    self.reset_policy
493  }
494
495  /// Gets all committed offsets.
496  pub fn get_all_committed(&self) -> OffsetResult<HashMap<String, Offset>> {
497    self.store.get_all()
498  }
499
500  /// Gets all pending offsets (not yet committed).
501  pub fn get_all_pending(&self) -> OffsetResult<HashMap<String, Offset>> {
502    let pending = self
503      .pending
504      .read()
505      .map_err(|e| OffsetError::LockError(e.to_string()))?;
506    Ok(
507      pending
508        .iter()
509        .map(|(k, (o, _))| (k.clone(), o.clone()))
510        .collect(),
511    )
512  }
513}