1use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use parking_lot::Mutex;
9use rusqlite::{params, Connection};
10use tokio::sync::mpsc;
11use tokio::time::{interval, Instant};
12
13use super::{CloudStorage, SyncDirection};
14use crate::error::{EngramError, Result};
15use crate::types::SyncStatus;
16
17#[derive(Debug)]
19pub enum SyncCommand {
20 Sync(SyncDirection, bool),
22 MarkDirty,
24 Stop,
26}
27
28pub struct SyncWorker {
30 sender: mpsc::Sender<SyncCommand>,
31}
32
33impl SyncWorker {
34 pub async fn start(
36 db_path: PathBuf,
37 cloud_uri: String,
38 encrypt: bool,
39 debounce_ms: u64,
40 conn: Arc<Mutex<Connection>>,
41 ) -> Result<Self> {
42 let (sender, mut receiver) = mpsc::channel::<SyncCommand>(100);
43
44 let cloud = CloudStorage::from_uri(&cloud_uri, encrypt).await?;
45 let debounce = Duration::from_millis(debounce_ms);
46
47 tokio::spawn(async move {
49 let mut last_dirty: Option<Instant> = None;
50 let mut check_interval = interval(Duration::from_secs(1));
51
52 loop {
53 tokio::select! {
54 Some(cmd) = receiver.recv() => {
55 match cmd {
56 SyncCommand::Sync(direction, force) => {
57 Self::do_sync(&db_path, &cloud, &conn, direction, force).await;
58 last_dirty = None;
59 }
60 SyncCommand::MarkDirty => {
61 last_dirty = Some(Instant::now());
62 }
63 SyncCommand::Stop => {
64 Self::do_sync(&db_path, &cloud, &conn, SyncDirection::Push, false).await;
66 break;
67 }
68 }
69 }
70 _ = check_interval.tick() => {
71 if let Some(dirty_time) = last_dirty {
73 if dirty_time.elapsed() >= debounce {
74 Self::do_sync(&db_path, &cloud, &conn, SyncDirection::Push, false).await;
75 last_dirty = None;
76 }
77 }
78 }
79 }
80 }
81
82 tracing::info!("Sync worker stopped");
83 });
84
85 Ok(Self { sender })
86 }
87
88 async fn do_sync(
90 db_path: &PathBuf,
91 cloud: &CloudStorage,
92 conn: &Arc<Mutex<Connection>>,
93 direction: SyncDirection,
94 _force: bool,
95 ) {
96 let started_at = Utc::now();
97
98 {
100 let conn = conn.lock();
101 let _ = conn.execute("UPDATE sync_state SET is_syncing = 1 WHERE id = 1", []);
102 }
103
104 let result = match direction {
105 SyncDirection::Push => cloud.upload(db_path).await,
106 SyncDirection::Pull => cloud.download(db_path).await,
107 SyncDirection::Bidirectional => {
108 match cloud.metadata().await {
110 Ok(_remote_meta) => {
111 let _local_modified =
112 std::fs::metadata(db_path).and_then(|m| m.modified()).ok();
113
114 cloud.upload(db_path).await
116 }
117 Err(_) => {
118 cloud.upload(db_path).await
120 }
121 }
122 }
123 };
124
125 let completed_at = Utc::now();
126
127 {
129 let conn = conn.lock();
130 match &result {
131 Ok(_) => {
132 let _ = conn.execute(
133 "UPDATE sync_state SET
134 is_syncing = 0,
135 last_sync = ?,
136 pending_changes = 0,
137 last_error = NULL
138 WHERE id = 1",
139 params![completed_at.to_rfc3339()],
140 );
141 }
142 Err(e) => {
143 let _ = conn.execute(
144 "UPDATE sync_state SET
145 is_syncing = 0,
146 last_error = ?
147 WHERE id = 1",
148 params![e.to_string()],
149 );
150 }
151 }
152 }
153
154 match result {
155 Ok(bytes) => {
156 tracing::info!(
157 "Sync {:?} completed: {} bytes in {:?}",
158 direction,
159 bytes,
160 completed_at - started_at
161 );
162 }
163 Err(e) => {
164 tracing::error!("Sync {:?} failed: {}", direction, e);
165 }
166 }
167 }
168
169 pub async fn sync(&self, direction: SyncDirection, force: bool) -> Result<()> {
171 self.sender
172 .send(SyncCommand::Sync(direction, force))
173 .await
174 .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
175 Ok(())
176 }
177
178 pub async fn mark_dirty(&self) -> Result<()> {
180 self.sender
181 .send(SyncCommand::MarkDirty)
182 .await
183 .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
184 Ok(())
185 }
186
187 pub async fn stop(&self) -> Result<()> {
189 self.sender
190 .send(SyncCommand::Stop)
191 .await
192 .map_err(|_| EngramError::Sync("Worker channel closed".to_string()))?;
193 Ok(())
194 }
195}
196
197pub fn get_sync_status(conn: &Connection) -> Result<SyncStatus> {
199 let row = conn.query_row(
200 "SELECT pending_changes, last_sync, last_error, is_syncing FROM sync_state WHERE id = 1",
201 [],
202 |row| {
203 let pending: i64 = row.get(0)?;
204 let last_sync: Option<String> = row.get(1)?;
205 let last_error: Option<String> = row.get(2)?;
206 let is_syncing: i32 = row.get(3)?;
207
208 Ok(SyncStatus {
209 pending_changes: pending,
210 last_sync: last_sync.and_then(|s| {
211 chrono::DateTime::parse_from_rfc3339(&s)
212 .map(|dt| dt.with_timezone(&Utc))
213 .ok()
214 }),
215 last_error,
216 is_syncing: is_syncing != 0,
217 })
218 },
219 )?;
220
221 Ok(row)
222}
223
224#[allow(dead_code)]
226pub fn increment_pending_changes(conn: &Connection) -> Result<()> {
227 conn.execute(
228 "UPDATE sync_state SET pending_changes = pending_changes + 1 WHERE id = 1",
229 [],
230 )?;
231 Ok(())
232}