credence_lib/coordinator/
task.rs

1use super::event::*;
2
3use {
4    kutil_std::error::*,
5    std::path::*,
6    tokio::{fs::*, sync::mpsc::*, task::*, *},
7    tokio_util::sync::*,
8};
9
10/// Spawn coordinator task.
11pub fn spawn_coordinator_task(
12    coordinator_path: PathBuf,
13    mut receiver: Receiver<Message>,
14    cancellation: CancellationToken,
15) -> JoinHandle<()> {
16    tracing::info!("starting coordinator task: {}", coordinator_path.display());
17
18    spawn(async move {
19        // Touch on start
20        touch(&coordinator_path).await;
21
22        loop {
23            select! {
24                Some(message) = receiver.recv() => {
25                    match message {
26                        Ok(event) => {
27                            if event.need_rescan() || event.kind.is_modify() {
28                                if tracing::enabled!(tracing::Level::INFO) {
29                                    for path in event.paths {
30                                        tracing::info!("modified: {}", path.display());
31                                    }
32                                }
33
34                                touch(&coordinator_path).await;
35                            }
36                        }
37
38                        Err(error) => tracing::error!("{:?}: {}", error.kind, error),
39                    }
40                }
41
42                _ = cancellation.cancelled() => {
43                    tracing::info!("cancelled coordinator task: {}", coordinator_path.display());
44                    break;
45                }
46            }
47        }
48
49        tracing::info!("coordinator task ended: {}", coordinator_path.display());
50
51        // https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html#clean-shutdown
52        receiver.close();
53    })
54}
55
56async fn touch(path: &PathBuf) {
57    if let Err(error) = File::create(path).await.with_path(path) {
58        tracing::error!("{}", error)
59    }
60}