database_replicator/xmin/
state.rs1use anyhow::{Context, Result};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use tokio::fs;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct TableSyncState {
13 pub schema: String,
15 pub table: String,
17 pub last_xmin: u32,
20 pub last_sync_at: chrono::DateTime<chrono::Utc>,
22 pub last_row_count: u64,
24}
25
26impl TableSyncState {
27 pub fn new(schema: &str, table: &str) -> Self {
29 Self {
30 schema: schema.to_string(),
31 table: table.to_string(),
32 last_xmin: 0,
33 last_sync_at: chrono::Utc::now(),
34 last_row_count: 0,
35 }
36 }
37
38 pub fn update(&mut self, new_xmin: u32, row_count: u64) {
40 self.last_xmin = new_xmin;
41 self.last_sync_at = chrono::Utc::now();
42 self.last_row_count = row_count;
43 }
44
45 pub fn qualified_name(&self) -> String {
47 format!("{}.{}", self.schema, self.table)
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncState {
54 pub source_url: String,
56 pub target_url: String,
58 pub tables: HashMap<String, TableSyncState>,
60 pub version: u32,
62 pub created_at: chrono::DateTime<chrono::Utc>,
64 pub updated_at: chrono::DateTime<chrono::Utc>,
66}
67
68impl SyncState {
69 pub fn new(source_url: &str, target_url: &str) -> Self {
71 let now = chrono::Utc::now();
72 Self {
73 source_url: sanitize_url(source_url),
74 target_url: sanitize_url(target_url),
75 tables: HashMap::new(),
76 version: 1,
77 created_at: now,
78 updated_at: now,
79 }
80 }
81
82 pub fn get_or_create_table(&mut self, schema: &str, table: &str) -> &mut TableSyncState {
84 let key = format!("{}.{}", schema, table);
85 self.tables
86 .entry(key)
87 .or_insert_with(|| TableSyncState::new(schema, table))
88 }
89
90 pub fn get_table(&self, schema: &str, table: &str) -> Option<&TableSyncState> {
92 let key = format!("{}.{}", schema, table);
93 self.tables.get(&key)
94 }
95
96 pub fn update_table(&mut self, schema: &str, table: &str, new_xmin: u32, row_count: u64) {
98 let state = self.get_or_create_table(schema, table);
99 state.update(new_xmin, row_count);
100 self.updated_at = chrono::Utc::now();
101 }
102
103 pub fn remove_table(&mut self, schema: &str, table: &str) -> Option<TableSyncState> {
105 let key = format!("{}.{}", schema, table);
106 let removed = self.tables.remove(&key);
107 if removed.is_some() {
108 self.updated_at = chrono::Utc::now();
109 }
110 removed
111 }
112
113 pub fn tracked_tables(&self) -> Vec<&str> {
115 self.tables.keys().map(|s| s.as_str()).collect()
116 }
117
118 pub async fn load(path: &Path) -> Result<Self> {
120 let contents = fs::read_to_string(path)
121 .await
122 .with_context(|| format!("Failed to read sync state from {:?}", path))?;
123 let state: SyncState = serde_json::from_str(&contents)
124 .with_context(|| format!("Failed to parse sync state from {:?}", path))?;
125 Ok(state)
126 }
127
128 pub async fn save(&self, path: &Path) -> Result<()> {
130 if let Some(parent) = path.parent() {
132 fs::create_dir_all(parent)
133 .await
134 .with_context(|| format!("Failed to create directory {:?}", parent))?;
135 }
136
137 let contents =
138 serde_json::to_string_pretty(self).context("Failed to serialize sync state")?;
139 fs::write(path, contents)
140 .await
141 .with_context(|| format!("Failed to write sync state to {:?}", path))?;
142 Ok(())
143 }
144
145 pub fn default_path() -> std::path::PathBuf {
147 std::path::PathBuf::from(".seren-replicator/xmin-sync-state.json")
148 }
149}
150
151fn sanitize_url(url: &str) -> String {
153 if let Ok(mut parsed) = url::Url::parse(url) {
155 if parsed.password().is_some() {
156 let _ = parsed.set_password(Some("***"));
157 }
158 parsed.to_string()
159 } else {
160 url.to_string()
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn test_table_sync_state_new() {
171 let state = TableSyncState::new("public", "users");
172 assert_eq!(state.schema, "public");
173 assert_eq!(state.table, "users");
174 assert_eq!(state.last_xmin, 0);
175 assert_eq!(state.last_row_count, 0);
176 }
177
178 #[test]
179 fn test_table_sync_state_update() {
180 let mut state = TableSyncState::new("public", "users");
181 state.update(12345, 100);
182 assert_eq!(state.last_xmin, 12345);
183 assert_eq!(state.last_row_count, 100);
184 }
185
186 #[test]
187 fn test_table_sync_state_qualified_name() {
188 let state = TableSyncState::new("myschema", "mytable");
189 assert_eq!(state.qualified_name(), "myschema.mytable");
190 }
191
192 #[test]
193 fn test_sync_state_new() {
194 let state = SyncState::new(
195 "postgresql://user:pass@localhost/db",
196 "postgresql://user:pass@remote/db",
197 );
198 assert!(state.tables.is_empty());
199 assert_eq!(state.version, 1);
200 assert!(state.source_url.contains("***"));
202 assert!(state.target_url.contains("***"));
203 }
204
205 #[test]
206 fn test_sync_state_get_or_create() {
207 let mut state = SyncState::new("source", "target");
208
209 let table_state = state.get_or_create_table("public", "users");
211 assert_eq!(table_state.last_xmin, 0);
212
213 table_state.update(100, 50);
215
216 let table_state = state.get_or_create_table("public", "users");
218 assert_eq!(table_state.last_xmin, 100);
219 }
220
221 #[test]
222 fn test_sync_state_update_table() {
223 let mut state = SyncState::new("source", "target");
224 state.update_table("public", "users", 500, 200);
225
226 let table_state = state.get_table("public", "users").unwrap();
227 assert_eq!(table_state.last_xmin, 500);
228 assert_eq!(table_state.last_row_count, 200);
229 }
230
231 #[test]
232 fn test_sync_state_remove_table() {
233 let mut state = SyncState::new("source", "target");
234 state.update_table("public", "users", 100, 10);
235
236 let removed = state.remove_table("public", "users");
237 assert!(removed.is_some());
238 assert!(state.get_table("public", "users").is_none());
239 }
240
241 #[test]
242 fn test_sanitize_url() {
243 assert_eq!(
244 sanitize_url("postgresql://user:secret@localhost/db"),
245 "postgresql://user:***@localhost/db"
246 );
247 assert_eq!(
248 sanitize_url("postgresql://user@localhost/db"),
249 "postgresql://user@localhost/db"
250 );
251 assert_eq!(sanitize_url("/path/to/db.sqlite"), "/path/to/db.sqlite");
252 }
253}