cooklang_sync_client/
syncer.rs

1use futures::{channel::mpsc::Receiver, try_join, StreamExt};
2use std::path::Path;
3
4use std::sync::Arc;
5use time::OffsetDateTime;
6use tokio::sync::Mutex;
7use tokio::time::Duration;
8use tokio_util::sync::CancellationToken;
9
10use log::{debug, error, trace};
11
12use crate::chunker::Chunker;
13use crate::connection::{get_connection, ConnectionPool};
14use crate::errors::SyncError;
15use crate::models;
16use crate::registry;
17use crate::remote::{CommitResultStatus, Remote};
18use crate::{SyncStatus, SyncStatusListener};
19
20type Result<T, E = SyncError> = std::result::Result<T, E>;
21
22const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47);
23// TODO should be in sync in multiple places
24const MAX_UPLOAD_SIZE: usize = 3_000_000;
25
26#[allow(clippy::too_many_arguments)]
27pub async fn run(
28    token: CancellationToken,
29    listener: Option<Arc<dyn SyncStatusListener>>,
30    pool: &ConnectionPool,
31    storage_path: &Path,
32    namespace_id: i32,
33    chunker: &mut Chunker,
34    remote: &Remote,
35    local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
36    read_only: bool,
37) -> Result<()> {
38    let chunker = Arc::new(Mutex::new(chunker));
39
40    if read_only {
41        let _ = try_join!(download_loop(
42            token.clone(),
43            listener.clone(),
44            pool,
45            Arc::clone(&chunker),
46            remote,
47            storage_path,
48            namespace_id
49        ))?;
50    } else {
51        let _ = try_join!(
52            download_loop(
53                token.clone(),
54                listener.clone(),
55                pool,
56                Arc::clone(&chunker),
57                remote,
58                storage_path,
59                namespace_id
60            ),
61            upload_loop(
62                token.clone(),
63                listener.clone(),
64                pool,
65                Arc::clone(&chunker),
66                remote,
67                namespace_id,
68                local_registry_updated_rx
69            ),
70        )?;
71    }
72
73    Ok(())
74}
75
76async fn download_loop(
77    token: CancellationToken,
78    listener: Option<Arc<dyn SyncStatusListener>>,
79    pool: &ConnectionPool,
80    chunker: Arc<Mutex<&mut Chunker>>,
81    remote: &Remote,
82    storage_path: &Path,
83    namespace_id: i32,
84) -> Result<()> {
85    loop {
86        // Check for cancellation at loop start
87        if token.is_cancelled() {
88            debug!("Download loop received shutdown signal");
89            break;
90        }
91
92        // Notify that we're downloading
93        if let Some(ref cb) = listener {
94            cb.on_status_changed(SyncStatus::Downloading);
95        }
96
97        match check_download_once(
98            pool,
99            Arc::clone(&chunker),
100            remote,
101            storage_path,
102            namespace_id,
103        )
104        .await
105        {
106            Ok(v) => v,
107            Err(SyncError::Unauthorized) => return Err(SyncError::Unauthorized),
108            Err(e) => return Err(SyncError::Unknown(format!("Check download failed: {}", e))),
109        };
110
111        // Return to idle after downloading
112        if let Some(ref cb) = listener {
113            cb.on_status_changed(SyncStatus::Idle);
114        }
115
116        // need to be longer than request timeout to make sure we don't get
117        // client side timeout error
118        tokio::select! {
119            _ = token.cancelled() => {
120                debug!("Download loop shutting down");
121                break;
122            }
123            result = remote.poll() => {
124                result?;
125            }
126        }
127    }
128
129    Ok(())
130}
131
132pub async fn upload_loop(
133    token: CancellationToken,
134    listener: Option<Arc<dyn SyncStatusListener>>,
135    pool: &ConnectionPool,
136    chunker: Arc<Mutex<&mut Chunker>>,
137    remote: &Remote,
138    namespace_id: i32,
139    mut local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
140) -> Result<()> {
141    // wait for indexer to work first
142    tokio::time::sleep(Duration::from_secs(5)).await;
143
144    loop {
145        // Check for cancellation at loop start
146        if token.is_cancelled() {
147            debug!("Upload loop received shutdown signal");
148            break;
149        }
150
151        // Notify that we're uploading
152        if let Some(ref cb) = listener {
153            cb.on_status_changed(SyncStatus::Uploading);
154        }
155
156        // need to wait only if we didn't upload anything
157        // otherwise it should re-run immideately
158        if check_upload_once(pool, Arc::clone(&chunker), remote, namespace_id).await? {
159            // Return to idle after uploading
160            if let Some(ref cb) = listener {
161                cb.on_status_changed(SyncStatus::Idle);
162            }
163
164            // TODO test that it doesn't cancle stream
165            tokio::select! {
166                _ = token.cancelled() => {
167                    debug!("Upload loop shutting down");
168                    break;
169                }
170                _ = tokio::time::sleep(INTERVAL_CHECK_UPLOAD_SEC) => {},
171                Some(_) = local_registry_updated_rx.next() => {},
172            };
173        } else {
174            // If we still have work to do, don't set to idle - keep uploading status
175            // and immediately continue the loop
176        }
177    }
178
179    Ok(())
180}
181
182pub async fn check_upload_once(
183    pool: &ConnectionPool,
184    chunker: Arc<Mutex<&mut Chunker>>,
185    remote: &Remote,
186    namespace_id: i32,
187) -> Result<bool> {
188    debug!("upload scan");
189
190    let conn = &mut get_connection(pool)?;
191    let to_upload = registry::updated_locally(conn, namespace_id)?;
192
193    let mut upload_queue: Vec<Vec<(String, Vec<u8>)>> = vec![vec![]];
194    let mut size = 0;
195    let mut last = upload_queue.last_mut().unwrap();
196    let mut all_commited = true;
197
198    for f in &to_upload {
199        trace!("to upload {:?}", f);
200        let mut chunker = chunker.lock().await;
201        let mut chunk_ids = vec![String::from("")];
202
203        if !f.deleted {
204            // Also warms up the cache
205            chunk_ids = chunker.hashify(&f.path).await?;
206        }
207
208        let r = remote
209            .commit(&f.path, f.deleted, &chunk_ids.join(","))
210            .await?;
211
212        match r {
213            CommitResultStatus::Success(jid) => {
214                trace!("commit success");
215                registry::update_jid(conn, f, jid)?;
216            }
217            CommitResultStatus::NeedChunks(chunks) => {
218                trace!("need chunks");
219
220                all_commited = false;
221
222                for c in chunks.split(',') {
223                    let data = chunker.read_chunk(c)?;
224                    size += data.len();
225                    last.push((c.into(), data));
226
227                    if size > MAX_UPLOAD_SIZE {
228                        upload_queue.push(vec![]);
229                        last = upload_queue.last_mut().unwrap();
230                        size = 0;
231                    }
232                }
233            }
234        }
235    }
236
237    for batch in upload_queue {
238        if !batch.is_empty() {
239            remote.upload_batch(batch).await?;
240        }
241    }
242
243    Ok(all_commited)
244}
245
246pub async fn check_download_once(
247    pool: &ConnectionPool,
248    chunker: Arc<Mutex<&mut Chunker>>,
249    remote: &Remote,
250    storage_path: &Path,
251    namespace_id: i32,
252) -> Result<bool> {
253    debug!("download scan");
254
255    let conn = &mut get_connection(pool)?;
256
257    let latest_local = registry::latest_jid(conn, namespace_id).unwrap_or(0);
258    let to_download = remote.list(latest_local).await?;
259    // TODO maybe should limit one download at a time and use batches
260    // it can also overflow in-memory cache
261    let mut download_queue: Vec<&str> = vec![];
262
263    for d in &to_download {
264        trace!("collecting needed chunks for {:?}", d);
265
266        if d.deleted {
267            continue;
268        }
269
270        let mut chunker = chunker.lock().await;
271
272        // Warm-up cache to include chunks from an old file
273        if chunker.exists(&d.path) {
274            chunker.hashify(&d.path).await?;
275        }
276
277        for c in d.chunk_ids.split(',') {
278            if chunker.check_chunk(c) {
279                continue;
280            }
281
282            download_queue.push(c);
283        }
284    }
285
286    if !download_queue.is_empty() {
287        let mut chunker = chunker.lock().await;
288
289        let mut downloaded = remote.download_batch(download_queue).await;
290
291        while let Some(result) = downloaded.next().await {
292            match result {
293                Ok((chunk_id, data)) => {
294                    chunker.save_chunk(&chunk_id, data)?;
295                }
296                Err(e) => {
297                    return Err(e);
298                }
299            }
300        }
301    }
302
303    for d in &to_download {
304        trace!("udpating downloaded files {:?}", d);
305
306        let mut chunker = chunker.lock().await;
307
308        if d.deleted {
309            let form = build_delete_form(&d.path, storage_path, d.id, namespace_id);
310            // TODO atomic?
311            registry::delete(conn, &vec![form])?;
312            if chunker.exists(&d.path) {
313                chunker.delete(&d.path).await?;
314            }
315        } else {
316            let chunks: Vec<&str> = d.chunk_ids.split(',').collect();
317            // TODO atomic? store in tmp first and then move?
318            // TODO should be after we create record in db
319            if let Err(e) = chunker.save(&d.path, chunks).await {
320                error!("{:?}", e);
321                return Err(e);
322            }
323
324            let form = build_file_record(&d.path, storage_path, d.id, namespace_id)?;
325            registry::create(conn, &vec![form])?;
326        }
327    }
328
329    Ok(!to_download.is_empty())
330}
331
332fn build_file_record(
333    path: &str,
334    base: &Path,
335    jid: i32,
336    namespace_id: i32,
337) -> Result<models::CreateForm, SyncError> {
338    let mut full_path = base.to_path_buf();
339    full_path.push(path);
340    let metadata = full_path
341        .metadata()
342        .map_err(|e| SyncError::from_io_error(path, e))?;
343    let size: i64 = metadata.len().try_into()?;
344    let time = metadata
345        .modified()
346        .map_err(|e| SyncError::from_io_error(path, e))?;
347    let modified_at = OffsetDateTime::from(time);
348
349    let form = models::CreateForm {
350        jid: Some(jid),
351        path: path.to_string(),
352        deleted: false,
353        size,
354        modified_at,
355        namespace_id,
356    };
357
358    Ok(form)
359}
360
361fn build_delete_form(path: &str, base: &Path, jid: i32, namespace_id: i32) -> models::DeleteForm {
362    let mut full_path = base.to_path_buf();
363    full_path.push(path);
364
365    models::DeleteForm {
366        path: path.to_string(),
367        jid: Some(jid),
368        deleted: true,
369        size: 0,
370        modified_at: OffsetDateTime::now_utc(),
371        namespace_id,
372    }
373}