1use std::collections::HashMap;
2use std::time::Instant;
3
4use suppaftp::FtpStream;
5
6use crate::url::WatchTarget;
7use crate::watcher::{
8 ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
9};
10
11pub struct FtpWatcher {
12 stream: FtpStream,
13 target: WatchTarget,
14 known_mtimes: HashMap<String, String>,
15 last_poll: Instant,
16 poll_interval: std::time::Duration,
17 loss_timeout: std::time::Duration,
18 last_success: Instant,
19 pending: Vec<WatchEvent>,
20}
21
22impl FtpWatcher {
23 pub fn connect(target: WatchTarget, options: &WatchOptions) -> Result<Self, WatchError> {
24 let host = target
25 .host
26 .as_deref()
27 .ok_or_else(|| WatchError::InvalidUrl("FTP requires a host".to_string()))?;
28
29 let port = target.port.unwrap_or(21);
30 let addr = format!("{host}:{port}");
31
32 let mut stream = FtpStream::connect(&addr).map_err(|e| WatchError::Ftp(e.to_string()))?;
33
34 let user = target.user.as_deref().unwrap_or("anonymous");
35 let pass = options.password.as_deref().unwrap_or("anonymous@");
36
37 stream
38 .login(user, pass)
39 .map_err(|e| WatchError::Ftp(e.to_string()))?;
40
41 Ok(Self {
42 stream,
43 target,
44 known_mtimes: HashMap::new(),
45 last_poll: Instant::now() - options.poll_interval,
46 poll_interval: options.poll_interval,
47 loss_timeout: options.loss_timeout,
48 last_success: Instant::now(),
49 pending: Vec::new(),
50 })
51 }
52}
53
54impl PathWatcher for FtpWatcher {
55 fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
56 if self.last_poll.elapsed() < self.poll_interval {
57 return Ok(Vec::new());
58 }
59 self.last_poll = Instant::now();
60
61 let listing = self
62 .stream
63 .nlst(Some(&self.target.path))
64 .map_err(|e| WatchError::Ftp(e.to_string()))?;
65
66 self.last_success = Instant::now();
67
68 let mut current: HashMap<String, String> = HashMap::new();
69 for file_path in &listing {
70 let mdtm = self
71 .stream
72 .mdtm(file_path)
73 .map(|dt| dt.to_string())
74 .unwrap_or_default();
75 current.insert(file_path.clone(), mdtm);
76 }
77
78 for (file_path, mtime) in ¤t {
79 let changed = match self.known_mtimes.get(file_path) {
80 Some(old_mtime) => mtime != old_mtime,
81 None => true,
82 };
83 if changed {
84 let kind = if self.known_mtimes.contains_key(file_path) {
85 WatchEventKind::Modified
86 } else {
87 WatchEventKind::Created
88 };
89 self.pending.push(WatchEvent {
90 path: file_path.clone(),
91 kind,
92 });
93 }
94 }
95
96 self.known_mtimes = current;
97 Ok(std::mem::take(&mut self.pending))
98 }
99
100 fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
101 let cursor = self
102 .stream
103 .retr_as_buffer(path)
104 .map_err(|e| WatchError::Ftp(e.to_string()))?;
105
106 self.last_success = Instant::now();
107 Ok(cursor.into_inner())
108 }
109
110 fn has_pending(&self) -> bool {
111 !self.pending.is_empty()
112 }
113
114 fn connection_state(&self) -> ConnectionState {
115 let elapsed = self.last_success.elapsed();
116 if elapsed < self.poll_interval * 2 {
117 ConnectionState::Connected
118 } else if elapsed < self.loss_timeout {
119 ConnectionState::Degraded
120 } else {
121 ConnectionState::Lost
122 }
123 }
124}