1use std::collections::HashMap;
33use std::fs::{self, File, OpenOptions};
34use std::io::{Read, Write};
35use std::path::{Path, PathBuf};
36use std::sync::{Arc, RwLock};
37
38use crate::error::{ClientError, Result};
39
40pub trait OffsetStore: Send + Sync {
45 fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>>;
49
50 fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()>;
54
55 fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()>;
59
60 fn list_all(&self) -> Result<HashMap<(u32, u64), u64>>;
64}
65
66#[derive(Debug, Default, Clone)]
71pub struct MemoryOffsetStore {
72 offsets: Arc<RwLock<HashMap<(u32, u64), u64>>>,
73}
74
75impl MemoryOffsetStore {
76 pub fn new() -> Self {
78 Self {
79 offsets: Arc::new(RwLock::new(HashMap::new())),
80 }
81 }
82}
83
84impl OffsetStore for MemoryOffsetStore {
85 fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
86 let offsets = self.offsets.read().map_err(|e| {
87 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
88 })?;
89 Ok(offsets.get(&(topic_id, consumer_id)).copied())
90 }
91
92 fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
93 let mut offsets = self.offsets.write().map_err(|e| {
94 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
95 })?;
96 offsets.insert((topic_id, consumer_id), offset);
97 Ok(())
98 }
99
100 fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
101 let mut offsets = self.offsets.write().map_err(|e| {
102 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
103 })?;
104 offsets.remove(&(topic_id, consumer_id));
105 Ok(())
106 }
107
108 fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
109 let offsets = self.offsets.read().map_err(|e| {
110 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
111 })?;
112 Ok(offsets.clone())
113 }
114}
115
116#[derive(Debug)]
128pub struct LockFileOffsetStore {
129 base_dir: PathBuf,
130 consumer_name: String,
131 cache: RwLock<HashMap<(u32, u64), u64>>,
132}
133
134impl LockFileOffsetStore {
135 pub fn open(base_dir: &Path, consumer_name: &str) -> Result<Self> {
149 let dir = base_dir.join(consumer_name);
150 fs::create_dir_all(&dir).map_err(ClientError::IoError)?;
151
152 let store = Self {
153 base_dir: dir,
154 consumer_name: consumer_name.to_string(),
155 cache: RwLock::new(HashMap::new()),
156 };
157
158 store.load_all_into_cache()?;
160
161 Ok(store)
162 }
163
164 fn offset_file_path(&self, topic_id: u32, consumer_id: u64) -> PathBuf {
166 self.base_dir.join(format!(
167 "topic-{}-consumer-{}.offset",
168 topic_id, consumer_id
169 ))
170 }
171
172 fn lock_file_path(&self, topic_id: u32, consumer_id: u64) -> PathBuf {
174 self.base_dir
175 .join(format!("topic-{}-consumer-{}.lock", topic_id, consumer_id))
176 }
177
178 fn load_all_into_cache(&self) -> Result<()> {
180 let mut cache = self.cache.write().map_err(|e| {
181 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
182 })?;
183
184 let entries = match fs::read_dir(&self.base_dir) {
185 Ok(entries) => entries,
186 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
187 Err(e) => return Err(ClientError::IoError(e)),
188 };
189
190 for entry in entries.flatten() {
191 let path = entry.path();
192 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
193 if name.ends_with(".offset") {
194 if let Some((topic_id, consumer_id)) = Self::parse_offset_filename(name) {
196 if let Ok(offset) = Self::read_offset_file(&path) {
197 cache.insert((topic_id, consumer_id), offset);
198 }
199 }
200 }
201 }
202 }
203
204 Ok(())
205 }
206
207 fn parse_offset_filename(name: &str) -> Option<(u32, u64)> {
209 let name = name.strip_suffix(".offset")?;
211 let parts: Vec<&str> = name.split('-').collect();
212 if parts.len() >= 4 && parts[0] == "topic" && parts[2] == "consumer" {
213 let topic_id: u32 = parts[1].parse().ok()?;
214 let consumer_id: u64 = parts[3].parse().ok()?;
215 Some((topic_id, consumer_id))
216 } else {
217 None
218 }
219 }
220
221 fn read_offset_file(path: &Path) -> Result<u64> {
223 let mut file = File::open(path).map_err(ClientError::IoError)?;
224 let mut content = String::new();
225 file.read_to_string(&mut content)
226 .map_err(ClientError::IoError)?;
227 content
228 .trim()
229 .parse()
230 .map_err(|e| ClientError::IoError(std::io::Error::other(e)))
231 }
232
233 fn write_offset_file(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
235 let path = self.offset_file_path(topic_id, consumer_id);
236 let lock_path = self.lock_file_path(topic_id, consumer_id);
237 let temp_path = path.with_extension("offset.tmp");
238
239 let lock_file = OpenOptions::new()
241 .write(true)
242 .create(true)
243 .truncate(true)
244 .open(&lock_path)
245 .map_err(ClientError::IoError)?;
246
247 #[cfg(unix)]
249 {
250 use std::os::unix::io::AsRawFd;
251 let fd = lock_file.as_raw_fd();
252 unsafe {
254 if libc::flock(fd, libc::LOCK_EX) != 0 {
255 return Err(ClientError::IoError(std::io::Error::last_os_error()));
256 }
257 }
258 }
259
260 #[cfg(windows)]
261 {
262 use std::os::windows::io::AsRawHandle;
263 use windows_sys::Win32::Foundation::HANDLE;
264 use windows_sys::Win32::Storage::FileSystem::{LOCKFILE_EXCLUSIVE_LOCK, LockFileEx};
265 let handle = lock_file.as_raw_handle() as HANDLE;
266 unsafe {
268 let mut overlapped =
269 std::mem::zeroed::<windows_sys::Win32::System::IO::OVERLAPPED>();
270 if LockFileEx(
271 handle,
272 LOCKFILE_EXCLUSIVE_LOCK,
273 0,
274 u32::MAX,
275 u32::MAX,
276 &mut overlapped,
277 ) == 0
278 {
279 return Err(ClientError::IoError(std::io::Error::last_os_error()));
280 }
281 }
282 }
283
284 let mut temp_file = File::create(&temp_path).map_err(ClientError::IoError)?;
286 writeln!(temp_file, "{}", offset).map_err(ClientError::IoError)?;
287 temp_file.sync_all().map_err(ClientError::IoError)?;
288 drop(temp_file);
289
290 fs::rename(&temp_path, &path).map_err(ClientError::IoError)?;
292
293 drop(lock_file);
295
296 Ok(())
297 }
298
299 pub fn consumer_name(&self) -> &str {
301 &self.consumer_name
302 }
303
304 pub fn base_dir(&self) -> &Path {
306 &self.base_dir
307 }
308}
309
310impl OffsetStore for LockFileOffsetStore {
311 fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
312 {
314 let cache = self.cache.read().map_err(|e| {
315 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
316 })?;
317 if let Some(&offset) = cache.get(&(topic_id, consumer_id)) {
318 return Ok(Some(offset));
319 }
320 }
321
322 let path = self.offset_file_path(topic_id, consumer_id);
324 if !path.exists() {
325 return Ok(None);
326 }
327
328 let offset = Self::read_offset_file(&path)?;
329
330 {
332 let mut cache = self.cache.write().map_err(|e| {
333 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
334 })?;
335 cache.insert((topic_id, consumer_id), offset);
336 }
337
338 Ok(Some(offset))
339 }
340
341 fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
342 self.write_offset_file(topic_id, consumer_id, offset)?;
344
345 {
347 let mut cache = self.cache.write().map_err(|e| {
348 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
349 })?;
350 cache.insert((topic_id, consumer_id), offset);
351 }
352
353 Ok(())
354 }
355
356 fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
357 let path = self.offset_file_path(topic_id, consumer_id);
358 let lock_path = self.lock_file_path(topic_id, consumer_id);
359
360 {
362 let mut cache = self.cache.write().map_err(|e| {
363 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
364 })?;
365 cache.remove(&(topic_id, consumer_id));
366 }
367
368 let _ = fs::remove_file(&path);
370 let _ = fs::remove_file(&lock_path);
371
372 Ok(())
373 }
374
375 fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
376 let cache = self.cache.read().map_err(|e| {
377 ClientError::IoError(std::io::Error::other(format!("Lock poisoned: {}", e)))
378 })?;
379 Ok(cache.clone())
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct CommitInfo {
386 pub topic_id: u32,
388 pub consumer_id: u64,
390 pub offset: u64,
392 pub previous_offset: Option<u64>,
394 pub timestamp: std::time::SystemTime,
396}
397
398pub trait PostCommitHook: Send + Sync {
403 fn on_commit(&self, info: &CommitInfo) -> Result<()>;
408}
409
410pub struct HookedOffsetStore<S: OffsetStore> {
414 inner: S,
415 hooks: Vec<Arc<dyn PostCommitHook>>,
416 previous_offsets: RwLock<HashMap<(u32, u64), u64>>,
417}
418
419impl<S: OffsetStore> HookedOffsetStore<S> {
420 pub fn new(inner: S) -> Self {
422 Self {
423 inner,
424 hooks: Vec::new(),
425 previous_offsets: RwLock::new(HashMap::new()),
426 }
427 }
428
429 pub fn add_hook(mut self, hook: Arc<dyn PostCommitHook>) -> Self {
431 self.hooks.push(hook);
432 self
433 }
434
435 pub fn with_hooks(mut self, hooks: Vec<Arc<dyn PostCommitHook>>) -> Self {
437 self.hooks.extend(hooks);
438 self
439 }
440
441 pub fn inner(&self) -> &S {
443 &self.inner
444 }
445
446 fn invoke_hooks(&self, info: &CommitInfo) -> Result<()> {
448 for hook in &self.hooks {
449 hook.on_commit(info)?;
450 }
451 Ok(())
452 }
453}
454
455impl<S: OffsetStore> OffsetStore for HookedOffsetStore<S> {
456 fn load(&self, topic_id: u32, consumer_id: u64) -> Result<Option<u64>> {
457 let offset = self.inner.load(topic_id, consumer_id)?;
458
459 if let Some(off) = offset {
461 if let Ok(mut prev) = self.previous_offsets.write() {
462 prev.insert((topic_id, consumer_id), off);
463 }
464 }
465
466 Ok(offset)
467 }
468
469 fn save(&self, topic_id: u32, consumer_id: u64, offset: u64) -> Result<()> {
470 let previous_offset = self
472 .previous_offsets
473 .read()
474 .ok()
475 .and_then(|prev| prev.get(&(topic_id, consumer_id)).copied());
476
477 self.inner.save(topic_id, consumer_id, offset)?;
479
480 if let Ok(mut prev) = self.previous_offsets.write() {
482 prev.insert((topic_id, consumer_id), offset);
483 }
484
485 let info = CommitInfo {
487 topic_id,
488 consumer_id,
489 offset,
490 previous_offset,
491 timestamp: std::time::SystemTime::now(),
492 };
493 self.invoke_hooks(&info)?;
494
495 Ok(())
496 }
497
498 fn delete(&self, topic_id: u32, consumer_id: u64) -> Result<()> {
499 if let Ok(mut prev) = self.previous_offsets.write() {
501 prev.remove(&(topic_id, consumer_id));
502 }
503
504 self.inner.delete(topic_id, consumer_id)
505 }
506
507 fn list_all(&self) -> Result<HashMap<(u32, u64), u64>> {
508 self.inner.list_all()
509 }
510}
511
512#[derive(Debug, Default)]
514pub struct LoggingCommitHook;
515
516impl PostCommitHook for LoggingCommitHook {
517 fn on_commit(&self, info: &CommitInfo) -> Result<()> {
518 tracing::debug!(
519 topic_id = info.topic_id,
520 consumer_id = info.consumer_id,
521 offset = info.offset,
522 previous = ?info.previous_offset,
523 "Offset committed"
524 );
525 Ok(())
526 }
527}
528
529#[derive(Debug, Default)]
531pub struct CollectingCommitHook {
532 commits: RwLock<Vec<CommitInfo>>,
533}
534
535impl CollectingCommitHook {
536 pub fn new() -> Self {
538 Self::default()
539 }
540
541 pub fn commits(&self) -> Vec<CommitInfo> {
543 self.commits.read().map(|c| c.clone()).unwrap_or_default()
544 }
545
546 pub fn clear(&self) {
548 if let Ok(mut commits) = self.commits.write() {
549 commits.clear();
550 }
551 }
552}
553
554impl PostCommitHook for CollectingCommitHook {
555 fn on_commit(&self, info: &CommitInfo) -> Result<()> {
556 if let Ok(mut commits) = self.commits.write() {
557 commits.push(info.clone());
558 }
559 Ok(())
560 }
561}
562
563#[cfg(test)]
564#[allow(clippy::unwrap_used)]
565mod tests {
566 use super::*;
567 use tempfile::TempDir;
568
569 #[test]
570 fn test_memory_offset_store() {
571 let store = MemoryOffsetStore::new();
572
573 assert!(store.load(1, 100).unwrap().is_none());
575
576 store.save(1, 100, 42).unwrap();
578 assert_eq!(store.load(1, 100).unwrap(), Some(42));
579
580 store.save(1, 100, 100).unwrap();
582 assert_eq!(store.load(1, 100).unwrap(), Some(100));
583
584 store.save(2, 200, 999).unwrap();
586 assert_eq!(store.load(2, 200).unwrap(), Some(999));
587 assert_eq!(store.load(1, 100).unwrap(), Some(100));
588
589 let all = store.list_all().unwrap();
591 assert_eq!(all.len(), 2);
592 assert_eq!(all.get(&(1, 100)), Some(&100));
593 assert_eq!(all.get(&(2, 200)), Some(&999));
594
595 store.delete(1, 100).unwrap();
597 assert!(store.load(1, 100).unwrap().is_none());
598 assert_eq!(store.load(2, 200).unwrap(), Some(999));
599 }
600
601 #[test]
602 fn test_lock_file_offset_store() {
603 let temp_dir = TempDir::new().unwrap();
604 let store = LockFileOffsetStore::open(temp_dir.path(), "test-consumer").unwrap();
605
606 assert!(store.load(1, 100).unwrap().is_none());
608
609 store.save(1, 100, 12345).unwrap();
611 assert_eq!(store.load(1, 100).unwrap(), Some(12345));
612
613 let file_path = store.offset_file_path(1, 100);
615 assert!(file_path.exists());
616
617 let content = fs::read_to_string(&file_path).unwrap();
619 assert_eq!(content.trim(), "12345");
620
621 store.save(1, 100, 67890).unwrap();
623 assert_eq!(store.load(1, 100).unwrap(), Some(67890));
624
625 store.delete(1, 100).unwrap();
627 assert!(store.load(1, 100).unwrap().is_none());
628 assert!(!file_path.exists());
629 }
630
631 #[test]
632 fn test_lock_file_offset_store_persistence() {
633 let temp_dir = TempDir::new().unwrap();
634
635 {
637 let store = LockFileOffsetStore::open(temp_dir.path(), "persist-test").unwrap();
638 store.save(5, 500, 99999).unwrap();
639 }
640
641 {
643 let store = LockFileOffsetStore::open(temp_dir.path(), "persist-test").unwrap();
644 assert_eq!(store.load(5, 500).unwrap(), Some(99999));
645 }
646 }
647
648 #[test]
649 fn test_parse_offset_filename() {
650 assert_eq!(
651 LockFileOffsetStore::parse_offset_filename("topic-1-consumer-100.offset"),
652 Some((1, 100))
653 );
654 assert_eq!(
655 LockFileOffsetStore::parse_offset_filename("topic-999-consumer-12345.offset"),
656 Some((999, 12345))
657 );
658 assert_eq!(
659 LockFileOffsetStore::parse_offset_filename("invalid.offset"),
660 None
661 );
662 assert_eq!(
663 LockFileOffsetStore::parse_offset_filename("topic-abc-consumer-100.offset"),
664 None
665 );
666 }
667
668 #[test]
669 fn test_hooked_offset_store() {
670 let inner = MemoryOffsetStore::new();
671 let hook = Arc::new(CollectingCommitHook::new());
672 let store = HookedOffsetStore::new(inner).add_hook(hook.clone());
673
674 store.save(1, 100, 42).unwrap();
676
677 let commits = hook.commits();
678 assert_eq!(commits.len(), 1);
679 assert_eq!(commits[0].topic_id, 1);
680 assert_eq!(commits[0].consumer_id, 100);
681 assert_eq!(commits[0].offset, 42);
682 assert!(commits[0].previous_offset.is_none());
683
684 store.save(1, 100, 100).unwrap();
686
687 let commits = hook.commits();
688 assert_eq!(commits.len(), 2);
689 assert_eq!(commits[1].offset, 100);
690 assert_eq!(commits[1].previous_offset, Some(42));
691
692 store.save(2, 200, 500).unwrap();
694 assert_eq!(hook.commits().len(), 3);
695
696 assert_eq!(store.load(1, 100).unwrap(), Some(100));
698 assert_eq!(store.load(2, 200).unwrap(), Some(500));
699
700 hook.clear();
702 assert!(hook.commits().is_empty());
703 }
704
705 #[test]
706 fn test_collecting_commit_hook() {
707 let hook = CollectingCommitHook::new();
708
709 let info = CommitInfo {
710 topic_id: 1,
711 consumer_id: 100,
712 offset: 42,
713 previous_offset: None,
714 timestamp: std::time::SystemTime::now(),
715 };
716
717 hook.on_commit(&info).unwrap();
718
719 let commits = hook.commits();
720 assert_eq!(commits.len(), 1);
721 assert_eq!(commits[0].topic_id, 1);
722
723 hook.clear();
724 assert!(hook.commits().is_empty());
725 }
726}