kitchen_fridge/provider/
mod.rs

1//! This modules abstracts data sources and merges them in a single virtual one
2//!
3//! It is also responsible for syncing them together
4
5use std::error::Error;
6use std::collections::HashSet;
7use std::marker::PhantomData;
8use std::sync::{Arc, Mutex};
9use std::fmt::{Display, Formatter};
10
11use url::Url;
12use itertools::Itertools;
13
14use crate::traits::{BaseCalendar, CalDavSource, DavCalendar};
15use crate::traits::CompleteCalendar;
16use crate::item::SyncStatus;
17
18pub mod sync_progress;
19use sync_progress::SyncProgress;
20use sync_progress::{FeedbackSender, SyncEvent};
21
22/// How many items will be batched in a single HTTP request when downloading from the server
23#[cfg(not(test))]
24const DOWNLOAD_BATCH_SIZE: usize = 30;
25/// How many items will be batched in a single HTTP request when downloading from the server
26#[cfg(test)]
27const DOWNLOAD_BATCH_SIZE: usize = 3;
28
29// I am too lazy to actually make `fetch_and_apply` generic over an async closure.
30// Let's work around by passing an enum, so that `fetch_and_apply` will know what to do
31enum BatchDownloadType {
32    RemoteAdditions,
33    RemoteChanges,
34}
35
36impl Display for BatchDownloadType {
37    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
38        match self {
39            Self::RemoteAdditions => write!(f, "remote additions"),
40            Self::RemoteChanges => write!(f, "remote changes"),
41        }
42    }
43}
44
45
46/// A data source that combines two `CalDavSource`s, which is able to sync both sources.
47///
48/// Usually, you will only need to use a provider between a server and a local cache, that is to say a [`CalDavProvider`](crate::CalDavProvider), i.e. a `Provider<Cache, CachedCalendar, Client, RemoteCalendar>`. \
49/// However, providers can be used for integration tests, where the remote source is mocked by a `Cache`.
50#[derive(Debug)]
51pub struct Provider<L, T, R, U>
52where
53    L: CalDavSource<T>,
54    T: CompleteCalendar + Sync + Send,
55    R: CalDavSource<U>,
56    U: DavCalendar + Sync + Send,
57{
58    /// The remote source (usually a server)
59    remote: R,
60    /// The local cache
61    local: L,
62
63    phantom_t: PhantomData<T>,
64    phantom_u: PhantomData<U>,
65}
66
67impl<L, T, R, U> Provider<L, T, R, U>
68where
69    L: CalDavSource<T>,
70    T: CompleteCalendar + Sync + Send,
71    R: CalDavSource<U>,
72    U: DavCalendar + Sync + Send,
73{
74    /// Create a provider.
75    ///
76    /// `remote` is usually a [`Client`](crate::client::Client), `local` is usually a [`Cache`](crate::cache::Cache).
77    /// However, both can be interchangeable. The only difference is that `remote` always wins in case of a sync conflict
78    pub fn new(remote: R, local: L) -> Self {
79        Self { remote, local,
80            phantom_t: PhantomData, phantom_u: PhantomData,
81        }
82    }
83
84    /// Returns the data source described as `local`
85    pub fn local(&self)  -> &L { &self.local }
86    /// Returns the data source described as `local`
87    pub fn local_mut(&mut self)  -> &mut L { &mut self.local }
88    /// Returns the data source described as `remote`.
89    ///
90    /// Apart from tests, there are very few (if any) reasons to access `remote` directly.
91    /// Usually, you should rather use the `local` source, which (usually) is a much faster local cache.
92    /// To be sure `local` accurately mirrors the `remote` source, you can run [`Provider::sync`]
93    pub fn remote(&self) -> &R { &self.remote }
94
95    /// Performs a synchronisation between `local` and `remote`, and provide feeedback to the user about the progress.
96    ///
97    /// This bidirectional sync applies additions/deletions made on a source to the other source.
98    /// In case of conflicts (the same item has been modified on both ends since the last sync, `remote` always wins).
99    ///
100    /// It returns whether the sync was totally successful (details about errors are logged using the `log::*` macros).
101    /// In case errors happened, the sync might have been partially executed but your data will never be correupted (either locally nor in the server).
102    /// Simply run this function again, it will re-start a sync, picking up where it failed.
103    pub async fn sync_with_feedback(&mut self, feedback_sender: FeedbackSender) -> bool {
104        let mut progress = SyncProgress::new_with_feedback_channel(feedback_sender);
105        self.run_sync(&mut progress).await
106    }
107
108    /// Performs a synchronisation between `local` and `remote`, without giving any feedback.
109    ///
110    /// See [`Self::sync_with_feedback`]
111    pub async fn sync(&mut self) -> bool {
112        let mut progress = SyncProgress::new();
113        self.run_sync(&mut progress).await
114    }
115
116    async fn run_sync(&mut self, progress: &mut SyncProgress) -> bool {
117        if let Err(err) = self.run_sync_inner(progress).await {
118            progress.error(&format!("Sync terminated because of an error: {}", err));
119        }
120        progress.feedback(SyncEvent::Finished{ success: progress.is_success() });
121        progress.is_success()
122    }
123
124    async fn run_sync_inner(&mut self, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
125        progress.info("Starting a sync.");
126        progress.feedback(SyncEvent::Started);
127
128        let mut handled_calendars = HashSet::new();
129
130        // Sync every remote calendar
131        let cals_remote = self.remote.get_calendars().await?;
132        for (cal_url, cal_remote) in cals_remote {
133            let counterpart = match self.get_or_insert_local_counterpart_calendar(&cal_url, cal_remote.clone()).await {
134                Err(err) => {
135                    progress.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_url, err));
136                    continue;
137                },
138                Ok(arc) => arc,
139            };
140
141            if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, progress).await {
142                progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_url, err));
143                continue;
144            }
145            handled_calendars.insert(cal_url);
146        }
147
148        // Sync every local calendar that would not be in the remote yet
149        let cals_local = self.local.get_calendars().await?;
150        for (cal_url, cal_local) in cals_local {
151            if handled_calendars.contains(&cal_url) {
152                continue;
153            }
154
155            let counterpart = match self.get_or_insert_remote_counterpart_calendar(&cal_url, cal_local.clone()).await {
156                Err(err) => {
157                    progress.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_url, err));
158                    continue;
159                },
160                Ok(arc) => arc,
161            };
162
163            if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, progress).await {
164                progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_url, err));
165                continue;
166            }
167        }
168
169        progress.info("Sync ended");
170
171        Ok(())
172    }
173
174
175    async fn get_or_insert_local_counterpart_calendar(&mut self, cal_url: &Url, needle: Arc<Mutex<U>>) -> Result<Arc<Mutex<T>>, Box<dyn Error>> {
176        get_or_insert_counterpart_calendar("local", &mut self.local, cal_url, needle).await
177    }
178    async fn get_or_insert_remote_counterpart_calendar(&mut self, cal_url: &Url, needle: Arc<Mutex<T>>) -> Result<Arc<Mutex<U>>, Box<dyn Error>> {
179        get_or_insert_counterpart_calendar("remote", &mut self.remote, cal_url, needle).await
180    }
181
182
183    async fn sync_calendar_pair(cal_local: Arc<Mutex<T>>, cal_remote: Arc<Mutex<U>>, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
184        let mut cal_remote = cal_remote.lock().unwrap();
185        let mut cal_local = cal_local.lock().unwrap();
186        let cal_name = cal_local.name().to_string();
187
188        progress.info(&format!("Syncing calendar {}", cal_name));
189        progress.reset_counter();
190        progress.feedback(SyncEvent::InProgress{
191            calendar: cal_name.clone(),
192            items_done_already: 0,
193            details: "started".to_string()
194        });
195
196        // Step 1 - find the differences
197        progress.debug("Finding the differences to sync...");
198        let mut local_del = HashSet::new();
199        let mut remote_del = HashSet::new();
200        let mut local_changes = HashSet::new();
201        let mut remote_changes = HashSet::new();
202        let mut local_additions = HashSet::new();
203        let mut remote_additions = HashSet::new();
204
205        let remote_items = cal_remote.get_item_version_tags().await?;
206        progress.feedback(SyncEvent::InProgress{
207            calendar: cal_name.clone(),
208            items_done_already: 0,
209            details: format!("{} remote items", remote_items.len()),
210        });
211
212        let mut local_items_to_handle = cal_local.get_item_urls().await?;
213        for (url, remote_tag) in remote_items {
214            progress.trace(&format!("***** Considering remote item {}...", url));
215            match cal_local.get_item_by_url(&url).await {
216                None => {
217                    // This was created on the remote
218                    progress.debug(&format!("*   {} is a remote addition", url));
219                    remote_additions.insert(url);
220                },
221                Some(local_item) => {
222                    if local_items_to_handle.remove(&url) == false {
223                        progress.error(&format!("Inconsistent state: missing task {} from the local tasks", url));
224                    }
225
226                    match local_item.sync_status() {
227                        SyncStatus::NotSynced => {
228                            progress.error(&format!("URL reuse between remote and local sources ({}). Ignoring this item in the sync", url));
229                            continue;
230                        },
231                        SyncStatus::Synced(local_tag) => {
232                            if &remote_tag != local_tag {
233                                // This has been modified on the remote
234                                progress.debug(&format!("*   {} is a remote change", url));
235                                remote_changes.insert(url);
236                            }
237                        },
238                        SyncStatus::LocallyModified(local_tag) => {
239                            if &remote_tag == local_tag {
240                                // This has been changed locally
241                                progress.debug(&format!("*   {} is a local change", url));
242                                local_changes.insert(url);
243                            } else {
244                                progress.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", url));
245                                progress.debug(&format!("*   {} is considered a remote change", url));
246                                remote_changes.insert(url);
247                            }
248                        },
249                        SyncStatus::LocallyDeleted(local_tag) => {
250                            if &remote_tag == local_tag {
251                                // This has been locally deleted
252                                progress.debug(&format!("*   {} is a local deletion", url));
253                                local_del.insert(url);
254                            } else {
255                                progress.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", url));
256                                progress.debug(&format!("*   {} is a considered a remote change", url));
257                                remote_changes.insert(url);
258                            }
259                        },
260                    }
261                }
262            }
263        }
264
265        // Also iterate on the local tasks that are not on the remote
266        for url in local_items_to_handle {
267            progress.trace(&format!("##### Considering local item {}...", url));
268            let local_item = match cal_local.get_item_by_url(&url).await {
269                None => {
270                    progress.error(&format!("Inconsistent state: missing task {} from the local tasks", url));
271                    continue;
272                },
273                Some(item) => item,
274            };
275
276            match local_item.sync_status() {
277                SyncStatus::Synced(_) => {
278                    // This item has been removed from the remote
279                    progress.debug(&format!("#   {} is a deletion from the server", url));
280                    remote_del.insert(url);
281                },
282                SyncStatus::NotSynced => {
283                    // This item has just been locally created
284                    progress.debug(&format!("#   {} has been locally created", url));
285                    local_additions.insert(url);
286                },
287                SyncStatus::LocallyDeleted(_) => {
288                    // This item has been deleted from both sources
289                    progress.debug(&format!("#   {} has been deleted from both sources", url));
290                    remote_del.insert(url);
291                },
292                SyncStatus::LocallyModified(_) => {
293                    progress.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", url));
294                    remote_del.insert(url);
295                },
296            }
297        }
298
299
300        // Step 2 - commit changes
301        progress.trace("Committing changes...");
302        for url_del in local_del {
303            progress.debug(&format!("> Pushing local deletion {} to the server", url_del));
304            progress.increment_counter(1);
305            progress.feedback(SyncEvent::InProgress{
306                calendar: cal_name.clone(),
307                items_done_already: progress.counter(),
308                details: Self::item_name(&cal_local, &url_del).await,
309            });
310
311            match cal_remote.delete_item(&url_del).await {
312                Err(err) => {
313                    progress.warn(&format!("Unable to delete remote item {}: {}", url_del, err));
314                },
315                Ok(()) => {
316                    // Change the local copy from "marked to deletion" to "actually deleted"
317                    if let Err(err) = cal_local.immediately_delete_item(&url_del).await {
318                        progress.error(&format!("Unable to permanently delete local item {}: {}", url_del, err));
319                    }
320                },
321            }
322        }
323
324        for url_del in remote_del {
325            progress.debug(&format!("> Applying remote deletion {} locally", url_del));
326            progress.increment_counter(1);
327            progress.feedback(SyncEvent::InProgress{
328                calendar: cal_name.clone(),
329                items_done_already: progress.counter(),
330                details: Self::item_name(&cal_local, &url_del).await,
331            });
332            if let Err(err) = cal_local.immediately_delete_item(&url_del).await {
333                progress.warn(&format!("Unable to delete local item {}: {}", url_del, err));
334            }
335        }
336
337        Self::apply_remote_additions(
338            remote_additions,
339            &mut *cal_local,
340            &mut *cal_remote,
341            progress,
342            &cal_name
343        ).await;
344
345        Self::apply_remote_changes(
346            remote_changes,
347            &mut *cal_local,
348            &mut *cal_remote,
349            progress,
350            &cal_name
351        ).await;
352
353
354        for url_add in local_additions {
355            progress.debug(&format!("> Pushing local addition {} to the server", url_add));
356            progress.increment_counter(1);
357            progress.feedback(SyncEvent::InProgress{
358                calendar: cal_name.clone(),
359                items_done_already: progress.counter(),
360                details: Self::item_name(&cal_local, &url_add).await,
361            });
362            match cal_local.get_item_by_url_mut(&url_add).await {
363                None => {
364                    progress.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", url_add));
365                    continue;
366                },
367                Some(item) => {
368                    match cal_remote.add_item(item.clone()).await {
369                        Err(err) => progress.error(&format!("Unable to add item {} to remote calendar: {}", url_add, err)),
370                        Ok(new_ss) => {
371                            // Update local sync status
372                            item.set_sync_status(new_ss);
373                        },
374                    }
375                },
376            };
377        }
378
379        for url_change in local_changes {
380            progress.debug(&format!("> Pushing local change {} to the server", url_change));
381            progress.increment_counter(1);
382            progress.feedback(SyncEvent::InProgress{
383                calendar: cal_name.clone(),
384                items_done_already: progress.counter(),
385                details: Self::item_name(&cal_local, &url_change).await,
386            });
387            match cal_local.get_item_by_url_mut(&url_change).await {
388                None => {
389                    progress.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", url_change));
390                    continue;
391                },
392                Some(item) => {
393                    match cal_remote.update_item(item.clone()).await {
394                        Err(err) => progress.error(&format!("Unable to update item {} in remote calendar: {}", url_change, err)),
395                        Ok(new_ss) => {
396                            // Update local sync status
397                            item.set_sync_status(new_ss);
398                        },
399                    };
400                }
401            };
402        }
403
404        Ok(())
405    }
406
407
408    async fn item_name(cal: &T, url: &Url) -> String {
409        cal.get_item_by_url(url).await.map(|item| item.name()).unwrap_or_default().to_string()
410    }
411
412    async fn apply_remote_additions(
413        mut remote_additions: HashSet<Url>,
414        cal_local: &mut T,
415        cal_remote: &mut U,
416        progress: &mut SyncProgress,
417        cal_name: &str
418    ) {
419        for batch in remote_additions.drain().chunks(DOWNLOAD_BATCH_SIZE).into_iter() {
420            Self::fetch_batch_and_apply(BatchDownloadType::RemoteAdditions, batch, cal_local, cal_remote, progress, cal_name).await;
421        }
422    }
423
424    async fn apply_remote_changes(
425        mut remote_changes: HashSet<Url>,
426        cal_local: &mut T,
427        cal_remote: &mut U,
428        progress: &mut SyncProgress,
429        cal_name: &str
430    ) {
431        for batch in remote_changes.drain().chunks(DOWNLOAD_BATCH_SIZE).into_iter() {
432            Self::fetch_batch_and_apply(BatchDownloadType::RemoteChanges, batch, cal_local, cal_remote, progress, cal_name).await;
433        }
434    }
435
436    async fn fetch_batch_and_apply<I: Iterator<Item = Url>>(
437        batch_type: BatchDownloadType,
438        remote_additions: I,
439        cal_local: &mut T,
440        cal_remote: &mut U,
441        progress: &mut SyncProgress,
442        cal_name: &str
443    ) {
444        progress.debug(&format!("> Applying a batch of {} locally", batch_type) /* too bad Chunks does not implement ExactSizeIterator, that could provide useful debug info. See https://github.com/rust-itertools/itertools/issues/171 */);
445
446        let list_of_additions: Vec<Url> = remote_additions.map(|url| url.clone()).collect();
447        match cal_remote.get_items_by_url(&list_of_additions).await {
448            Err(err) => {
449                progress.warn(&format!("Unable to get the batch of {} {:?}: {}. Skipping them.", batch_type, list_of_additions, err));
450            },
451            Ok(items) => {
452                for item in items {
453                    match item {
454                        None => {
455                            progress.error(&format!("Inconsistency: an item from the batch has vanished from the remote end"));
456                            continue;
457                        },
458                        Some(new_item) => {
459                            let local_update_result = match batch_type {
460                                BatchDownloadType::RemoteAdditions => cal_local.add_item(new_item.clone()).await,
461                                BatchDownloadType::RemoteChanges => cal_local.update_item(new_item.clone()).await,
462                            };
463                            if let Err(err) = local_update_result {
464                                progress.error(&format!("Not able to add item {} to local calendar: {}", new_item.url(), err));
465                            }
466                        },
467                    }
468                }
469
470                // Notifying every item at the same time would not make sense. Let's notify only one of them
471                let one_item_name = match list_of_additions.get(0) {
472                    Some(url) => Self::item_name(&cal_local, &url).await,
473                    None => String::from("<unable to get the name of the first batched item>"),
474                };
475                progress.increment_counter(list_of_additions.len());
476                progress.feedback(SyncEvent::InProgress{
477                    calendar: cal_name.to_string(),
478                    items_done_already: progress.counter(),
479                    details: one_item_name,
480                });
481            },
482        }
483    }
484}
485
486
487async fn get_or_insert_counterpart_calendar<H, N, I>(haystack_descr: &str, haystack: &mut H, cal_url: &Url, needle: Arc<Mutex<N>>)
488    -> Result<Arc<Mutex<I>>, Box<dyn Error>>
489where
490    H: CalDavSource<I>,
491    I: BaseCalendar,
492    N: BaseCalendar,
493{
494    loop {
495        if let Some(cal) = haystack.get_calendar(&cal_url).await {
496            break Ok(cal);
497        }
498
499        // This calendar does not exist locally yet, let's add it
500        log::debug!("Adding a {} calendar {}", haystack_descr, cal_url);
501        let src = needle.lock().unwrap();
502        let name = src.name().to_string();
503        let supported_comps = src.supported_components();
504        let color = src.color();
505        if let Err(err) = haystack.create_calendar(
506            cal_url.clone(),
507            name,
508            supported_comps,
509            color.cloned(),
510        ).await{
511            return Err(err);
512        }
513    }
514}
515