csaf_walker/source/
http.rs

1use crate::{
2    discover::{DiscoveredAdvisory, DistributionContext},
3    metadata::{self, MetadataSource},
4    model::metadata::ProviderMetadata,
5    retrieve::RetrievedAdvisory,
6    rolie::{RolieSource, SourceFile},
7    source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::utils::url::ensure_slash;
18use walker_common::{
19    changes::{self, ChangeEntry, ChangeSource},
20    fetcher::{self, DataProcessor, Fetcher},
21    retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
22    utils::openpgp::PublicKey,
23    validate::source::{Key, KeySource, KeySourceError},
24};
25
26#[non_exhaustive]
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub struct HttpOptions {
29    pub since: Option<SystemTime>,
30}
31
32impl HttpOptions {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
38        self.since = since.into();
39        self
40    }
41}
42
43#[derive(Clone, Debug)]
44pub struct HttpSource {
45    fetcher: Fetcher,
46    metadata_source: Arc<dyn MetadataSource>,
47    options: HttpOptions,
48}
49
50impl HttpSource {
51    pub fn new<M: MetadataSource + 'static>(
52        metadata: M,
53        fetcher: Fetcher,
54        options: HttpOptions,
55    ) -> Self {
56        Self {
57            metadata_source: Arc::new(metadata),
58            fetcher,
59            options,
60        }
61    }
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum HttpSourceError {
66    #[error("Metadata discovery error: {0}")]
67    Metadata(#[from] metadata::Error),
68    #[error("Fetch error: {0}")]
69    Fetcher(#[from] fetcher::Error),
70    #[error("URL error: {0}")]
71    Url(#[from] ParseError),
72    #[error("CSV error: {0}")]
73    Csv(#[from] csv::Error),
74    #[error("JSON parse error: {0}")]
75    Json(#[from] serde_json::Error),
76}
77
78impl From<changes::Error> for HttpSourceError {
79    fn from(value: changes::Error) -> Self {
80        match value {
81            changes::Error::Fetcher(err) => Self::Fetcher(err),
82            changes::Error::Url(err) => Self::Url(err),
83            changes::Error::Csv(err) => Self::Csv(err),
84        }
85    }
86}
87
88impl walker_common::source::Source for HttpSource {
89    type Error = HttpSourceError;
90    type Retrieved = RetrievedAdvisory;
91}
92
93impl Source for HttpSource {
94    async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
95        Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
96    }
97
98    async fn load_index(
99        &self,
100        context: DistributionContext,
101    ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
102        let discover_context = Arc::new(context);
103
104        // filter out advisories based on since, but only if we can be sure
105        let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
106            (
107                Ok(DiscoveredAdvisory {
108                    url: _,
109                    context: _,
110                    digest: _,
111                    signature: _,
112                    modified,
113                }),
114                Some(since),
115            ) => modified >= since,
116            _ => true,
117        };
118
119        match discover_context.as_ref() {
120            DistributionContext::Directory(base) => {
121                let base = ensure_slash(base.clone());
122                let changes = ChangeSource::retrieve(&self.fetcher, &base).await?;
123
124                Ok(changes
125                    .entries
126                    .into_iter()
127                    .map(|ChangeEntry { file, timestamp }| {
128                        let modified = timestamp.into();
129                        let url = base.join(&file)?;
130
131                        Ok::<_, ParseError>(DiscoveredAdvisory {
132                            context: discover_context.clone(),
133                            url,
134                            modified,
135                            signature: None,
136                            digest: None,
137                        })
138                    })
139                    .filter(since_filter)
140                    .collect::<Result<_, _>>()?)
141            }
142
143            DistributionContext::Feed(feed) => {
144                let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
145                Ok(source_files
146                    .files
147                    .into_iter()
148                    .map(
149                        |SourceFile {
150                             file,
151                             timestamp,
152                             digest,
153                             signature,
154                         }| {
155                            let modified = timestamp.into();
156                            let url = Url::parse(&file)?;
157                            let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
158                            let signature = signature
159                                .map(|signature| Url::parse(&signature))
160                                .transpose()?;
161
162                            Ok::<_, ParseError>(DiscoveredAdvisory {
163                                context: discover_context.clone(),
164                                url,
165                                digest,
166                                signature,
167                                modified,
168                            })
169                        },
170                    )
171                    .filter(since_filter)
172                    .collect::<Result<_, _>>()?)
173            }
174        }
175    }
176
177    async fn load_advisory(
178        &self,
179        discovered: DiscoveredAdvisory,
180    ) -> Result<RetrievedAdvisory, Self::Error> {
181        let (signature, sha256, sha512) = try_join!(
182            async {
183                // If we have a signature source, use it. Otherwise, guess.
184                match discovered.signature.clone() {
185                    Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
186                    None => {
187                        self.fetcher
188                            .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
189                            .await
190                    }
191                }
192            },
193            async {
194                match discovered.digest.clone() {
195                    Some(digest) if digest.as_str().ends_with(".sha256") => {
196                        self.fetcher.fetch::<Option<String>>(digest).await
197                    }
198                    Some(_) => Ok(None),
199                    None => {
200                        self.fetcher
201                            .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
202                            .await
203                    }
204                }
205            },
206            async {
207                match discovered.digest.clone() {
208                    Some(digest) if digest.as_str().ends_with(".sha512") => {
209                        self.fetcher.fetch::<Option<String>>(digest).await
210                    }
211                    Some(_) => Ok(None),
212                    None => {
213                        self.fetcher
214                            .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
215                            .await
216                    }
217                }
218            },
219        )?;
220
221        let sha256 = sha256
222            // take the first "word" from the line
223            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
224            .map(|expected| RetrievingDigest {
225                expected,
226                current: Sha256::new(),
227            });
228        let sha512 = sha512
229            // take the first "word" from the line
230            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
231            .map(|expected| RetrievingDigest {
232                expected,
233                current: Sha512::new(),
234            });
235
236        let advisory = self
237            .fetcher
238            .fetch_processed(
239                discovered.url.clone(),
240                FetchingRetrievedAdvisory { sha256, sha512 },
241            )
242            .await?;
243
244        Ok(advisory.into_retrieved(discovered, signature))
245    }
246}
247
248pub struct FetchedRetrievedAdvisory {
249    data: Bytes,
250    sha256: Option<RetrievedDigest<Sha256>>,
251    sha512: Option<RetrievedDigest<Sha512>>,
252    metadata: RetrievalMetadata,
253}
254
255impl FetchedRetrievedAdvisory {
256    fn into_retrieved(
257        self,
258        discovered: DiscoveredAdvisory,
259        signature: Option<String>,
260    ) -> RetrievedAdvisory {
261        RetrievedAdvisory {
262            discovered,
263            data: self.data,
264            signature,
265            sha256: self.sha256,
266            sha512: self.sha512,
267            metadata: self.metadata,
268        }
269    }
270}
271
272pub struct FetchingRetrievedAdvisory {
273    pub sha256: Option<RetrievingDigest<Sha256>>,
274    pub sha512: Option<RetrievingDigest<Sha512>>,
275}
276
277impl DataProcessor for FetchingRetrievedAdvisory {
278    type Type = FetchedRetrievedAdvisory;
279
280    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
281        let mut response = response.error_for_status()?;
282
283        let mut data = BytesMut::new();
284        let mut sha256 = self.sha256.clone();
285        let mut sha512 = self.sha512.clone();
286
287        while let Some(chunk) = response.chunk().await? {
288            if let Some(d) = &mut sha256 {
289                d.update(&chunk);
290            }
291            if let Some(d) = &mut sha512 {
292                d.update(&chunk);
293            }
294            data.put(chunk);
295        }
296
297        let etag = response
298            .headers()
299            .get(reqwest::header::ETAG)
300            .and_then(|s| s.to_str().ok())
301            .map(ToString::to_string);
302
303        let last_modification = response
304            .headers()
305            .get(reqwest::header::LAST_MODIFIED)
306            .and_then(|s| s.to_str().ok())
307            .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
308
309        Ok(FetchedRetrievedAdvisory {
310            data: data.freeze(),
311            sha256: sha256.map(|d| d.into()),
312            sha512: sha512.map(|d| d.into()),
313            metadata: RetrievalMetadata {
314                last_modification,
315                etag,
316            },
317        })
318    }
319}
320
321impl KeySource for HttpSource {
322    type Error = fetcher::Error;
323
324    async fn load_public_key(
325        &self,
326        key_source: Key<'_>,
327    ) -> Result<PublicKey, KeySourceError<Self::Error>> {
328        self.fetcher.load_public_key(key_source).await
329    }
330}