1use crate::metadata::MetadataSource;
2use crate::{
3 discover::{DiscoveredAdvisory, DistributionContext},
4 metadata,
5 model::metadata::ProviderMetadata,
6 retrieve::RetrievedAdvisory,
7 rolie::{RolieSource, SourceFile},
8 source::Source,
9};
10use bytes::{BufMut, Bytes, BytesMut};
11use digest::Digest;
12use futures::try_join;
13use reqwest::Response;
14use sha2::{Sha256, Sha512};
15use std::sync::Arc;
16use std::time::SystemTime;
17use time::{OffsetDateTime, format_description::well_known::Rfc2822};
18use url::{ParseError, Url};
19use walker_common::{
20 changes::{self, ChangeEntry, ChangeSource},
21 fetcher::{self, DataProcessor, Fetcher},
22 retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
23 utils::openpgp::PublicKey,
24 validate::source::{Key, KeySource, KeySourceError},
25};
26
27#[non_exhaustive]
28#[derive(Clone, Debug, Default, PartialEq, Eq)]
29pub struct HttpOptions {
30 pub since: Option<SystemTime>,
31}
32
33impl HttpOptions {
34 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
39 self.since = since.into();
40 self
41 }
42}
43
44#[derive(Clone, Debug)]
45pub struct HttpSource {
46 fetcher: Fetcher,
47 metadata_source: Arc<dyn MetadataSource>,
48 options: HttpOptions,
49}
50
51impl HttpSource {
52 pub fn new<M: MetadataSource + 'static>(
53 metadata: M,
54 fetcher: Fetcher,
55 options: HttpOptions,
56 ) -> Self {
57 Self {
58 metadata_source: Arc::new(metadata),
59 fetcher,
60 options,
61 }
62 }
63}
64
65#[derive(Debug, thiserror::Error)]
66pub enum HttpSourceError {
67 #[error("Metadata discovery error: {0}")]
68 Metadata(#[from] metadata::Error),
69 #[error("Fetch error: {0}")]
70 Fetcher(#[from] fetcher::Error),
71 #[error("URL error: {0}")]
72 Url(#[from] ParseError),
73 #[error("CSV error: {0}")]
74 Csv(#[from] csv::Error),
75 #[error("JSON parse error: {0}")]
76 Json(#[from] serde_json::Error),
77}
78
79impl From<changes::Error> for HttpSourceError {
80 fn from(value: changes::Error) -> Self {
81 match value {
82 changes::Error::Fetcher(err) => Self::Fetcher(err),
83 changes::Error::Url(err) => Self::Url(err),
84 changes::Error::Csv(err) => Self::Csv(err),
85 }
86 }
87}
88
89impl walker_common::source::Source for HttpSource {
90 type Error = HttpSourceError;
91 type Retrieved = RetrievedAdvisory;
92}
93
94impl Source for HttpSource {
95 async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
96 Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
97 }
98
99 async fn load_index(
100 &self,
101 context: DistributionContext,
102 ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
103 let discover_context = Arc::new(context);
104
105 let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
107 (
108 Ok(DiscoveredAdvisory {
109 url: _,
110 context: _,
111 modified,
112 }),
113 Some(since),
114 ) => modified >= since,
115 _ => true,
116 };
117
118 match discover_context.as_ref() {
119 DistributionContext::Directory(base) => {
120 let has_slash = base.to_string().ends_with('/');
121
122 let join_url = |mut s: &str| {
123 if has_slash && s.ends_with('/') {
124 s = &s[1..];
125 }
126 Url::parse(&format!("{}{s}", base))
127 };
128
129 let changes = ChangeSource::retrieve(&self.fetcher, &base.clone()).await?;
130
131 Ok(changes
132 .entries
133 .into_iter()
134 .map(|ChangeEntry { file, timestamp }| {
135 let modified = timestamp.into();
136 let url = join_url(&file)?;
137
138 Ok::<_, ParseError>(DiscoveredAdvisory {
139 context: discover_context.clone(),
140 url,
141 modified,
142 })
143 })
144 .filter(since_filter)
145 .collect::<Result<_, _>>()?)
146 }
147
148 DistributionContext::Feed(feed) => {
149 let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
150 Ok(source_files
151 .files
152 .into_iter()
153 .map(|SourceFile { file, timestamp }| {
154 let modified = timestamp.into();
155 let url = Url::parse(&file)?;
156
157 Ok::<_, ParseError>(DiscoveredAdvisory {
158 context: discover_context.clone(),
159 url,
160 modified,
161 })
162 })
163 .filter(since_filter)
164 .collect::<Result<_, _>>()?)
165 }
166 }
167 }
168
169 async fn load_advisory(
170 &self,
171 discovered: DiscoveredAdvisory,
172 ) -> Result<RetrievedAdvisory, Self::Error> {
173 let (signature, sha256, sha512) = try_join!(
174 self.fetcher
175 .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url)),
176 self.fetcher
177 .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url)),
178 self.fetcher
179 .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url)),
180 )?;
181
182 let sha256 = sha256
183 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
185 .map(|expected| RetrievingDigest {
186 expected,
187 current: Sha256::new(),
188 });
189 let sha512 = sha512
190 .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
192 .map(|expected| RetrievingDigest {
193 expected,
194 current: Sha512::new(),
195 });
196
197 let advisory = self
198 .fetcher
199 .fetch_processed(
200 discovered.url.clone(),
201 FetchingRetrievedAdvisory { sha256, sha512 },
202 )
203 .await?;
204
205 Ok(advisory.into_retrieved(discovered, signature))
206 }
207}
208
209pub struct FetchedRetrievedAdvisory {
210 data: Bytes,
211 sha256: Option<RetrievedDigest<Sha256>>,
212 sha512: Option<RetrievedDigest<Sha512>>,
213 metadata: RetrievalMetadata,
214}
215
216impl FetchedRetrievedAdvisory {
217 fn into_retrieved(
218 self,
219 discovered: DiscoveredAdvisory,
220 signature: Option<String>,
221 ) -> RetrievedAdvisory {
222 RetrievedAdvisory {
223 discovered,
224 data: self.data,
225 signature,
226 sha256: self.sha256,
227 sha512: self.sha512,
228 metadata: self.metadata,
229 }
230 }
231}
232
233pub struct FetchingRetrievedAdvisory {
234 pub sha256: Option<RetrievingDigest<Sha256>>,
235 pub sha512: Option<RetrievingDigest<Sha512>>,
236}
237
238impl DataProcessor for FetchingRetrievedAdvisory {
239 type Type = FetchedRetrievedAdvisory;
240
241 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
242 let mut response = response.error_for_status()?;
243
244 let mut data = BytesMut::new();
245 let mut sha256 = self.sha256.clone();
246 let mut sha512 = self.sha512.clone();
247
248 while let Some(chunk) = response.chunk().await? {
249 if let Some(d) = &mut sha256 {
250 d.update(&chunk);
251 }
252 if let Some(d) = &mut sha512 {
253 d.update(&chunk);
254 }
255 data.put(chunk);
256 }
257
258 let etag = response
259 .headers()
260 .get(reqwest::header::ETAG)
261 .and_then(|s| s.to_str().ok())
262 .map(ToString::to_string);
263
264 let last_modification = response
265 .headers()
266 .get(reqwest::header::LAST_MODIFIED)
267 .and_then(|s| s.to_str().ok())
268 .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
269
270 Ok(FetchedRetrievedAdvisory {
271 data: data.freeze(),
272 sha256: sha256.map(|d| d.into()),
273 sha512: sha512.map(|d| d.into()),
274 metadata: RetrievalMetadata {
275 last_modification,
276 etag,
277 },
278 })
279 }
280}
281
282impl KeySource for HttpSource {
283 type Error = fetcher::Error;
284
285 async fn load_public_key(
286 &self,
287 key_source: Key<'_>,
288 ) -> Result<PublicKey, KeySourceError<Self::Error>> {
289 self.fetcher.load_public_key(key_source).await
290 }
291}