upstream_rs/providers/http/
direct_adapter.rs1use anyhow::{Result, anyhow, bail};
2use chrono::{DateTime, Datelike, Timelike, Utc};
3use std::path::Path;
4
5use crate::models::common::Version;
6use crate::models::provider::{Asset, Release};
7use crate::providers::http::http_client::{ConditionalProbeResult, HttpClient};
8
9#[derive(Debug, Clone)]
10pub struct DirectAdapter {
11 client: HttpClient,
12}
13
14impl DirectAdapter {
15 fn parse_version_from_filename(filename: &str) -> Option<Version> {
16 Version::from_filename(filename).ok()
17 }
18
19 fn version_from_last_modified(dt: DateTime<Utc>) -> Version {
20 let major = dt.year_ce().1;
21 let minor = dt.ordinal();
22 let patch = dt.num_seconds_from_midnight();
23 Version::new(major, minor, patch, false)
24 }
25
26 pub fn new(client: HttpClient) -> Self {
27 Self { client }
28 }
29
30 pub async fn download_asset<F>(
31 &self,
32 asset: &Asset,
33 destination_path: &Path,
34 dl_callback: &mut Option<F>,
35 ) -> Result<()>
36 where
37 F: FnMut(u64, u64),
38 {
39 self.client
40 .download_file(&asset.download_url, destination_path, dl_callback)
41 .await
42 }
43
44 pub async fn get_release_by_tag(&self, _slug: &str, _tag: &str) -> Result<Release> {
45 bail!("Direct provider does not support tagged releases")
46 }
47
48 pub async fn get_latest_release(&self, slug: &str) -> Result<Release> {
49 self.get_latest_release_if_modified_since(slug, None)
50 .await?
51 .ok_or_else(|| anyhow!("Unexpected not-modified response for direct provider"))
52 }
53
54 pub async fn get_latest_release_if_modified_since(
55 &self,
56 slug: &str,
57 last_upgraded: Option<DateTime<Utc>>,
58 ) -> Result<Option<Release>> {
59 let probe = self
60 .client
61 .probe_asset_if_modified_since(slug, last_upgraded)
62 .await?;
63 let info = match probe {
64 ConditionalProbeResult::NotModified => return Ok(None),
65 ConditionalProbeResult::Asset(info) => info,
66 };
67 let published_at = info.last_modified.unwrap_or_else(Utc::now);
68 let version = Self::parse_version_from_filename(&info.name)
69 .or_else(|| info.last_modified.map(Self::version_from_last_modified))
70 .unwrap_or_else(|| Version::new(0, 0, 0, false));
71
72 let asset = Asset::new(
73 info.download_url,
74 1,
75 info.name.clone(),
76 info.size,
77 published_at,
78 );
79
80 let release_name = if let Some(etag) = info.etag {
81 format!("{} [{}]", info.name, etag)
82 } else {
83 info.name
84 };
85
86 Ok(Some(Release {
87 id: 1,
88 tag: "direct".to_string(),
89 name: release_name,
90 body: "Direct HTTP asset".to_string(),
91 is_draft: false,
92 is_prerelease: false,
93 assets: vec![asset],
94 version,
95 published_at,
96 }))
97 }
98
99 pub async fn get_releases(
100 &self,
101 slug: &str,
102 _per_page: Option<u32>,
103 _max_total: Option<u32>,
104 ) -> Result<Vec<Release>> {
105 Ok(vec![self.get_latest_release(slug).await?])
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::DirectAdapter;
112 use crate::providers::http::HttpClient;
113 use chrono::Utc;
114 use std::io::{BufRead, BufReader, Write};
115 use std::net::TcpListener;
116 use std::sync::mpsc;
117 use std::thread;
118
119 fn spawn_test_server<F>(max_requests: usize, handler: F) -> String
120 where
121 F: Fn(&str, &str) -> String + Send + 'static,
122 {
123 let (tx, rx) = mpsc::channel();
124 thread::spawn(move || {
125 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
126 let addr = listener.local_addr().expect("resolve local addr");
127 tx.send(addr).expect("send test server addr");
128
129 for _ in 0..max_requests {
130 let (mut stream, _) = listener.accept().expect("accept request");
131 let cloned = stream.try_clone().expect("clone stream");
132 let mut reader = BufReader::new(cloned);
133
134 let mut request_line = String::new();
135 reader
136 .read_line(&mut request_line)
137 .expect("read request line");
138 let mut parts = request_line.split_whitespace();
139 let method = parts.next().unwrap_or("");
140 let path = parts.next().unwrap_or("/");
141
142 let mut line = String::new();
143 loop {
144 line.clear();
145 reader.read_line(&mut line).expect("read request headers");
146 if line == "\r\n" || line.is_empty() {
147 break;
148 }
149 }
150
151 let response = handler(method, path);
152 stream
153 .write_all(response.as_bytes())
154 .expect("write response");
155 stream.flush().expect("flush response");
156 }
157 });
158
159 let addr = rx.recv().expect("receive server address");
160 format!("http://{}", addr)
161 }
162
163 fn http_response(status_line: &str, headers: &[(&str, &str)], body: &str) -> String {
164 let mut out = format!("{status_line}\r\n");
165 for (k, v) in headers {
166 out.push_str(&format!("{k}: {v}\r\n"));
167 }
168 out.push_str("\r\n");
169 out.push_str(body);
170 out
171 }
172
173 #[test]
174 fn parse_version_from_filename_extracts_semver_triplet() {
175 let version = DirectAdapter::parse_version_from_filename("tool-v1.2.3-linux-x86_64.tar.gz")
176 .expect("parsed version");
177 assert_eq!(version.major, 1);
178 assert_eq!(version.minor, 2);
179 assert_eq!(version.patch, 3);
180 }
181
182 #[tokio::test]
183 async fn get_latest_release_builds_release_from_probe_metadata() {
184 let etag = "\"etag-value\"".to_string();
185 let server = spawn_test_server(1, move |method, _| {
186 assert_eq!(method, "HEAD");
187 http_response(
188 "HTTP/1.1 200 OK",
189 &[
190 ("Connection", "close"),
191 ("Content-Length", "42"),
192 ("ETag", &etag),
193 ("Last-Modified", "Tue, 10 Feb 2026 15:04:05 GMT"),
194 ],
195 "",
196 )
197 });
198 let adapter = DirectAdapter::new(HttpClient::new().expect("http client"));
199 let release = adapter
200 .get_latest_release(&format!("{server}/tool-v2.3.4.tar.gz"))
201 .await
202 .expect("release");
203
204 assert_eq!(release.assets.len(), 1);
205 assert_eq!(release.version.major, 2);
206 assert_eq!(release.version.minor, 3);
207 assert_eq!(release.version.patch, 4);
208 assert!(release.name.contains("etag-value"));
209 }
210
211 #[tokio::test]
212 async fn conditional_latest_release_returns_none_on_not_modified() {
213 let server = spawn_test_server(1, move |method, _| {
214 assert_eq!(method, "HEAD");
215 http_response("HTTP/1.1 304 Not Modified", &[("Connection", "close")], "")
216 });
217 let adapter = DirectAdapter::new(HttpClient::new().expect("http client"));
218
219 let release = adapter
220 .get_latest_release_if_modified_since(&server, Some(Utc::now()))
221 .await
222 .expect("conditional release");
223 assert!(release.is_none());
224 }
225}