1use std::path::{Path, PathBuf};
2use std::{fmt, io};
3
4use distant_net::common::Request;
5use log::*;
6use tokio::sync::mpsc;
7use tokio::task::JoinHandle;
8
9use crate::client::{DistantChannel, DistantChannelExt};
10use crate::constants::CLIENT_WATCHER_CAPACITY;
11use crate::protocol::{self, Change, ChangeKindSet};
12
13pub struct Watcher {
15 channel: DistantChannel,
16 path: PathBuf,
17 task: JoinHandle<()>,
18 rx: mpsc::Receiver<Change>,
19 active: bool,
20}
21
22impl fmt::Debug for Watcher {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("Watcher").field("path", &self.path).finish()
25 }
26}
27
28impl Watcher {
29 pub async fn watch(
31 mut channel: DistantChannel,
32 path: impl Into<PathBuf>,
33 recursive: bool,
34 only: impl Into<ChangeKindSet>,
35 except: impl Into<ChangeKindSet>,
36 ) -> io::Result<Self> {
37 let path = path.into();
38 let only = only.into();
39 let except = except.into();
40 trace!(
41 "Watching {:?} (recursive = {}){}{}",
42 path,
43 recursive,
44 if only.is_empty() {
45 String::new()
46 } else {
47 format!(" (only = {only})")
48 },
49 if except.is_empty() {
50 String::new()
51 } else {
52 format!(" (except = {except})")
53 },
54 );
55
56 let mut mailbox = channel
58 .mail(Request::new(protocol::Msg::Single(
59 protocol::Request::Watch {
60 path: path.to_path_buf(),
61 recursive,
62 only: only.into_sorted_vec(),
63 except: except.into_sorted_vec(),
64 },
65 )))
66 .await?;
67
68 let (tx, rx) = mpsc::channel(CLIENT_WATCHER_CAPACITY);
69
70 let mut queue: Vec<Change> = Vec::new();
72 let mut confirmed = false;
73 while let Some(res) = mailbox.next().await {
74 for data in res.payload.into_vec() {
75 match data {
76 protocol::Response::Changed(change) => queue.push(change),
77 protocol::Response::Ok => {
78 confirmed = true;
79 }
80 protocol::Response::Error(x) => return Err(io::Error::from(x)),
81 x => {
82 return Err(io::Error::new(
83 io::ErrorKind::Other,
84 format!("Unexpected response: {x:?}"),
85 ))
86 }
87 }
88 }
89
90 if confirmed {
94 break;
95 }
96 }
97
98 trace!("Forwarding {} queued changes for {:?}", queue.len(), path);
100 for change in queue {
101 if tx.send(change).await.is_err() {
102 return Err(io::Error::new(io::ErrorKind::Other, "Queue change dropped"));
103 }
104 }
105
106 if !confirmed {
109 return Err(io::Error::new(io::ErrorKind::Other, "Missing confirmation"));
110 }
111
112 let task = tokio::spawn({
115 let path = path.clone();
116 async move {
117 while let Some(res) = mailbox.next().await {
118 for data in res.payload.into_vec() {
119 match data {
120 protocol::Response::Changed(change) => {
121 if tx.is_closed() {
124 break;
125 }
126
127 if let Err(x) = tx.send(change).await {
129 error!(
130 "Watcher for {:?} failed to send change {:?}",
131 path, x.0
132 );
133 break;
134 }
135 }
136 _ => continue,
137 }
138 }
139 }
140 }
141 });
142
143 Ok(Self {
144 path,
145 channel,
146 task,
147 rx,
148 active: true,
149 })
150 }
151
152 pub fn path(&self) -> &Path {
154 self.path.as_path()
155 }
156
157 pub fn is_active(&self) -> bool {
159 self.active
160 }
161
162 pub async fn next(&mut self) -> Option<Change> {
164 self.rx.recv().await
165 }
166
167 pub async fn unwatch(&mut self) -> io::Result<()> {
169 trace!("Unwatching {:?}", self.path);
170 self.channel.unwatch(self.path.to_path_buf()).await?;
171
172 self.task.abort();
174 self.active = false;
175
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::sync::Arc;
183
184 use distant_net::common::{FramedTransport, InmemoryTransport, Response};
185 use distant_net::Client;
186 use test_log::test;
187 use tokio::sync::Mutex;
188
189 use super::*;
190 use crate::protocol::ChangeKind;
191 use crate::DistantClient;
192
193 fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) {
194 let (t1, t2) = FramedTransport::pair(100);
195 (t1, Client::spawn_inmemory(t2, Default::default()))
196 }
197
198 #[test(tokio::test)]
199 async fn watcher_should_have_path_reflect_watched_path() {
200 let (mut transport, session) = make_session();
201 let test_path = Path::new("/some/test/path");
202
203 let watch_task = tokio::spawn(async move {
206 Watcher::watch(
207 session.clone_channel(),
208 test_path,
209 true,
210 ChangeKindSet::empty(),
211 ChangeKindSet::empty(),
212 )
213 .await
214 });
215
216 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
218
219 transport
221 .write_frame_for(&Response::new(req.id, protocol::Response::Ok))
222 .await
223 .unwrap();
224
225 let watcher = watch_task.await.unwrap().unwrap();
227 assert_eq!(watcher.path(), test_path);
228 }
229
230 #[test(tokio::test)]
231 async fn watcher_should_support_getting_next_change() {
232 let (mut transport, session) = make_session();
233 let test_path = Path::new("/some/test/path");
234
235 let watch_task = tokio::spawn(async move {
238 Watcher::watch(
239 session.clone_channel(),
240 test_path,
241 true,
242 ChangeKindSet::empty(),
243 ChangeKindSet::empty(),
244 )
245 .await
246 });
247
248 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
250
251 transport
253 .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
254 .await
255 .unwrap();
256
257 let mut watcher = watch_task.await.unwrap().unwrap();
259
260 transport
262 .write_frame_for(&Response::new(
263 req.id,
264 vec![
265 protocol::Response::Changed(Change {
266 timestamp: 0,
267 kind: ChangeKind::Access,
268 path: test_path.to_path_buf(),
269 details: Default::default(),
270 }),
271 protocol::Response::Changed(Change {
272 timestamp: 1,
273 kind: ChangeKind::Modify,
274 path: test_path.to_path_buf(),
275 details: Default::default(),
276 }),
277 ],
278 ))
279 .await
280 .unwrap();
281
282 let change = watcher.next().await.expect("Watcher closed unexpectedly");
284 assert_eq!(
285 change,
286 Change {
287 timestamp: 0,
288 kind: ChangeKind::Access,
289 path: test_path.to_path_buf(),
290 details: Default::default(),
291 }
292 );
293
294 let change = watcher.next().await.expect("Watcher closed unexpectedly");
295 assert_eq!(
296 change,
297 Change {
298 timestamp: 1,
299 kind: ChangeKind::Modify,
300 path: test_path.to_path_buf(),
301 details: Default::default(),
302 }
303 );
304 }
305
306 #[test(tokio::test)]
307 async fn watcher_should_distinguish_change_events_and_only_receive_changes_for_itself() {
308 let (mut transport, session) = make_session();
309 let test_path = Path::new("/some/test/path");
310
311 let watch_task = tokio::spawn(async move {
314 Watcher::watch(
315 session.clone_channel(),
316 test_path,
317 true,
318 ChangeKindSet::empty(),
319 ChangeKindSet::empty(),
320 )
321 .await
322 });
323
324 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
326
327 transport
329 .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
330 .await
331 .unwrap();
332
333 let mut watcher = watch_task.await.unwrap().unwrap();
335
336 transport
338 .write_frame_for(&Response::new(
339 req.id.clone(),
340 protocol::Response::Changed(Change {
341 timestamp: 0,
342 kind: ChangeKind::Access,
343 path: test_path.to_path_buf(),
344 details: Default::default(),
345 }),
346 ))
347 .await
348 .unwrap();
349
350 transport
352 .write_frame_for(&Response::new(
353 req.id.clone() + "1",
354 protocol::Response::Changed(Change {
355 timestamp: 1,
356 kind: ChangeKind::Modify,
357 path: test_path.to_path_buf(),
358 details: Default::default(),
359 }),
360 ))
361 .await
362 .unwrap();
363
364 transport
366 .write_frame_for(&Response::new(
367 req.id,
368 protocol::Response::Changed(Change {
369 timestamp: 2,
370 kind: ChangeKind::Delete,
371 path: test_path.to_path_buf(),
372 details: Default::default(),
373 }),
374 ))
375 .await
376 .unwrap();
377
378 let change = watcher.next().await.expect("Watcher closed unexpectedly");
380 assert_eq!(
381 change,
382 Change {
383 timestamp: 0,
384 kind: ChangeKind::Access,
385 path: test_path.to_path_buf(),
386 details: Default::default(),
387 }
388 );
389
390 let change = watcher.next().await.expect("Watcher closed unexpectedly");
391 assert_eq!(
392 change,
393 Change {
394 timestamp: 2,
395 kind: ChangeKind::Delete,
396 path: test_path.to_path_buf(),
397 details: Default::default(),
398 }
399 );
400 }
401
402 #[test(tokio::test)]
403 async fn watcher_should_stop_receiving_events_if_unwatched() {
404 let (mut transport, session) = make_session();
405 let test_path = Path::new("/some/test/path");
406
407 let watch_task = tokio::spawn(async move {
410 Watcher::watch(
411 session.clone_channel(),
412 test_path,
413 true,
414 ChangeKindSet::empty(),
415 ChangeKindSet::empty(),
416 )
417 .await
418 });
419
420 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
422
423 transport
425 .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
426 .await
427 .unwrap();
428
429 transport
431 .write_frame_for(&Response::new(
432 req.id,
433 vec![
434 protocol::Response::Changed(Change {
435 timestamp: 0,
436 kind: ChangeKind::Access,
437 path: test_path.to_path_buf(),
438 details: Default::default(),
439 }),
440 protocol::Response::Changed(Change {
441 timestamp: 1,
442 kind: ChangeKind::Modify,
443 path: test_path.to_path_buf(),
444 details: Default::default(),
445 }),
446 protocol::Response::Changed(Change {
447 timestamp: 2,
448 kind: ChangeKind::Delete,
449 path: test_path.to_path_buf(),
450 details: Default::default(),
451 }),
452 ],
453 ))
454 .await
455 .unwrap();
456
457 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
459
460 let watcher = Arc::new(Mutex::new(watch_task.await.unwrap().unwrap()));
463
464 let change = watcher
466 .lock()
467 .await
468 .next()
469 .await
470 .expect("Watcher closed unexpectedly");
471 assert_eq!(
472 change,
473 Change {
474 timestamp: 0,
475 kind: ChangeKind::Access,
476 path: test_path.to_path_buf(),
477 details: Default::default(),
478 }
479 );
480
481 let watcher_2 = Arc::clone(&watcher);
483 let unwatch_task = tokio::spawn(async move { watcher_2.lock().await.unwatch().await });
484
485 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
486
487 transport
488 .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
489 .await
490 .unwrap();
491
492 unwatch_task.await.unwrap().unwrap();
494
495 transport
496 .write_frame_for(&Response::new(
497 req.id,
498 protocol::Response::Changed(Change {
499 timestamp: 3,
500 kind: ChangeKind::Unknown,
501 path: test_path.to_path_buf(),
502 details: Default::default(),
503 }),
504 ))
505 .await
506 .unwrap();
507
508 assert_eq!(
511 watcher.lock().await.next().await,
512 Some(Change {
513 timestamp: 1,
514 kind: ChangeKind::Modify,
515 path: test_path.to_path_buf(),
516 details: Default::default(),
517 })
518 );
519 assert_eq!(
520 watcher.lock().await.next().await,
521 Some(Change {
522 timestamp: 2,
523 kind: ChangeKind::Delete,
524 path: test_path.to_path_buf(),
525 details: Default::default(),
526 })
527 );
528 assert_eq!(watcher.lock().await.next().await, None);
529 }
530}