1use crate::{
2 discover::{DiscoveredAdvisory, DistributionContext},
3 metadata::{self, MetadataSource},
4 model::metadata::ProviderMetadata,
5 retrieve::RetrievedAdvisory,
6 rolie::{RolieSource, SourceFile},
7 source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::utils::url::ensure_slash;
18use walker_common::{
19 changes::{self, ChangeEntry, ChangeSource},
20 fetcher::{self, DataProcessor, Fetcher},
21 retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
22 utils::openpgp::PublicKey,
23 validate::source::{Key, KeySource, KeySourceError},
24};
25
26#[non_exhaustive]
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub struct HttpOptions {
29 pub since: Option<SystemTime>,
30}
31
32impl HttpOptions {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
38 self.since = since.into();
39 self
40 }
41}
42
43#[derive(Clone, Debug)]
44pub struct HttpSource {
45 fetcher: Fetcher,
46 metadata_source: Arc<dyn MetadataSource>,
47 options: HttpOptions,
48}
49
50impl HttpSource {
51 pub fn new<M: MetadataSource + 'static>(
52 metadata: M,
53 fetcher: Fetcher,
54 options: HttpOptions,
55 ) -> Self {
56 Self {
57 metadata_source: Arc::new(metadata),
58 fetcher,
59 options,
60 }
61 }
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum HttpSourceError {
66 #[error("Metadata discovery error: {0}")]
67 Metadata(#[from] metadata::Error),
68 #[error("Fetch error: {0}")]
69 Fetcher(#[from] fetcher::Error),
70 #[error("URL error: {0}")]
71 Url(#[from] ParseError),
72 #[error("CSV error: {0}")]
73 Csv(#[from] csv::Error),
74 #[error("JSON parse error: {0}")]
75 Json(#[from] serde_json::Error),
76}
77
78impl From<changes::Error> for HttpSourceError {
79 fn from(value: changes::Error) -> Self {
80 match value {
81 changes::Error::Fetcher(err) => Self::Fetcher(err),
82 changes::Error::Url(err) => Self::Url(err),
83 changes::Error::Csv(err) => Self::Csv(err),
84 }
85 }
86}
87
88impl walker_common::source::Source for HttpSource {
89 type Error = HttpSourceError;
90 type Retrieved = RetrievedAdvisory;
91}
92
93impl Source for HttpSource {
94 async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
95 Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
96 }
97
98 async fn load_index(
99 &self,
100 context: DistributionContext,
101 ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
102 let discover_context = Arc::new(context);
103
104 let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
106 (
107 Ok(DiscoveredAdvisory {
108 url: _,
109 context: _,
110 digest: _,
111 signature: _,
112 modified,
113 }),
114 Some(since),
115 ) => modified >= since,
116 _ => true,
117 };
118
119 match discover_context.as_ref() {
120 DistributionContext::Directory(base) => {
121 let base = ensure_slash(base.clone());
122 let changes = ChangeSource::retrieve(&self.fetcher, &base).await?;
123
124 Ok(changes
125 .entries
126 .into_iter()
127 .map(|ChangeEntry { file, timestamp }| {
128 let modified = timestamp.into();
129 let url = base.join(&file)?;
130
131 Ok::<_, ParseError>(DiscoveredAdvisory {
132 context: discover_context.clone(),
133 url,
134 modified,
135 signature: None,
136 digest: None,
137 })
138 })
139 .filter(since_filter)
140 .collect::<Result<_, _>>()?)
141 }
142
143 DistributionContext::Feed(feed) => {
144 let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
145 Ok(source_files
146 .files
147 .into_iter()
148 .map(
149 |SourceFile {
150 file,
151 timestamp,
152 digest,
153 signature,
154 }| {
155 let modified = timestamp.into();
156 let url = Url::parse(&file)?;
157 let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
158 let signature = signature
159 .map(|signature| Url::parse(&signature))
160 .transpose()?;
161
162 Ok::<_, ParseError>(DiscoveredAdvisory {
163 context: discover_context.clone(),
164 url,
165 digest,
166 signature,
167 modified,
168 })
169 },
170 )
171 .filter(since_filter)
172 .collect::<Result<_, _>>()?)
173 }
174 }
175 }
176
177 async fn load_advisory(
178 &self,
179 discovered: DiscoveredAdvisory,
180 ) -> Result<RetrievedAdvisory, Self::Error> {
181 let (signature, sha256, sha512) = try_join!(
182 async {
183 match discovered.signature.clone() {
185 Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
186 None => {
187 self.fetcher
188 .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
189 .await
190 }
191 }
192 },
193 async {
194 match discovered.digest.clone() {
195 Some(digest) if digest.as_str().ends_with(".sha256") => {
196 self.fetcher.fetch::<Option<String>>(digest).await
197 }
198 Some(_) => Ok(None),
199 None => {
200 self.fetcher
201 .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
202 .await
203 }
204 }
205 },
206 async {
207 match discovered.digest.clone() {
208 Some(digest) if digest.as_str().ends_with(".sha512") => {
209 self.fetcher.fetch::<Option<String>>(digest).await
210 }
211 Some(_) => Ok(None),
212 None => {
213 self.fetcher
214 .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
215 .await
216 }
217 }
218 },
219 )?;
220
221 let sha256 = sha256
222 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
224 .map(|expected| RetrievingDigest {
225 expected,
226 current: Sha256::new(),
227 });
228 let sha512 = sha512
229 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
231 .map(|expected| RetrievingDigest {
232 expected,
233 current: Sha512::new(),
234 });
235
236 let advisory = self
237 .fetcher
238 .fetch_processed(
239 discovered.url.clone(),
240 FetchingRetrievedAdvisory { sha256, sha512 },
241 )
242 .await?;
243
244 Ok(advisory.into_retrieved(discovered, signature))
245 }
246}
247
248pub struct FetchedRetrievedAdvisory {
249 data: Bytes,
250 sha256: Option<RetrievedDigest<Sha256>>,
251 sha512: Option<RetrievedDigest<Sha512>>,
252 metadata: RetrievalMetadata,
253}
254
255impl FetchedRetrievedAdvisory {
256 fn into_retrieved(
257 self,
258 discovered: DiscoveredAdvisory,
259 signature: Option<String>,
260 ) -> RetrievedAdvisory {
261 RetrievedAdvisory {
262 discovered,
263 data: self.data,
264 signature,
265 sha256: self.sha256,
266 sha512: self.sha512,
267 metadata: self.metadata,
268 }
269 }
270}
271
272pub struct FetchingRetrievedAdvisory {
273 pub sha256: Option<RetrievingDigest<Sha256>>,
274 pub sha512: Option<RetrievingDigest<Sha512>>,
275}
276
277impl DataProcessor for FetchingRetrievedAdvisory {
278 type Type = FetchedRetrievedAdvisory;
279
280 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
281 let mut response = response.error_for_status()?;
282
283 let mut data = BytesMut::new();
284 let mut sha256 = self.sha256.clone();
285 let mut sha512 = self.sha512.clone();
286
287 while let Some(chunk) = response.chunk().await? {
288 if let Some(d) = &mut sha256 {
289 d.update(&chunk);
290 }
291 if let Some(d) = &mut sha512 {
292 d.update(&chunk);
293 }
294 data.put(chunk);
295 }
296
297 let etag = response
298 .headers()
299 .get(reqwest::header::ETAG)
300 .and_then(|s| s.to_str().ok())
301 .map(ToString::to_string);
302
303 let last_modification = response
304 .headers()
305 .get(reqwest::header::LAST_MODIFIED)
306 .and_then(|s| s.to_str().ok())
307 .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
308
309 Ok(FetchedRetrievedAdvisory {
310 data: data.freeze(),
311 sha256: sha256.map(|d| d.into()),
312 sha512: sha512.map(|d| d.into()),
313 metadata: RetrievalMetadata {
314 last_modification,
315 etag,
316 },
317 })
318 }
319}
320
321impl KeySource for HttpSource {
322 type Error = fetcher::Error;
323
324 async fn load_public_key(
325 &self,
326 key_source: Key<'_>,
327 ) -> Result<PublicKey, KeySourceError<Self::Error>> {
328 self.fetcher.load_public_key(key_source).await
329 }
330}