1use std::error::Error;
6use std::collections::HashSet;
7use std::marker::PhantomData;
8use std::sync::{Arc, Mutex};
9use std::fmt::{Display, Formatter};
10
11use url::Url;
12use itertools::Itertools;
13
14use crate::traits::{BaseCalendar, CalDavSource, DavCalendar};
15use crate::traits::CompleteCalendar;
16use crate::item::SyncStatus;
17
18pub mod sync_progress;
19use sync_progress::SyncProgress;
20use sync_progress::{FeedbackSender, SyncEvent};
21
22#[cfg(not(test))]
24const DOWNLOAD_BATCH_SIZE: usize = 30;
25#[cfg(test)]
27const DOWNLOAD_BATCH_SIZE: usize = 3;
28
29enum BatchDownloadType {
32 RemoteAdditions,
33 RemoteChanges,
34}
35
36impl Display for BatchDownloadType {
37 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
38 match self {
39 Self::RemoteAdditions => write!(f, "remote additions"),
40 Self::RemoteChanges => write!(f, "remote changes"),
41 }
42 }
43}
44
45
46#[derive(Debug)]
51pub struct Provider<L, T, R, U>
52where
53 L: CalDavSource<T>,
54 T: CompleteCalendar + Sync + Send,
55 R: CalDavSource<U>,
56 U: DavCalendar + Sync + Send,
57{
58 remote: R,
60 local: L,
62
63 phantom_t: PhantomData<T>,
64 phantom_u: PhantomData<U>,
65}
66
67impl<L, T, R, U> Provider<L, T, R, U>
68where
69 L: CalDavSource<T>,
70 T: CompleteCalendar + Sync + Send,
71 R: CalDavSource<U>,
72 U: DavCalendar + Sync + Send,
73{
74 pub fn new(remote: R, local: L) -> Self {
79 Self { remote, local,
80 phantom_t: PhantomData, phantom_u: PhantomData,
81 }
82 }
83
84 pub fn local(&self) -> &L { &self.local }
86 pub fn local_mut(&mut self) -> &mut L { &mut self.local }
88 pub fn remote(&self) -> &R { &self.remote }
94
95 pub async fn sync_with_feedback(&mut self, feedback_sender: FeedbackSender) -> bool {
104 let mut progress = SyncProgress::new_with_feedback_channel(feedback_sender);
105 self.run_sync(&mut progress).await
106 }
107
108 pub async fn sync(&mut self) -> bool {
112 let mut progress = SyncProgress::new();
113 self.run_sync(&mut progress).await
114 }
115
116 async fn run_sync(&mut self, progress: &mut SyncProgress) -> bool {
117 if let Err(err) = self.run_sync_inner(progress).await {
118 progress.error(&format!("Sync terminated because of an error: {}", err));
119 }
120 progress.feedback(SyncEvent::Finished{ success: progress.is_success() });
121 progress.is_success()
122 }
123
124 async fn run_sync_inner(&mut self, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
125 progress.info("Starting a sync.");
126 progress.feedback(SyncEvent::Started);
127
128 let mut handled_calendars = HashSet::new();
129
130 let cals_remote = self.remote.get_calendars().await?;
132 for (cal_url, cal_remote) in cals_remote {
133 let counterpart = match self.get_or_insert_local_counterpart_calendar(&cal_url, cal_remote.clone()).await {
134 Err(err) => {
135 progress.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_url, err));
136 continue;
137 },
138 Ok(arc) => arc,
139 };
140
141 if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, progress).await {
142 progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_url, err));
143 continue;
144 }
145 handled_calendars.insert(cal_url);
146 }
147
148 let cals_local = self.local.get_calendars().await?;
150 for (cal_url, cal_local) in cals_local {
151 if handled_calendars.contains(&cal_url) {
152 continue;
153 }
154
155 let counterpart = match self.get_or_insert_remote_counterpart_calendar(&cal_url, cal_local.clone()).await {
156 Err(err) => {
157 progress.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_url, err));
158 continue;
159 },
160 Ok(arc) => arc,
161 };
162
163 if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, progress).await {
164 progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_url, err));
165 continue;
166 }
167 }
168
169 progress.info("Sync ended");
170
171 Ok(())
172 }
173
174
175 async fn get_or_insert_local_counterpart_calendar(&mut self, cal_url: &Url, needle: Arc<Mutex<U>>) -> Result<Arc<Mutex<T>>, Box<dyn Error>> {
176 get_or_insert_counterpart_calendar("local", &mut self.local, cal_url, needle).await
177 }
178 async fn get_or_insert_remote_counterpart_calendar(&mut self, cal_url: &Url, needle: Arc<Mutex<T>>) -> Result<Arc<Mutex<U>>, Box<dyn Error>> {
179 get_or_insert_counterpart_calendar("remote", &mut self.remote, cal_url, needle).await
180 }
181
182
183 async fn sync_calendar_pair(cal_local: Arc<Mutex<T>>, cal_remote: Arc<Mutex<U>>, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
184 let mut cal_remote = cal_remote.lock().unwrap();
185 let mut cal_local = cal_local.lock().unwrap();
186 let cal_name = cal_local.name().to_string();
187
188 progress.info(&format!("Syncing calendar {}", cal_name));
189 progress.reset_counter();
190 progress.feedback(SyncEvent::InProgress{
191 calendar: cal_name.clone(),
192 items_done_already: 0,
193 details: "started".to_string()
194 });
195
196 progress.debug("Finding the differences to sync...");
198 let mut local_del = HashSet::new();
199 let mut remote_del = HashSet::new();
200 let mut local_changes = HashSet::new();
201 let mut remote_changes = HashSet::new();
202 let mut local_additions = HashSet::new();
203 let mut remote_additions = HashSet::new();
204
205 let remote_items = cal_remote.get_item_version_tags().await?;
206 progress.feedback(SyncEvent::InProgress{
207 calendar: cal_name.clone(),
208 items_done_already: 0,
209 details: format!("{} remote items", remote_items.len()),
210 });
211
212 let mut local_items_to_handle = cal_local.get_item_urls().await?;
213 for (url, remote_tag) in remote_items {
214 progress.trace(&format!("***** Considering remote item {}...", url));
215 match cal_local.get_item_by_url(&url).await {
216 None => {
217 progress.debug(&format!("* {} is a remote addition", url));
219 remote_additions.insert(url);
220 },
221 Some(local_item) => {
222 if local_items_to_handle.remove(&url) == false {
223 progress.error(&format!("Inconsistent state: missing task {} from the local tasks", url));
224 }
225
226 match local_item.sync_status() {
227 SyncStatus::NotSynced => {
228 progress.error(&format!("URL reuse between remote and local sources ({}). Ignoring this item in the sync", url));
229 continue;
230 },
231 SyncStatus::Synced(local_tag) => {
232 if &remote_tag != local_tag {
233 progress.debug(&format!("* {} is a remote change", url));
235 remote_changes.insert(url);
236 }
237 },
238 SyncStatus::LocallyModified(local_tag) => {
239 if &remote_tag == local_tag {
240 progress.debug(&format!("* {} is a local change", url));
242 local_changes.insert(url);
243 } else {
244 progress.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", url));
245 progress.debug(&format!("* {} is considered a remote change", url));
246 remote_changes.insert(url);
247 }
248 },
249 SyncStatus::LocallyDeleted(local_tag) => {
250 if &remote_tag == local_tag {
251 progress.debug(&format!("* {} is a local deletion", url));
253 local_del.insert(url);
254 } else {
255 progress.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", url));
256 progress.debug(&format!("* {} is a considered a remote change", url));
257 remote_changes.insert(url);
258 }
259 },
260 }
261 }
262 }
263 }
264
265 for url in local_items_to_handle {
267 progress.trace(&format!("##### Considering local item {}...", url));
268 let local_item = match cal_local.get_item_by_url(&url).await {
269 None => {
270 progress.error(&format!("Inconsistent state: missing task {} from the local tasks", url));
271 continue;
272 },
273 Some(item) => item,
274 };
275
276 match local_item.sync_status() {
277 SyncStatus::Synced(_) => {
278 progress.debug(&format!("# {} is a deletion from the server", url));
280 remote_del.insert(url);
281 },
282 SyncStatus::NotSynced => {
283 progress.debug(&format!("# {} has been locally created", url));
285 local_additions.insert(url);
286 },
287 SyncStatus::LocallyDeleted(_) => {
288 progress.debug(&format!("# {} has been deleted from both sources", url));
290 remote_del.insert(url);
291 },
292 SyncStatus::LocallyModified(_) => {
293 progress.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", url));
294 remote_del.insert(url);
295 },
296 }
297 }
298
299
300 progress.trace("Committing changes...");
302 for url_del in local_del {
303 progress.debug(&format!("> Pushing local deletion {} to the server", url_del));
304 progress.increment_counter(1);
305 progress.feedback(SyncEvent::InProgress{
306 calendar: cal_name.clone(),
307 items_done_already: progress.counter(),
308 details: Self::item_name(&cal_local, &url_del).await,
309 });
310
311 match cal_remote.delete_item(&url_del).await {
312 Err(err) => {
313 progress.warn(&format!("Unable to delete remote item {}: {}", url_del, err));
314 },
315 Ok(()) => {
316 if let Err(err) = cal_local.immediately_delete_item(&url_del).await {
318 progress.error(&format!("Unable to permanently delete local item {}: {}", url_del, err));
319 }
320 },
321 }
322 }
323
324 for url_del in remote_del {
325 progress.debug(&format!("> Applying remote deletion {} locally", url_del));
326 progress.increment_counter(1);
327 progress.feedback(SyncEvent::InProgress{
328 calendar: cal_name.clone(),
329 items_done_already: progress.counter(),
330 details: Self::item_name(&cal_local, &url_del).await,
331 });
332 if let Err(err) = cal_local.immediately_delete_item(&url_del).await {
333 progress.warn(&format!("Unable to delete local item {}: {}", url_del, err));
334 }
335 }
336
337 Self::apply_remote_additions(
338 remote_additions,
339 &mut *cal_local,
340 &mut *cal_remote,
341 progress,
342 &cal_name
343 ).await;
344
345 Self::apply_remote_changes(
346 remote_changes,
347 &mut *cal_local,
348 &mut *cal_remote,
349 progress,
350 &cal_name
351 ).await;
352
353
354 for url_add in local_additions {
355 progress.debug(&format!("> Pushing local addition {} to the server", url_add));
356 progress.increment_counter(1);
357 progress.feedback(SyncEvent::InProgress{
358 calendar: cal_name.clone(),
359 items_done_already: progress.counter(),
360 details: Self::item_name(&cal_local, &url_add).await,
361 });
362 match cal_local.get_item_by_url_mut(&url_add).await {
363 None => {
364 progress.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", url_add));
365 continue;
366 },
367 Some(item) => {
368 match cal_remote.add_item(item.clone()).await {
369 Err(err) => progress.error(&format!("Unable to add item {} to remote calendar: {}", url_add, err)),
370 Ok(new_ss) => {
371 item.set_sync_status(new_ss);
373 },
374 }
375 },
376 };
377 }
378
379 for url_change in local_changes {
380 progress.debug(&format!("> Pushing local change {} to the server", url_change));
381 progress.increment_counter(1);
382 progress.feedback(SyncEvent::InProgress{
383 calendar: cal_name.clone(),
384 items_done_already: progress.counter(),
385 details: Self::item_name(&cal_local, &url_change).await,
386 });
387 match cal_local.get_item_by_url_mut(&url_change).await {
388 None => {
389 progress.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", url_change));
390 continue;
391 },
392 Some(item) => {
393 match cal_remote.update_item(item.clone()).await {
394 Err(err) => progress.error(&format!("Unable to update item {} in remote calendar: {}", url_change, err)),
395 Ok(new_ss) => {
396 item.set_sync_status(new_ss);
398 },
399 };
400 }
401 };
402 }
403
404 Ok(())
405 }
406
407
408 async fn item_name(cal: &T, url: &Url) -> String {
409 cal.get_item_by_url(url).await.map(|item| item.name()).unwrap_or_default().to_string()
410 }
411
412 async fn apply_remote_additions(
413 mut remote_additions: HashSet<Url>,
414 cal_local: &mut T,
415 cal_remote: &mut U,
416 progress: &mut SyncProgress,
417 cal_name: &str
418 ) {
419 for batch in remote_additions.drain().chunks(DOWNLOAD_BATCH_SIZE).into_iter() {
420 Self::fetch_batch_and_apply(BatchDownloadType::RemoteAdditions, batch, cal_local, cal_remote, progress, cal_name).await;
421 }
422 }
423
424 async fn apply_remote_changes(
425 mut remote_changes: HashSet<Url>,
426 cal_local: &mut T,
427 cal_remote: &mut U,
428 progress: &mut SyncProgress,
429 cal_name: &str
430 ) {
431 for batch in remote_changes.drain().chunks(DOWNLOAD_BATCH_SIZE).into_iter() {
432 Self::fetch_batch_and_apply(BatchDownloadType::RemoteChanges, batch, cal_local, cal_remote, progress, cal_name).await;
433 }
434 }
435
436 async fn fetch_batch_and_apply<I: Iterator<Item = Url>>(
437 batch_type: BatchDownloadType,
438 remote_additions: I,
439 cal_local: &mut T,
440 cal_remote: &mut U,
441 progress: &mut SyncProgress,
442 cal_name: &str
443 ) {
444 progress.debug(&format!("> Applying a batch of {} locally", batch_type) );
445
446 let list_of_additions: Vec<Url> = remote_additions.map(|url| url.clone()).collect();
447 match cal_remote.get_items_by_url(&list_of_additions).await {
448 Err(err) => {
449 progress.warn(&format!("Unable to get the batch of {} {:?}: {}. Skipping them.", batch_type, list_of_additions, err));
450 },
451 Ok(items) => {
452 for item in items {
453 match item {
454 None => {
455 progress.error(&format!("Inconsistency: an item from the batch has vanished from the remote end"));
456 continue;
457 },
458 Some(new_item) => {
459 let local_update_result = match batch_type {
460 BatchDownloadType::RemoteAdditions => cal_local.add_item(new_item.clone()).await,
461 BatchDownloadType::RemoteChanges => cal_local.update_item(new_item.clone()).await,
462 };
463 if let Err(err) = local_update_result {
464 progress.error(&format!("Not able to add item {} to local calendar: {}", new_item.url(), err));
465 }
466 },
467 }
468 }
469
470 let one_item_name = match list_of_additions.get(0) {
472 Some(url) => Self::item_name(&cal_local, &url).await,
473 None => String::from("<unable to get the name of the first batched item>"),
474 };
475 progress.increment_counter(list_of_additions.len());
476 progress.feedback(SyncEvent::InProgress{
477 calendar: cal_name.to_string(),
478 items_done_already: progress.counter(),
479 details: one_item_name,
480 });
481 },
482 }
483 }
484}
485
486
487async fn get_or_insert_counterpart_calendar<H, N, I>(haystack_descr: &str, haystack: &mut H, cal_url: &Url, needle: Arc<Mutex<N>>)
488 -> Result<Arc<Mutex<I>>, Box<dyn Error>>
489where
490 H: CalDavSource<I>,
491 I: BaseCalendar,
492 N: BaseCalendar,
493{
494 loop {
495 if let Some(cal) = haystack.get_calendar(&cal_url).await {
496 break Ok(cal);
497 }
498
499 log::debug!("Adding a {} calendar {}", haystack_descr, cal_url);
501 let src = needle.lock().unwrap();
502 let name = src.name().to_string();
503 let supported_comps = src.supported_components();
504 let color = src.color();
505 if let Err(err) = haystack.create_calendar(
506 cal_url.clone(),
507 name,
508 supported_comps,
509 color.cloned(),
510 ).await{
511 return Err(err);
512 }
513 }
514}
515