1use crate::{
2 discover::{DiscoveredAdvisory, DistributionContext},
3 metadata::{self, MetadataSource},
4 model::metadata::ProviderMetadata,
5 retrieve::RetrievedAdvisory,
6 rolie::{RolieSource, SourceFile},
7 source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::utils::url::ensure_slash;
18use walker_common::{
19 changes::{self, ChangeEntry, ChangeSource},
20 fetcher::{self, DataProcessor, Fetcher},
21 retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
22 utils::openpgp::PublicKey,
23 validate::source::{Key, KeySource, KeySourceError},
24};
25
26#[non_exhaustive]
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub struct HttpOptions {
29 pub since: Option<SystemTime>,
30}
31
32impl HttpOptions {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
38 self.since = since.into();
39 self
40 }
41}
42
43#[derive(Clone, Debug)]
44pub struct HttpSource {
45 fetcher: Fetcher,
46 metadata_source: Arc<dyn MetadataSource>,
47 options: HttpOptions,
48}
49
50impl HttpSource {
51 pub fn new<M: MetadataSource + 'static>(
52 metadata: M,
53 fetcher: Fetcher,
54 options: HttpOptions,
55 ) -> Self {
56 Self {
57 metadata_source: Arc::new(metadata),
58 fetcher,
59 options,
60 }
61 }
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum HttpSourceError {
66 #[error("Metadata discovery error: {0}")]
67 Metadata(#[from] metadata::Error),
68 #[error("Fetch error: {0}")]
69 Fetcher(#[from] fetcher::Error),
70 #[error("URL error: {0}")]
71 Url(#[from] ParseError),
72 #[error("CSV error: {0}")]
73 Csv(#[from] csv::Error),
74 #[error("JSON parse error: {0}")]
75 Json(#[from] serde_json::Error),
76}
77
78impl From<changes::Error> for HttpSourceError {
79 fn from(value: changes::Error) -> Self {
80 match value {
81 changes::Error::Fetcher(err) => Self::Fetcher(err),
82 changes::Error::Url(err) => Self::Url(err),
83 changes::Error::Csv(err) => Self::Csv(err),
84 }
85 }
86}
87
88impl walker_common::source::Source for HttpSource {
89 type Error = HttpSourceError;
90 type Retrieved = RetrievedAdvisory;
91}
92
93impl Source for HttpSource {
94 async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
95 Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
96 }
97
98 async fn load_index(
99 &self,
100 context: DistributionContext,
101 ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
102 let discover_context = Arc::new(context);
103
104 let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
106 (
107 Ok(DiscoveredAdvisory {
108 url: _,
109 context: _,
110 digest: _,
111 signature: _,
112 modified,
113 }),
114 Some(since),
115 ) => modified >= since,
116 _ => true,
117 };
118
119 match discover_context.as_ref() {
120 DistributionContext::Directory(base) => {
121 let base = ensure_slash(base.clone());
122 let changes = ChangeSource::retrieve(&self.fetcher, &base).await?;
123
124 Ok(changes
125 .entries
126 .into_iter()
127 .map(|ChangeEntry { file, timestamp }| {
128 let modified = timestamp.into();
129 let url = base.join(&file)?;
130
131 Ok::<_, ParseError>(DiscoveredAdvisory {
132 context: discover_context.clone(),
133 url,
134 modified,
135 signature: None,
136 digest: None,
137 })
138 })
139 .filter(since_filter)
140 .collect::<Result<_, _>>()?)
141 }
142
143 DistributionContext::Feed(feed) => {
144 let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
145 Ok(source_files
146 .files
147 .into_iter()
148 .map(
149 |SourceFile {
150 file,
151 timestamp,
152 digest,
153 signature,
154 }| {
155 let modified = timestamp.into();
156 let url = Url::parse(&file)?;
157 let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
158 let signature = signature
159 .map(|signature| Url::parse(&signature))
160 .transpose()?;
161
162 Ok::<_, ParseError>(DiscoveredAdvisory {
163 context: discover_context.clone(),
164 url,
165 digest,
166 signature,
167 modified,
168 })
169 },
170 )
171 .filter(since_filter)
172 .collect::<Result<_, _>>()?)
173 }
174 }
175 }
176
177 async fn load_advisory(
178 &self,
179 discovered: DiscoveredAdvisory,
180 ) -> Result<RetrievedAdvisory, Self::Error> {
181 let digest_result = try_join!(
182 async {
183 match discovered.signature.clone() {
185 Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
186 None => {
187 self.fetcher
188 .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
189 .await
190 }
191 }
192 },
193 async {
194 match discovered.digest.clone() {
195 Some(digest) if digest.as_str().ends_with(".sha256") => {
196 self.fetcher.fetch::<Option<String>>(digest).await
197 }
198 Some(_) => Ok(None),
199 None => {
200 self.fetcher
201 .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
202 .await
203 }
204 }
205 },
206 async {
207 match discovered.digest.clone() {
208 Some(digest) if digest.as_str().ends_with(".sha512") => {
209 self.fetcher.fetch::<Option<String>>(digest).await
210 }
211 Some(_) => Ok(None),
212 None => {
213 self.fetcher
214 .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
215 .await
216 }
217 }
218 },
219 );
220
221 let (signature, sha256, sha512) = digest_result.map_err(HttpSourceError::Fetcher)?;
222
223 let sha256 = sha256
224 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
226 .map(|expected| RetrievingDigest {
227 expected,
228 current: Sha256::new(),
229 });
230 let sha512 = sha512
231 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
233 .map(|expected| RetrievingDigest {
234 expected,
235 current: Sha512::new(),
236 });
237
238 let advisory = self
239 .fetcher
240 .fetch_processed(
241 discovered.url.clone(),
242 FetchingRetrievedAdvisory { sha256, sha512 },
243 )
244 .await?;
245
246 Ok(advisory.into_retrieved(discovered, signature))
247 }
248}
249
250pub struct FetchedRetrievedAdvisory {
251 data: Bytes,
252 sha256: Option<RetrievedDigest<Sha256>>,
253 sha512: Option<RetrievedDigest<Sha512>>,
254 metadata: RetrievalMetadata,
255}
256
257impl FetchedRetrievedAdvisory {
258 fn into_retrieved(
259 self,
260 discovered: DiscoveredAdvisory,
261 signature: Option<String>,
262 ) -> RetrievedAdvisory {
263 RetrievedAdvisory {
264 discovered,
265 data: self.data,
266 signature,
267 sha256: self.sha256,
268 sha512: self.sha512,
269 metadata: self.metadata,
270 }
271 }
272}
273
274pub struct FetchingRetrievedAdvisory {
275 pub sha256: Option<RetrievingDigest<Sha256>>,
276 pub sha512: Option<RetrievingDigest<Sha512>>,
277}
278
279impl DataProcessor for FetchingRetrievedAdvisory {
280 type Type = FetchedRetrievedAdvisory;
281
282 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
283 let mut response = response.error_for_status()?;
284
285 let mut data = BytesMut::new();
286 let mut sha256 = self.sha256.clone();
287 let mut sha512 = self.sha512.clone();
288
289 while let Some(chunk) = response.chunk().await? {
290 if let Some(d) = &mut sha256 {
291 d.update(&chunk);
292 }
293 if let Some(d) = &mut sha512 {
294 d.update(&chunk);
295 }
296 data.put(chunk);
297 }
298
299 let etag = response
300 .headers()
301 .get(reqwest::header::ETAG)
302 .and_then(|s| s.to_str().ok())
303 .map(ToString::to_string);
304
305 let last_modification = response
306 .headers()
307 .get(reqwest::header::LAST_MODIFIED)
308 .and_then(|s| s.to_str().ok())
309 .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
310
311 Ok(FetchedRetrievedAdvisory {
312 data: data.freeze(),
313 sha256: sha256.map(|d| d.into()),
314 sha512: sha512.map(|d| d.into()),
315 metadata: RetrievalMetadata {
316 last_modification,
317 etag,
318 },
319 })
320 }
321}
322
323impl KeySource for HttpSource {
324 type Error = fetcher::Error;
325
326 async fn load_public_key(
327 &self,
328 key_source: Key<'_>,
329 ) -> Result<PublicKey, KeySourceError<Self::Error>> {
330 self.fetcher.load_public_key(key_source).await
331 }
332}