1use crate::{
2 discover::{DiscoveredAdvisory, DistributionContext},
3 metadata::{self, MetadataSource},
4 model::metadata::ProviderMetadata,
5 retrieve::RetrievedAdvisory,
6 rolie::{RolieSource, SourceFile},
7 source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::{
18 changes::{self, ChangeEntry, ChangeSource},
19 fetcher::{self, DataProcessor, Fetcher},
20 retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
21 utils::openpgp::PublicKey,
22 validate::source::{Key, KeySource, KeySourceError},
23};
24
25#[non_exhaustive]
26#[derive(Clone, Debug, Default, PartialEq, Eq)]
27pub struct HttpOptions {
28 pub since: Option<SystemTime>,
29}
30
31impl HttpOptions {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
37 self.since = since.into();
38 self
39 }
40}
41
42#[derive(Clone, Debug)]
43pub struct HttpSource {
44 fetcher: Fetcher,
45 metadata_source: Arc<dyn MetadataSource>,
46 options: HttpOptions,
47}
48
49impl HttpSource {
50 pub fn new<M: MetadataSource + 'static>(
51 metadata: M,
52 fetcher: Fetcher,
53 options: HttpOptions,
54 ) -> Self {
55 Self {
56 metadata_source: Arc::new(metadata),
57 fetcher,
58 options,
59 }
60 }
61}
62
63#[derive(Debug, thiserror::Error)]
64pub enum HttpSourceError {
65 #[error("Metadata discovery error: {0}")]
66 Metadata(#[from] metadata::Error),
67 #[error("Fetch error: {0}")]
68 Fetcher(#[from] fetcher::Error),
69 #[error("URL error: {0}")]
70 Url(#[from] ParseError),
71 #[error("CSV error: {0}")]
72 Csv(#[from] csv::Error),
73 #[error("JSON parse error: {0}")]
74 Json(#[from] serde_json::Error),
75}
76
77impl From<changes::Error> for HttpSourceError {
78 fn from(value: changes::Error) -> Self {
79 match value {
80 changes::Error::Fetcher(err) => Self::Fetcher(err),
81 changes::Error::Url(err) => Self::Url(err),
82 changes::Error::Csv(err) => Self::Csv(err),
83 }
84 }
85}
86
87impl walker_common::source::Source for HttpSource {
88 type Error = HttpSourceError;
89 type Retrieved = RetrievedAdvisory;
90}
91
92impl Source for HttpSource {
93 async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
94 Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
95 }
96
97 async fn load_index(
98 &self,
99 context: DistributionContext,
100 ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
101 let discover_context = Arc::new(context);
102
103 let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
105 (
106 Ok(DiscoveredAdvisory {
107 url: _,
108 context: _,
109 digest: _,
110 signature: _,
111 modified,
112 }),
113 Some(since),
114 ) => modified >= since,
115 _ => true,
116 };
117
118 match discover_context.as_ref() {
119 DistributionContext::Directory(base) => {
120 let has_slash = base.to_string().ends_with('/');
121
122 let join_url = |mut s: &str| {
123 if has_slash && s.ends_with('/') {
124 s = &s[1..];
125 }
126 Url::parse(&format!("{}{s}", base))
127 };
128
129 let changes = ChangeSource::retrieve(&self.fetcher, &base.clone()).await?;
130
131 Ok(changes
132 .entries
133 .into_iter()
134 .map(|ChangeEntry { file, timestamp }| {
135 let modified = timestamp.into();
136 let url = join_url(&file)?;
137
138 Ok::<_, ParseError>(DiscoveredAdvisory {
139 context: discover_context.clone(),
140 url,
141 modified,
142 signature: None,
143 digest: None,
144 })
145 })
146 .filter(since_filter)
147 .collect::<Result<_, _>>()?)
148 }
149
150 DistributionContext::Feed(feed) => {
151 let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
152 Ok(source_files
153 .files
154 .into_iter()
155 .map(
156 |SourceFile {
157 file,
158 timestamp,
159 digest,
160 signature,
161 }| {
162 let modified = timestamp.into();
163 let url = Url::parse(&file)?;
164 let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
165 let signature = signature
166 .map(|signature| Url::parse(&signature))
167 .transpose()?;
168
169 Ok::<_, ParseError>(DiscoveredAdvisory {
170 context: discover_context.clone(),
171 url,
172 digest,
173 signature,
174 modified,
175 })
176 },
177 )
178 .filter(since_filter)
179 .collect::<Result<_, _>>()?)
180 }
181 }
182 }
183
184 async fn load_advisory(
185 &self,
186 discovered: DiscoveredAdvisory,
187 ) -> Result<RetrievedAdvisory, Self::Error> {
188 let (signature, sha256, sha512) = try_join!(
189 async {
190 match discovered.signature.clone() {
192 Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
193 None => {
194 self.fetcher
195 .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
196 .await
197 }
198 }
199 },
200 async {
201 match discovered.digest.clone() {
202 Some(digest) if digest.as_str().ends_with(".sha256") => {
203 self.fetcher.fetch::<Option<String>>(digest).await
204 }
205 Some(_) => Ok(None),
206 None => {
207 self.fetcher
208 .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
209 .await
210 }
211 }
212 },
213 async {
214 match discovered.digest.clone() {
215 Some(digest) if digest.as_str().ends_with(".sha512") => {
216 self.fetcher.fetch::<Option<String>>(digest).await
217 }
218 Some(_) => Ok(None),
219 None => {
220 self.fetcher
221 .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
222 .await
223 }
224 }
225 },
226 )?;
227
228 let sha256 = sha256
229 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
231 .map(|expected| RetrievingDigest {
232 expected,
233 current: Sha256::new(),
234 });
235 let sha512 = sha512
236 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
238 .map(|expected| RetrievingDigest {
239 expected,
240 current: Sha512::new(),
241 });
242
243 let advisory = self
244 .fetcher
245 .fetch_processed(
246 discovered.url.clone(),
247 FetchingRetrievedAdvisory { sha256, sha512 },
248 )
249 .await?;
250
251 Ok(advisory.into_retrieved(discovered, signature))
252 }
253}
254
255pub struct FetchedRetrievedAdvisory {
256 data: Bytes,
257 sha256: Option<RetrievedDigest<Sha256>>,
258 sha512: Option<RetrievedDigest<Sha512>>,
259 metadata: RetrievalMetadata,
260}
261
262impl FetchedRetrievedAdvisory {
263 fn into_retrieved(
264 self,
265 discovered: DiscoveredAdvisory,
266 signature: Option<String>,
267 ) -> RetrievedAdvisory {
268 RetrievedAdvisory {
269 discovered,
270 data: self.data,
271 signature,
272 sha256: self.sha256,
273 sha512: self.sha512,
274 metadata: self.metadata,
275 }
276 }
277}
278
279pub struct FetchingRetrievedAdvisory {
280 pub sha256: Option<RetrievingDigest<Sha256>>,
281 pub sha512: Option<RetrievingDigest<Sha512>>,
282}
283
284impl DataProcessor for FetchingRetrievedAdvisory {
285 type Type = FetchedRetrievedAdvisory;
286
287 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
288 let mut response = response.error_for_status()?;
289
290 let mut data = BytesMut::new();
291 let mut sha256 = self.sha256.clone();
292 let mut sha512 = self.sha512.clone();
293
294 while let Some(chunk) = response.chunk().await? {
295 if let Some(d) = &mut sha256 {
296 d.update(&chunk);
297 }
298 if let Some(d) = &mut sha512 {
299 d.update(&chunk);
300 }
301 data.put(chunk);
302 }
303
304 let etag = response
305 .headers()
306 .get(reqwest::header::ETAG)
307 .and_then(|s| s.to_str().ok())
308 .map(ToString::to_string);
309
310 let last_modification = response
311 .headers()
312 .get(reqwest::header::LAST_MODIFIED)
313 .and_then(|s| s.to_str().ok())
314 .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
315
316 Ok(FetchedRetrievedAdvisory {
317 data: data.freeze(),
318 sha256: sha256.map(|d| d.into()),
319 sha512: sha512.map(|d| d.into()),
320 metadata: RetrievalMetadata {
321 last_modification,
322 etag,
323 },
324 })
325 }
326}
327
328impl KeySource for HttpSource {
329 type Error = fetcher::Error;
330
331 async fn load_public_key(
332 &self,
333 key_source: Key<'_>,
334 ) -> Result<PublicKey, KeySourceError<Self::Error>> {
335 self.fetcher.load_public_key(key_source).await
336 }
337}