Skip to main content

upstream_rs/providers/http/
direct_adapter.rs

1use anyhow::{Result, anyhow, bail};
2use chrono::{DateTime, Datelike, Timelike, Utc};
3use std::path::Path;
4
5use crate::models::common::Version;
6use crate::models::provider::{Asset, Release};
7use crate::providers::http::http_client::{ConditionalProbeResult, HttpClient};
8use crate::providers::release_provider::ReleaseProvider;
9
10#[derive(Debug, Clone)]
11pub struct DirectAdapter {
12    client: HttpClient,
13}
14
15impl DirectAdapter {
16    fn parse_version_from_filename(filename: &str) -> Option<Version> {
17        Version::from_filename(filename).ok()
18    }
19
20    fn version_from_last_modified(dt: DateTime<Utc>) -> Version {
21        let major = dt.year_ce().1;
22        let minor = dt.ordinal();
23        let patch = dt.num_seconds_from_midnight();
24        Version::new(major, minor, patch, false)
25    }
26
27    pub fn new(client: HttpClient) -> Self {
28        Self { client }
29    }
30
31    pub async fn download_asset<F>(
32        &self,
33        asset: &Asset,
34        destination_path: &Path,
35        dl_callback: &mut Option<F>,
36    ) -> Result<()>
37    where
38        F: FnMut(u64, u64),
39    {
40        self.client
41            .download_file(&asset.download_url, destination_path, dl_callback)
42            .await
43    }
44
45    pub async fn get_release_by_tag(&self, _slug: &str, _tag: &str) -> Result<Release> {
46        bail!("Direct provider does not support tagged releases")
47    }
48
49    pub async fn get_latest_release(&self, slug: &str) -> Result<Release> {
50        self.get_latest_release_if_modified_since(slug, None)
51            .await?
52            .ok_or_else(|| anyhow!("Unexpected not-modified response for direct provider"))
53    }
54
55    pub async fn get_latest_release_if_modified_since(
56        &self,
57        slug: &str,
58        last_upgraded: Option<DateTime<Utc>>,
59    ) -> Result<Option<Release>> {
60        let probe = self
61            .client
62            .probe_asset_if_modified_since(slug, last_upgraded)
63            .await?;
64        let info = match probe {
65            ConditionalProbeResult::NotModified => return Ok(None),
66            ConditionalProbeResult::Asset(info) => info,
67        };
68        let published_at = info
69            .last_modified
70            .unwrap_or_else(|| last_upgraded.unwrap_or_else(Utc::now));
71        let version = Self::parse_version_from_filename(&info.name)
72            .or_else(|| info.last_modified.map(Self::version_from_last_modified))
73            .unwrap_or_else(|| Version::new(0, 0, 0, false));
74
75        let asset = Asset::new(
76            info.download_url,
77            1,
78            info.name.clone(),
79            info.size,
80            published_at,
81        );
82
83        let release_name = if let Some(etag) = info.etag {
84            format!("{} [{}]", info.name, etag)
85        } else {
86            info.name
87        };
88
89        Ok(Some(Release {
90            id: 1,
91            tag: "direct".to_string(),
92            name: release_name,
93            body: "Direct HTTP asset".to_string(),
94            is_draft: false,
95            is_prerelease: false,
96            assets: vec![asset],
97            version,
98            published_at,
99        }))
100    }
101
102    pub async fn get_releases(
103        &self,
104        slug: &str,
105        _per_page: Option<u32>,
106        _max_total: Option<u32>,
107    ) -> Result<Vec<Release>> {
108        Ok(vec![self.get_latest_release(slug).await?])
109    }
110}
111
112#[async_trait::async_trait(?Send)]
113impl ReleaseProvider for DirectAdapter {
114    async fn get_latest_release(&self, slug: &str) -> Result<Release> {
115        DirectAdapter::get_latest_release(self, slug).await
116    }
117
118    async fn get_releases(
119        &self,
120        slug: &str,
121        per_page: Option<u32>,
122        max_total: Option<u32>,
123    ) -> Result<Vec<Release>> {
124        DirectAdapter::get_releases(self, slug, per_page, max_total).await
125    }
126
127    async fn get_release_by_tag(&self, slug: &str, tag: &str) -> Result<Release> {
128        DirectAdapter::get_release_by_tag(self, slug, tag).await
129    }
130
131    async fn get_latest_release_if_modified_since(
132        &self,
133        slug: &str,
134        last_upgraded: Option<DateTime<Utc>>,
135    ) -> Result<Option<Release>> {
136        DirectAdapter::get_latest_release_if_modified_since(self, slug, last_upgraded).await
137    }
138
139    async fn download_asset(
140        &self,
141        asset: &Asset,
142        destination_path: &Path,
143        dl_callback: Option<&mut (dyn FnMut(u64, u64) + '_)>,
144    ) -> Result<()> {
145        let mut forwarded = dl_callback;
146        DirectAdapter::download_asset(self, asset, destination_path, &mut forwarded).await
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::DirectAdapter;
153    use crate::providers::http::HttpClient;
154    use chrono::Utc;
155    use std::io::{BufRead, BufReader, Write};
156    use std::net::TcpListener;
157    use std::sync::mpsc;
158    use std::thread;
159
160    fn spawn_test_server<F>(max_requests: usize, handler: F) -> String
161    where
162        F: Fn(&str, &str) -> String + Send + 'static,
163    {
164        let (tx, rx) = mpsc::channel();
165        thread::spawn(move || {
166            let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
167            let addr = listener.local_addr().expect("resolve local addr");
168            tx.send(addr).expect("send test server addr");
169
170            for _ in 0..max_requests {
171                let (mut stream, _) = listener.accept().expect("accept request");
172                let cloned = stream.try_clone().expect("clone stream");
173                let mut reader = BufReader::new(cloned);
174
175                let mut request_line = String::new();
176                reader
177                    .read_line(&mut request_line)
178                    .expect("read request line");
179                let mut parts = request_line.split_whitespace();
180                let method = parts.next().unwrap_or("");
181                let path = parts.next().unwrap_or("/");
182
183                let mut line = String::new();
184                loop {
185                    line.clear();
186                    reader.read_line(&mut line).expect("read request headers");
187                    if line == "\r\n" || line.is_empty() {
188                        break;
189                    }
190                }
191
192                let response = handler(method, path);
193                stream
194                    .write_all(response.as_bytes())
195                    .expect("write response");
196                stream.flush().expect("flush response");
197            }
198        });
199
200        let addr = rx.recv().expect("receive server address");
201        format!("http://{}", addr)
202    }
203
204    fn http_response(status_line: &str, headers: &[(&str, &str)], body: &str) -> String {
205        let mut out = format!("{status_line}\r\n");
206        for (k, v) in headers {
207            out.push_str(&format!("{k}: {v}\r\n"));
208        }
209        out.push_str("\r\n");
210        out.push_str(body);
211        out
212    }
213
214    #[test]
215    fn parse_version_from_filename_extracts_semver_triplet() {
216        let version = DirectAdapter::parse_version_from_filename("tool-v1.2.3-linux-x86_64.tar.gz")
217            .expect("parsed version");
218        assert_eq!(version.major, 1);
219        assert_eq!(version.minor, 2);
220        assert_eq!(version.patch, 3);
221    }
222
223    #[tokio::test]
224    async fn get_latest_release_builds_release_from_probe_metadata() {
225        let etag = "\"etag-value\"".to_string();
226        let server = spawn_test_server(1, move |method, _| {
227            assert_eq!(method, "HEAD");
228            http_response(
229                "HTTP/1.1 200 OK",
230                &[
231                    ("Connection", "close"),
232                    ("Content-Length", "42"),
233                    ("ETag", &etag),
234                    ("Last-Modified", "Tue, 10 Feb 2026 15:04:05 GMT"),
235                ],
236                "",
237            )
238        });
239        let adapter = DirectAdapter::new(HttpClient::new().expect("http client"));
240        let release = adapter
241            .get_latest_release(&format!("{server}/tool-v2.3.4.tar.gz"))
242            .await
243            .expect("release");
244
245        assert_eq!(release.assets.len(), 1);
246        assert_eq!(release.version.major, 2);
247        assert_eq!(release.version.minor, 3);
248        assert_eq!(release.version.patch, 4);
249        assert!(release.name.contains("etag-value"));
250    }
251
252    #[tokio::test]
253    async fn conditional_latest_release_returns_none_on_not_modified() {
254        let server = spawn_test_server(1, move |method, _| {
255            assert_eq!(method, "HEAD");
256            http_response("HTTP/1.1 304 Not Modified", &[("Connection", "close")], "")
257        });
258        let adapter = DirectAdapter::new(HttpClient::new().expect("http client"));
259
260        let release = adapter
261            .get_latest_release_if_modified_since(&server, Some(Utc::now()))
262            .await
263            .expect("conditional release");
264        assert!(release.is_none());
265    }
266}