credence_lib/coordinator/
coordinator.rs1use super::{event::*, task::*};
2
3use {
4 notify::*,
5 std::{path::*, result::Result},
6 tokio::{sync::mpsc::*, task::*},
7 tokio_util::sync::*,
8};
9
10#[derive(Debug)]
16pub struct Coordinator {
17 pub watcher: RecommendedWatcher,
19
20 pub cancellation: CancellationToken,
22
23 pub task: JoinHandle<()>,
25}
26
27impl Coordinator {
28 pub fn new(
30 coordinator_path: PathBuf,
31 follow_symlinks: bool,
32 compare_contents: bool,
33 queue_size: usize,
34 ) -> Result<Self, Error> {
35 let (sender, receiver) = channel(queue_size);
36
37 let watcher = RecommendedWatcher::new(
38 SenderEventHandler(sender.clone()),
39 Config::default().with_follow_symlinks(follow_symlinks).with_compare_contents(compare_contents),
40 )?;
41
42 let cancellation = CancellationToken::default();
43 let task = spawn_coordinator_task(coordinator_path, receiver, cancellation.clone());
44
45 Ok(Self { watcher, cancellation, task })
46 }
47
48 pub async fn shutdown(self) -> Result<(), JoinError> {
50 self.cancellation.cancel();
51 self.task.await
52 }
53
54 pub fn add<PathT>(&mut self, path: PathT) -> Result<(), Error>
58 where
59 PathT: AsRef<Path>,
60 {
61 let path = path.as_ref();
62 self.watcher.watch(
63 path,
64 if path.is_dir() {
65 tracing::info!("coordinating (recursively): {}", path.display());
66 notify::RecursiveMode::Recursive
67 } else {
68 tracing::info!("coordinating: {}", path.display());
69 notify::RecursiveMode::NonRecursive
70 },
71 )
72 }
73
74 #[allow(dead_code)]
76 pub fn remove<PathT>(&mut self, path: PathT) -> Result<(), Error>
77 where
78 PathT: AsRef<Path>,
79 {
80 let path = path.as_ref();
81 tracing::info!("no longer coordinating: {}", path.display());
82 self.watcher.unwatch(path)
83 }
84}