1use std::collections::HashMap;
7
8use crate::fs::VirtualFs;
9use crate::fslog::FsLog;
10use crate::merge::merge;
11use crate::types::{
12 FsError, SyncError, SyncFile, SyncRequest, SyncResponse, DIR_MEDIA, DIR_USER_ROOT, MD_EXT,
13 STATUS_MERGED, STATUS_NOT_MODIFIED, STATUS_OK, STATUS_UPDATED_ON_SERVER,
14};
15
16#[derive(Debug, Clone)]
18pub struct SyncConfig {
19 pub config_filename: String,
21 pub storage_dir: String,
23}
24
25impl Default for SyncConfig {
26 fn default() -> Self {
27 Self {
28 config_filename: "config.json".to_string(),
29 storage_dir: String::new(),
30 }
31 }
32}
33
34pub struct SyncEngine {
36 fs: VirtualFs,
37 config: SyncConfig,
38 fslog: FsLog,
39}
40
41impl SyncEngine {
42 pub fn new(fs: VirtualFs, config: SyncConfig, fslog: FsLog) -> Self {
44 Self { fs, config, fslog }
45 }
46
47 pub fn fs(&self) -> &VirtualFs {
49 &self.fs
50 }
51
52 pub fn sync_filenames(
60 &self,
61 user_id: i64,
62 request: SyncRequest,
63 ) -> Result<SyncResponse, SyncError> {
64 let mut files_to_send: Vec<SyncFile> = Vec::new();
65 let mut dir_timestamps: HashMap<String, i64> = HashMap::new();
66
67 let mut last_sync: i64 = 0;
68 for ts in request.timestamps.values() {
69 if *ts > last_sync {
70 last_sync = *ts;
71 }
72 }
73
74 let renames = if last_sync != 0 {
75 let user_prefix = format!("{}/{}/", self.config.storage_dir, user_id);
76 self.fslog.renames_since(&user_prefix, last_sync)
77 } else {
78 HashMap::new()
79 };
80
81 for path in &request.deleted {
83 let rel = path.trim_start_matches('/');
84 let _ = self.fs.del(DIR_USER_ROOT, rel);
85 }
86
87 for client_file in &request.modified {
89 let rel = client_file.path.trim_start_matches('/');
90 let server_mtime = self.fs.mtime(DIR_USER_ROOT, rel).ok();
91 let mut content = client_file.content.clone();
92
93 if let Some(server_modified) = server_mtime {
95 if server_modified > client_file.last_modified {
96 if let Ok(server_content) = self.fs.read(DIR_USER_ROOT, rel) {
97 content = merge(&server_content, &client_file.content);
98 }
99 }
100 }
101
102 if client_file.path == self.config.config_filename {
104 continue;
105 }
106
107 match self.fs.write(DIR_USER_ROOT, rel, &content) {
108 Err(FsError::QuotaExceeded) => return Err(SyncError::QuotaExceeded),
109 Err(e) => tracing::warn!(path = %rel, error = %e, "Sync write failed"),
110 Ok(_) => {}
111 }
112 }
113
114 let server_timestamps = self
116 .fs
117 .mtimes(DIR_USER_ROOT, &[MD_EXT, ".txt"])
118 .map_err(|e| SyncError::Storage(e.to_string()))?;
119
120 for (path, server_time) in &server_timestamps {
121 let parts: Vec<&str> = path.split('/').collect();
122 let dir = if parts.len() == 1 { "." } else { parts[0] };
123 let client_dir_time = request.timestamps.get(dir).copied().unwrap_or(0);
124
125 if server_time > &client_dir_time {
126 if let Ok(content) = self.fs.read(DIR_USER_ROOT, path) {
127 files_to_send.push(SyncFile {
128 status: STATUS_OK.to_string(),
129 path: path.clone(),
130 last_modified: *server_time,
131 client_last_modified: 0,
132 client_last_synced: 0,
133 content,
134 });
135 }
136 }
137
138 let existing = dir_timestamps.get(dir).copied().unwrap_or(0);
139 if *server_time > existing {
140 dir_timestamps.insert(dir.to_string(), *server_time);
141 }
142 }
143
144 Ok(SyncResponse {
145 status: STATUS_OK.to_string(),
146 files: files_to_send,
147 timestamps: dir_timestamps,
148 renames,
149 })
150 }
151
152 pub fn sync_file(
154 &self,
155 _user_id: i64,
156 client_file: SyncFile,
157 ) -> Result<SyncResponse, SyncError> {
158 let rel = client_file.path.trim_start_matches('/');
159 let server_content = self.fs.read(DIR_USER_ROOT, rel).ok();
160 let server_mtime = self.fs.mtime(DIR_USER_ROOT, rel).ok().unwrap_or(0);
161
162 if let Some(ref content) = server_content {
164 if *content == client_file.content {
165 return Ok(SyncResponse {
166 status: STATUS_NOT_MODIFIED.to_string(),
167 ..SyncResponse::default()
168 });
169 }
170 }
171
172 let mut status = STATUS_OK.to_string();
173 let mut content = client_file.content.clone();
174 let mut should_update = true;
175
176 if let Some(ref server_content) = server_content {
177 let not_modified_on_client = client_file.client_last_synced != 0
178 && client_file.client_last_modified == client_file.client_last_synced;
179 let modified_on_server = server_mtime > client_file.last_modified;
180
181 if modified_on_server && not_modified_on_client {
182 content = server_content.clone();
183 should_update = false;
184 } else if modified_on_server {
185 content = merge(server_content, &client_file.content);
186 status = STATUS_MERGED.to_string();
187 }
188 }
189
190 if should_update {
191 self.fs
192 .write(DIR_USER_ROOT, rel, &content)
193 .map_err(|e| SyncError::Storage(e.to_string()))?;
194 return Ok(SyncResponse {
195 status: STATUS_UPDATED_ON_SERVER.to_string(),
196 ..SyncResponse::default()
197 });
198 }
199
200 let final_mtime = self.fs.mtime(DIR_USER_ROOT, rel).unwrap_or(0);
201 Ok(SyncResponse {
202 status: status.clone(),
203 files: vec![SyncFile {
204 status,
205 path: client_file.path,
206 last_modified: final_mtime,
207 client_last_modified: client_file.last_modified,
208 client_last_synced: client_file.client_last_synced,
209 content,
210 }],
211 ..SyncResponse::default()
212 })
213 }
214}
215
216#[derive(Debug, Clone)]
220pub struct MediaEntry {
221 pub filename: String,
223 pub last_modified: i64,
225}
226
227#[derive(Debug, Clone)]
229pub struct MediaSyncResponse {
230 pub files: Vec<MediaEntry>,
232 pub timestamp: i64,
234}
235
236impl SyncEngine {
237 pub fn sync_media_filenames(
242 &self,
243 since_timestamp: i64,
244 ) -> Result<MediaSyncResponse, SyncError> {
245 let mtimes = self
246 .fs
247 .mtimes(DIR_MEDIA, &[])
248 .map_err(|e| SyncError::Storage(e.to_string()))?;
249
250 let mut files: Vec<MediaEntry> = Vec::new();
251 let mut latest_timestamp: i64 = 0;
252
253 for (filename, mod_time) in &mtimes {
254 if *mod_time <= since_timestamp {
255 continue;
256 }
257 if *mod_time > latest_timestamp {
258 latest_timestamp = *mod_time;
259 }
260 files.push(MediaEntry {
261 filename: filename.clone(),
262 last_modified: *mod_time,
263 });
264 }
265
266 Ok(MediaSyncResponse {
267 files,
268 timestamp: latest_timestamp,
269 })
270 }
271
272 pub fn sync_media_upload(&self, filename: &str, data: &[u8]) -> Result<(), SyncError> {
274 let exists = self
275 .fs
276 .exists(DIR_MEDIA, filename)
277 .map_err(|e| SyncError::Storage(e.to_string()))?;
278
279 if exists {
280 return Ok(());
282 }
283
284 self.fs
285 .write_bytes(DIR_MEDIA, filename, data)
286 .map_err(|e| match e {
287 FsError::QuotaExceeded => SyncError::QuotaExceeded,
288 other => SyncError::Storage(other.to_string()),
289 })?;
290
291 Ok(())
292 }
293
294 pub fn sync_media_read(&self, filename: &str) -> Result<Vec<u8>, SyncError> {
296 self.fs
297 .read_bytes(DIR_MEDIA, filename)
298 .map_err(|e| SyncError::Storage(e.to_string()))
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use tempfile::TempDir;
306
307 fn test_engine() -> (SyncEngine, TempDir) {
308 let dir = TempDir::new().unwrap();
309 let fs = VirtualFs::new(dir.path().to_path_buf()).unwrap();
310 let fslog = FsLog::new(dir.path().join("fslog"));
311 let config = SyncConfig {
312 config_filename: "config.json".into(),
313 storage_dir: dir.path().to_string_lossy().to_string(),
314 };
315 (SyncEngine::new(fs, config, fslog), dir)
316 }
317
318 #[test]
319 fn test_sync_file_new() {
320 let (engine, _t) = test_engine();
321 let resp = engine
322 .sync_file(
323 1,
324 SyncFile {
325 status: String::new(),
326 path: "test.md".into(),
327 last_modified: 0,
328 client_last_modified: 0,
329 client_last_synced: 0,
330 content: "hello".into(),
331 },
332 )
333 .unwrap();
334 assert_eq!(resp.status, STATUS_UPDATED_ON_SERVER);
335 }
336
337 #[test]
338 fn test_sync_file_not_modified() {
339 let (engine, _t) = test_engine();
340 engine.fs.write(DIR_USER_ROOT, "test.md", "hello").unwrap();
341 let resp = engine
342 .sync_file(
343 1,
344 SyncFile {
345 status: String::new(),
346 path: "test.md".into(),
347 last_modified: 0,
348 client_last_modified: 0,
349 client_last_synced: 0,
350 content: "hello".into(),
351 },
352 )
353 .unwrap();
354 assert_eq!(resp.status, STATUS_NOT_MODIFIED);
355 }
356
357 #[test]
358 fn test_batch_sync_creates_files() {
359 let (engine, _t) = test_engine();
360 let resp = engine
361 .sync_filenames(
362 1,
363 SyncRequest {
364 modified: vec![SyncFile {
365 status: String::new(),
366 path: "new.md".into(),
367 last_modified: 0,
368 client_last_modified: 0,
369 client_last_synced: 0,
370 content: "new content".into(),
371 }],
372 deleted: vec![],
373 timestamps: HashMap::new(),
374 },
375 )
376 .unwrap();
377 assert_eq!(resp.status, STATUS_OK);
378 assert!(engine.fs.exists(DIR_USER_ROOT, "new.md").unwrap());
379 }
380
381 #[test]
382 fn test_sync_media_upload_and_read() {
383 let (engine, _t) = test_engine();
384 engine.fs.make_dir(DIR_MEDIA).unwrap();
385
386 let data: &[u8] = &[0x89, 0x50, 0x4E, 0x47, 0xFF, 0xD8, 0x00];
388
389 engine.sync_media_upload("photo.png", data).unwrap();
390
391 let read_back = engine.sync_media_read("photo.png").unwrap();
392 assert_eq!(read_back, data);
393 }
394
395 #[test]
396 fn test_sync_media_upload_skips_existing() {
397 let (engine, _t) = test_engine();
398 engine.fs.make_dir(DIR_MEDIA).unwrap();
399
400 engine.sync_media_upload("file.bin", b"original").unwrap();
401 engine.sync_media_upload("file.bin", b"updated").unwrap();
403
404 let content = engine.sync_media_read("file.bin").unwrap();
405 assert_eq!(content, b"original");
406 }
407}