Skip to main content

engram/sync/
worker.rs

1//! Background sync worker with debouncing (RML-875)
2
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use parking_lot::Mutex;
9use rusqlite::{params, Connection};
10use tokio::sync::mpsc;
11use tokio::time::{interval, Instant};
12
13use super::{CloudStorage, SyncDirection};
14use crate::error::{EngramError, Result};
15use crate::types::SyncStatus;
16
17/// Commands for the sync worker
18#[derive(Debug)]
19pub enum SyncCommand {
20    /// Trigger a sync (direction, force)
21    Sync(SyncDirection, bool),
22    /// Mark data as dirty (triggers debounced sync)
23    MarkDirty,
24    /// Stop the worker
25    Stop,
26}
27
28/// Background sync worker
29pub struct SyncWorker {
30    sender: mpsc::Sender<SyncCommand>,
31}
32
33impl SyncWorker {
34    /// Start the sync worker
35    pub async fn start(
36        db_path: PathBuf,
37        cloud_uri: String,
38        encrypt: bool,
39        debounce_ms: u64,
40        conn: Arc<Mutex<Connection>>,
41    ) -> Result<Self> {
42        let (sender, mut receiver) = mpsc::channel::<SyncCommand>(100);
43
44        let cloud = CloudStorage::from_uri(&cloud_uri, encrypt).await?;
45        let debounce = Duration::from_millis(debounce_ms);
46
47        // Spawn worker task
48        tokio::spawn(async move {
49            let mut last_dirty: Option<Instant> = None;
50            let mut check_interval = interval(Duration::from_secs(1));
51
52            loop {
53                tokio::select! {
54                    Some(cmd) = receiver.recv() => {
55                        match cmd {
56                            SyncCommand::Sync(direction, force) => {
57                                Self::do_sync(&db_path, &cloud, &conn, direction, force).await;
58                                last_dirty = None;
59                            }
60                            SyncCommand::MarkDirty => {
61                                last_dirty = Some(Instant::now());
62                            }
63                            SyncCommand::Stop => {
64                                // Final sync before stopping
65                                Self::do_sync(&db_path, &cloud, &conn, SyncDirection::Push, false).await;
66                                break;
67                            }
68                        }
69                    }
70                    _ = check_interval.tick() => {
71                        // Check if debounce period has passed
72                        if let Some(dirty_time) = last_dirty {
73                            if dirty_time.elapsed() >= debounce {
74                                Self::do_sync(&db_path, &cloud, &conn, SyncDirection::Push, false).await;
75                                last_dirty = None;
76                            }
77                        }
78                    }
79                }
80            }
81
82            tracing::info!("Sync worker stopped");
83        });
84
85        Ok(Self { sender })
86    }
87
88    /// Perform the actual sync operation
89    async fn do_sync(
90        db_path: &PathBuf,
91        cloud: &CloudStorage,
92        conn: &Arc<Mutex<Connection>>,
93        direction: SyncDirection,
94        _force: bool,
95    ) {
96        let started_at = Utc::now();
97
98        // Update sync state to syncing
99        {
100            let conn = conn.lock();
101            let _ = conn.execute("UPDATE sync_state SET is_syncing = 1 WHERE id = 1", []);
102        }
103
104        let result = match direction {
105            SyncDirection::Push => cloud.upload(db_path).await,
106            SyncDirection::Pull => cloud.download(db_path).await,
107            SyncDirection::Bidirectional => {
108                // Check which is newer
109                match cloud.metadata().await {
110                    Ok(_remote_meta) => {
111                        let _local_modified =
112                            std::fs::metadata(db_path).and_then(|m| m.modified()).ok();
113
114                        // Simple heuristic: push if local is newer or no remote
115                        cloud.upload(db_path).await
116                    }
117                    Err(_) => {
118                        // No remote, push
119                        cloud.upload(db_path).await
120                    }
121                }
122            }
123        };
124
125        let completed_at = Utc::now();
126
127        // Update sync state
128        {
129            let conn = conn.lock();
130            match &result {
131                Ok(_) => {
132                    let _ = conn.execute(
133                        "UPDATE sync_state SET
134                            is_syncing = 0,
135                            last_sync = ?,
136                            pending_changes = 0,
137                            last_error = NULL
138                         WHERE id = 1",
139                        params![completed_at.to_rfc3339()],
140                    );
141                }
142                Err(e) => {
143                    let _ = conn.execute(
144                        "UPDATE sync_state SET
145                            is_syncing = 0,
146                            last_error = ?
147                         WHERE id = 1",
148                        params![e.to_string()],
149                    );
150                }
151            }
152        }
153
154        match result {
155            Ok(bytes) => {
156                tracing::info!(
157                    "Sync {:?} completed: {} bytes in {:?}",
158                    direction,
159                    bytes,
160                    completed_at - started_at
161                );
162            }
163            Err(e) => {
164                tracing::error!("Sync {:?} failed: {}", direction, e);
165            }
166        }
167    }
168
169    /// Trigger a sync
170    pub async fn sync(&self, direction: SyncDirection, force: bool) -> Result<()> {
171        self.sender
172            .send(SyncCommand::Sync(direction, force))
173            .await
174            .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
175        Ok(())
176    }
177
178    /// Mark data as dirty (triggers debounced sync)
179    pub async fn mark_dirty(&self) -> Result<()> {
180        self.sender
181            .send(SyncCommand::MarkDirty)
182            .await
183            .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
184        Ok(())
185    }
186
187    /// Stop the worker
188    pub async fn stop(&self) -> Result<()> {
189        self.sender
190            .send(SyncCommand::Stop)
191            .await
192            .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
193        Ok(())
194    }
195}
196
197/// Get current sync status from database
198pub fn get_sync_status(conn: &Connection) -> Result<SyncStatus> {
199    let row = conn.query_row(
200        "SELECT pending_changes, last_sync, last_error, is_syncing FROM sync_state WHERE id = 1",
201        [],
202        |row| {
203            let pending: i64 = row.get(0)?;
204            let last_sync: Option<String> = row.get(1)?;
205            let last_error: Option<String> = row.get(2)?;
206            let is_syncing: i32 = row.get(3)?;
207
208            Ok(SyncStatus {
209                pending_changes: pending,
210                last_sync: last_sync.and_then(|s| {
211                    chrono::DateTime::parse_from_rfc3339(&s)
212                        .map(|dt| dt.with_timezone(&Utc))
213                        .ok()
214                }),
215                last_error,
216                is_syncing: is_syncing != 0,
217            })
218        },
219    )?;
220
221    Ok(row)
222}
223
224/// Increment pending changes counter
225#[allow(dead_code)]
226pub fn increment_pending_changes(conn: &Connection) -> Result<()> {
227    conn.execute(
228        "UPDATE sync_state SET pending_changes = pending_changes + 1 WHERE id = 1",
229        [],
230    )?;
231    Ok(())
232}