upstream_rs/providers/http/
direct_adapter.rs1use anyhow::{Result, anyhow, bail};
2use chrono::{DateTime, Datelike, Timelike, Utc};
3use std::path::Path;
4
5use crate::models::common::Version;
6use crate::models::provider::{Asset, Release};
7use crate::providers::http::http_client::{ConditionalProbeResult, HttpClient};
8use crate::providers::release_provider::ReleaseProvider;
9
10#[derive(Debug, Clone)]
11pub struct DirectAdapter {
12 client: HttpClient,
13}
14
15impl DirectAdapter {
16 fn parse_version_from_filename(filename: &str) -> Option<Version> {
17 Version::from_filename(filename).ok()
18 }
19
20 fn version_from_last_modified(dt: DateTime<Utc>) -> Version {
21 let major = dt.year_ce().1;
22 let minor = dt.ordinal();
23 let patch = dt.num_seconds_from_midnight();
24 Version::new(major, minor, patch, false)
25 }
26
27 pub fn new(client: HttpClient) -> Self {
28 Self { client }
29 }
30
31 pub async fn download_asset<F>(
32 &self,
33 asset: &Asset,
34 destination_path: &Path,
35 dl_callback: &mut Option<F>,
36 ) -> Result<()>
37 where
38 F: FnMut(u64, u64),
39 {
40 self.client
41 .download_file(&asset.download_url, destination_path, dl_callback)
42 .await
43 }
44
45 pub async fn get_release_by_tag(&self, _slug: &str, _tag: &str) -> Result<Release> {
46 bail!("Direct provider does not support tagged releases")
47 }
48
49 pub async fn get_latest_release(&self, slug: &str) -> Result<Release> {
50 self.get_latest_release_if_modified_since(slug, None)
51 .await?
52 .ok_or_else(|| anyhow!("Unexpected not-modified response for direct provider"))
53 }
54
55 pub async fn get_latest_release_if_modified_since(
56 &self,
57 slug: &str,
58 last_upgraded: Option<DateTime<Utc>>,
59 ) -> Result<Option<Release>> {
60 let probe = self
61 .client
62 .probe_asset_if_modified_since(slug, last_upgraded)
63 .await?;
64 let info = match probe {
65 ConditionalProbeResult::NotModified => return Ok(None),
66 ConditionalProbeResult::Asset(info) => info,
67 };
68 let published_at = info
69 .last_modified
70 .unwrap_or_else(|| last_upgraded.unwrap_or_else(Utc::now));
71 let version = Self::parse_version_from_filename(&info.name)
72 .or_else(|| info.last_modified.map(Self::version_from_last_modified))
73 .unwrap_or_else(|| Version::new(0, 0, 0, false));
74
75 let asset = Asset::new(
76 info.download_url,
77 1,
78 info.name.clone(),
79 info.size,
80 published_at,
81 );
82
83 let release_name = if let Some(etag) = info.etag {
84 format!("{} [{}]", info.name, etag)
85 } else {
86 info.name
87 };
88
89 Ok(Some(Release {
90 id: 1,
91 tag: "direct".to_string(),
92 name: release_name,
93 body: "Direct HTTP asset".to_string(),
94 is_draft: false,
95 is_prerelease: false,
96 assets: vec![asset],
97 version,
98 published_at,
99 }))
100 }
101
102 pub async fn get_releases(
103 &self,
104 slug: &str,
105 _per_page: Option<u32>,
106 _max_total: Option<u32>,
107 ) -> Result<Vec<Release>> {
108 Ok(vec![self.get_latest_release(slug).await?])
109 }
110}
111
112#[async_trait::async_trait(?Send)]
113impl ReleaseProvider for DirectAdapter {
114 async fn get_latest_release(&self, slug: &str) -> Result<Release> {
115 DirectAdapter::get_latest_release(self, slug).await
116 }
117
118 async fn get_releases(
119 &self,
120 slug: &str,
121 per_page: Option<u32>,
122 max_total: Option<u32>,
123 ) -> Result<Vec<Release>> {
124 DirectAdapter::get_releases(self, slug, per_page, max_total).await
125 }
126
127 async fn get_release_by_tag(&self, slug: &str, tag: &str) -> Result<Release> {
128 DirectAdapter::get_release_by_tag(self, slug, tag).await
129 }
130
131 async fn get_latest_release_if_modified_since(
132 &self,
133 slug: &str,
134 last_upgraded: Option<DateTime<Utc>>,
135 ) -> Result<Option<Release>> {
136 DirectAdapter::get_latest_release_if_modified_since(self, slug, last_upgraded).await
137 }
138
139 async fn download_asset(
140 &self,
141 asset: &Asset,
142 destination_path: &Path,
143 dl_callback: Option<&mut (dyn FnMut(u64, u64) + '_)>,
144 ) -> Result<()> {
145 let mut forwarded = dl_callback;
146 DirectAdapter::download_asset(self, asset, destination_path, &mut forwarded).await
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::DirectAdapter;
153 use crate::providers::http::HttpClient;
154 use chrono::Utc;
155 use std::io::{BufRead, BufReader, Write};
156 use std::net::TcpListener;
157 use std::sync::mpsc;
158 use std::thread;
159
160 fn spawn_test_server<F>(max_requests: usize, handler: F) -> String
161 where
162 F: Fn(&str, &str) -> String + Send + 'static,
163 {
164 let (tx, rx) = mpsc::channel();
165 thread::spawn(move || {
166 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
167 let addr = listener.local_addr().expect("resolve local addr");
168 tx.send(addr).expect("send test server addr");
169
170 for _ in 0..max_requests {
171 let (mut stream, _) = listener.accept().expect("accept request");
172 let cloned = stream.try_clone().expect("clone stream");
173 let mut reader = BufReader::new(cloned);
174
175 let mut request_line = String::new();
176 reader
177 .read_line(&mut request_line)
178 .expect("read request line");
179 let mut parts = request_line.split_whitespace();
180 let method = parts.next().unwrap_or("");
181 let path = parts.next().unwrap_or("/");
182
183 let mut line = String::new();
184 loop {
185 line.clear();
186 reader.read_line(&mut line).expect("read request headers");
187 if line == "\r\n" || line.is_empty() {
188 break;
189 }
190 }
191
192 let response = handler(method, path);
193 stream
194 .write_all(response.as_bytes())
195 .expect("write response");
196 stream.flush().expect("flush response");
197 }
198 });
199
200 let addr = rx.recv().expect("receive server address");
201 format!("http://{}", addr)
202 }
203
204 fn http_response(status_line: &str, headers: &[(&str, &str)], body: &str) -> String {
205 let mut out = format!("{status_line}\r\n");
206 for (k, v) in headers {
207 out.push_str(&format!("{k}: {v}\r\n"));
208 }
209 out.push_str("\r\n");
210 out.push_str(body);
211 out
212 }
213
214 #[test]
215 fn parse_version_from_filename_extracts_semver_triplet() {
216 let version = DirectAdapter::parse_version_from_filename("tool-v1.2.3-linux-x86_64.tar.gz")
217 .expect("parsed version");
218 assert_eq!(version.major, 1);
219 assert_eq!(version.minor, 2);
220 assert_eq!(version.patch, 3);
221 }
222
223 #[tokio::test]
224 async fn get_latest_release_builds_release_from_probe_metadata() {
225 let etag = "\"etag-value\"".to_string();
226 let server = spawn_test_server(1, move |method, _| {
227 assert_eq!(method, "HEAD");
228 http_response(
229 "HTTP/1.1 200 OK",
230 &[
231 ("Connection", "close"),
232 ("Content-Length", "42"),
233 ("ETag", &etag),
234 ("Last-Modified", "Tue, 10 Feb 2026 15:04:05 GMT"),
235 ],
236 "",
237 )
238 });
239 let adapter = DirectAdapter::new(HttpClient::new(Default::default()).expect("http client"));
240 let release = adapter
241 .get_latest_release(&format!("{server}/tool-v2.3.4.tar.gz"))
242 .await
243 .expect("release");
244
245 assert_eq!(release.assets.len(), 1);
246 assert_eq!(release.version.major, 2);
247 assert_eq!(release.version.minor, 3);
248 assert_eq!(release.version.patch, 4);
249 assert!(release.name.contains("etag-value"));
250 }
251
252 #[tokio::test]
253 async fn conditional_latest_release_returns_none_on_not_modified() {
254 let server = spawn_test_server(1, move |method, _| {
255 assert_eq!(method, "HEAD");
256 http_response("HTTP/1.1 304 Not Modified", &[("Connection", "close")], "")
257 });
258 let adapter = DirectAdapter::new(HttpClient::new(Default::default()).expect("http client"));
259
260 let release = adapter
261 .get_latest_release_if_modified_since(&server, Some(Utc::now()))
262 .await
263 .expect("conditional release");
264 assert!(release.is_none());
265 }
266}