Skip to main content

brume_daemon/
synchro_list.rs

1use std::{any::Any, collections::HashMap, sync::Arc, thread::sleep, time::Duration};
2
3use brume::{
4    concrete::{
5        FSBackend, FsBackendError, Named,
6        local::{LocalDir, LocalSyncInfo},
7        nextcloud::{NextcloudFs, NextcloudSyncInfo},
8    },
9    filesystem::FileSystem,
10    synchro::{ConflictResolutionState, FullSyncStatus, Synchro, SynchroSide},
11    vfs::{Vfs, VirtualPath},
12};
13use futures::{StreamExt, future::join_all, stream};
14use log::{debug, error, info};
15use serde::Serialize;
16use thiserror::Error;
17use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
18use uuid::Uuid;
19
20use brume_daemon_proto::{
21    AnyFsCreationInfo, AnyFsDescription, AnyFsRef, AnySynchroCreationInfo, AnySynchroRef,
22    SynchroId, SynchroState, SynchroStatus,
23};
24
25use crate::db::{Database, DatabaseError};
26
27#[derive(Error, Debug)]
28pub enum SyncError {
29    #[error("Error during sync process")]
30    SyncFailed(#[from] SynchroFailed),
31    #[error("Invalid synchro")]
32    InvalidSynchroState(#[from] InvalidSynchro),
33    #[error("Synchro not found: {0:?}")]
34    SynchroNotFound(SynchroId),
35    #[error("Database error")]
36    Database(#[from] DatabaseError),
37}
38
39#[derive(Error, Debug)]
40#[error("Failed to synchronize {synchro}")]
41pub struct SynchroFailed {
42    synchro: AnySynchroRef,
43    source: brume::Error,
44}
45
46#[derive(Error, Debug)]
47#[error("Synchro in invalid state: {synchro}")]
48pub struct InvalidSynchro {
49    synchro: AnySynchroRef,
50}
51
52impl From<AnySynchroRef> for InvalidSynchro {
53    fn from(value: AnySynchroRef) -> Self {
54        InvalidSynchro { synchro: value }
55    }
56}
57
58#[derive(Error, Debug)]
59pub enum SynchroCreationError {
60    #[error("The filesystems are already synchronized")]
61    AlreadyPresent,
62    #[error("Failed to instantiate filesystem object")]
63    FileSystemCreationError(#[from] FsBackendError),
64    #[error("The provided synchro is not of the expected type")]
65    InvalidType { expected: String, found: String },
66}
67
68impl SynchroCreationError {
69    fn invalid_type<Expected: Named, Found: Named>() -> Self {
70        Self::InvalidType {
71            expected: Expected::TYPE_NAME.to_string(),
72            found: Found::TYPE_NAME.to_string(),
73        }
74    }
75}
76
77#[derive(Error, Debug)]
78pub enum SynchroDeletionError {
79    #[error("Invalid synchro")]
80    InvalidSynchroState(#[from] InvalidSynchro),
81    #[error("Synchro not found: {0:?}")]
82    SynchroNotFound(SynchroId),
83}
84
85#[derive(Error, Debug)]
86pub enum SynchroModificationError {
87    #[error("Invalid synchro")]
88    InvalidSynchroState(#[from] InvalidSynchro),
89    #[error("Synchro not found: {0:?}")]
90    SynchroNotFound(SynchroId),
91}
92
93/// Result of a synchro creation
94#[derive(Clone)]
95pub struct CreatedSynchro {
96    id: SynchroId,
97    name: String,
98    local_id: Uuid,
99    remote_id: Uuid,
100}
101
102impl CreatedSynchro {
103    pub fn id(&self) -> SynchroId {
104        self.id
105    }
106
107    pub fn name(&self) -> &str {
108        &self.name
109    }
110
111    pub fn local_id(&self) -> Uuid {
112        self.local_id
113    }
114
115    pub fn remote_id(&self) -> Uuid {
116        self.remote_id
117    }
118}
119
120/// A [`SynchroList`] that allows only read-only access.
121///
122/// The list content cannot be modified, but since the underlying [`FileSystems`] are locked behind
123/// mutexes, they are themselves modifiabled.
124///
125/// [`FileSystems`]: FileSystem
126#[derive(Clone)]
127pub struct ReadOnlySynchroList {
128    maps: Arc<RwLock<SynchroList>>,
129}
130
131impl ReadOnlySynchroList {
132    pub async fn read(&self) -> RwLockReadGuard<SynchroList> {
133        self.maps.read().await
134    }
135
136    pub async fn len(&self) -> usize {
137        self.read().await.len()
138    }
139
140    pub async fn is_empty(&self) -> bool {
141        self.len().await == 0
142    }
143}
144
145/// A synchronized Filesystem pair where both filesystems are in a Mutex
146struct SynchroMutex<
147    'local,
148    'remote,
149    LocalBackend: FSBackend + 'static,
150    RemoteBackend: FSBackend + 'static,
151> {
152    local: &'local Mutex<FileSystem<LocalBackend>>,
153    remote: &'remote Mutex<FileSystem<RemoteBackend>>,
154}
155
156/// Allow to easily convert the given type into [`Any`] for runtime downcast
157trait DynTyped {
158    fn as_any(&self) -> &dyn Any;
159}
160
161impl<T: FSBackend + 'static> DynTyped for Mutex<FileSystem<T>> {
162    fn as_any(&self) -> &dyn Any {
163        self
164    }
165}
166
167/// Holds a list of pair of [`FileSystems`] that can be synchronized using [`Synchro::full_sync`].
168///
169/// The filesystems can be any of the [`supported types`]
170///
171/// [`FileSystems`]: FileSystem
172/// [`supported types`]: brume_daemon_proto::AnyFsCreationInfo
173#[derive(Default)]
174pub struct SynchroList {
175    synchros: HashMap<SynchroId, RwLock<AnySynchroRef>>,
176    nextcloud_list: HashMap<Uuid, Mutex<FileSystem<NextcloudFs>>>,
177    local_dir_list: HashMap<Uuid, Mutex<FileSystem<LocalDir>>>,
178}
179
180impl SynchroList {
181    /// Create a new empty list
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    pub fn len(&self) -> usize {
187        self.synchros.len()
188    }
189
190    pub fn is_empty(&self) -> bool {
191        self.len() == 0
192    }
193
194    pub fn synchro_ref_list(&self) -> &HashMap<SynchroId, RwLock<AnySynchroRef>> {
195        &self.synchros
196    }
197
198    pub(crate) fn synchro_ref_list_mut(
199        &mut self,
200    ) -> &mut HashMap<SynchroId, RwLock<AnySynchroRef>> {
201        &mut self.synchros
202    }
203
204    fn create_and_insert_fs(
205        &mut self,
206        fs_info: AnyFsCreationInfo,
207    ) -> Result<AnyFsRef, FsBackendError> {
208        let fs_ref: AnyFsRef = fs_info.clone().into();
209        match fs_info {
210            AnyFsCreationInfo::LocalDir(info) => {
211                let concrete = info.try_into()?;
212                self.local_dir_list
213                    .insert(fs_ref.id(), Mutex::new(FileSystem::new(concrete)));
214                Ok(fs_ref)
215            }
216            AnyFsCreationInfo::Nextcloud(info) => {
217                let concrete = info.try_into()?;
218                self.nextcloud_list
219                    .insert(fs_ref.id(), Mutex::new(FileSystem::new(concrete)));
220                Ok(fs_ref)
221            }
222        }
223    }
224
225    pub(crate) fn insert_existing_fs<SyncInfo: Named + 'static>(
226        &mut self,
227        fs_info: AnyFsCreationInfo,
228        vfs: &Vfs<SyncInfo>,
229        id: Uuid,
230    ) -> Result<(), SynchroCreationError> {
231        match fs_info {
232            AnyFsCreationInfo::LocalDir(info) => {
233                let concrete = info.try_into().map_err(FsBackendError::from)?;
234                let mut fs = FileSystem::new(concrete);
235                *fs.vfs_mut() = (vfs as &dyn Any)
236                    .downcast_ref::<Vfs<LocalSyncInfo>>()
237                    .ok_or_else(|| SynchroCreationError::invalid_type::<LocalSyncInfo, SyncInfo>())?
238                    .clone();
239                self.local_dir_list.insert(id, Mutex::new(fs));
240                Ok(())
241            }
242            AnyFsCreationInfo::Nextcloud(info) => {
243                let concrete = info.try_into().map_err(FsBackendError::from)?;
244                let mut fs = FileSystem::new(concrete);
245                *fs.vfs_mut() = (vfs as &dyn Any)
246                    .downcast_ref::<Vfs<NextcloudSyncInfo>>()
247                    .ok_or_else(|| {
248                        SynchroCreationError::invalid_type::<NextcloudSyncInfo, SyncInfo>()
249                    })?
250                    .clone();
251                self.nextcloud_list.insert(id, Mutex::new(fs));
252                Ok(())
253            }
254        }
255    }
256
257    /// Checks if the two Filesystems in the pair are already synchronized together.
258    pub async fn is_synchronized(
259        &self,
260        local_desc: &AnyFsDescription,
261        remote_desc: &AnyFsDescription,
262    ) -> bool {
263        for sync in self.synchros.values() {
264            let sync = sync.read().await;
265            if (sync.local().description() == local_desc
266                || sync.local().description() == remote_desc)
267                && (sync.remote().description() == local_desc
268                    || sync.remote().description() == remote_desc)
269            {
270                return true;
271            }
272        }
273
274        false
275    }
276
277    async fn name_is_unique(&self, name: &str) -> bool {
278        for sync in self.synchros.values() {
279            if sync.read().await.name() == name {
280                return false;
281            }
282        }
283        true
284    }
285
286    async fn make_unique_name(&self, name: &str) -> String {
287        if self.name_is_unique(name).await {
288            return name.to_string();
289        }
290
291        let mut counter = 1;
292        loop {
293            let new_name = format!("{}{}", name, counter);
294            if self.name_is_unique(name).await {
295                return new_name;
296            }
297            counter += 1;
298        }
299    }
300
301    /// Generates a unique and simple name for a synchro, based on the name of the remote and local
302    /// fs
303    async fn unique_synchro_name(&self, local_name: &str, remote_name: &str) -> String {
304        let same_remote: Vec<_> = stream::iter(self.synchros.values())
305            .filter(|sync| async { sync.read().await.remote().name() == remote_name })
306            .collect()
307            .await;
308
309        if same_remote.is_empty() && self.name_is_unique(remote_name).await {
310            return remote_name.to_string();
311        }
312
313        let same_local: Vec<_> = stream::iter(same_remote)
314            .filter(|sync| async { sync.read().await.local().name() == local_name })
315            .collect()
316            .await;
317
318        let name = format!("{local_name}-{remote_name}");
319        if same_local.is_empty() && self.name_is_unique(&name).await {
320            return name;
321        }
322
323        self.make_unique_name(&name).await
324    }
325
326    /// Creates and inserts a new filesystem Synchro in the list
327    pub async fn insert(
328        &mut self,
329        sync_info: AnySynchroCreationInfo,
330    ) -> Result<CreatedSynchro, SynchroCreationError> {
331        let local_desc = sync_info.local().clone().into();
332        let remote_desc = sync_info.remote().clone().into();
333
334        if self.is_synchronized(&local_desc, &remote_desc).await {
335            return Err(SynchroCreationError::AlreadyPresent);
336        }
337        let id = SynchroId::new();
338
339        let local_ref = self.create_and_insert_fs(sync_info.local().clone())?;
340        let remote_ref = self.create_and_insert_fs(sync_info.remote().clone())?;
341        let name = if let Some(name) = sync_info.name() {
342            self.make_unique_name(name).await
343        } else {
344            self.unique_synchro_name(local_ref.name(), remote_ref.name())
345                .await
346        };
347
348        info!("Synchro created: name: {name}, id: {id:?}");
349        let res = CreatedSynchro {
350            id,
351            name: name.clone(),
352            local_id: local_ref.id(),
353            remote_id: remote_ref.id(),
354        };
355
356        let synchro = AnySynchroRef::new(local_ref, remote_ref, name);
357
358        self.synchros.insert(id, RwLock::new(synchro.clone()));
359
360        Ok(res)
361    }
362
363    /// Deletes a synchronization from the list
364    pub fn remove(&mut self, id: SynchroId) -> Result<(), SynchroDeletionError> {
365        if let Some(sync) = self.synchros.remove(&id) {
366            let mut res = Ok(());
367            let sync = sync.into_inner();
368
369            if !self.remove_fs(sync.local()) {
370                res = Err(SynchroDeletionError::InvalidSynchroState(
371                    sync.clone().into(),
372                ));
373            }
374            if !self.remove_fs(sync.remote()) {
375                res = Err(SynchroDeletionError::InvalidSynchroState(
376                    sync.clone().into(),
377                ));
378            }
379
380            info!("Synchro deleted: {id:?}");
381            res
382        } else {
383            Err(SynchroDeletionError::SynchroNotFound(id))
384        }
385    }
386
387    pub async fn resolve_conflict(
388        &self,
389        id: SynchroId,
390        path: &VirtualPath,
391        side: SynchroSide,
392        db: &Database,
393    ) -> Result<(), SyncError> {
394        let synchro = self
395            .synchros
396            .get(&id)
397            .ok_or_else(|| SyncError::SynchroNotFound(id))?;
398        let local_desc = synchro.read().await.local().description().clone();
399        let remote_desc = synchro.read().await.remote().description().clone();
400        match (local_desc, remote_desc) {
401            (AnyFsDescription::LocalDir(_), AnyFsDescription::LocalDir(_)) => {
402                self.resolve_conflict_sync::<LocalDir, LocalDir>(id, synchro, path, side, db)
403                    .await
404            }
405            (AnyFsDescription::LocalDir(_), AnyFsDescription::Nextcloud(_)) => {
406                self.resolve_conflict_sync::<LocalDir, NextcloudFs>(id, synchro, path, side, db)
407                    .await
408            }
409            (AnyFsDescription::Nextcloud(_), AnyFsDescription::LocalDir(_)) => {
410                self.resolve_conflict_sync::<NextcloudFs, LocalDir>(id, synchro, path, side, db)
411                    .await
412            }
413            (AnyFsDescription::Nextcloud(_), AnyFsDescription::Nextcloud(_)) => {
414                self.resolve_conflict_sync::<NextcloudFs, NextcloudFs>(id, synchro, path, side, db)
415                    .await
416            }
417        }
418    }
419
420    fn get_fs<Backend: FSBackend + 'static>(
421        &self,
422        fs: &AnyFsRef,
423    ) -> Option<&Mutex<FileSystem<Backend>>> {
424        match fs.description() {
425            AnyFsDescription::LocalDir(_) => self
426                .local_dir_list
427                .get(&fs.id())
428                .and_then(|fs| fs.as_any().downcast_ref::<Mutex<FileSystem<Backend>>>()),
429            AnyFsDescription::Nextcloud(_) => self
430                .nextcloud_list
431                .get(&fs.id())
432                .and_then(|fs| fs.as_any().downcast_ref::<Mutex<FileSystem<Backend>>>()),
433        }
434    }
435
436    fn remove_fs(&mut self, fs: &AnyFsRef) -> bool {
437        match fs.description() {
438            AnyFsDescription::LocalDir(_) => self.local_dir_list.remove(&fs.id()).is_some(),
439            AnyFsDescription::Nextcloud(_) => self.nextcloud_list.remove(&fs.id()).is_some(),
440        }
441    }
442
443    fn get_sync<LocalBackend: FSBackend + 'static, RemoteBackend: FSBackend + 'static>(
444        &self,
445        synchro: &AnySynchroRef,
446    ) -> Option<SynchroMutex<LocalBackend, RemoteBackend>> {
447        let local = self.get_fs::<LocalBackend>(synchro.local())?;
448        let remote = self.get_fs::<RemoteBackend>(synchro.remote())?;
449
450        Some(SynchroMutex { local, remote })
451    }
452
453    /// Resolves a conflict on a synchro in the list by applying the update from the chose side
454    pub async fn resolve_conflict_sync<
455        LocalBackend: FSBackend + 'static,
456        RemoteBackend: FSBackend + 'static,
457    >(
458        &self,
459        id: SynchroId,
460        synchro_lock: &RwLock<AnySynchroRef>,
461        path: &VirtualPath,
462        side: SynchroSide,
463        db: &Database,
464    ) -> Result<(), SyncError>
465    where
466        LocalBackend::SyncInfo: Serialize,
467        RemoteBackend::SyncInfo: Serialize,
468    {
469        // Wait for synchro to be ready
470        loop {
471            {
472                let mut synchro = synchro_lock.write().await;
473
474                if synchro.status().is_synchronizable() {
475                    let status = SynchroStatus::SyncInProgress;
476                    synchro.set_status(status);
477                    db.set_synchro_status(id, status).await?;
478                    break;
479                }
480            }
481            sleep(Duration::from_secs(1)); // TODO: make configurable
482        }
483
484        let res = {
485            let synchro = synchro_lock.read().await;
486
487            let synchro_mutex = self
488                .get_sync::<LocalBackend, RemoteBackend>(&synchro)
489                .ok_or_else(|| InvalidSynchro::from(synchro.clone()))
490                .unwrap();
491
492            let mut local_fs = synchro_mutex.local.lock().await;
493            let mut remote_fs = synchro_mutex.remote.lock().await;
494
495            let mut sync: Synchro<'_, '_, LocalBackend, RemoteBackend> =
496                Synchro::new(&mut local_fs, &mut remote_fs);
497            sync.resolve_conflict(path, side).await
498        };
499
500        match res {
501            Ok(conflict_result) => {
502                let status = conflict_result.status().into();
503                synchro_lock.write().await.set_status(status);
504                db.set_synchro_status(id, status).await?;
505
506                match conflict_result.state() {
507                    ConflictResolutionState::Local(node_state) => {
508                        db.update_vfs_node_state(
509                            synchro_lock.read().await.local().id(),
510                            path,
511                            node_state,
512                        )
513                        .await?
514                    }
515                    ConflictResolutionState::Remote(node_state) => {
516                        db.update_vfs_node_state(
517                            synchro_lock.read().await.remote().id(),
518                            path,
519                            node_state,
520                        )
521                        .await?
522                    }
523                    ConflictResolutionState::None => {
524                        db.delete_vfs_node(synchro_lock.read().await.local().id(), path)
525                            .await?
526                    }
527                }
528                Ok(())
529            }
530
531            Err(err) => {
532                let mut synchro = synchro_lock.write().await;
533                let status = FullSyncStatus::from(&err).into();
534                synchro.set_status(status);
535                db.set_synchro_status(id, status).await?;
536                Err(SynchroFailed {
537                    synchro: synchro.to_owned(),
538                    source: err,
539                }
540                .into())
541            }
542        }
543    }
544
545    /// Performs a [`full_sync`] on the provided synchro, that should be in the list
546    ///
547    /// [`full_sync`]: brume::synchro::Synchro::full_sync
548    pub async fn sync_one<LocalBackend: FSBackend + 'static, RemoteBackend: FSBackend + 'static>(
549        &self,
550        id: SynchroId,
551        synchro_lock: &RwLock<AnySynchroRef>,
552        db: &Database,
553    ) -> Result<(), SyncError>
554    where
555        LocalBackend::SyncInfo: Serialize,
556        RemoteBackend::SyncInfo: Serialize,
557    {
558        {
559            let mut synchro = synchro_lock.write().await;
560
561            // Skip synchro that are already identified as desynchronized until the user fixes it
562            if !synchro.status().is_synchronizable() {
563                return Ok(());
564            }
565            let status = SynchroStatus::SyncInProgress;
566            synchro.set_status(status);
567            db.set_synchro_status(id, status).await?;
568        }
569
570        let res = {
571            let synchro = synchro_lock.read().await;
572            debug!("Starting full_sync for Synchro {}", synchro.name());
573
574            let synchro_mutex = self
575                .get_sync::<LocalBackend, RemoteBackend>(&synchro)
576                .ok_or_else(|| InvalidSynchro::from(synchro.clone()))?;
577
578            let mut local_fs = synchro_mutex.local.lock().await;
579            let mut remote_fs = synchro_mutex.remote.lock().await;
580            let mut sync = Synchro::new(&mut local_fs, &mut remote_fs);
581            sync.full_sync().await
582        };
583
584        match res {
585            Ok(sync_res) => {
586                // Propagate updates to the db
587                for update in sync_res.local_updates() {
588                    db.update_vfs(synchro_lock.read().await.local().id(), update)
589                        .await?;
590                }
591
592                for update in sync_res.remote_updates() {
593                    db.update_vfs(synchro_lock.read().await.remote().id(), update)
594                        .await?;
595                }
596
597                let status = sync_res.status();
598                let mut synchro = synchro_lock.write().await;
599                debug!(
600                    "Synchro {} returned with status: {status:?}",
601                    synchro.name()
602                );
603                let status = status.into();
604                synchro.set_status(status);
605                db.set_synchro_status(id, status).await?;
606                Ok(())
607            }
608            Err(err) => {
609                let mut synchro = synchro_lock.write().await;
610                error!("Synchro {} returned an error: {err}", synchro.name());
611                let status = FullSyncStatus::from(&err).into();
612                synchro.set_status(status);
613                db.set_synchro_status(id, status).await?;
614                Err(SynchroFailed {
615                    synchro: synchro.to_owned(),
616                    source: err,
617                }
618                .into())
619            }
620        }
621    }
622
623    /// Performs a [`full_sync`] on all the synchro in the list
624    ///
625    /// [`full_sync`]: brume::synchro::Synchro::full_sync
626    pub async fn sync_all(&self, db: &Database) -> Vec<Result<(), SyncError>> {
627        let futures: Vec<_> = self
628            .synchros
629            .iter()
630            .map(|(id, synchro)| async move {
631                if matches!(synchro.read().await.state(), SynchroState::Paused) {
632                    return Ok(());
633                }
634
635                let (local_desc, remote_desc) = {
636                    let sync = synchro.read().await;
637                    (
638                        sync.local().description().clone(),
639                        sync.remote().description().clone(),
640                    )
641                };
642                match (local_desc, remote_desc) {
643                    (AnyFsDescription::LocalDir(_), AnyFsDescription::LocalDir(_)) => {
644                        self.sync_one::<LocalDir, LocalDir>(*id, synchro, db).await
645                    }
646                    (AnyFsDescription::LocalDir(_), AnyFsDescription::Nextcloud(_)) => {
647                        self.sync_one::<LocalDir, NextcloudFs>(*id, synchro, db)
648                            .await
649                    }
650                    (AnyFsDescription::Nextcloud(_), AnyFsDescription::LocalDir(_)) => {
651                        self.sync_one::<NextcloudFs, LocalDir>(*id, synchro, db)
652                            .await
653                    }
654                    (AnyFsDescription::Nextcloud(_), AnyFsDescription::Nextcloud(_)) => {
655                        self.sync_one::<NextcloudFs, NextcloudFs>(*id, synchro, db)
656                            .await
657                    }
658                }
659            })
660            .collect();
661
662        // TODO: switch to FutureUnordered ?
663        join_all(futures).await
664    }
665
666    /// Sets the state of a synchro in the list
667    pub async fn set_state(
668        &self,
669        id: SynchroId,
670        state: SynchroState,
671    ) -> Result<(), SynchroModificationError> {
672        let sync = self
673            .synchros
674            .get(&id)
675            .ok_or(SynchroModificationError::SynchroNotFound(id))?;
676
677        sync.write().await.set_state(state);
678        Ok(())
679    }
680
681    pub async fn get_vfs(&self, id: SynchroId, side: SynchroSide) -> Result<Vfs<()>, SyncError> {
682        let synchro = self
683            .synchros
684            .get(&id)
685            .ok_or_else(|| SyncError::SynchroNotFound(id))?
686            .read()
687            .await;
688
689        let (desc, id) = match side {
690            SynchroSide::Local => (synchro.local().description().clone(), synchro.local().id()),
691            SynchroSide::Remote => (
692                synchro.remote().description().clone(),
693                synchro.remote().id(),
694            ),
695        };
696
697        match desc {
698            AnyFsDescription::LocalDir(_) => {
699                let fs = self
700                    .local_dir_list
701                    .get(&id)
702                    .ok_or_else(|| InvalidSynchro::from(synchro.clone()))?
703                    .lock()
704                    .await;
705
706                Ok(fs.vfs().into())
707            }
708            AnyFsDescription::Nextcloud(_) => {
709                let fs = self
710                    .nextcloud_list
711                    .get(&id)
712                    .ok_or_else(|| InvalidSynchro::from(synchro.clone()))?
713                    .lock()
714                    .await;
715
716                Ok(fs.vfs().into())
717            }
718        }
719    }
720}
721
722/// A [`SynchroList`] that allows read-write access and can be shared between threads.
723#[derive(Clone)]
724pub struct ReadWriteSynchroList {
725    maps: Arc<RwLock<SynchroList>>,
726}
727
728impl Default for ReadWriteSynchroList {
729    fn default() -> Self {
730        Self::new()
731    }
732}
733
734impl From<SynchroList> for ReadWriteSynchroList {
735    fn from(value: SynchroList) -> Self {
736        Self {
737            maps: Arc::new(RwLock::new(value)),
738        }
739    }
740}
741
742impl ReadWriteSynchroList {
743    pub async fn read(&self) -> RwLockReadGuard<SynchroList> {
744        self.maps.read().await
745    }
746
747    /// Inserts a new synchronized pair of filesystem in the list
748    pub async fn insert(
749        &self,
750        sync_info: AnySynchroCreationInfo,
751    ) -> Result<CreatedSynchro, SynchroCreationError> {
752        self.maps.write().await.insert(sync_info).await
753    }
754
755    /// Deletes a synchronization from the list
756    pub async fn remove(&self, id: SynchroId) -> Result<(), SynchroDeletionError> {
757        self.maps.write().await.remove(id)
758    }
759
760    /// Forces a synchro of all the filesystems in the list, and updates the db accordingly
761    pub async fn sync_all(&self, db: &Database) -> Vec<Result<(), SyncError>> {
762        let maps = self.maps.read().await;
763
764        maps.sync_all(db).await
765    }
766
767    /// Resolves a conflict on a path inside a synchro, by applying the update from `side`.
768    /// Updates the db accordingly
769    pub async fn resolve_conflict(
770        &self,
771        id: SynchroId,
772        path: &VirtualPath,
773        side: SynchroSide,
774        db: &Database,
775    ) -> Result<(), SyncError> {
776        let maps = self.maps.read().await;
777
778        maps.resolve_conflict(id, path, side, db).await
779    }
780
781    /// Returns a read-only view of the list
782    pub fn as_read_only(&self) -> ReadOnlySynchroList {
783        ReadOnlySynchroList {
784            maps: self.maps.clone(),
785        }
786    }
787
788    /// Creates a new empty list
789    pub fn new() -> Self {
790        Self {
791            maps: Arc::new(RwLock::new(SynchroList::new())),
792        }
793    }
794
795    pub async fn len(&self) -> usize {
796        self.read().await.len()
797    }
798
799    pub async fn is_empty(&self) -> bool {
800        self.len().await == 0
801    }
802}
803
804#[cfg(test)]
805mod test {
806    use brume::concrete::{local::LocalDirCreationInfo, nextcloud::NextcloudFsCreationInfo};
807    use brume_daemon_proto::{AnyFsCreationInfo, AnySynchroCreationInfo};
808
809    use super::*;
810
811    #[tokio::test]
812    async fn test_insert_remove() {
813        let mut list = SynchroList::new();
814
815        let loc_a = LocalDirCreationInfo::new("/a");
816        let loc_b = LocalDirCreationInfo::new("/b");
817        let sync1 = AnySynchroCreationInfo::new(
818            AnyFsCreationInfo::LocalDir(loc_a),
819            AnyFsCreationInfo::LocalDir(loc_b),
820            None,
821        );
822
823        let id1 = list.insert(sync1).await.unwrap().id();
824
825        assert_eq!(list.synchros.len(), 1);
826        assert_eq!(list.local_dir_list.len(), 2);
827        assert_eq!(list.nextcloud_list.len(), 0);
828
829        let nx_a = NextcloudFsCreationInfo::new("https://cloud.com", "user", "user");
830        let loc_b = LocalDirCreationInfo::new("/b");
831        let sync2 = AnySynchroCreationInfo::new(
832            AnyFsCreationInfo::LocalDir(loc_b),
833            AnyFsCreationInfo::Nextcloud(nx_a),
834            None,
835        );
836
837        list.insert(sync2).await.unwrap();
838
839        assert_eq!(list.synchros.len(), 2);
840        assert_eq!(list.local_dir_list.len(), 3);
841        assert_eq!(list.nextcloud_list.len(), 1);
842
843        list.remove(id1).unwrap();
844
845        assert_eq!(list.synchros.len(), 1);
846        assert_eq!(list.local_dir_list.len(), 1);
847        assert_eq!(list.nextcloud_list.len(), 1);
848    }
849}