credence_lib/coordinator/
task.rs1use super::event::*;
2
3use {
4 kutil_std::error::*,
5 std::path::*,
6 tokio::{fs::*, sync::mpsc::*, task::*, *},
7 tokio_util::sync::*,
8};
9
10pub fn spawn_coordinator_task(
12 coordinator_path: PathBuf,
13 mut receiver: Receiver<Message>,
14 cancellation: CancellationToken,
15) -> JoinHandle<()> {
16 tracing::info!("starting coordinator task: {}", coordinator_path.display());
17
18 spawn(async move {
19 touch(&coordinator_path).await;
21
22 loop {
23 select! {
24 Some(message) = receiver.recv() => {
25 match message {
26 Ok(event) => {
27 if event.need_rescan() || event.kind.is_modify() {
28 if tracing::enabled!(tracing::Level::INFO) {
29 for path in event.paths {
30 tracing::info!("modified: {}", path.display());
31 }
32 }
33
34 touch(&coordinator_path).await;
35 }
36 }
37
38 Err(error) => tracing::error!("{:?}: {}", error.kind, error),
39 }
40 }
41
42 _ = cancellation.cancelled() => {
43 tracing::info!("cancelled coordinator task: {}", coordinator_path.display());
44 break;
45 }
46 }
47 }
48
49 tracing::info!("coordinator task ended: {}", coordinator_path.display());
50
51 receiver.close();
53 })
54}
55
56async fn touch(path: &PathBuf) {
57 if let Err(error) = File::create(path).await.with_path(path) {
58 tracing::error!("{}", error)
59 }
60}