csaf_walker/source/
http.rs

1use crate::{
2    discover::{DiscoveredAdvisory, DistributionContext},
3    metadata::{self, MetadataSource},
4    model::metadata::ProviderMetadata,
5    retrieve::RetrievedAdvisory,
6    rolie::{RolieSource, SourceFile},
7    source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::{
18    changes::{self, ChangeEntry, ChangeSource},
19    fetcher::{self, DataProcessor, Fetcher},
20    retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
21    utils::openpgp::PublicKey,
22    validate::source::{Key, KeySource, KeySourceError},
23};
24
25#[non_exhaustive]
26#[derive(Clone, Debug, Default, PartialEq, Eq)]
27pub struct HttpOptions {
28    pub since: Option<SystemTime>,
29}
30
31impl HttpOptions {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
37        self.since = since.into();
38        self
39    }
40}
41
42#[derive(Clone, Debug)]
43pub struct HttpSource {
44    fetcher: Fetcher,
45    metadata_source: Arc<dyn MetadataSource>,
46    options: HttpOptions,
47}
48
49impl HttpSource {
50    pub fn new<M: MetadataSource + 'static>(
51        metadata: M,
52        fetcher: Fetcher,
53        options: HttpOptions,
54    ) -> Self {
55        Self {
56            metadata_source: Arc::new(metadata),
57            fetcher,
58            options,
59        }
60    }
61}
62
63#[derive(Debug, thiserror::Error)]
64pub enum HttpSourceError {
65    #[error("Metadata discovery error: {0}")]
66    Metadata(#[from] metadata::Error),
67    #[error("Fetch error: {0}")]
68    Fetcher(#[from] fetcher::Error),
69    #[error("URL error: {0}")]
70    Url(#[from] ParseError),
71    #[error("CSV error: {0}")]
72    Csv(#[from] csv::Error),
73    #[error("JSON parse error: {0}")]
74    Json(#[from] serde_json::Error),
75}
76
77impl From<changes::Error> for HttpSourceError {
78    fn from(value: changes::Error) -> Self {
79        match value {
80            changes::Error::Fetcher(err) => Self::Fetcher(err),
81            changes::Error::Url(err) => Self::Url(err),
82            changes::Error::Csv(err) => Self::Csv(err),
83        }
84    }
85}
86
87impl walker_common::source::Source for HttpSource {
88    type Error = HttpSourceError;
89    type Retrieved = RetrievedAdvisory;
90}
91
92impl Source for HttpSource {
93    async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
94        Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
95    }
96
97    async fn load_index(
98        &self,
99        context: DistributionContext,
100    ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
101        let discover_context = Arc::new(context);
102
103        // filter out advisories based on since, but only if we can be sure
104        let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
105            (
106                Ok(DiscoveredAdvisory {
107                    url: _,
108                    context: _,
109                    digest: _,
110                    signature: _,
111                    modified,
112                }),
113                Some(since),
114            ) => modified >= since,
115            _ => true,
116        };
117
118        match discover_context.as_ref() {
119            DistributionContext::Directory(base) => {
120                let has_slash = base.to_string().ends_with('/');
121
122                let join_url = |mut s: &str| {
123                    if has_slash && s.ends_with('/') {
124                        s = &s[1..];
125                    }
126                    Url::parse(&format!("{}{s}", base))
127                };
128
129                let changes = ChangeSource::retrieve(&self.fetcher, &base.clone()).await?;
130
131                Ok(changes
132                    .entries
133                    .into_iter()
134                    .map(|ChangeEntry { file, timestamp }| {
135                        let modified = timestamp.into();
136                        let url = join_url(&file)?;
137
138                        Ok::<_, ParseError>(DiscoveredAdvisory {
139                            context: discover_context.clone(),
140                            url,
141                            modified,
142                            signature: None,
143                            digest: None,
144                        })
145                    })
146                    .filter(since_filter)
147                    .collect::<Result<_, _>>()?)
148            }
149
150            DistributionContext::Feed(feed) => {
151                let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
152                Ok(source_files
153                    .files
154                    .into_iter()
155                    .map(
156                        |SourceFile {
157                             file,
158                             timestamp,
159                             digest,
160                             signature,
161                         }| {
162                            let modified = timestamp.into();
163                            let url = Url::parse(&file)?;
164                            let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
165                            let signature = signature
166                                .map(|signature| Url::parse(&signature))
167                                .transpose()?;
168
169                            Ok::<_, ParseError>(DiscoveredAdvisory {
170                                context: discover_context.clone(),
171                                url,
172                                digest,
173                                signature,
174                                modified,
175                            })
176                        },
177                    )
178                    .filter(since_filter)
179                    .collect::<Result<_, _>>()?)
180            }
181        }
182    }
183
184    async fn load_advisory(
185        &self,
186        discovered: DiscoveredAdvisory,
187    ) -> Result<RetrievedAdvisory, Self::Error> {
188        let (signature, sha256, sha512) = try_join!(
189            async {
190                // If we have a signature source, use it. Otherwise, guess.
191                match discovered.signature.clone() {
192                    Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
193                    None => {
194                        self.fetcher
195                            .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
196                            .await
197                    }
198                }
199            },
200            async {
201                match discovered.digest.clone() {
202                    Some(digest) if digest.as_str().ends_with(".sha256") => {
203                        self.fetcher.fetch::<Option<String>>(digest).await
204                    }
205                    Some(_) => Ok(None),
206                    None => {
207                        self.fetcher
208                            .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
209                            .await
210                    }
211                }
212            },
213            async {
214                match discovered.digest.clone() {
215                    Some(digest) if digest.as_str().ends_with(".sha512") => {
216                        self.fetcher.fetch::<Option<String>>(digest).await
217                    }
218                    Some(_) => Ok(None),
219                    None => {
220                        self.fetcher
221                            .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
222                            .await
223                    }
224                }
225            },
226        )?;
227
228        let sha256 = sha256
229            // take the first "word" from the line
230            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
231            .map(|expected| RetrievingDigest {
232                expected,
233                current: Sha256::new(),
234            });
235        let sha512 = sha512
236            // take the first "word" from the line
237            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
238            .map(|expected| RetrievingDigest {
239                expected,
240                current: Sha512::new(),
241            });
242
243        let advisory = self
244            .fetcher
245            .fetch_processed(
246                discovered.url.clone(),
247                FetchingRetrievedAdvisory { sha256, sha512 },
248            )
249            .await?;
250
251        Ok(advisory.into_retrieved(discovered, signature))
252    }
253}
254
255pub struct FetchedRetrievedAdvisory {
256    data: Bytes,
257    sha256: Option<RetrievedDigest<Sha256>>,
258    sha512: Option<RetrievedDigest<Sha512>>,
259    metadata: RetrievalMetadata,
260}
261
262impl FetchedRetrievedAdvisory {
263    fn into_retrieved(
264        self,
265        discovered: DiscoveredAdvisory,
266        signature: Option<String>,
267    ) -> RetrievedAdvisory {
268        RetrievedAdvisory {
269            discovered,
270            data: self.data,
271            signature,
272            sha256: self.sha256,
273            sha512: self.sha512,
274            metadata: self.metadata,
275        }
276    }
277}
278
279pub struct FetchingRetrievedAdvisory {
280    pub sha256: Option<RetrievingDigest<Sha256>>,
281    pub sha512: Option<RetrievingDigest<Sha512>>,
282}
283
284impl DataProcessor for FetchingRetrievedAdvisory {
285    type Type = FetchedRetrievedAdvisory;
286
287    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
288        let mut response = response.error_for_status()?;
289
290        let mut data = BytesMut::new();
291        let mut sha256 = self.sha256.clone();
292        let mut sha512 = self.sha512.clone();
293
294        while let Some(chunk) = response.chunk().await? {
295            if let Some(d) = &mut sha256 {
296                d.update(&chunk);
297            }
298            if let Some(d) = &mut sha512 {
299                d.update(&chunk);
300            }
301            data.put(chunk);
302        }
303
304        let etag = response
305            .headers()
306            .get(reqwest::header::ETAG)
307            .and_then(|s| s.to_str().ok())
308            .map(ToString::to_string);
309
310        let last_modification = response
311            .headers()
312            .get(reqwest::header::LAST_MODIFIED)
313            .and_then(|s| s.to_str().ok())
314            .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
315
316        Ok(FetchedRetrievedAdvisory {
317            data: data.freeze(),
318            sha256: sha256.map(|d| d.into()),
319            sha512: sha512.map(|d| d.into()),
320            metadata: RetrievalMetadata {
321                last_modification,
322                etag,
323            },
324        })
325    }
326}
327
328impl KeySource for HttpSource {
329    type Error = fetcher::Error;
330
331    async fn load_public_key(
332        &self,
333        key_source: Key<'_>,
334    ) -> Result<PublicKey, KeySourceError<Self::Error>> {
335        self.fetcher.load_public_key(key_source).await
336    }
337}