1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::{self, Display};
10use std::fs;
11use std::io::{self};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, RwLock};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
19pub enum Offset {
20 Sequence(u64),
22 Timestamp(DateTime<Utc>),
24 Custom(String),
26 #[default]
28 Earliest,
29 Latest,
31}
32
33impl Display for Offset {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Offset::Sequence(n) => write!(f, "seq:{}", n),
37 Offset::Timestamp(ts) => write!(f, "ts:{}", ts.to_rfc3339()),
38 Offset::Custom(s) => write!(f, "custom:{}", s),
39 Offset::Earliest => write!(f, "earliest"),
40 Offset::Latest => write!(f, "latest"),
41 }
42 }
43}
44
45impl Offset {
46 pub fn sequence(n: u64) -> Self {
48 Offset::Sequence(n)
49 }
50
51 pub fn timestamp(ts: DateTime<Utc>) -> Self {
53 Offset::Timestamp(ts)
54 }
55
56 pub fn custom(s: impl Into<String>) -> Self {
58 Offset::Custom(s.into())
59 }
60
61 pub fn increment(&self) -> Option<Self> {
64 match self {
65 Offset::Sequence(n) => Some(Offset::Sequence(n + 1)),
66 _ => None,
67 }
68 }
69
70 pub fn is_earliest(&self) -> bool {
72 matches!(self, Offset::Earliest)
73 }
74
75 pub fn is_latest(&self) -> bool {
77 matches!(self, Offset::Latest)
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
83pub enum OffsetResetPolicy {
84 #[default]
86 Earliest,
87 Latest,
89 None,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95pub enum CommitStrategy {
96 #[default]
98 Auto,
99 Periodic(usize),
101 Manual,
103}
104
105#[derive(Debug)]
107pub enum OffsetError {
108 IoError(io::Error),
110 SerializationError(String),
112 SourceNotFound(String),
114 LockError(String),
116 InvalidOffset(String),
118}
119
120impl Display for OffsetError {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 match self {
123 OffsetError::IoError(e) => write!(f, "IO error: {}", e),
124 OffsetError::SerializationError(s) => write!(f, "Serialization error: {}", s),
125 OffsetError::SourceNotFound(s) => write!(f, "Source not found: {}", s),
126 OffsetError::LockError(s) => write!(f, "Lock error: {}", s),
127 OffsetError::InvalidOffset(s) => write!(f, "Invalid offset: {}", s),
128 }
129 }
130}
131
132impl std::error::Error for OffsetError {}
133
134impl From<io::Error> for OffsetError {
135 fn from(err: io::Error) -> Self {
136 OffsetError::IoError(err)
137 }
138}
139
140pub type OffsetResult<T> = Result<T, OffsetError>;
142
143pub trait OffsetStore: Send + Sync + std::fmt::Debug {
147 fn get(&self, source: &str) -> OffsetResult<Option<Offset>>;
149
150 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()>;
152
153 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>>;
155
156 fn clear(&self, source: &str) -> OffsetResult<()>;
158
159 fn clear_all(&self) -> OffsetResult<()>;
161}
162
163#[derive(Debug, Clone, Default)]
168pub struct InMemoryOffsetStore {
169 offsets: Arc<RwLock<HashMap<String, Offset>>>,
170}
171
172impl InMemoryOffsetStore {
173 pub fn new() -> Self {
175 Self::default()
176 }
177
178 pub fn with_offsets(offsets: HashMap<String, Offset>) -> Self {
180 Self {
181 offsets: Arc::new(RwLock::new(offsets)),
182 }
183 }
184}
185
186impl OffsetStore for InMemoryOffsetStore {
187 fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
188 let offsets = self
189 .offsets
190 .read()
191 .map_err(|e| OffsetError::LockError(e.to_string()))?;
192 Ok(offsets.get(source).cloned())
193 }
194
195 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
196 let mut offsets = self
197 .offsets
198 .write()
199 .map_err(|e| OffsetError::LockError(e.to_string()))?;
200 offsets.insert(source.to_string(), offset);
201 Ok(())
202 }
203
204 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
205 let offsets = self
206 .offsets
207 .read()
208 .map_err(|e| OffsetError::LockError(e.to_string()))?;
209 Ok(offsets.clone())
210 }
211
212 fn clear(&self, source: &str) -> OffsetResult<()> {
213 let mut offsets = self
214 .offsets
215 .write()
216 .map_err(|e| OffsetError::LockError(e.to_string()))?;
217 offsets.remove(source);
218 Ok(())
219 }
220
221 fn clear_all(&self) -> OffsetResult<()> {
222 let mut offsets = self
223 .offsets
224 .write()
225 .map_err(|e| OffsetError::LockError(e.to_string()))?;
226 offsets.clear();
227 Ok(())
228 }
229}
230
231#[derive(Debug, Clone)]
237pub struct FileOffsetStore {
238 path: PathBuf,
239 cache: Arc<RwLock<HashMap<String, Offset>>>,
240}
241
242impl FileOffsetStore {
243 pub fn new<P: AsRef<Path>>(path: P) -> OffsetResult<Self> {
245 let path = path.as_ref().to_path_buf();
246
247 let cache = if path.exists() {
249 let data = fs::read_to_string(&path)?;
250 if data.is_empty() {
251 HashMap::new()
252 } else {
253 serde_json::from_str(&data).map_err(|e| OffsetError::SerializationError(e.to_string()))?
254 }
255 } else {
256 HashMap::new()
257 };
258
259 Ok(Self {
260 path,
261 cache: Arc::new(RwLock::new(cache)),
262 })
263 }
264
265 fn persist(&self, offsets: &HashMap<String, Offset>) -> OffsetResult<()> {
267 if let Some(parent) = self.path.parent() {
269 fs::create_dir_all(parent)?;
270 }
271
272 let data = serde_json::to_string_pretty(offsets)
273 .map_err(|e| OffsetError::SerializationError(e.to_string()))?;
274 fs::write(&self.path, data)?;
275 Ok(())
276 }
277
278 pub fn path(&self) -> &Path {
280 &self.path
281 }
282}
283
284impl OffsetStore for FileOffsetStore {
285 fn get(&self, source: &str) -> OffsetResult<Option<Offset>> {
286 let cache = self
287 .cache
288 .read()
289 .map_err(|e| OffsetError::LockError(e.to_string()))?;
290 Ok(cache.get(source).cloned())
291 }
292
293 fn commit(&self, source: &str, offset: Offset) -> OffsetResult<()> {
294 let mut cache = self
295 .cache
296 .write()
297 .map_err(|e| OffsetError::LockError(e.to_string()))?;
298 cache.insert(source.to_string(), offset);
299 self.persist(&cache)?;
300 Ok(())
301 }
302
303 fn get_all(&self) -> OffsetResult<HashMap<String, Offset>> {
304 let cache = self
305 .cache
306 .read()
307 .map_err(|e| OffsetError::LockError(e.to_string()))?;
308 Ok(cache.clone())
309 }
310
311 fn clear(&self, source: &str) -> OffsetResult<()> {
312 let mut cache = self
313 .cache
314 .write()
315 .map_err(|e| OffsetError::LockError(e.to_string()))?;
316 cache.remove(source);
317 self.persist(&cache)?;
318 Ok(())
319 }
320
321 fn clear_all(&self) -> OffsetResult<()> {
322 let mut cache = self
323 .cache
324 .write()
325 .map_err(|e| OffsetError::LockError(e.to_string()))?;
326 cache.clear();
327 self.persist(&cache)?;
328 Ok(())
329 }
330}
331
332#[derive(Debug)]
338pub struct OffsetTracker {
339 store: Box<dyn OffsetStore>,
340 strategy: CommitStrategy,
341 reset_policy: OffsetResetPolicy,
342 pending: Arc<RwLock<HashMap<String, (Offset, usize)>>>,
343}
344
345impl OffsetTracker {
346 pub fn new(store: Box<dyn OffsetStore>) -> Self {
348 Self {
349 store,
350 strategy: CommitStrategy::default(),
351 reset_policy: OffsetResetPolicy::default(),
352 pending: Arc::new(RwLock::new(HashMap::new())),
353 }
354 }
355
356 pub fn with_strategy(store: Box<dyn OffsetStore>, strategy: CommitStrategy) -> Self {
358 Self {
359 store,
360 strategy,
361 reset_policy: OffsetResetPolicy::default(),
362 pending: Arc::new(RwLock::new(HashMap::new())),
363 }
364 }
365
366 pub fn with_reset_policy(mut self, policy: OffsetResetPolicy) -> Self {
368 self.reset_policy = policy;
369 self
370 }
371
372 pub fn get_offset(&self, source: &str) -> OffsetResult<Offset> {
375 match self.store.get(source)? {
376 Some(offset) => Ok(offset),
377 None => match self.reset_policy {
378 OffsetResetPolicy::Earliest => Ok(Offset::Earliest),
379 OffsetResetPolicy::Latest => Ok(Offset::Latest),
380 OffsetResetPolicy::None => Err(OffsetError::SourceNotFound(source.to_string())),
381 },
382 }
383 }
384
385 pub fn record(&self, source: &str, offset: Offset) -> OffsetResult<()> {
390 match self.strategy {
391 CommitStrategy::Auto => {
392 self.store.commit(source, offset)?;
393 }
394 CommitStrategy::Periodic(interval) => {
395 let mut pending = self
396 .pending
397 .write()
398 .map_err(|e| OffsetError::LockError(e.to_string()))?;
399
400 let entry = pending
401 .entry(source.to_string())
402 .or_insert((offset.clone(), 0));
403 entry.0 = offset;
404 entry.1 += 1;
405
406 if entry.1 >= interval {
407 let offset_to_commit = entry.0.clone();
408 entry.1 = 0;
409 drop(pending); self.store.commit(source, offset_to_commit)?;
411 }
412 }
413 CommitStrategy::Manual => {
414 let mut pending = self
415 .pending
416 .write()
417 .map_err(|e| OffsetError::LockError(e.to_string()))?;
418 let entry = pending
419 .entry(source.to_string())
420 .or_insert((offset.clone(), 0));
421 entry.0 = offset;
422 entry.1 += 1;
423 }
424 }
425 Ok(())
426 }
427
428 pub fn commit(&self, source: &str) -> OffsetResult<()> {
432 let pending_offset = {
433 let pending = self
434 .pending
435 .read()
436 .map_err(|e| OffsetError::LockError(e.to_string()))?;
437 pending.get(source).map(|(o, _)| o.clone())
438 };
439
440 if let Some(offset) = pending_offset {
441 self.store.commit(source, offset)?;
442 let mut pending = self
443 .pending
444 .write()
445 .map_err(|e| OffsetError::LockError(e.to_string()))?;
446 if let Some(entry) = pending.get_mut(source) {
447 entry.1 = 0;
448 }
449 }
450 Ok(())
451 }
452
453 pub fn commit_all(&self) -> OffsetResult<()> {
455 let sources: Vec<String> = {
456 let pending = self
457 .pending
458 .read()
459 .map_err(|e| OffsetError::LockError(e.to_string()))?;
460 pending.keys().cloned().collect()
461 };
462
463 for source in sources {
464 self.commit(&source)?;
465 }
466 Ok(())
467 }
468
469 pub fn reset(&self, source: &str, offset: Offset) -> OffsetResult<()> {
471 self.store.commit(source, offset)
472 }
473
474 pub fn clear(&self, source: &str) -> OffsetResult<()> {
476 self.store.clear(source)?;
477 let mut pending = self
478 .pending
479 .write()
480 .map_err(|e| OffsetError::LockError(e.to_string()))?;
481 pending.remove(source);
482 Ok(())
483 }
484
485 pub fn strategy(&self) -> CommitStrategy {
487 self.strategy
488 }
489
490 pub fn reset_policy(&self) -> OffsetResetPolicy {
492 self.reset_policy
493 }
494
495 pub fn get_all_committed(&self) -> OffsetResult<HashMap<String, Offset>> {
497 self.store.get_all()
498 }
499
500 pub fn get_all_pending(&self) -> OffsetResult<HashMap<String, Offset>> {
502 let pending = self
503 .pending
504 .read()
505 .map_err(|e| OffsetError::LockError(e.to_string()))?;
506 Ok(
507 pending
508 .iter()
509 .map(|(k, (o, _))| (k.clone(), o.clone()))
510 .collect(),
511 )
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use tempfile::tempdir;
519
520 #[test]
522 fn test_offset_display() {
523 assert_eq!(Offset::Sequence(42).to_string(), "seq:42");
524 assert_eq!(Offset::Earliest.to_string(), "earliest");
525 assert_eq!(Offset::Latest.to_string(), "latest");
526 assert_eq!(Offset::Custom("foo".to_string()).to_string(), "custom:foo");
527 }
528
529 #[test]
530 fn test_offset_increment() {
531 assert_eq!(Offset::Sequence(0).increment(), Some(Offset::Sequence(1)));
532 assert_eq!(Offset::Sequence(42).increment(), Some(Offset::Sequence(43)));
533 assert_eq!(Offset::Earliest.increment(), None);
534 assert_eq!(Offset::Latest.increment(), None);
535 assert_eq!(Offset::Custom("x".to_string()).increment(), None);
536 }
537
538 #[test]
539 fn test_offset_is_earliest_latest() {
540 assert!(Offset::Earliest.is_earliest());
541 assert!(!Offset::Latest.is_earliest());
542 assert!(Offset::Latest.is_latest());
543 assert!(!Offset::Earliest.is_latest());
544 }
545
546 #[test]
547 fn test_offset_default() {
548 assert_eq!(Offset::default(), Offset::Earliest);
549 }
550
551 #[test]
553 fn test_in_memory_store_basic() {
554 let store = InMemoryOffsetStore::new();
555
556 assert!(store.get("source1").unwrap().is_none());
557
558 store.commit("source1", Offset::Sequence(10)).unwrap();
559 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
560
561 store.commit("source1", Offset::Sequence(20)).unwrap();
562 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(20)));
563 }
564
565 #[test]
566 fn test_in_memory_store_multiple_sources() {
567 let store = InMemoryOffsetStore::new();
568
569 store.commit("source1", Offset::Sequence(10)).unwrap();
570 store.commit("source2", Offset::Sequence(20)).unwrap();
571
572 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
573 assert_eq!(store.get("source2").unwrap(), Some(Offset::Sequence(20)));
574
575 let all = store.get_all().unwrap();
576 assert_eq!(all.len(), 2);
577 }
578
579 #[test]
580 fn test_in_memory_store_clear() {
581 let store = InMemoryOffsetStore::new();
582
583 store.commit("source1", Offset::Sequence(10)).unwrap();
584 store.commit("source2", Offset::Sequence(20)).unwrap();
585
586 store.clear("source1").unwrap();
587 assert!(store.get("source1").unwrap().is_none());
588 assert_eq!(store.get("source2").unwrap(), Some(Offset::Sequence(20)));
589
590 store.clear_all().unwrap();
591 assert!(store.get_all().unwrap().is_empty());
592 }
593
594 #[test]
595 fn test_in_memory_store_with_initial_offsets() {
596 let mut initial = HashMap::new();
597 initial.insert("source1".to_string(), Offset::Sequence(100));
598
599 let store = InMemoryOffsetStore::with_offsets(initial);
600 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(100)));
601 }
602
603 #[test]
605 fn test_file_store_basic() {
606 let dir = tempdir().unwrap();
607 let path = dir.path().join("offsets.json");
608
609 let store = FileOffsetStore::new(&path).unwrap();
610
611 assert!(store.get("source1").unwrap().is_none());
612
613 store.commit("source1", Offset::Sequence(10)).unwrap();
614 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(10)));
615
616 let store2 = FileOffsetStore::new(&path).unwrap();
618 assert_eq!(store2.get("source1").unwrap(), Some(Offset::Sequence(10)));
619 }
620
621 #[test]
622 fn test_file_store_clear() {
623 let dir = tempdir().unwrap();
624 let path = dir.path().join("offsets.json");
625
626 let store = FileOffsetStore::new(&path).unwrap();
627
628 store.commit("source1", Offset::Sequence(10)).unwrap();
629 store.commit("source2", Offset::Sequence(20)).unwrap();
630
631 store.clear("source1").unwrap();
632
633 let store2 = FileOffsetStore::new(&path).unwrap();
635 assert!(store2.get("source1").unwrap().is_none());
636 assert_eq!(store2.get("source2").unwrap(), Some(Offset::Sequence(20)));
637 }
638
639 #[test]
640 fn test_file_store_creates_parent_dirs() {
641 let dir = tempdir().unwrap();
642 let path = dir.path().join("nested/dir/offsets.json");
643
644 let store = FileOffsetStore::new(&path).unwrap();
645 store.commit("source1", Offset::Sequence(10)).unwrap();
646
647 assert!(path.exists());
648 }
649
650 #[test]
652 fn test_tracker_auto_commit() {
653 let store = Box::new(InMemoryOffsetStore::new());
654 let tracker = OffsetTracker::new(store);
655
656 tracker.record("source1", Offset::Sequence(10)).unwrap();
657
658 assert_eq!(
660 tracker.get_all_committed().unwrap().get("source1"),
661 Some(&Offset::Sequence(10))
662 );
663 }
664
665 #[test]
666 fn test_tracker_periodic_commit() {
667 let store = Box::new(InMemoryOffsetStore::new());
668 let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Periodic(3));
669
670 tracker.record("source1", Offset::Sequence(1)).unwrap();
671 tracker.record("source1", Offset::Sequence(2)).unwrap();
672
673 assert!(tracker.get_all_committed().unwrap().is_empty());
675
676 tracker.record("source1", Offset::Sequence(3)).unwrap();
677
678 assert_eq!(
680 tracker.get_all_committed().unwrap().get("source1"),
681 Some(&Offset::Sequence(3))
682 );
683 }
684
685 #[test]
686 fn test_tracker_manual_commit() {
687 let store = Box::new(InMemoryOffsetStore::new());
688 let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
689
690 tracker.record("source1", Offset::Sequence(10)).unwrap();
691
692 assert!(tracker.get_all_committed().unwrap().is_empty());
694
695 assert_eq!(
697 tracker.get_all_pending().unwrap().get("source1"),
698 Some(&Offset::Sequence(10))
699 );
700
701 tracker.commit("source1").unwrap();
702
703 assert_eq!(
705 tracker.get_all_committed().unwrap().get("source1"),
706 Some(&Offset::Sequence(10))
707 );
708 }
709
710 #[test]
711 fn test_tracker_commit_all() {
712 let store = Box::new(InMemoryOffsetStore::new());
713 let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
714
715 tracker.record("source1", Offset::Sequence(10)).unwrap();
716 tracker.record("source2", Offset::Sequence(20)).unwrap();
717
718 tracker.commit_all().unwrap();
719
720 let committed = tracker.get_all_committed().unwrap();
721 assert_eq!(committed.get("source1"), Some(&Offset::Sequence(10)));
722 assert_eq!(committed.get("source2"), Some(&Offset::Sequence(20)));
723 }
724
725 #[test]
726 fn test_tracker_reset_policy_earliest() {
727 let store = Box::new(InMemoryOffsetStore::new());
728 let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::Earliest);
729
730 assert_eq!(tracker.get_offset("unknown").unwrap(), Offset::Earliest);
731 }
732
733 #[test]
734 fn test_tracker_reset_policy_latest() {
735 let store = Box::new(InMemoryOffsetStore::new());
736 let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::Latest);
737
738 assert_eq!(tracker.get_offset("unknown").unwrap(), Offset::Latest);
739 }
740
741 #[test]
742 fn test_tracker_reset_policy_none() {
743 let store = Box::new(InMemoryOffsetStore::new());
744 let tracker = OffsetTracker::new(store).with_reset_policy(OffsetResetPolicy::None);
745
746 let result = tracker.get_offset("unknown");
747 assert!(matches!(result, Err(OffsetError::SourceNotFound(_))));
748 }
749
750 #[test]
751 fn test_tracker_reset_offset() {
752 let store = Box::new(InMemoryOffsetStore::new());
753 let tracker = OffsetTracker::new(store);
754
755 tracker.record("source1", Offset::Sequence(100)).unwrap();
756 tracker.reset("source1", Offset::Sequence(0)).unwrap();
757
758 assert_eq!(tracker.get_offset("source1").unwrap(), Offset::Sequence(0));
759 }
760
761 #[test]
762 fn test_tracker_clear() {
763 let store = Box::new(InMemoryOffsetStore::new());
764 let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Manual);
765
766 tracker.record("source1", Offset::Sequence(100)).unwrap();
767 tracker.commit("source1").unwrap();
768 tracker.record("source1", Offset::Sequence(200)).unwrap();
769
770 tracker.clear("source1").unwrap();
771
772 assert!(tracker.get_all_committed().unwrap().is_empty());
774 assert!(tracker.get_all_pending().unwrap().is_empty());
775 }
776
777 #[test]
778 fn test_offset_error_display() {
779 let io_err = OffsetError::IoError(io::Error::new(io::ErrorKind::NotFound, "file not found"));
780 assert!(io_err.to_string().contains("IO error"));
781
782 let ser_err = OffsetError::SerializationError("bad json".to_string());
783 assert!(ser_err.to_string().contains("Serialization error"));
784
785 let not_found = OffsetError::SourceNotFound("src".to_string());
786 assert!(not_found.to_string().contains("Source not found"));
787
788 let lock_err = OffsetError::LockError("poisoned".to_string());
789 assert!(lock_err.to_string().contains("Lock error"));
790
791 let invalid = OffsetError::InvalidOffset("bad format".to_string());
792 assert!(invalid.to_string().contains("Invalid offset"));
793 }
794
795 #[test]
796 fn test_tracker_strategy_getters() {
797 let store = Box::new(InMemoryOffsetStore::new());
798 let tracker = OffsetTracker::with_strategy(store, CommitStrategy::Periodic(5))
799 .with_reset_policy(OffsetResetPolicy::Latest);
800
801 assert_eq!(tracker.strategy(), CommitStrategy::Periodic(5));
802 assert_eq!(tracker.reset_policy(), OffsetResetPolicy::Latest);
803 }
804
805 #[test]
806 fn test_offset_constructors() {
807 let seq = Offset::sequence(42);
808 assert_eq!(seq, Offset::Sequence(42));
809
810 let ts = Utc::now();
811 let ts_offset = Offset::timestamp(ts);
812 assert!(matches!(ts_offset, Offset::Timestamp(_)));
813
814 let custom = Offset::custom("my-offset");
815 assert_eq!(custom, Offset::Custom("my-offset".to_string()));
816 }
817
818 #[test]
819 fn test_file_store_path() {
820 let dir = tempdir().unwrap();
821 let path = dir.path().join("offsets.json");
822
823 let store = FileOffsetStore::new(&path).unwrap();
824 assert_eq!(store.path(), path);
825 }
826
827 #[test]
828 fn test_in_memory_store_clone() {
829 let store = InMemoryOffsetStore::new();
830 store.commit("source1", Offset::Sequence(10)).unwrap();
831
832 let cloned = store.clone();
833
834 assert_eq!(cloned.get("source1").unwrap(), Some(Offset::Sequence(10)));
836
837 cloned.commit("source1", Offset::Sequence(20)).unwrap();
839 assert_eq!(store.get("source1").unwrap(), Some(Offset::Sequence(20)));
840 }
841}