1pub mod config;
7pub mod hunk;
8pub mod patch;
9pub mod report;
10
11use std::{collections::HashSet, sync::Arc};
12
13use futures::{stream::FuturesUnordered, StreamExt};
14use tracing::{debug, trace};
15
16use self::{hunk::FolderSyncHunk, report::FolderSyncReport};
17use super::{
18 add::AddFolder, delete::DeleteFolder, expunge::ExpungeFolder, list::ListFolders, Folder,
19};
20#[doc(inline)]
21pub use super::{Error, Result};
22use crate::{
23 backend::context::BackendContextBuilder,
24 sync::{pool::SyncPoolContext, SyncDestination, SyncEvent},
25};
26
27pub(crate) async fn sync<L, R>(
28 ctx_ref: Arc<SyncPoolContext<L::Context, R::Context>>,
29) -> Result<FolderSyncReport>
30where
31 L: BackendContextBuilder + 'static,
32 R: BackendContextBuilder + 'static,
33{
34 let mut report = FolderSyncReport::default();
35
36 let ctx = ctx_ref.clone();
37 let left_cached_folders = tokio::spawn(async move {
38 let folders = ctx
39 .left_cache
40 .list_folders()
41 .await
42 .map_err(Error::ListLeftFoldersCachedError)?;
43 let names = HashSet::<String>::from_iter(
44 folders
45 .iter()
46 .map(Folder::get_kind_or_name)
47 .filter_map(|folder| {
53 if ctx.folder_filters.matches(folder) {
54 Some(folder.to_owned())
55 } else {
56 None
57 }
58 }),
59 );
60
61 SyncEvent::ListedLeftCachedFolders(names.len())
62 .emit(&ctx.handler)
63 .await;
64
65 Result::Ok(names)
66 });
67
68 let ctx = ctx_ref.clone();
69 let left_folders = tokio::spawn(async move {
70 let folders = ctx
71 .left
72 .list_folders()
73 .await
74 .map_err(Error::ListLeftFoldersError)?;
75 let names = HashSet::<String>::from_iter(
76 folders
77 .iter()
78 .map(Folder::get_kind_or_name)
79 .filter_map(|folder| {
85 if ctx.folder_filters.matches(folder) {
86 Some(folder.to_owned())
87 } else {
88 None
89 }
90 }),
91 );
92
93 SyncEvent::ListedLeftFolders(names.len())
94 .emit(&ctx.handler)
95 .await;
96
97 Result::Ok(names)
98 });
99
100 let ctx = ctx_ref.clone();
101 let right_cached_folders = tokio::spawn(async move {
102 let folders = ctx
103 .right_cache
104 .list_folders()
105 .await
106 .map_err(Error::ListRightFoldersCachedError)?;
107 let names = HashSet::<String>::from_iter(
108 folders
109 .iter()
110 .map(Folder::get_kind_or_name)
111 .filter_map(|folder| {
117 if ctx.folder_filters.matches(folder) {
118 Some(folder.to_owned())
119 } else {
120 None
121 }
122 }),
123 );
124
125 SyncEvent::ListedRightCachedFolders(names.len())
126 .emit(&ctx.handler)
127 .await;
128
129 Result::Ok(names)
130 });
131
132 let ctx = ctx_ref.clone();
133 let right_folders = tokio::spawn(async move {
134 let folders = ctx
135 .right
136 .list_folders()
137 .await
138 .map_err(Error::ListRightFoldersError)?;
139 let names: HashSet<String> = HashSet::from_iter(
140 folders
141 .iter()
142 .map(Folder::get_kind_or_name)
143 .filter_map(|folder| {
149 if ctx.folder_filters.matches(folder) {
150 Some(folder.to_owned())
151 } else {
152 None
153 }
154 }),
155 );
156
157 SyncEvent::ListedRightFolders(names.len())
158 .emit(&ctx.handler)
159 .await;
160
161 Result::Ok(names)
162 });
163
164 let (left_cached_folders, left_folders, right_cached_folders, right_folders) =
165 tokio::try_join!(
166 left_cached_folders,
167 left_folders,
168 right_cached_folders,
169 right_folders
170 )
171 .map_err(Error::FolderTasksFailed)?;
172
173 SyncEvent::ListedAllFolders.emit(&ctx_ref.handler).await;
174
175 let mut patch = patch::build(
176 left_cached_folders?,
177 left_folders?,
178 right_cached_folders?,
179 right_folders?,
180 );
181
182 ctx_ref.apply_folder_permissions(&mut patch);
183
184 SyncEvent::GeneratedFolderPatch(patch.clone())
185 .emit(&ctx_ref.handler)
186 .await;
187
188 let (folders, patch) = patch.into_iter().fold(
189 (HashSet::default(), vec![]),
190 |(mut folders, mut patch), (folder, hunks)| {
191 folders.insert(folder);
192 patch.extend(hunks);
193 (folders, patch)
194 },
195 );
196
197 report.names = folders;
198 report.patch = FuturesUnordered::from_iter(patch.into_iter().map(|hunk| {
199 let ctx = ctx_ref.clone();
200 tokio::spawn(async move {
201 let hunk_clone = hunk.clone();
202 let handler = ctx.handler.clone();
203 let task = async move {
204 if ctx.dry_run {
205 return Ok(());
206 }
207
208 match hunk_clone {
209 FolderSyncHunk::Cache(folder, SyncDestination::Left) => {
210 ctx.left_cache.add_folder(&folder).await?;
211 }
212 FolderSyncHunk::Create(folder, SyncDestination::Left) => {
213 ctx.left.add_folder(&folder).await?;
214 }
215 FolderSyncHunk::Cache(folder, SyncDestination::Right) => {
216 ctx.right_cache.add_folder(&folder).await?;
217 }
218 FolderSyncHunk::Create(folder, SyncDestination::Right) => {
219 ctx.right.add_folder(&folder).await?;
220 }
221 FolderSyncHunk::Uncache(folder, SyncDestination::Left) => {
222 ctx.left_cache.delete_folder(&folder).await?;
223 }
224 FolderSyncHunk::Delete(folder, SyncDestination::Left) => {
225 ctx.left.delete_folder(&folder).await?;
226 }
227 FolderSyncHunk::Uncache(folder, SyncDestination::Right) => {
228 ctx.right_cache.delete_folder(&folder).await?;
229 }
230 FolderSyncHunk::Delete(folder, SyncDestination::Right) => {
231 ctx.right.delete_folder(&folder).await?;
232 }
233 };
234
235 Ok(())
236 };
237
238 let output = task.await;
239
240 SyncEvent::ProcessedFolderHunk(hunk.clone())
241 .emit(&handler)
242 .await;
243
244 match output {
245 Ok(()) => (hunk, None),
246 Err(err) => (hunk, Some(err)),
247 }
248 })
249 }))
250 .filter_map(|hunk| async {
251 match hunk {
252 Ok(hunk) => Some(hunk),
253 Err(err) => {
254 debug!("cannot process folder hunk: {err}");
255 trace!("{err:?}");
256 None
257 }
258 }
259 })
260 .collect::<Vec<_>>()
261 .await;
262
263 SyncEvent::ProcessedAllFolderHunks
264 .emit(&ctx_ref.handler)
265 .await;
266
267 Ok(report)
268}
269
270pub(crate) async fn expunge<L, R>(
271 ctx_ref: Arc<SyncPoolContext<L::Context, R::Context>>,
272 folders: &HashSet<String>,
273) where
274 L: BackendContextBuilder + 'static,
275 R: BackendContextBuilder + 'static,
276{
277 FuturesUnordered::from_iter(folders.iter().map(|folder_ref| {
278 let ctx = ctx_ref.clone();
279 let folder = folder_ref.clone();
280 let left_cached_expunge = async move {
281 if ctx.dry_run {
282 Ok(())
283 } else {
284 ctx.left_cache.expunge_folder(&folder).await
285 }
286 };
287
288 let ctx = ctx_ref.clone();
289 let folder = folder_ref.clone();
290 let left_expunge = async move {
291 if ctx.dry_run {
292 Ok(())
293 } else {
294 ctx.left.expunge_folder(&folder).await
295 }
296 };
297
298 let ctx = ctx_ref.clone();
299 let folder = folder_ref.clone();
300 let right_cached_expunge = async move {
301 if ctx.dry_run {
302 Ok(())
303 } else {
304 ctx.right_cache.expunge_folder(&folder).await
305 }
306 };
307
308 let ctx = ctx_ref.clone();
309 let folder = folder_ref.clone();
310 let right_expunge = async move {
311 if ctx.dry_run {
312 Ok(())
313 } else {
314 ctx.right.expunge_folder(&folder).await
315 }
316 };
317
318 async {
319 tokio::try_join!(
320 left_cached_expunge,
321 left_expunge,
322 right_cached_expunge,
323 right_expunge
324 )
325 }
326 }))
327 .for_each(|task| async {
328 if let Err(err) = task {
329 debug!("cannot expunge folders: {err}");
330 trace!("{err:?}");
331 }
332 })
333 .await;
334
335 SyncEvent::ExpungedAllFolders.emit(&ctx_ref.handler).await
336}