credence_lib/coordinator/
coordinator.rs

1use super::{event::*, task::*};
2
3use {
4    notify::*,
5    std::{path::*, result::Result},
6    tokio::{sync::mpsc::*, task::*},
7    tokio_util::sync::*,
8};
9
10//
11// Coordinator
12//
13
14/// File modification coordinator.
15#[derive(Debug)]
16pub struct Coordinator {
17    /// Watcher.
18    pub watcher: RecommendedWatcher,
19
20    /// [CancellationToken] for task.
21    pub cancellation: CancellationToken,
22
23    /// [JoinHandle] for task.
24    pub task: JoinHandle<()>,
25}
26
27impl Coordinator {
28    /// Constructor.
29    pub fn new(
30        coordinator_path: PathBuf,
31        follow_symlinks: bool,
32        compare_contents: bool,
33        queue_size: usize,
34    ) -> Result<Self, Error> {
35        let (sender, receiver) = channel(queue_size);
36
37        let watcher = RecommendedWatcher::new(
38            SenderEventHandler(sender.clone()),
39            Config::default().with_follow_symlinks(follow_symlinks).with_compare_contents(compare_contents),
40        )?;
41
42        let cancellation = CancellationToken::new();
43        let task = spawn_coordinator_task(coordinator_path, receiver, cancellation.clone());
44
45        Ok(Self { watcher, cancellation, task })
46    }
47
48    /// Shutdown (and wait for shutdown to complete).
49    pub async fn shutdown(self) -> Result<(), JoinError> {
50        self.cancellation.cancel();
51        self.task.await
52    }
53
54    /// Add a coordinated path.
55    ///
56    /// If it's a directory it will be recursive.
57    pub fn add<PathT>(&mut self, path: PathT) -> Result<(), Error>
58    where
59        PathT: AsRef<Path>,
60    {
61        let path = path.as_ref();
62        self.watcher.watch(
63            path,
64            if path.is_dir() {
65                tracing::info!("coordinating (recursively): {}", path.display());
66                notify::RecursiveMode::Recursive
67            } else {
68                tracing::info!("coordinating: {}", path.display());
69                notify::RecursiveMode::NonRecursive
70            },
71        )
72    }
73
74    /// Remove a coordinated path.
75    #[allow(dead_code)]
76    pub fn remove<PathT>(&mut self, path: PathT) -> Result<(), Error>
77    where
78        PathT: AsRef<Path>,
79    {
80        let path = path.as_ref();
81        tracing::info!("no longer coordinating: {}", path.display());
82        self.watcher.unwatch(path)
83    }
84}