Skip to main content

kafka_backup_core/offset_store/
mod.rs

1//! Offset storage for tracking backup progress.
2//!
3//! This module provides persistent offset storage to enable backup resumption
4//! after process crashes. Offsets are stored locally in SQLite and periodically
5//! synced to remote storage (S3) for durability.
6
7mod sqlite;
8
9pub use sqlite::SqliteOffsetStore;
10
11use async_trait::async_trait;
12
13use crate::storage::StorageBackend;
14use crate::Result;
15
16/// Offset information for a topic-partition
17#[derive(Debug, Clone)]
18pub struct OffsetInfo {
19    pub topic: String,
20    pub partition: i32,
21    pub last_offset: i64,
22    pub checkpoint_ts: i64,
23}
24
25/// Trait for offset storage backends
26#[async_trait]
27pub trait OffsetStore: Send + Sync {
28    /// Get the last committed offset for a topic-partition
29    async fn get_offset(&self, backup_id: &str, topic: &str, partition: i32)
30        -> Result<Option<i64>>;
31
32    /// Set the offset for a topic-partition
33    async fn set_offset(
34        &self,
35        backup_id: &str,
36        topic: &str,
37        partition: i32,
38        offset: i64,
39    ) -> Result<()>;
40
41    /// Get all offsets for a backup
42    async fn get_all_offsets(&self, backup_id: &str) -> Result<Vec<OffsetInfo>>;
43
44    /// Checkpoint all pending writes to durable storage
45    async fn checkpoint(&self) -> Result<()>;
46
47    /// Sync the local database to remote storage
48    async fn sync_to_storage(&self, storage: &dyn StorageBackend, s3_key: &str) -> Result<()>;
49
50    /// Load the database from remote storage
51    async fn load_from_storage(&self, storage: &dyn StorageBackend, s3_key: &str) -> Result<bool>;
52
53    /// Get or create a backup job record
54    async fn get_or_create_job(&self, backup_id: &str, cluster_id: Option<&str>) -> Result<()>;
55
56    /// Update job status
57    async fn update_job_status(&self, backup_id: &str, status: &str) -> Result<()>;
58
59    /// Update job heartbeat
60    async fn heartbeat(&self, backup_id: &str) -> Result<()>;
61}
62
63/// Configuration for offset storage
64#[derive(Debug, Clone)]
65pub struct OffsetStoreConfig {
66    /// Path to the local SQLite database
67    pub db_path: std::path::PathBuf,
68    /// S3 key for syncing the database (optional)
69    pub s3_key: Option<String>,
70    /// Checkpoint interval in seconds (default: 5)
71    pub checkpoint_interval_secs: u64,
72    /// Sync to S3 interval in seconds (default: 30)
73    pub sync_interval_secs: u64,
74}
75
76impl Default for OffsetStoreConfig {
77    fn default() -> Self {
78        Self {
79            db_path: std::path::PathBuf::from("./offsets.db"),
80            s3_key: None,
81            checkpoint_interval_secs: 5,
82            sync_interval_secs: 30,
83        }
84    }
85}