1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::{self, Display};
10use std::fs;
11use std::io::{self};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, RwLock};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum Offset {
20 Sequence(u64),
22 Timestamp(DateTime<Utc>),
24 Custom(String),
26 #[default]
28 Earliest,
29 Latest,
31}
32
33impl Display for Offset {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Offset::Sequence(n) => write!(f, "seq:{}", n),
37 Offset::Timestamp(ts) => write!(f, "ts:{}", ts.to_rfc3339()),
38 Offset::Custom(s) => write!(f, "custom:{}", s),
39 Offset::Earliest => write!(f, "earliest"),
40 Offset::Latest => write!(f, "latest"),
41 }
42 }
43}
44
45impl Offset {
46 pub fn sequence(n: u64) -> Self {
48 Offset::Sequence(n)
49 }
50
51 pub fn timestamp(ts: DateTime<Utc>) -> Self {
53 Offset::Timestamp(ts)
54 }
55
56 pub fn custom(s: impl Into<String>) -> Self {
58 Offset::Custom(s.into())
59 }
60
61 pub fn increment(&self) -> Option<Self> {
64 match self {
65 Offset::Sequence(n) => Some(Offset::Sequence(n + 1)),
66 _ => None,
67 }
68 }
69
70 pub fn is_earliest(&self) -> bool {
72 matches!(self, Offset::Earliest)
73 }
74
75 pub fn is_latest(&self) -> bool {
77 matches!(self, Offset::Latest)
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
83pub enum OffsetResetPolicy {
84 #[default]
86 Earliest,
87 Latest,
89 None,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub enum CommitStrategy {
96 #[default]
98 Auto,
99 Periodic(usize),
101 Manual,
103}
104
105#[derive(Debug)]
107pub enum OffsetError {
108 IoError(io::Error),
110 SerializationError(String),
112 SourceNotFound(String),
114 LockError(String),
116 InvalidOffset(String),
118}
119
120impl Display for OffsetError {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 match self {
123 OffsetError::IoError(e) => write!(f, "IO error: {}", e),
124 OffsetError::SerializationError(s) => write!(f, "Serialization error: {}", s),
125 OffsetError::SourceNotFound(s) => write!(f, "Source not found: {}", s),
126 OffsetError::LockError(s) => write!(f, "Lock error: {}", s),
127 OffsetError::InvalidOffset(s) => write!(f, "Invalid offset: {}", s),
128 }
129 }
130}
131
132impl std::error::Error for OffsetError {}
133
134impl From<io::Error> for OffsetError {
135 fn from(err: io::Error) -> Self {
136 OffsetError::IoError(err)
137 }
138}
139
140pub type OffsetResult<T> = Result<T, OffsetError>;
142
143pub trait OffsetStore: Send + Sync + std::fmt::Debug {
147 fn get(&self, source: &str) -> OffsetResult<Option<Offset>>;
149
150 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()>;
152
153 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>>;
155
156 fn clear(&self, source: &str) -> OffsetResult<()>;
158
159 fn clear_all(&self) -> OffsetResult<()>;
161}
162
163#[derive(Debug, Clone, Default)]
168pub struct InMemoryOffsetStore {
169 offsets: Arc<RwLock<HashMap<String, Offset>>>,
170}
171
172impl InMemoryOffsetStore {
173 pub fn new() -> Self {
175 Self::default()
176 }
177
178 pub fn with_offsets(offsets: HashMap<String, Offset>) -> Self {
180 Self {
181 offsets: Arc::new(RwLock::new(offsets)),
182 }
183 }
184}
185
186impl OffsetStore for InMemoryOffsetStore {
187 fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
188 let offsets = self
189 .offsets
190 .read()
191 .map_err(|e| OffsetError::LockError(e.to_string()))?;
192 Ok(offsets.get(source).cloned())
193 }
194
195 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
196 let mut offsets = self
197 .offsets
198 .write()
199 .map_err(|e| OffsetError::LockError(e.to_string()))?;
200 offsets.insert(source.to_string(), offset);
201 Ok(())
202 }
203
204 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
205 let offsets = self
206 .offsets
207 .read()
208 .map_err(|e| OffsetError::LockError(e.to_string()))?;
209 Ok(offsets.clone())
210 }
211
212 fn clear(&self, source: &str) -> OffsetResult<()> {
213 let mut offsets = self
214 .offsets
215 .write()
216 .map_err(|e| OffsetError::LockError(e.to_string()))?;
217 offsets.remove(source);
218 Ok(())
219 }
220
221 fn clear_all(&self) -> OffsetResult<()> {
222 let mut offsets = self
223 .offsets
224 .write()
225 .map_err(|e| OffsetError::LockError(e.to_string()))?;
226 offsets.clear();
227 Ok(())
228 }
229}
230
231#[derive(Debug, Clone)]
237pub struct FileOffsetStore {
238 path: PathBuf,
239 cache: Arc<RwLock<HashMap<String, Offset>>>,
240}
241
242impl FileOffsetStore {
243 pub fn new<P: AsRef<Path>>(path: P) -> OffsetResult<Self> {
245 let path = path.as_ref().to_path_buf();
246
247 let cache = if path.exists() {
249 let data = fs::read_to_string(&path)?;
250 if data.is_empty() {
251 HashMap::new()
252 } else {
253 serde_json::from_str(&data).map_err(|e| OffsetError::SerializationError(e.to_string()))?
254 }
255 } else {
256 HashMap::new()
257 };
258
259 Ok(Self {
260 path,
261 cache: Arc::new(RwLock::new(cache)),
262 })
263 }
264
265 fn persist(&self, offsets: &HashMap<String, Offset>) -> OffsetResult<()> {
267 if let Some(parent) = self.path.parent() {
269 fs::create_dir_all(parent)?;
270 }
271
272 let data = serde_json::to_string_pretty(offsets)
273 .map_err(|e| OffsetError::SerializationError(e.to_string()))?;
274 fs::write(&self.path, data)?;
275 Ok(())
276 }
277
278 pub fn path(&self) -> &Path {
280 &self.path
281 }
282}
283
284impl OffsetStore for FileOffsetStore {
285 fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
286 let cache = self
287 .cache
288 .read()
289 .map_err(|e| OffsetError::LockError(e.to_string()))?;
290 Ok(cache.get(source).cloned())
291 }
292
293 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
294 let mut cache = self
295 .cache
296 .write()
297 .map_err(|e| OffsetError::LockError(e.to_string()))?;
298 cache.insert(source.to_string(), offset);
299 self.persist(&cache)?;
300 Ok(())
301 }
302
303 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
304 let cache = self
305 .cache
306 .read()
307 .map_err(|e| OffsetError::LockError(e.to_string()))?;
308 Ok(cache.clone())
309 }
310
311 fn clear(&self, source: &str) -> OffsetResult<()> {
312 let mut cache = self
313 .cache
314 .write()
315 .map_err(|e| OffsetError::LockError(e.to_string()))?;
316 cache.remove(source);
317 self.persist(&cache)?;
318 Ok(())
319 }
320
321 fn clear_all(&self) -> OffsetResult<()> {
322 let mut cache = self
323 .cache
324 .write()
325 .map_err(|e| OffsetError::LockError(e.to_string()))?;
326 cache.clear();
327 self.persist(&cache)?;
328 Ok(())
329 }
330}
331
332#[derive(Debug)]
338pub struct OffsetTracker {
339 store: Box<dyn OffsetStore>,
340 strategy: CommitStrategy,
341 reset_policy: OffsetResetPolicy,
342 pending: Arc<RwLock<HashMap<String, (Offset, usize)>>>,
343}
344
345impl OffsetTracker {
346 pub fn new(store: Box<dyn OffsetStore>) -> Self {
348 Self {
349 store,
350 strategy: CommitStrategy::default(),
351 reset_policy: OffsetResetPolicy::default(),
352 pending: Arc::new(RwLock::new(HashMap::new())),
353 }
354 }
355
356 pub fn with_strategy(store: Box<dyn OffsetStore>, strategy: CommitStrategy) -> Self {
358 Self {
359 store,
360 strategy,
361 reset_policy: OffsetResetPolicy::default(),
362 pending: Arc::new(RwLock::new(HashMap::new())),
363 }
364 }
365
366 pub fn with_reset_policy(mut self, policy: OffsetResetPolicy) -> Self {
368 self.reset_policy = policy;
369 self
370 }
371
372 pub fn get_offset(&self, source: &str) -> OffsetResult<Offset> {
375 match self.store.get(source)? {
376 Some(offset) => Ok(offset),
377 None => match self.reset_policy {
378 OffsetResetPolicy::Earliest => Ok(Offset::Earliest),
379 OffsetResetPolicy::Latest => Ok(Offset::Latest),
380 OffsetResetPolicy::None => Err(OffsetError::SourceNotFound(source.to_string())),
381 },
382 }
383 }
384
385 pub fn record(&self, source: &str, offset: Offset) -> OffsetResult<()> {
390 match self.strategy {
391 CommitStrategy::Auto => {
392 self.store.commit(source, offset)?;
393 }
394 CommitStrategy::Periodic(interval) => {
395 let mut pending = self
396 .pending
397 .write()
398 .map_err(|e| OffsetError::LockError(e.to_string()))?;
399
400 let entry = pending
401 .entry(source.to_string())
402 .or_insert((offset.clone(), 0));
403 entry.0 = offset;
404 entry.1 += 1;
405
406 if entry.1 >= interval {
407 let offset_to_commit = entry.0.clone();
408 entry.1 = 0;
409 drop(pending); self.store.commit(source, offset_to_commit)?;
411 }
412 }
413 CommitStrategy::Manual => {
414 let mut pending = self
415 .pending
416 .write()
417 .map_err(|e| OffsetError::LockError(e.to_string()))?;
418 let entry = pending
419 .entry(source.to_string())
420 .or_insert((offset.clone(), 0));
421 entry.0 = offset;
422 entry.1 += 1;
423 }
424 }
425 Ok(())
426 }
427
428 pub fn commit(&self, source: &str) -> OffsetResult<()> {
432 let pending_offset = {
433 let pending = self
434 .pending
435 .read()
436 .map_err(|e| OffsetError::LockError(e.to_string()))?;
437 pending.get(source).map(|(o, _)| o.clone())
438 };
439
440 if let Some(offset) = pending_offset {
441 self.store.commit(source, offset)?;
442 let mut pending = self
443 .pending
444 .write()
445 .map_err(|e| OffsetError::LockError(e.to_string()))?;
446 if let Some(entry) = pending.get_mut(source) {
447 entry.1 = 0;
448 }
449 }
450 Ok(())
451 }
452
453 pub fn commit_all(&self) -> OffsetResult<()> {
455 let sources: Vec<String> = {
456 let pending = self
457 .pending
458 .read()
459 .map_err(|e| OffsetError::LockError(e.to_string()))?;
460 pending.keys().cloned().collect()
461 };
462
463 for source in sources {
464 self.commit(&source)?;
465 }
466 Ok(())
467 }
468
469 pub fn reset(&self, source: &str, offset: Offset) -> OffsetResult<()> {
471 self.store.commit(source, offset)
472 }
473
474 pub fn clear(&self, source: &str) -> OffsetResult<()> {
476 self.store.clear(source)?;
477 let mut pending = self
478 .pending
479 .write()
480 .map_err(|e| OffsetError::LockError(e.to_string()))?;
481 pending.remove(source);
482 Ok(())
483 }
484
485 pub fn strategy(&self) -> CommitStrategy {
487 self.strategy
488 }
489
490 pub fn reset_policy(&self) -> OffsetResetPolicy {
492 self.reset_policy
493 }
494
495 pub fn get_all_committed(&self) -> OffsetResult<HashMap<String, Offset>> {
497 self.store.get_all()
498 }
499
500 pub fn get_all_pending(&self) -> OffsetResult<HashMap<String, Offset>> {
502 let pending = self
503 .pending
504 .read()
505 .map_err(|e| OffsetError::LockError(e.to_string()))?;
506 Ok(
507 pending
508 .iter()
509 .map(|(k, (o, _))| (k.clone(), o.clone()))
510 .collect(),
511 )
512 }
513}