Skip to main content

watch_path/
ftp.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use suppaftp::FtpStream;
5
6use crate::url::WatchTarget;
7use crate::watcher::{
8    ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
9};
10
11pub struct FtpWatcher {
12    stream: FtpStream,
13    target: WatchTarget,
14    known_mtimes: HashMap<String, String>,
15    last_poll: Instant,
16    poll_interval: std::time::Duration,
17    loss_timeout: std::time::Duration,
18    last_success: Instant,
19    pending: Vec<WatchEvent>,
20}
21
22impl FtpWatcher {
23    pub fn connect(target: WatchTarget, options: &WatchOptions) -> Result<Self, WatchError> {
24        let host = target
25            .host
26            .as_deref()
27            .ok_or_else(|| WatchError::InvalidUrl("FTP requires a host".to_string()))?;
28
29        let port = target.port.unwrap_or(21);
30        let addr = format!("{host}:{port}");
31
32        let mut stream = FtpStream::connect(&addr).map_err(|e| WatchError::Ftp(e.to_string()))?;
33
34        let user = target.user.as_deref().unwrap_or("anonymous");
35        let pass = options.password.as_deref().unwrap_or("anonymous@");
36
37        stream
38            .login(user, pass)
39            .map_err(|e| WatchError::Ftp(e.to_string()))?;
40
41        Ok(Self {
42            stream,
43            target,
44            known_mtimes: HashMap::new(),
45            last_poll: Instant::now() - options.poll_interval,
46            poll_interval: options.poll_interval,
47            loss_timeout: options.loss_timeout,
48            last_success: Instant::now(),
49            pending: Vec::new(),
50        })
51    }
52}
53
54impl PathWatcher for FtpWatcher {
55    fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
56        if self.last_poll.elapsed() < self.poll_interval {
57            return Ok(Vec::new());
58        }
59        self.last_poll = Instant::now();
60
61        let listing = self
62            .stream
63            .nlst(Some(&self.target.path))
64            .map_err(|e| WatchError::Ftp(e.to_string()))?;
65
66        self.last_success = Instant::now();
67
68        let mut current: HashMap<String, String> = HashMap::new();
69        for file_path in &listing {
70            let mdtm = self
71                .stream
72                .mdtm(file_path)
73                .map(|dt| dt.to_string())
74                .unwrap_or_default();
75            current.insert(file_path.clone(), mdtm);
76        }
77
78        for (file_path, mtime) in &current {
79            let changed = match self.known_mtimes.get(file_path) {
80                Some(old_mtime) => mtime != old_mtime,
81                None => true,
82            };
83            if changed {
84                let kind = if self.known_mtimes.contains_key(file_path) {
85                    WatchEventKind::Modified
86                } else {
87                    WatchEventKind::Created
88                };
89                self.pending.push(WatchEvent {
90                    path: file_path.clone(),
91                    kind,
92                });
93            }
94        }
95
96        self.known_mtimes = current;
97        Ok(std::mem::take(&mut self.pending))
98    }
99
100    fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
101        let cursor = self
102            .stream
103            .retr_as_buffer(path)
104            .map_err(|e| WatchError::Ftp(e.to_string()))?;
105
106        self.last_success = Instant::now();
107        Ok(cursor.into_inner())
108    }
109
110    fn has_pending(&self) -> bool {
111        !self.pending.is_empty()
112    }
113
114    fn connection_state(&self) -> ConnectionState {
115        let elapsed = self.last_success.elapsed();
116        if elapsed < self.poll_interval * 2 {
117            ConnectionState::Connected
118        } else if elapsed < self.loss_timeout {
119            ConnectionState::Degraded
120        } else {
121            ConnectionState::Lost
122        }
123    }
124}