Skip to main content

syncor_core/sync/
state.rs

1use crate::error::Result;
2use rusqlite::{params, Connection};
3use std::path::Path;
4
5pub struct StateDb {
6    conn: Connection,
7}
8
9#[derive(Debug, Clone)]
10pub struct SyncState {
11    pub link_id: String,
12    pub last_local_snapshot: Option<String>,
13    pub last_remote_revision: Option<String>,
14    pub last_synced_snapshot_id: Option<String>,
15    pub last_sync_at: Option<String>,
16}
17
18#[derive(Debug, Clone)]
19pub struct ConflictRecord {
20    pub link_id: String,
21    pub file_path: String,
22    pub local_hash: Option<String>,
23    pub remote_hash: Option<String>,
24    pub base_hash: Option<String>,
25}
26
27#[derive(Debug, Clone)]
28pub struct SyncLogEntry {
29    pub id: i64,
30    pub link_id: String,
31    pub action: String,
32    pub status: String,
33    pub message: Option<String>,
34    pub created_at: String,
35}
36
37impl StateDb {
38    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
39        let conn = Connection::open(path)?;
40
41        // Enable WAL mode for better concurrency
42        conn.execute_batch("PRAGMA journal_mode=WAL;")?;
43
44        conn.execute_batch(
45            "CREATE TABLE IF NOT EXISTS sync_state (
46                link_id TEXT PRIMARY KEY,
47                last_local_snapshot TEXT,
48                last_remote_revision TEXT,
49                last_synced_snapshot_id TEXT,
50                last_sync_at TEXT
51            );
52
53            CREATE TABLE IF NOT EXISTS conflicts (
54                id INTEGER PRIMARY KEY AUTOINCREMENT,
55                link_id TEXT NOT NULL,
56                file_path TEXT NOT NULL,
57                local_hash TEXT,
58                remote_hash TEXT,
59                base_hash TEXT
60            );
61
62            CREATE TABLE IF NOT EXISTS sync_log (
63                id INTEGER PRIMARY KEY AUTOINCREMENT,
64                link_id TEXT NOT NULL,
65                action TEXT NOT NULL,
66                status TEXT NOT NULL,
67                message TEXT,
68                created_at TEXT NOT NULL DEFAULT (datetime('now'))
69            );",
70        )?;
71
72        Ok(Self { conn })
73    }
74
75    pub fn get_sync_state(&self, link_id: &str) -> Result<Option<SyncState>> {
76        let mut stmt = self.conn.prepare(
77            "SELECT link_id, last_local_snapshot, last_remote_revision, last_synced_snapshot_id, last_sync_at
78             FROM sync_state WHERE link_id = ?1",
79        )?;
80
81        let mut rows = stmt.query(params![link_id])?;
82        if let Some(row) = rows.next()? {
83            Ok(Some(SyncState {
84                link_id: row.get(0)?,
85                last_local_snapshot: row.get(1)?,
86                last_remote_revision: row.get(2)?,
87                last_synced_snapshot_id: row.get(3)?,
88                last_sync_at: row.get(4)?,
89            }))
90        } else {
91            Ok(None)
92        }
93    }
94
95    pub fn upsert_sync_state(&self, state: &SyncState) -> Result<()> {
96        self.conn.execute(
97            "INSERT INTO sync_state (link_id, last_local_snapshot, last_remote_revision, last_synced_snapshot_id, last_sync_at)
98             VALUES (?1, ?2, ?3, ?4, ?5)
99             ON CONFLICT(link_id) DO UPDATE SET
100                last_local_snapshot = excluded.last_local_snapshot,
101                last_remote_revision = excluded.last_remote_revision,
102                last_synced_snapshot_id = excluded.last_synced_snapshot_id,
103                last_sync_at = excluded.last_sync_at",
104            params![
105                state.link_id,
106                state.last_local_snapshot,
107                state.last_remote_revision,
108                state.last_synced_snapshot_id,
109                state.last_sync_at,
110            ],
111        )?;
112        Ok(())
113    }
114
115    pub fn insert_conflict(&self, conflict: &ConflictRecord) -> Result<()> {
116        self.conn.execute(
117            "INSERT INTO conflicts (link_id, file_path, local_hash, remote_hash, base_hash)
118             VALUES (?1, ?2, ?3, ?4, ?5)",
119            params![
120                conflict.link_id,
121                conflict.file_path,
122                conflict.local_hash,
123                conflict.remote_hash,
124                conflict.base_hash,
125            ],
126        )?;
127        Ok(())
128    }
129
130    pub fn list_conflicts(&self, link_id: &str) -> Result<Vec<ConflictRecord>> {
131        let mut stmt = self.conn.prepare(
132            "SELECT link_id, file_path, local_hash, remote_hash, base_hash
133             FROM conflicts WHERE link_id = ?1",
134        )?;
135
136        let records = stmt.query_map(params![link_id], |row| {
137            Ok(ConflictRecord {
138                link_id: row.get(0)?,
139                file_path: row.get(1)?,
140                local_hash: row.get(2)?,
141                remote_hash: row.get(3)?,
142                base_hash: row.get(4)?,
143            })
144        })?;
145
146        let mut result = Vec::new();
147        for record in records {
148            result.push(record?);
149        }
150        Ok(result)
151    }
152
153    pub fn has_conflicts(&self, link_id: &str) -> Result<bool> {
154        let count: i64 = self.conn.query_row(
155            "SELECT COUNT(*) FROM conflicts WHERE link_id = ?1",
156            params![link_id],
157            |row| row.get(0),
158        )?;
159        Ok(count > 0)
160    }
161
162    pub fn clear_conflicts(&self, link_id: &str) -> Result<()> {
163        self.conn
164            .execute("DELETE FROM conflicts WHERE link_id = ?1", params![link_id])?;
165        Ok(())
166    }
167
168    pub fn append_log(
169        &self,
170        link_id: &str,
171        action: &str,
172        status: &str,
173        message: Option<&str>,
174    ) -> Result<()> {
175        self.conn.execute(
176            "INSERT INTO sync_log (link_id, action, status, message)
177             VALUES (?1, ?2, ?3, ?4)",
178            params![link_id, action, status, message],
179        )?;
180        Ok(())
181    }
182
183    pub fn list_log(&self, link_id: &str, limit: Option<usize>) -> Result<Vec<SyncLogEntry>> {
184        let limit_val = limit.unwrap_or(100) as i64;
185        let mut stmt = self.conn.prepare(
186            "SELECT id, link_id, action, status, message, created_at
187             FROM sync_log WHERE link_id = ?1
188             ORDER BY id DESC LIMIT ?2",
189        )?;
190
191        let entries = stmt.query_map(params![link_id, limit_val], |row| {
192            Ok(SyncLogEntry {
193                id: row.get(0)?,
194                link_id: row.get(1)?,
195                action: row.get(2)?,
196                status: row.get(3)?,
197                message: row.get(4)?,
198                created_at: row.get(5)?,
199            })
200        })?;
201
202        let mut result = Vec::new();
203        for entry in entries {
204            result.push(entry?);
205        }
206        Ok(result)
207    }
208
209    pub fn delete_state(&self, link_id: &str) -> Result<()> {
210        self.conn.execute(
211            "DELETE FROM sync_state WHERE link_id = ?1",
212            params![link_id],
213        )?;
214        self.conn
215            .execute("DELETE FROM conflicts WHERE link_id = ?1", params![link_id])?;
216        self.conn
217            .execute("DELETE FROM sync_log WHERE link_id = ?1", params![link_id])?;
218        Ok(())
219    }
220}