distant_core/client/
watcher.rs

1use std::path::{Path, PathBuf};
2use std::{fmt, io};
3
4use distant_net::common::Request;
5use log::*;
6use tokio::sync::mpsc;
7use tokio::task::JoinHandle;
8
9use crate::client::{DistantChannel, DistantChannelExt};
10use crate::constants::CLIENT_WATCHER_CAPACITY;
11use crate::protocol::{self, Change, ChangeKindSet};
12
13/// Represents a watcher of some path on a remote machine
14pub struct Watcher {
15    channel: DistantChannel,
16    path: PathBuf,
17    task: JoinHandle<()>,
18    rx: mpsc::Receiver<Change>,
19    active: bool,
20}
21
22impl fmt::Debug for Watcher {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("Watcher").field("path", &self.path).finish()
25    }
26}
27
28impl Watcher {
29    /// Creates a watcher for some remote path
30    pub async fn watch(
31        mut channel: DistantChannel,
32        path: impl Into<PathBuf>,
33        recursive: bool,
34        only: impl Into<ChangeKindSet>,
35        except: impl Into<ChangeKindSet>,
36    ) -> io::Result<Self> {
37        let path = path.into();
38        let only = only.into();
39        let except = except.into();
40        trace!(
41            "Watching {:?} (recursive = {}){}{}",
42            path,
43            recursive,
44            if only.is_empty() {
45                String::new()
46            } else {
47                format!(" (only = {only})")
48            },
49            if except.is_empty() {
50                String::new()
51            } else {
52                format!(" (except = {except})")
53            },
54        );
55
56        // Submit our run request and get back a mailbox for responses
57        let mut mailbox = channel
58            .mail(Request::new(protocol::Msg::Single(
59                protocol::Request::Watch {
60                    path: path.to_path_buf(),
61                    recursive,
62                    only: only.into_sorted_vec(),
63                    except: except.into_sorted_vec(),
64                },
65            )))
66            .await?;
67
68        let (tx, rx) = mpsc::channel(CLIENT_WATCHER_CAPACITY);
69
70        // Wait to get the confirmation of watch as either ok or error
71        let mut queue: Vec<Change> = Vec::new();
72        let mut confirmed = false;
73        while let Some(res) = mailbox.next().await {
74            for data in res.payload.into_vec() {
75                match data {
76                    protocol::Response::Changed(change) => queue.push(change),
77                    protocol::Response::Ok => {
78                        confirmed = true;
79                    }
80                    protocol::Response::Error(x) => return Err(io::Error::from(x)),
81                    x => {
82                        return Err(io::Error::new(
83                            io::ErrorKind::Other,
84                            format!("Unexpected response: {x:?}"),
85                        ))
86                    }
87                }
88            }
89
90            // Exit if we got the confirmation
91            // NOTE: Doing this later because we want to make sure the entire payload is processed
92            //       first before exiting the loop
93            if confirmed {
94                break;
95            }
96        }
97
98        // Send out any of our queued changes that we got prior to the acknowledgement
99        trace!("Forwarding {} queued changes for {:?}", queue.len(), path);
100        for change in queue {
101            if tx.send(change).await.is_err() {
102                return Err(io::Error::new(io::ErrorKind::Other, "Queue change dropped"));
103            }
104        }
105
106        // If we never received an acknowledgement of watch before the mailbox closed,
107        // fail with a missing confirmation error
108        if !confirmed {
109            return Err(io::Error::new(io::ErrorKind::Other, "Missing confirmation"));
110        }
111
112        // Spawn a task that continues to look for change events, discarding anything
113        // else that it gets
114        let task = tokio::spawn({
115            let path = path.clone();
116            async move {
117                while let Some(res) = mailbox.next().await {
118                    for data in res.payload.into_vec() {
119                        match data {
120                            protocol::Response::Changed(change) => {
121                                // If we can't queue up a change anymore, we've
122                                // been closed and therefore want to quit
123                                if tx.is_closed() {
124                                    break;
125                                }
126
127                                // Otherwise, send over the change
128                                if let Err(x) = tx.send(change).await {
129                                    error!(
130                                        "Watcher for {:?} failed to send change {:?}",
131                                        path, x.0
132                                    );
133                                    break;
134                                }
135                            }
136                            _ => continue,
137                        }
138                    }
139                }
140            }
141        });
142
143        Ok(Self {
144            path,
145            channel,
146            task,
147            rx,
148            active: true,
149        })
150    }
151
152    /// Returns a reference to the path this watcher is monitoring
153    pub fn path(&self) -> &Path {
154        self.path.as_path()
155    }
156
157    /// Returns true if the watcher is still actively watching for changes
158    pub fn is_active(&self) -> bool {
159        self.active
160    }
161
162    /// Returns the next change detected by the watcher, or none if the watcher has concluded
163    pub async fn next(&mut self) -> Option<Change> {
164        self.rx.recv().await
165    }
166
167    /// Unwatches the path being watched, closing out the watcher
168    pub async fn unwatch(&mut self) -> io::Result<()> {
169        trace!("Unwatching {:?}", self.path);
170        self.channel.unwatch(self.path.to_path_buf()).await?;
171
172        // Kill our task that processes inbound changes if we have successfully unwatched the path
173        self.task.abort();
174        self.active = false;
175
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::sync::Arc;
183
184    use distant_net::common::{FramedTransport, InmemoryTransport, Response};
185    use distant_net::Client;
186    use test_log::test;
187    use tokio::sync::Mutex;
188
189    use super::*;
190    use crate::protocol::ChangeKind;
191    use crate::DistantClient;
192
193    fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) {
194        let (t1, t2) = FramedTransport::pair(100);
195        (t1, Client::spawn_inmemory(t2, Default::default()))
196    }
197
198    #[test(tokio::test)]
199    async fn watcher_should_have_path_reflect_watched_path() {
200        let (mut transport, session) = make_session();
201        let test_path = Path::new("/some/test/path");
202
203        // Create a task for watcher as we need to handle the request and a response
204        // in a separate async block
205        let watch_task = tokio::spawn(async move {
206            Watcher::watch(
207                session.clone_channel(),
208                test_path,
209                true,
210                ChangeKindSet::empty(),
211                ChangeKindSet::empty(),
212            )
213            .await
214        });
215
216        // Wait until we get the request from the session
217        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
218
219        // Send back an acknowledgement that a watcher was created
220        transport
221            .write_frame_for(&Response::new(req.id, protocol::Response::Ok))
222            .await
223            .unwrap();
224
225        // Get the watcher and verify the path
226        let watcher = watch_task.await.unwrap().unwrap();
227        assert_eq!(watcher.path(), test_path);
228    }
229
230    #[test(tokio::test)]
231    async fn watcher_should_support_getting_next_change() {
232        let (mut transport, session) = make_session();
233        let test_path = Path::new("/some/test/path");
234
235        // Create a task for watcher as we need to handle the request and a response
236        // in a separate async block
237        let watch_task = tokio::spawn(async move {
238            Watcher::watch(
239                session.clone_channel(),
240                test_path,
241                true,
242                ChangeKindSet::empty(),
243                ChangeKindSet::empty(),
244            )
245            .await
246        });
247
248        // Wait until we get the request from the session
249        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
250
251        // Send back an acknowledgement that a watcher was created
252        transport
253            .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
254            .await
255            .unwrap();
256
257        // Get the watcher
258        let mut watcher = watch_task.await.unwrap().unwrap();
259
260        // Send some changes related to the file
261        transport
262            .write_frame_for(&Response::new(
263                req.id,
264                vec![
265                    protocol::Response::Changed(Change {
266                        timestamp: 0,
267                        kind: ChangeKind::Access,
268                        path: test_path.to_path_buf(),
269                        details: Default::default(),
270                    }),
271                    protocol::Response::Changed(Change {
272                        timestamp: 1,
273                        kind: ChangeKind::Modify,
274                        path: test_path.to_path_buf(),
275                        details: Default::default(),
276                    }),
277                ],
278            ))
279            .await
280            .unwrap();
281
282        // Verify that the watcher gets the changes, one at a time
283        let change = watcher.next().await.expect("Watcher closed unexpectedly");
284        assert_eq!(
285            change,
286            Change {
287                timestamp: 0,
288                kind: ChangeKind::Access,
289                path: test_path.to_path_buf(),
290                details: Default::default(),
291            }
292        );
293
294        let change = watcher.next().await.expect("Watcher closed unexpectedly");
295        assert_eq!(
296            change,
297            Change {
298                timestamp: 1,
299                kind: ChangeKind::Modify,
300                path: test_path.to_path_buf(),
301                details: Default::default(),
302            }
303        );
304    }
305
306    #[test(tokio::test)]
307    async fn watcher_should_distinguish_change_events_and_only_receive_changes_for_itself() {
308        let (mut transport, session) = make_session();
309        let test_path = Path::new("/some/test/path");
310
311        // Create a task for watcher as we need to handle the request and a response
312        // in a separate async block
313        let watch_task = tokio::spawn(async move {
314            Watcher::watch(
315                session.clone_channel(),
316                test_path,
317                true,
318                ChangeKindSet::empty(),
319                ChangeKindSet::empty(),
320            )
321            .await
322        });
323
324        // Wait until we get the request from the session
325        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
326
327        // Send back an acknowledgement that a watcher was created
328        transport
329            .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
330            .await
331            .unwrap();
332
333        // Get the watcher
334        let mut watcher = watch_task.await.unwrap().unwrap();
335
336        // Send a change from the appropriate origin
337        transport
338            .write_frame_for(&Response::new(
339                req.id.clone(),
340                protocol::Response::Changed(Change {
341                    timestamp: 0,
342                    kind: ChangeKind::Access,
343                    path: test_path.to_path_buf(),
344                    details: Default::default(),
345                }),
346            ))
347            .await
348            .unwrap();
349
350        // Send a change from a different origin
351        transport
352            .write_frame_for(&Response::new(
353                req.id.clone() + "1",
354                protocol::Response::Changed(Change {
355                    timestamp: 1,
356                    kind: ChangeKind::Modify,
357                    path: test_path.to_path_buf(),
358                    details: Default::default(),
359                }),
360            ))
361            .await
362            .unwrap();
363
364        // Send a change from the appropriate origin
365        transport
366            .write_frame_for(&Response::new(
367                req.id,
368                protocol::Response::Changed(Change {
369                    timestamp: 2,
370                    kind: ChangeKind::Delete,
371                    path: test_path.to_path_buf(),
372                    details: Default::default(),
373                }),
374            ))
375            .await
376            .unwrap();
377
378        // Verify that the watcher gets the changes, one at a time
379        let change = watcher.next().await.expect("Watcher closed unexpectedly");
380        assert_eq!(
381            change,
382            Change {
383                timestamp: 0,
384                kind: ChangeKind::Access,
385                path: test_path.to_path_buf(),
386                details: Default::default(),
387            }
388        );
389
390        let change = watcher.next().await.expect("Watcher closed unexpectedly");
391        assert_eq!(
392            change,
393            Change {
394                timestamp: 2,
395                kind: ChangeKind::Delete,
396                path: test_path.to_path_buf(),
397                details: Default::default(),
398            }
399        );
400    }
401
402    #[test(tokio::test)]
403    async fn watcher_should_stop_receiving_events_if_unwatched() {
404        let (mut transport, session) = make_session();
405        let test_path = Path::new("/some/test/path");
406
407        // Create a task for watcher as we need to handle the request and a response
408        // in a separate async block
409        let watch_task = tokio::spawn(async move {
410            Watcher::watch(
411                session.clone_channel(),
412                test_path,
413                true,
414                ChangeKindSet::empty(),
415                ChangeKindSet::empty(),
416            )
417            .await
418        });
419
420        // Wait until we get the request from the session
421        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
422
423        // Send back an acknowledgement that a watcher was created
424        transport
425            .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
426            .await
427            .unwrap();
428
429        // Send some changes from the appropriate origin
430        transport
431            .write_frame_for(&Response::new(
432                req.id,
433                vec![
434                    protocol::Response::Changed(Change {
435                        timestamp: 0,
436                        kind: ChangeKind::Access,
437                        path: test_path.to_path_buf(),
438                        details: Default::default(),
439                    }),
440                    protocol::Response::Changed(Change {
441                        timestamp: 1,
442                        kind: ChangeKind::Modify,
443                        path: test_path.to_path_buf(),
444                        details: Default::default(),
445                    }),
446                    protocol::Response::Changed(Change {
447                        timestamp: 2,
448                        kind: ChangeKind::Delete,
449                        path: test_path.to_path_buf(),
450                        details: Default::default(),
451                    }),
452                ],
453            ))
454            .await
455            .unwrap();
456
457        // Wait a little bit for all changes to be queued
458        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
459
460        // Create a task for for unwatching as we need to handle the request and a response
461        // in a separate async block
462        let watcher = Arc::new(Mutex::new(watch_task.await.unwrap().unwrap()));
463
464        // Verify that the watcher gets the first change
465        let change = watcher
466            .lock()
467            .await
468            .next()
469            .await
470            .expect("Watcher closed unexpectedly");
471        assert_eq!(
472            change,
473            Change {
474                timestamp: 0,
475                kind: ChangeKind::Access,
476                path: test_path.to_path_buf(),
477                details: Default::default(),
478            }
479        );
480
481        // Unwatch the watcher, verify the request is sent out, and respond with ok
482        let watcher_2 = Arc::clone(&watcher);
483        let unwatch_task = tokio::spawn(async move { watcher_2.lock().await.unwatch().await });
484
485        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
486
487        transport
488            .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
489            .await
490            .unwrap();
491
492        // Wait for the unwatch to complete
493        unwatch_task.await.unwrap().unwrap();
494
495        transport
496            .write_frame_for(&Response::new(
497                req.id,
498                protocol::Response::Changed(Change {
499                    timestamp: 3,
500                    kind: ChangeKind::Unknown,
501                    path: test_path.to_path_buf(),
502                    details: Default::default(),
503                }),
504            ))
505            .await
506            .unwrap();
507
508        // Verify that we get any remaining changes that were received before unwatched,
509        // but nothing new after that
510        assert_eq!(
511            watcher.lock().await.next().await,
512            Some(Change {
513                timestamp: 1,
514                kind: ChangeKind::Modify,
515                path: test_path.to_path_buf(),
516                details: Default::default(),
517            })
518        );
519        assert_eq!(
520            watcher.lock().await.next().await,
521            Some(Change {
522                timestamp: 2,
523                kind: ChangeKind::Delete,
524                path: test_path.to_path_buf(),
525                details: Default::default(),
526            })
527        );
528        assert_eq!(watcher.lock().await.next().await, None);
529    }
530}