1use std::collections::HashMap;
2use std::time::Instant;
3
4use crate::url::WatchTarget;
5use crate::watcher::{
6 ConnectionState, PathWatcher, WatchError, WatchEvent, WatchEventKind, WatchOptions,
7};
8
9pub struct HttpWatcher {
10 client: reqwest::blocking::Client,
11 base_url: String,
12 known_etags: HashMap<String, String>,
13 last_poll: Instant,
14 poll_interval: std::time::Duration,
15 loss_timeout: std::time::Duration,
16 last_success: Instant,
17 pending: Vec<WatchEvent>,
18}
19
20impl HttpWatcher {
21 pub fn connect(target: &WatchTarget, options: &WatchOptions) -> Result<Self, WatchError> {
22 let scheme = match target.protocol {
23 crate::url::Protocol::Https => "https",
24 _ => "http",
25 };
26 let host = target
27 .host
28 .as_deref()
29 .ok_or_else(|| WatchError::InvalidUrl("HTTP requires a host".to_string()))?;
30
31 let base_url = match target.port {
32 Some(port) => format!("{scheme}://{host}:{port}{}", target.path),
33 None => format!("{scheme}://{host}{}", target.path),
34 };
35
36 let client = reqwest::blocking::Client::new();
37
38 let response = client
39 .head(&base_url)
40 .send()
41 .map_err(|e| WatchError::Http(e.to_string()))?;
42
43 if !response.status().is_success() {
44 return Err(WatchError::Connection(format!(
45 "HTTP {}: {}",
46 response.status(),
47 base_url
48 )));
49 }
50
51 Ok(Self {
52 client,
53 base_url,
54 known_etags: HashMap::new(),
55 last_poll: Instant::now() - options.poll_interval,
56 poll_interval: options.poll_interval,
57 loss_timeout: options.loss_timeout,
58 last_success: Instant::now(),
59 pending: Vec::new(),
60 })
61 }
62}
63
64impl PathWatcher for HttpWatcher {
65 fn poll(&mut self) -> Result<Vec<WatchEvent>, WatchError> {
66 if self.last_poll.elapsed() < self.poll_interval {
67 return Ok(Vec::new());
68 }
69 self.last_poll = Instant::now();
70
71 let response = self
72 .client
73 .head(&self.base_url)
74 .send()
75 .map_err(|e| WatchError::Http(e.to_string()))?;
76
77 if !response.status().is_success() {
78 return Err(WatchError::Http(format!(
79 "HTTP {} from {}",
80 response.status(),
81 self.base_url
82 )));
83 }
84
85 self.last_success = Instant::now();
86
87 let etag = response
88 .headers()
89 .get("etag")
90 .or_else(|| response.headers().get("last-modified"))
91 .map(|v| v.to_str().unwrap_or("").to_string())
92 .unwrap_or_default();
93
94 if !etag.is_empty() {
95 let changed = match self.known_etags.get(&self.base_url) {
96 Some(old_etag) => &etag != old_etag,
97 None => true,
98 };
99 if changed {
100 let kind = if self.known_etags.contains_key(&self.base_url) {
101 WatchEventKind::Modified
102 } else {
103 WatchEventKind::Created
104 };
105 self.pending.push(WatchEvent {
106 path: self.base_url.clone(),
107 kind,
108 });
109 self.known_etags.insert(self.base_url.clone(), etag);
110 }
111 }
112
113 Ok(std::mem::take(&mut self.pending))
114 }
115
116 fn read(&mut self, path: &str) -> Result<Vec<u8>, WatchError> {
117 let url = if path.starts_with("http://") || path.starts_with("https://") {
118 path.to_string()
119 } else {
120 format!("{}/{}", self.base_url.trim_end_matches('/'), path)
121 };
122
123 let response = self
124 .client
125 .get(&url)
126 .send()
127 .map_err(|e| WatchError::Http(e.to_string()))?;
128
129 if !response.status().is_success() {
130 return Err(WatchError::Http(format!(
131 "HTTP {} reading {}",
132 response.status(),
133 url
134 )));
135 }
136
137 self.last_success = Instant::now();
138 response
139 .bytes()
140 .map(|b| b.to_vec())
141 .map_err(|e| WatchError::Http(e.to_string()))
142 }
143
144 fn has_pending(&self) -> bool {
145 !self.pending.is_empty()
146 }
147
148 fn connection_state(&self) -> ConnectionState {
149 let elapsed = self.last_success.elapsed();
150 if elapsed < self.poll_interval * 2 {
151 ConnectionState::Connected
152 } else if elapsed < self.loss_timeout {
153 ConnectionState::Degraded
154 } else {
155 ConnectionState::Lost
156 }
157 }
158}