email/folder/sync/
mod.rs

1//! # Folder synchronization
2//!
3//! This module contains everything you need to synchronize remote
4//! folders with local ones.
5
6pub mod config;
7pub mod hunk;
8pub mod patch;
9pub mod report;
10
11use std::{collections::HashSet, sync::Arc};
12
13use futures::{stream::FuturesUnordered, StreamExt};
14use tracing::{debug, trace};
15
16use self::{hunk::FolderSyncHunk, report::FolderSyncReport};
17use super::{
18    add::AddFolder, delete::DeleteFolder, expunge::ExpungeFolder, list::ListFolders, Folder,
19};
20#[doc(inline)]
21pub use super::{Error, Result};
22use crate::{
23    backend::context::BackendContextBuilder,
24    sync::{pool::SyncPoolContext, SyncDestination, SyncEvent},
25};
26
27pub(crate) async fn sync<L, R>(
28    ctx_ref: Arc<SyncPoolContext<L::Context, R::Context>>,
29) -> Result<FolderSyncReport>
30where
31    L: BackendContextBuilder + 'static,
32    R: BackendContextBuilder + 'static,
33{
34    let mut report = FolderSyncReport::default();
35
36    let ctx = ctx_ref.clone();
37    let left_cached_folders = tokio::spawn(async move {
38        let folders = ctx
39            .left_cache
40            .list_folders()
41            .await
42            .map_err(Error::ListLeftFoldersCachedError)?;
43        let names = HashSet::<String>::from_iter(
44            folders
45                .iter()
46                .map(Folder::get_kind_or_name)
47                // TODO: instead of fetching all the folders then
48                // filtering them here, it could be better to filter
49                // them at the source directly, which implies to add a
50                // new backend fn called `search_folders` and to set
51                // up a common search API across backends.
52                .filter_map(|folder| {
53                    if ctx.folder_filters.matches(folder) {
54                        Some(folder.to_owned())
55                    } else {
56                        None
57                    }
58                }),
59        );
60
61        SyncEvent::ListedLeftCachedFolders(names.len())
62            .emit(&ctx.handler)
63            .await;
64
65        Result::Ok(names)
66    });
67
68    let ctx = ctx_ref.clone();
69    let left_folders = tokio::spawn(async move {
70        let folders = ctx
71            .left
72            .list_folders()
73            .await
74            .map_err(Error::ListLeftFoldersError)?;
75        let names = HashSet::<String>::from_iter(
76            folders
77                .iter()
78                .map(Folder::get_kind_or_name)
79                // TODO: instead of fetching all the folders then
80                // filtering them here, it could be better to filter
81                // them at the source directly, which implies to add a
82                // new backend fn called `search_folders` and to set
83                // up a common search API across backends.
84                .filter_map(|folder| {
85                    if ctx.folder_filters.matches(folder) {
86                        Some(folder.to_owned())
87                    } else {
88                        None
89                    }
90                }),
91        );
92
93        SyncEvent::ListedLeftFolders(names.len())
94            .emit(&ctx.handler)
95            .await;
96
97        Result::Ok(names)
98    });
99
100    let ctx = ctx_ref.clone();
101    let right_cached_folders = tokio::spawn(async move {
102        let folders = ctx
103            .right_cache
104            .list_folders()
105            .await
106            .map_err(Error::ListRightFoldersCachedError)?;
107        let names = HashSet::<String>::from_iter(
108            folders
109                .iter()
110                .map(Folder::get_kind_or_name)
111                // TODO: instead of fetching all the folders then
112                // filtering them here, it could be better to filter
113                // them at the source directly, which implies to add a
114                // new backend fn called `search_folders` and to set
115                // up a common search API across backends.
116                .filter_map(|folder| {
117                    if ctx.folder_filters.matches(folder) {
118                        Some(folder.to_owned())
119                    } else {
120                        None
121                    }
122                }),
123        );
124
125        SyncEvent::ListedRightCachedFolders(names.len())
126            .emit(&ctx.handler)
127            .await;
128
129        Result::Ok(names)
130    });
131
132    let ctx = ctx_ref.clone();
133    let right_folders = tokio::spawn(async move {
134        let folders = ctx
135            .right
136            .list_folders()
137            .await
138            .map_err(Error::ListRightFoldersError)?;
139        let names: HashSet<String> = HashSet::from_iter(
140            folders
141                .iter()
142                .map(Folder::get_kind_or_name)
143                // TODO: instead of fetching all the folders then
144                // filtering them here, it could be better to filter
145                // them at the source directly, which implies to add a
146                // new backend fn called `search_folders` and to set
147                // up a common search API across backends.
148                .filter_map(|folder| {
149                    if ctx.folder_filters.matches(folder) {
150                        Some(folder.to_owned())
151                    } else {
152                        None
153                    }
154                }),
155        );
156
157        SyncEvent::ListedRightFolders(names.len())
158            .emit(&ctx.handler)
159            .await;
160
161        Result::Ok(names)
162    });
163
164    let (left_cached_folders, left_folders, right_cached_folders, right_folders) =
165        tokio::try_join!(
166            left_cached_folders,
167            left_folders,
168            right_cached_folders,
169            right_folders
170        )
171        .map_err(Error::FolderTasksFailed)?;
172
173    SyncEvent::ListedAllFolders.emit(&ctx_ref.handler).await;
174
175    let mut patch = patch::build(
176        left_cached_folders?,
177        left_folders?,
178        right_cached_folders?,
179        right_folders?,
180    );
181
182    ctx_ref.apply_folder_permissions(&mut patch);
183
184    SyncEvent::GeneratedFolderPatch(patch.clone())
185        .emit(&ctx_ref.handler)
186        .await;
187
188    let (folders, patch) = patch.into_iter().fold(
189        (HashSet::default(), vec![]),
190        |(mut folders, mut patch), (folder, hunks)| {
191            folders.insert(folder);
192            patch.extend(hunks);
193            (folders, patch)
194        },
195    );
196
197    report.names = folders;
198    report.patch = FuturesUnordered::from_iter(patch.into_iter().map(|hunk| {
199        let ctx = ctx_ref.clone();
200        tokio::spawn(async move {
201            let hunk_clone = hunk.clone();
202            let handler = ctx.handler.clone();
203            let task = async move {
204                if ctx.dry_run {
205                    return Ok(());
206                }
207
208                match hunk_clone {
209                    FolderSyncHunk::Cache(folder, SyncDestination::Left) => {
210                        ctx.left_cache.add_folder(&folder).await?;
211                    }
212                    FolderSyncHunk::Create(folder, SyncDestination::Left) => {
213                        ctx.left.add_folder(&folder).await?;
214                    }
215                    FolderSyncHunk::Cache(folder, SyncDestination::Right) => {
216                        ctx.right_cache.add_folder(&folder).await?;
217                    }
218                    FolderSyncHunk::Create(folder, SyncDestination::Right) => {
219                        ctx.right.add_folder(&folder).await?;
220                    }
221                    FolderSyncHunk::Uncache(folder, SyncDestination::Left) => {
222                        ctx.left_cache.delete_folder(&folder).await?;
223                    }
224                    FolderSyncHunk::Delete(folder, SyncDestination::Left) => {
225                        ctx.left.delete_folder(&folder).await?;
226                    }
227                    FolderSyncHunk::Uncache(folder, SyncDestination::Right) => {
228                        ctx.right_cache.delete_folder(&folder).await?;
229                    }
230                    FolderSyncHunk::Delete(folder, SyncDestination::Right) => {
231                        ctx.right.delete_folder(&folder).await?;
232                    }
233                };
234
235                Ok(())
236            };
237
238            let output = task.await;
239
240            SyncEvent::ProcessedFolderHunk(hunk.clone())
241                .emit(&handler)
242                .await;
243
244            match output {
245                Ok(()) => (hunk, None),
246                Err(err) => (hunk, Some(err)),
247            }
248        })
249    }))
250    .filter_map(|hunk| async {
251        match hunk {
252            Ok(hunk) => Some(hunk),
253            Err(err) => {
254                debug!("cannot process folder hunk: {err}");
255                trace!("{err:?}");
256                None
257            }
258        }
259    })
260    .collect::<Vec<_>>()
261    .await;
262
263    SyncEvent::ProcessedAllFolderHunks
264        .emit(&ctx_ref.handler)
265        .await;
266
267    Ok(report)
268}
269
270pub(crate) async fn expunge<L, R>(
271    ctx_ref: Arc<SyncPoolContext<L::Context, R::Context>>,
272    folders: &HashSet<String>,
273) where
274    L: BackendContextBuilder + 'static,
275    R: BackendContextBuilder + 'static,
276{
277    FuturesUnordered::from_iter(folders.iter().map(|folder_ref| {
278        let ctx = ctx_ref.clone();
279        let folder = folder_ref.clone();
280        let left_cached_expunge = async move {
281            if ctx.dry_run {
282                Ok(())
283            } else {
284                ctx.left_cache.expunge_folder(&folder).await
285            }
286        };
287
288        let ctx = ctx_ref.clone();
289        let folder = folder_ref.clone();
290        let left_expunge = async move {
291            if ctx.dry_run {
292                Ok(())
293            } else {
294                ctx.left.expunge_folder(&folder).await
295            }
296        };
297
298        let ctx = ctx_ref.clone();
299        let folder = folder_ref.clone();
300        let right_cached_expunge = async move {
301            if ctx.dry_run {
302                Ok(())
303            } else {
304                ctx.right_cache.expunge_folder(&folder).await
305            }
306        };
307
308        let ctx = ctx_ref.clone();
309        let folder = folder_ref.clone();
310        let right_expunge = async move {
311            if ctx.dry_run {
312                Ok(())
313            } else {
314                ctx.right.expunge_folder(&folder).await
315            }
316        };
317
318        async {
319            tokio::try_join!(
320                left_cached_expunge,
321                left_expunge,
322                right_cached_expunge,
323                right_expunge
324            )
325        }
326    }))
327    .for_each(|task| async {
328        if let Err(err) = task {
329            debug!("cannot expunge folders: {err}");
330            trace!("{err:?}");
331        }
332    })
333    .await;
334
335    SyncEvent::ExpungedAllFolders.emit(&ctx_ref.handler).await
336}