Skip to main content

watch_path/
http.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use crate::url::WatchTarget;
5use crate::watcher::{
6    ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
7};
8
9pub struct HttpWatcher {
10    client: reqwest::blocking::Client,
11    base_url: String,
12    known_etags: HashMap<String, String>,
13    last_poll: Instant,
14    poll_interval: std::time::Duration,
15    loss_timeout: std::time::Duration,
16    last_success: Instant,
17    pending: Vec<WatchEvent>,
18}
19
20impl HttpWatcher {
21    pub fn connect(target: &WatchTarget, options: &WatchOptions) -> Result<Self, WatchError> {
22        let scheme = match target.protocol {
23            crate::url::Protocol::Https => "https",
24            _ => "http",
25        };
26        let host = target
27            .host
28            .as_deref()
29            .ok_or_else(|| WatchError::InvalidUrl("HTTP requires a host".to_string()))?;
30
31        let base_url = match target.port {
32            Some(port) => format!("{scheme}://{host}:{port}{}", target.path),
33            None => format!("{scheme}://{host}{}", target.path),
34        };
35
36        let client = reqwest::blocking::Client::new();
37
38        let response = client
39            .head(&base_url)
40            .send()
41            .map_err(|e| WatchError::Http(e.to_string()))?;
42
43        if !response.status().is_success() {
44            return Err(WatchError::Connection(format!(
45                "HTTP {}: {}",
46                response.status(),
47                base_url
48            )));
49        }
50
51        Ok(Self {
52            client,
53            base_url,
54            known_etags: HashMap::new(),
55            last_poll: Instant::now() - options.poll_interval,
56            poll_interval: options.poll_interval,
57            loss_timeout: options.loss_timeout,
58            last_success: Instant::now(),
59            pending: Vec::new(),
60        })
61    }
62}
63
64impl PathWatcher for HttpWatcher {
65    fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
66        if self.last_poll.elapsed() < self.poll_interval {
67            return Ok(Vec::new());
68        }
69        self.last_poll = Instant::now();
70
71        let response = self
72            .client
73            .head(&self.base_url)
74            .send()
75            .map_err(|e| WatchError::Http(e.to_string()))?;
76
77        if !response.status().is_success() {
78            return Err(WatchError::Http(format!(
79                "HTTP {} from {}",
80                response.status(),
81                self.base_url
82            )));
83        }
84
85        self.last_success = Instant::now();
86
87        let etag = response
88            .headers()
89            .get("etag")
90            .or_else(|| response.headers().get("last-modified"))
91            .map(|v| v.to_str().unwrap_or("").to_string())
92            .unwrap_or_default();
93
94        if !etag.is_empty() {
95            let changed = match self.known_etags.get(&self.base_url) {
96                Some(old_etag) => &etag != old_etag,
97                None => true,
98            };
99            if changed {
100                let kind = if self.known_etags.contains_key(&self.base_url) {
101                    WatchEventKind::Modified
102                } else {
103                    WatchEventKind::Created
104                };
105                self.pending.push(WatchEvent {
106                    path: self.base_url.clone(),
107                    kind,
108                });
109                self.known_etags.insert(self.base_url.clone(), etag);
110            }
111        }
112
113        Ok(std::mem::take(&mut self.pending))
114    }
115
116    fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
117        let url = if path.starts_with("http://") || path.starts_with("https://") {
118            path.to_string()
119        } else {
120            format!("{}/{}", self.base_url.trim_end_matches('/'), path)
121        };
122
123        let response = self
124            .client
125            .get(&url)
126            .send()
127            .map_err(|e| WatchError::Http(e.to_string()))?;
128
129        if !response.status().is_success() {
130            return Err(WatchError::Http(format!(
131                "HTTP {} reading {}",
132                response.status(),
133                url
134            )));
135        }
136
137        self.last_success = Instant::now();
138        response
139            .bytes()
140            .map(|b| b.to_vec())
141            .map_err(|e| WatchError::Http(e.to_string()))
142    }
143
144    fn has_pending(&self) -> bool {
145        !self.pending.is_empty()
146    }
147
148    fn connection_state(&self) -> ConnectionState {
149        let elapsed = self.last_success.elapsed();
150        if elapsed < self.poll_interval * 2 {
151            ConnectionState::Connected
152        } else if elapsed < self.loss_timeout {
153            ConnectionState::Degraded
154        } else {
155            ConnectionState::Lost
156        }
157    }
158}