Skip to main content

oxios_markdown/
sync.rs

1//! Sync engine for client-server file synchronization.
2//!
3//! Ported from files.md (`server/sync/sync.go`) by Artem Zakirullin.
4//! Implements mtime-based 3-way merge synchronization with conflict resolution.
5
6use std::collections::HashMap;
7
8use crate::fs::VirtualFs;
9use crate::fslog::FsLog;
10use crate::merge::merge;
11use crate::types::{
12    FsError, SyncError, SyncFile, SyncRequest, SyncResponse, DIR_MEDIA, DIR_USER_ROOT, MD_EXT,
13    STATUS_MERGED, STATUS_NOT_MODIFIED, STATUS_OK, STATUS_UPDATED_ON_SERVER,
14};
15
16/// Configuration for the sync engine.
17#[derive(Debug, Clone)]
18pub struct SyncConfig {
19    /// Knowledge base config filename (usually "config.json").
20    pub config_filename: String,
21    /// Storage directory prefix for user data.
22    pub storage_dir: String,
23}
24
25impl Default for SyncConfig {
26    fn default() -> Self {
27        Self {
28            config_filename: "config.json".to_string(),
29            storage_dir: String::new(),
30        }
31    }
32}
33
34/// Sync engine: handles batch and single-file synchronization.
35pub struct SyncEngine {
36    fs: VirtualFs,
37    config: SyncConfig,
38    fslog: FsLog,
39}
40
41impl SyncEngine {
42    /// Create a new sync engine.
43    pub fn new(fs: VirtualFs, config: SyncConfig, fslog: FsLog) -> Self {
44        Self { fs, config, fslog }
45    }
46
47    /// Get a reference to the underlying filesystem.
48    pub fn fs(&self) -> &VirtualFs {
49        &self.fs
50    }
51
52    /// Perform batch file synchronization.
53    ///
54    /// Algorithm:
55    /// 1. Apply client deletions
56    /// 2. Save client modifications (merge on conflict)
57    /// 3. Send server files that the client doesn't have
58    /// 4. Include rename log entries
59    pub fn sync_filenames(
60        &self,
61        user_id: i64,
62        request: SyncRequest,
63    ) -> Result<SyncResponse, SyncError> {
64        let mut files_to_send: Vec<SyncFile> = Vec::new();
65        let mut dir_timestamps: HashMap<String, i64> = HashMap::new();
66
67        let mut last_sync: i64 = 0;
68        for ts in request.timestamps.values() {
69            if *ts > last_sync {
70                last_sync = *ts;
71            }
72        }
73
74        let renames = if last_sync != 0 {
75            let user_prefix = format!("{}/{}/", self.config.storage_dir, user_id);
76            self.fslog.renames_since(&user_prefix, last_sync)
77        } else {
78            HashMap::new()
79        };
80
81        // Process deletions
82        for path in &request.deleted {
83            let rel = path.trim_start_matches('/');
84            let _ = self.fs.del(DIR_USER_ROOT, rel);
85        }
86
87        // Process modifications
88        for client_file in &request.modified {
89            let rel = client_file.path.trim_start_matches('/');
90            let server_mtime = self.fs.mtime(DIR_USER_ROOT, rel).ok();
91            let mut content = client_file.content.clone();
92
93            // If server file is newer, merge with client content
94            if let Some(server_modified) = server_mtime {
95                if server_modified > client_file.last_modified {
96                    if let Ok(server_content) = self.fs.read(DIR_USER_ROOT, rel) {
97                        content = merge(&server_content, &client_file.content);
98                    }
99                }
100            }
101
102            // Skip config file
103            if client_file.path == self.config.config_filename {
104                continue;
105            }
106
107            match self.fs.write(DIR_USER_ROOT, rel, &content) {
108                Err(FsError::QuotaExceeded) => return Err(SyncError::QuotaExceeded),
109                Err(e) => tracing::warn!(path = %rel, error = %e, "Sync write failed"),
110                Ok(_) => {}
111            }
112        }
113
114        // Build response with files the client needs
115        let server_timestamps = self
116            .fs
117            .mtimes(DIR_USER_ROOT, &[MD_EXT, ".txt"])
118            .map_err(|e| SyncError::Storage(e.to_string()))?;
119
120        for (path, server_time) in &server_timestamps {
121            let parts: Vec<&str> = path.split('/').collect();
122            let dir = if parts.len() == 1 { "." } else { parts[0] };
123            let client_dir_time = request.timestamps.get(dir).copied().unwrap_or(0);
124
125            if server_time > &client_dir_time {
126                if let Ok(content) = self.fs.read(DIR_USER_ROOT, path) {
127                    files_to_send.push(SyncFile {
128                        status: STATUS_OK.to_string(),
129                        path: path.clone(),
130                        last_modified: *server_time,
131                        client_last_modified: 0,
132                        client_last_synced: 0,
133                        content,
134                    });
135                }
136            }
137
138            let existing = dir_timestamps.get(dir).copied().unwrap_or(0);
139            if *server_time > existing {
140                dir_timestamps.insert(dir.to_string(), *server_time);
141            }
142        }
143
144        Ok(SyncResponse {
145            status: STATUS_OK.to_string(),
146            files: files_to_send,
147            timestamps: dir_timestamps,
148            renames,
149        })
150    }
151
152    /// Synchronize a single file.
153    pub fn sync_file(
154        &self,
155        _user_id: i64,
156        client_file: SyncFile,
157    ) -> Result<SyncResponse, SyncError> {
158        let rel = client_file.path.trim_start_matches('/');
159        let server_content = self.fs.read(DIR_USER_ROOT, rel).ok();
160        let server_mtime = self.fs.mtime(DIR_USER_ROOT, rel).ok().unwrap_or(0);
161
162        // Already up to date?
163        if let Some(ref content) = server_content {
164            if *content == client_file.content {
165                return Ok(SyncResponse {
166                    status: STATUS_NOT_MODIFIED.to_string(),
167                    ..SyncResponse::default()
168                });
169            }
170        }
171
172        let mut status = STATUS_OK.to_string();
173        let mut content = client_file.content.clone();
174        let mut should_update = true;
175
176        if let Some(ref server_content) = server_content {
177            let not_modified_on_client = client_file.client_last_synced != 0
178                && client_file.client_last_modified == client_file.client_last_synced;
179            let modified_on_server = server_mtime > client_file.last_modified;
180
181            if modified_on_server && not_modified_on_client {
182                content = server_content.clone();
183                should_update = false;
184            } else if modified_on_server {
185                content = merge(server_content, &client_file.content);
186                status = STATUS_MERGED.to_string();
187            }
188        }
189
190        if should_update {
191            self.fs
192                .write(DIR_USER_ROOT, rel, &content)
193                .map_err(|e| SyncError::Storage(e.to_string()))?;
194            return Ok(SyncResponse {
195                status: STATUS_UPDATED_ON_SERVER.to_string(),
196                ..SyncResponse::default()
197            });
198        }
199
200        let final_mtime = self.fs.mtime(DIR_USER_ROOT, rel).unwrap_or(0);
201        Ok(SyncResponse {
202            status: status.clone(),
203            files: vec![SyncFile {
204                status,
205                path: client_file.path,
206                last_modified: final_mtime,
207                client_last_modified: client_file.last_modified,
208                client_last_synced: client_file.client_last_synced,
209                content,
210            }],
211            ..SyncResponse::default()
212        })
213    }
214}
215
216// ── Media types and methods ───────────────────────────────
217
218/// Media file entry.
219#[derive(Debug, Clone)]
220pub struct MediaEntry {
221    /// Filename within the media directory.
222    pub filename: String,
223    /// Last modified timestamp (millis since epoch).
224    pub last_modified: i64,
225}
226
227/// Media sync response.
228#[derive(Debug, Clone)]
229pub struct MediaSyncResponse {
230    /// Media files modified since the given timestamp.
231    pub files: Vec<MediaEntry>,
232    /// Latest modification timestamp among returned files.
233    pub timestamp: i64,
234}
235
236impl SyncEngine {
237    /// List media files modified since a given timestamp.
238    ///
239    /// Returns all media files whose mtime > `since_timestamp`,
240    /// along with the latest timestamp for incremental sync.
241    pub fn sync_media_filenames(
242        &self,
243        since_timestamp: i64,
244    ) -> Result<MediaSyncResponse, SyncError> {
245        let mtimes = self
246            .fs
247            .mtimes(DIR_MEDIA, &[])
248            .map_err(|e| SyncError::Storage(e.to_string()))?;
249
250        let mut files: Vec<MediaEntry> = Vec::new();
251        let mut latest_timestamp: i64 = 0;
252
253        for (filename, mod_time) in &mtimes {
254            if *mod_time <= since_timestamp {
255                continue;
256            }
257            if *mod_time > latest_timestamp {
258                latest_timestamp = *mod_time;
259            }
260            files.push(MediaEntry {
261                filename: filename.clone(),
262                last_modified: *mod_time,
263            });
264        }
265
266        Ok(MediaSyncResponse {
267            files,
268            timestamp: latest_timestamp,
269        })
270    }
271
272    /// Upload a media file (from raw bytes).
273    pub fn sync_media_upload(&self, filename: &str, data: &[u8]) -> Result<(), SyncError> {
274        let exists = self
275            .fs
276            .exists(DIR_MEDIA, filename)
277            .map_err(|e| SyncError::Storage(e.to_string()))?;
278
279        if exists {
280            // File already exists, skip
281            return Ok(());
282        }
283
284        self.fs
285            .write_bytes(DIR_MEDIA, filename, data)
286            .map_err(|e| match e {
287                FsError::QuotaExceeded => SyncError::QuotaExceeded,
288                other => SyncError::Storage(other.to_string()),
289            })?;
290
291        Ok(())
292    }
293
294    /// Read a media file as raw bytes.
295    pub fn sync_media_read(&self, filename: &str) -> Result<Vec<u8>, SyncError> {
296        self.fs
297            .read_bytes(DIR_MEDIA, filename)
298            .map_err(|e| SyncError::Storage(e.to_string()))
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use tempfile::TempDir;
306
307    fn test_engine() -> (SyncEngine, TempDir) {
308        let dir = TempDir::new().unwrap();
309        let fs = VirtualFs::new(dir.path().to_path_buf()).unwrap();
310        let fslog = FsLog::new(dir.path().join("fslog"));
311        let config = SyncConfig {
312            config_filename: "config.json".into(),
313            storage_dir: dir.path().to_string_lossy().to_string(),
314        };
315        (SyncEngine::new(fs, config, fslog), dir)
316    }
317
318    #[test]
319    fn test_sync_file_new() {
320        let (engine, _t) = test_engine();
321        let resp = engine
322            .sync_file(
323                1,
324                SyncFile {
325                    status: String::new(),
326                    path: "test.md".into(),
327                    last_modified: 0,
328                    client_last_modified: 0,
329                    client_last_synced: 0,
330                    content: "hello".into(),
331                },
332            )
333            .unwrap();
334        assert_eq!(resp.status, STATUS_UPDATED_ON_SERVER);
335    }
336
337    #[test]
338    fn test_sync_file_not_modified() {
339        let (engine, _t) = test_engine();
340        engine.fs.write(DIR_USER_ROOT, "test.md", "hello").unwrap();
341        let resp = engine
342            .sync_file(
343                1,
344                SyncFile {
345                    status: String::new(),
346                    path: "test.md".into(),
347                    last_modified: 0,
348                    client_last_modified: 0,
349                    client_last_synced: 0,
350                    content: "hello".into(),
351                },
352            )
353            .unwrap();
354        assert_eq!(resp.status, STATUS_NOT_MODIFIED);
355    }
356
357    #[test]
358    fn test_batch_sync_creates_files() {
359        let (engine, _t) = test_engine();
360        let resp = engine
361            .sync_filenames(
362                1,
363                SyncRequest {
364                    modified: vec![SyncFile {
365                        status: String::new(),
366                        path: "new.md".into(),
367                        last_modified: 0,
368                        client_last_modified: 0,
369                        client_last_synced: 0,
370                        content: "new content".into(),
371                    }],
372                    deleted: vec![],
373                    timestamps: HashMap::new(),
374                },
375            )
376            .unwrap();
377        assert_eq!(resp.status, STATUS_OK);
378        assert!(engine.fs.exists(DIR_USER_ROOT, "new.md").unwrap());
379    }
380
381    #[test]
382    fn test_sync_media_upload_and_read() {
383        let (engine, _t) = test_engine();
384        engine.fs.make_dir(DIR_MEDIA).unwrap();
385
386        // Binary data that is NOT valid UTF-8
387        let data: &[u8] = &[0x89, 0x50, 0x4E, 0x47, 0xFF, 0xD8, 0x00];
388
389        engine.sync_media_upload("photo.png", data).unwrap();
390
391        let read_back = engine.sync_media_read("photo.png").unwrap();
392        assert_eq!(read_back, data);
393    }
394
395    #[test]
396    fn test_sync_media_upload_skips_existing() {
397        let (engine, _t) = test_engine();
398        engine.fs.make_dir(DIR_MEDIA).unwrap();
399
400        engine.sync_media_upload("file.bin", b"original").unwrap();
401        // Uploading again should skip (no overwrite)
402        engine.sync_media_upload("file.bin", b"updated").unwrap();
403
404        let content = engine.sync_media_read("file.bin").unwrap();
405        assert_eq!(content, b"original");
406    }
407}