database_replicator/xmin/
state.rs

1// ABOUTME: SyncState for xmin-based sync - tracks sync progress per table
2// ABOUTME: Persists high-water mark xmin values to enable incremental syncs
3
4use anyhow::{Context, Result};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use tokio::fs;
9
10/// Sync state for a single table, tracking the last synced xmin value.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct TableSyncState {
13    /// Schema name (e.g., "public")
14    pub schema: String,
15    /// Table name
16    pub table: String,
17    /// Last successfully synced xmin value (high-water mark)
18    /// Rows with xmin > this value need to be synced
19    pub last_xmin: u32,
20    /// Timestamp of last successful sync
21    pub last_sync_at: chrono::DateTime<chrono::Utc>,
22    /// Number of rows synced in last batch
23    pub last_row_count: u64,
24}
25
26impl TableSyncState {
27    /// Create a new TableSyncState with initial xmin of 0 (sync everything)
28    pub fn new(schema: &str, table: &str) -> Self {
29        Self {
30            schema: schema.to_string(),
31            table: table.to_string(),
32            last_xmin: 0,
33            last_sync_at: chrono::Utc::now(),
34            last_row_count: 0,
35        }
36    }
37
38    /// Update state after a successful sync
39    pub fn update(&mut self, new_xmin: u32, row_count: u64) {
40        self.last_xmin = new_xmin;
41        self.last_sync_at = chrono::Utc::now();
42        self.last_row_count = row_count;
43    }
44
45    /// Get the qualified table name (schema.table)
46    pub fn qualified_name(&self) -> String {
47        format!("{}.{}", self.schema, self.table)
48    }
49}
50
51/// Overall sync state for a database, containing state for all tracked tables.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncState {
54    /// Source database URL (sanitized - no password)
55    pub source_url: String,
56    /// Target database URL (sanitized - no password)
57    pub target_url: String,
58    /// Per-table sync states, keyed by "schema.table"
59    pub tables: HashMap<String, TableSyncState>,
60    /// Version of the state format for future migrations
61    pub version: u32,
62    /// When this state was created
63    pub created_at: chrono::DateTime<chrono::Utc>,
64    /// When this state was last modified
65    pub updated_at: chrono::DateTime<chrono::Utc>,
66}
67
68impl SyncState {
69    /// Create a new empty SyncState
70    pub fn new(source_url: &str, target_url: &str) -> Self {
71        let now = chrono::Utc::now();
72        Self {
73            source_url: sanitize_url(source_url),
74            target_url: sanitize_url(target_url),
75            tables: HashMap::new(),
76            version: 1,
77            created_at: now,
78            updated_at: now,
79        }
80    }
81
82    /// Get or create state for a table
83    pub fn get_or_create_table(&mut self, schema: &str, table: &str) -> &mut TableSyncState {
84        let key = format!("{}.{}", schema, table);
85        self.tables
86            .entry(key)
87            .or_insert_with(|| TableSyncState::new(schema, table))
88    }
89
90    /// Get state for a table if it exists
91    pub fn get_table(&self, schema: &str, table: &str) -> Option<&TableSyncState> {
92        let key = format!("{}.{}", schema, table);
93        self.tables.get(&key)
94    }
95
96    /// Update state for a table after successful sync
97    pub fn update_table(&mut self, schema: &str, table: &str, new_xmin: u32, row_count: u64) {
98        let state = self.get_or_create_table(schema, table);
99        state.update(new_xmin, row_count);
100        self.updated_at = chrono::Utc::now();
101    }
102
103    /// Remove state for a table (e.g., if table was dropped)
104    pub fn remove_table(&mut self, schema: &str, table: &str) -> Option<TableSyncState> {
105        let key = format!("{}.{}", schema, table);
106        let removed = self.tables.remove(&key);
107        if removed.is_some() {
108            self.updated_at = chrono::Utc::now();
109        }
110        removed
111    }
112
113    /// Get all table names being tracked
114    pub fn tracked_tables(&self) -> Vec<&str> {
115        self.tables.keys().map(|s| s.as_str()).collect()
116    }
117
118    /// Load state from a JSON file
119    pub async fn load(path: &Path) -> Result<Self> {
120        let contents = fs::read_to_string(path)
121            .await
122            .with_context(|| format!("Failed to read sync state from {:?}", path))?;
123        let state: SyncState = serde_json::from_str(&contents)
124            .with_context(|| format!("Failed to parse sync state from {:?}", path))?;
125        Ok(state)
126    }
127
128    /// Save state to a JSON file
129    pub async fn save(&self, path: &Path) -> Result<()> {
130        // Ensure parent directory exists
131        if let Some(parent) = path.parent() {
132            fs::create_dir_all(parent)
133                .await
134                .with_context(|| format!("Failed to create directory {:?}", parent))?;
135        }
136
137        let contents =
138            serde_json::to_string_pretty(self).context("Failed to serialize sync state")?;
139        fs::write(path, contents)
140            .await
141            .with_context(|| format!("Failed to write sync state to {:?}", path))?;
142        Ok(())
143    }
144
145    /// Get the default state file path for the current directory
146    pub fn default_path() -> std::path::PathBuf {
147        std::path::PathBuf::from(".seren-replicator/xmin-sync-state.json")
148    }
149}
150
151/// Sanitize a database URL by removing the password component
152fn sanitize_url(url: &str) -> String {
153    // Try to parse as URL and remove password
154    if let Ok(mut parsed) = url::Url::parse(url) {
155        if parsed.password().is_some() {
156            let _ = parsed.set_password(Some("***"));
157        }
158        parsed.to_string()
159    } else {
160        // If not a valid URL, return as-is (might be a file path for SQLite)
161        url.to_string()
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn test_table_sync_state_new() {
171        let state = TableSyncState::new("public", "users");
172        assert_eq!(state.schema, "public");
173        assert_eq!(state.table, "users");
174        assert_eq!(state.last_xmin, 0);
175        assert_eq!(state.last_row_count, 0);
176    }
177
178    #[test]
179    fn test_table_sync_state_update() {
180        let mut state = TableSyncState::new("public", "users");
181        state.update(12345, 100);
182        assert_eq!(state.last_xmin, 12345);
183        assert_eq!(state.last_row_count, 100);
184    }
185
186    #[test]
187    fn test_table_sync_state_qualified_name() {
188        let state = TableSyncState::new("myschema", "mytable");
189        assert_eq!(state.qualified_name(), "myschema.mytable");
190    }
191
192    #[test]
193    fn test_sync_state_new() {
194        let state = SyncState::new(
195            "postgresql://user:pass@localhost/db",
196            "postgresql://user:pass@remote/db",
197        );
198        assert!(state.tables.is_empty());
199        assert_eq!(state.version, 1);
200        // Passwords should be sanitized
201        assert!(state.source_url.contains("***"));
202        assert!(state.target_url.contains("***"));
203    }
204
205    #[test]
206    fn test_sync_state_get_or_create() {
207        let mut state = SyncState::new("source", "target");
208
209        // First call creates
210        let table_state = state.get_or_create_table("public", "users");
211        assert_eq!(table_state.last_xmin, 0);
212
213        // Update it
214        table_state.update(100, 50);
215
216        // Second call retrieves existing
217        let table_state = state.get_or_create_table("public", "users");
218        assert_eq!(table_state.last_xmin, 100);
219    }
220
221    #[test]
222    fn test_sync_state_update_table() {
223        let mut state = SyncState::new("source", "target");
224        state.update_table("public", "users", 500, 200);
225
226        let table_state = state.get_table("public", "users").unwrap();
227        assert_eq!(table_state.last_xmin, 500);
228        assert_eq!(table_state.last_row_count, 200);
229    }
230
231    #[test]
232    fn test_sync_state_remove_table() {
233        let mut state = SyncState::new("source", "target");
234        state.update_table("public", "users", 100, 10);
235
236        let removed = state.remove_table("public", "users");
237        assert!(removed.is_some());
238        assert!(state.get_table("public", "users").is_none());
239    }
240
241    #[test]
242    fn test_sanitize_url() {
243        assert_eq!(
244            sanitize_url("postgresql://user:secret@localhost/db"),
245            "postgresql://user:***@localhost/db"
246        );
247        assert_eq!(
248            sanitize_url("postgresql://user@localhost/db"),
249            "postgresql://user@localhost/db"
250        );
251        assert_eq!(sanitize_url("/path/to/db.sqlite"), "/path/to/db.sqlite");
252    }
253}