1use futures::{channel::mpsc::Receiver, try_join, StreamExt};
2use std::path::Path;
3
4use std::sync::Arc;
5use time::OffsetDateTime;
6use tokio::sync::Mutex;
7use tokio::time::Duration;
8use tokio_util::sync::CancellationToken;
9
10use log::{debug, error, trace};
11
12use crate::chunker::Chunker;
13use crate::connection::{get_connection, ConnectionPool};
14use crate::errors::SyncError;
15use crate::models;
16use crate::registry;
17use crate::remote::{CommitResultStatus, Remote};
18use crate::{SyncStatus, SyncStatusListener};
19
20type Result<T, E = SyncError> = std::result::Result<T, E>;
21
22const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47);
23const MAX_UPLOAD_SIZE: usize = 3_000_000;
25
26#[allow(clippy::too_many_arguments)]
27pub async fn run(
28 token: CancellationToken,
29 listener: Option<Arc<dyn SyncStatusListener>>,
30 pool: &ConnectionPool,
31 storage_path: &Path,
32 namespace_id: i32,
33 chunker: &mut Chunker,
34 remote: &Remote,
35 local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
36 read_only: bool,
37) -> Result<()> {
38 let chunker = Arc::new(Mutex::new(chunker));
39
40 if read_only {
41 let _ = try_join!(download_loop(
42 token.clone(),
43 listener.clone(),
44 pool,
45 Arc::clone(&chunker),
46 remote,
47 storage_path,
48 namespace_id
49 ))?;
50 } else {
51 let _ = try_join!(
52 download_loop(
53 token.clone(),
54 listener.clone(),
55 pool,
56 Arc::clone(&chunker),
57 remote,
58 storage_path,
59 namespace_id
60 ),
61 upload_loop(
62 token.clone(),
63 listener.clone(),
64 pool,
65 Arc::clone(&chunker),
66 remote,
67 namespace_id,
68 local_registry_updated_rx
69 ),
70 )?;
71 }
72
73 Ok(())
74}
75
76async fn download_loop(
77 token: CancellationToken,
78 listener: Option<Arc<dyn SyncStatusListener>>,
79 pool: &ConnectionPool,
80 chunker: Arc<Mutex<&mut Chunker>>,
81 remote: &Remote,
82 storage_path: &Path,
83 namespace_id: i32,
84) -> Result<()> {
85 loop {
86 if token.is_cancelled() {
88 debug!("Download loop received shutdown signal");
89 break;
90 }
91
92 if let Some(ref cb) = listener {
94 cb.on_status_changed(SyncStatus::Downloading);
95 }
96
97 match check_download_once(
98 pool,
99 Arc::clone(&chunker),
100 remote,
101 storage_path,
102 namespace_id,
103 )
104 .await
105 {
106 Ok(v) => v,
107 Err(SyncError::Unauthorized) => return Err(SyncError::Unauthorized),
108 Err(e) => return Err(SyncError::Unknown(format!("Check download failed: {}", e))),
109 };
110
111 if let Some(ref cb) = listener {
113 cb.on_status_changed(SyncStatus::Idle);
114 }
115
116 tokio::select! {
119 _ = token.cancelled() => {
120 debug!("Download loop shutting down");
121 break;
122 }
123 result = remote.poll() => {
124 result?;
125 }
126 }
127 }
128
129 Ok(())
130}
131
132pub async fn upload_loop(
133 token: CancellationToken,
134 listener: Option<Arc<dyn SyncStatusListener>>,
135 pool: &ConnectionPool,
136 chunker: Arc<Mutex<&mut Chunker>>,
137 remote: &Remote,
138 namespace_id: i32,
139 mut local_registry_updated_rx: Receiver<models::IndexerUpdateEvent>,
140) -> Result<()> {
141 tokio::time::sleep(Duration::from_secs(5)).await;
143
144 loop {
145 if token.is_cancelled() {
147 debug!("Upload loop received shutdown signal");
148 break;
149 }
150
151 if let Some(ref cb) = listener {
153 cb.on_status_changed(SyncStatus::Uploading);
154 }
155
156 if check_upload_once(pool, Arc::clone(&chunker), remote, namespace_id).await? {
159 if let Some(ref cb) = listener {
161 cb.on_status_changed(SyncStatus::Idle);
162 }
163
164 tokio::select! {
166 _ = token.cancelled() => {
167 debug!("Upload loop shutting down");
168 break;
169 }
170 _ = tokio::time::sleep(INTERVAL_CHECK_UPLOAD_SEC) => {},
171 Some(_) = local_registry_updated_rx.next() => {},
172 };
173 } else {
174 }
177 }
178
179 Ok(())
180}
181
182pub async fn check_upload_once(
183 pool: &ConnectionPool,
184 chunker: Arc<Mutex<&mut Chunker>>,
185 remote: &Remote,
186 namespace_id: i32,
187) -> Result<bool> {
188 debug!("upload scan");
189
190 let conn = &mut get_connection(pool)?;
191 let to_upload = registry::updated_locally(conn, namespace_id)?;
192
193 let mut upload_queue: Vec<Vec<(String, Vec<u8>)>> = vec![vec![]];
194 let mut size = 0;
195 let mut last = upload_queue.last_mut().unwrap();
196 let mut all_commited = true;
197
198 for f in &to_upload {
199 trace!("to upload {:?}", f);
200 let mut chunker = chunker.lock().await;
201 let mut chunk_ids = vec![String::from("")];
202
203 if !f.deleted {
204 chunk_ids = chunker.hashify(&f.path).await?;
206 }
207
208 let r = remote
209 .commit(&f.path, f.deleted, &chunk_ids.join(","))
210 .await?;
211
212 match r {
213 CommitResultStatus::Success(jid) => {
214 trace!("commit success");
215 registry::update_jid(conn, f, jid)?;
216 }
217 CommitResultStatus::NeedChunks(chunks) => {
218 trace!("need chunks");
219
220 all_commited = false;
221
222 for c in chunks.split(',') {
223 let data = chunker.read_chunk(c)?;
224 size += data.len();
225 last.push((c.into(), data));
226
227 if size > MAX_UPLOAD_SIZE {
228 upload_queue.push(vec![]);
229 last = upload_queue.last_mut().unwrap();
230 size = 0;
231 }
232 }
233 }
234 }
235 }
236
237 for batch in upload_queue {
238 if !batch.is_empty() {
239 remote.upload_batch(batch).await?;
240 }
241 }
242
243 Ok(all_commited)
244}
245
246pub async fn check_download_once(
247 pool: &ConnectionPool,
248 chunker: Arc<Mutex<&mut Chunker>>,
249 remote: &Remote,
250 storage_path: &Path,
251 namespace_id: i32,
252) -> Result<bool> {
253 debug!("download scan");
254
255 let conn = &mut get_connection(pool)?;
256
257 let latest_local = registry::latest_jid(conn, namespace_id).unwrap_or(0);
258 let to_download = remote.list(latest_local).await?;
259 let mut download_queue: Vec<&str> = vec![];
262
263 for d in &to_download {
264 trace!("collecting needed chunks for {:?}", d);
265
266 if d.deleted {
267 continue;
268 }
269
270 let mut chunker = chunker.lock().await;
271
272 if chunker.exists(&d.path) {
274 chunker.hashify(&d.path).await?;
275 }
276
277 for c in d.chunk_ids.split(',') {
278 if chunker.check_chunk(c) {
279 continue;
280 }
281
282 download_queue.push(c);
283 }
284 }
285
286 if !download_queue.is_empty() {
287 let mut chunker = chunker.lock().await;
288
289 let mut downloaded = remote.download_batch(download_queue).await;
290
291 while let Some(result) = downloaded.next().await {
292 match result {
293 Ok((chunk_id, data)) => {
294 chunker.save_chunk(&chunk_id, data)?;
295 }
296 Err(e) => {
297 return Err(e);
298 }
299 }
300 }
301 }
302
303 for d in &to_download {
304 trace!("udpating downloaded files {:?}", d);
305
306 let mut chunker = chunker.lock().await;
307
308 if d.deleted {
309 let form = build_delete_form(&d.path, storage_path, d.id, namespace_id);
310 registry::delete(conn, &vec![form])?;
312 if chunker.exists(&d.path) {
313 chunker.delete(&d.path).await?;
314 }
315 } else {
316 let chunks: Vec<&str> = d.chunk_ids.split(',').collect();
317 if let Err(e) = chunker.save(&d.path, chunks).await {
320 error!("{:?}", e);
321 return Err(e);
322 }
323
324 let form = build_file_record(&d.path, storage_path, d.id, namespace_id)?;
325 registry::create(conn, &vec![form])?;
326 }
327 }
328
329 Ok(!to_download.is_empty())
330}
331
332fn build_file_record(
333 path: &str,
334 base: &Path,
335 jid: i32,
336 namespace_id: i32,
337) -> Result<models::CreateForm, SyncError> {
338 let mut full_path = base.to_path_buf();
339 full_path.push(path);
340 let metadata = full_path
341 .metadata()
342 .map_err(|e| SyncError::from_io_error(path, e))?;
343 let size: i64 = metadata.len().try_into()?;
344 let time = metadata
345 .modified()
346 .map_err(|e| SyncError::from_io_error(path, e))?;
347 let modified_at = OffsetDateTime::from(time);
348
349 let form = models::CreateForm {
350 jid: Some(jid),
351 path: path.to_string(),
352 deleted: false,
353 size,
354 modified_at,
355 namespace_id,
356 };
357
358 Ok(form)
359}
360
361fn build_delete_form(path: &str, base: &Path, jid: i32, namespace_id: i32) -> models::DeleteForm {
362 let mut full_path = base.to_path_buf();
363 full_path.push(path);
364
365 models::DeleteForm {
366 path: path.to_string(),
367 jid: Some(jid),
368 deleted: true,
369 size: 0,
370 modified_at: OffsetDateTime::now_utc(),
371 namespace_id,
372 }
373}