Skip to main content

csaf_walker/source/
http.rs

1use crate::{
2    discover::{DiscoveredAdvisory, DistributionContext},
3    metadata::{self, MetadataSource},
4    model::metadata::ProviderMetadata,
5    retrieve::RetrievedAdvisory,
6    rolie::{RolieSource, SourceFile},
7    source::Source,
8};
9use bytes::{BufMut, Bytes, BytesMut};
10use digest::Digest;
11use futures::try_join;
12use reqwest::Response;
13use sha2::{Sha256, Sha512};
14use std::{sync::Arc, time::SystemTime};
15use time::{OffsetDateTime, format_description::well_known::Rfc2822};
16use url::{ParseError, Url};
17use walker_common::utils::url::ensure_slash;
18use walker_common::{
19    changes::{self, ChangeEntry, ChangeSource},
20    fetcher::{self, DataProcessor, Fetcher},
21    retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
22    utils::openpgp::PublicKey,
23    validate::source::{Key, KeySource, KeySourceError},
24};
25
26#[non_exhaustive]
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub struct HttpOptions {
29    pub since: Option<SystemTime>,
30}
31
32impl HttpOptions {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
38        self.since = since.into();
39        self
40    }
41}
42
43#[derive(Clone, Debug)]
44pub struct HttpSource {
45    fetcher: Fetcher,
46    metadata_source: Arc<dyn MetadataSource>,
47    options: HttpOptions,
48}
49
50impl HttpSource {
51    pub fn new<M: MetadataSource + 'static>(
52        metadata: M,
53        fetcher: Fetcher,
54        options: HttpOptions,
55    ) -> Self {
56        Self {
57            metadata_source: Arc::new(metadata),
58            fetcher,
59            options,
60        }
61    }
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum HttpSourceError {
66    #[error("Metadata discovery error: {0}")]
67    Metadata(#[from] metadata::Error),
68    #[error("Fetch error: {0}")]
69    Fetcher(#[from] fetcher::Error),
70    #[error("URL error: {0}")]
71    Url(#[from] ParseError),
72    #[error("CSV error: {0}")]
73    Csv(#[from] csv::Error),
74    #[error("JSON parse error: {0}")]
75    Json(#[from] serde_json::Error),
76}
77
78impl From<changes::Error> for HttpSourceError {
79    fn from(value: changes::Error) -> Self {
80        match value {
81            changes::Error::Fetcher(err) => Self::Fetcher(err),
82            changes::Error::Url(err) => Self::Url(err),
83            changes::Error::Csv(err) => Self::Csv(err),
84        }
85    }
86}
87
88impl walker_common::source::Source for HttpSource {
89    type Error = HttpSourceError;
90    type Retrieved = RetrievedAdvisory;
91}
92
93impl Source for HttpSource {
94    async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
95        Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
96    }
97
98    async fn load_index(
99        &self,
100        context: DistributionContext,
101    ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
102        let discover_context = Arc::new(context);
103
104        // filter out advisories based on since, but only if we can be sure
105        let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
106            (
107                Ok(DiscoveredAdvisory {
108                    url: _,
109                    context: _,
110                    digest: _,
111                    signature: _,
112                    modified,
113                }),
114                Some(since),
115            ) => modified >= since,
116            _ => true,
117        };
118
119        match discover_context.as_ref() {
120            DistributionContext::Directory(base) => {
121                let base = ensure_slash(base.clone());
122                let changes = ChangeSource::retrieve(&self.fetcher, &base).await?;
123
124                Ok(changes
125                    .entries
126                    .into_iter()
127                    .map(|ChangeEntry { file, timestamp }| {
128                        let modified = timestamp.into();
129                        let url = base.join(&file)?;
130
131                        Ok::<_, ParseError>(DiscoveredAdvisory {
132                            context: discover_context.clone(),
133                            url,
134                            modified,
135                            signature: None,
136                            digest: None,
137                        })
138                    })
139                    .filter(since_filter)
140                    .collect::<Result<_, _>>()?)
141            }
142
143            DistributionContext::Feed(feed) => {
144                let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
145                Ok(source_files
146                    .files
147                    .into_iter()
148                    .map(
149                        |SourceFile {
150                             file,
151                             timestamp,
152                             digest,
153                             signature,
154                         }| {
155                            let modified = timestamp.into();
156                            let url = Url::parse(&file)?;
157                            let digest = digest.map(|digest| Url::parse(&digest)).transpose()?;
158                            let signature = signature
159                                .map(|signature| Url::parse(&signature))
160                                .transpose()?;
161
162                            Ok::<_, ParseError>(DiscoveredAdvisory {
163                                context: discover_context.clone(),
164                                url,
165                                digest,
166                                signature,
167                                modified,
168                            })
169                        },
170                    )
171                    .filter(since_filter)
172                    .collect::<Result<_, _>>()?)
173            }
174        }
175    }
176
177    async fn load_advisory(
178        &self,
179        discovered: DiscoveredAdvisory,
180    ) -> Result<RetrievedAdvisory, Self::Error> {
181        let digest_result = try_join!(
182            async {
183                // If we have a signature source, use it. Otherwise, guess.
184                match discovered.signature.clone() {
185                    Some(signature) => self.fetcher.fetch::<Option<String>>(signature).await,
186                    None => {
187                        self.fetcher
188                            .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url))
189                            .await
190                    }
191                }
192            },
193            async {
194                match discovered.digest.clone() {
195                    Some(digest) if digest.as_str().ends_with(".sha256") => {
196                        self.fetcher.fetch::<Option<String>>(digest).await
197                    }
198                    Some(_) => Ok(None),
199                    None => {
200                        self.fetcher
201                            .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url))
202                            .await
203                    }
204                }
205            },
206            async {
207                match discovered.digest.clone() {
208                    Some(digest) if digest.as_str().ends_with(".sha512") => {
209                        self.fetcher.fetch::<Option<String>>(digest).await
210                    }
211                    Some(_) => Ok(None),
212                    None => {
213                        self.fetcher
214                            .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url))
215                            .await
216                    }
217                }
218            },
219        );
220
221        let (signature, sha256, sha512) = digest_result.map_err(HttpSourceError::Fetcher)?;
222
223        let sha256 = sha256
224            // take the first "word" from the line
225            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
226            .map(|expected| RetrievingDigest {
227                expected,
228                current: Sha256::new(),
229            });
230        let sha512 = sha512
231            // take the first "word" from the line
232            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
233            .map(|expected| RetrievingDigest {
234                expected,
235                current: Sha512::new(),
236            });
237
238        let advisory = self
239            .fetcher
240            .fetch_processed(
241                discovered.url.clone(),
242                FetchingRetrievedAdvisory { sha256, sha512 },
243            )
244            .await?;
245
246        Ok(advisory.into_retrieved(discovered, signature))
247    }
248}
249
250pub struct FetchedRetrievedAdvisory {
251    data: Bytes,
252    sha256: Option<RetrievedDigest<Sha256>>,
253    sha512: Option<RetrievedDigest<Sha512>>,
254    metadata: RetrievalMetadata,
255}
256
257impl FetchedRetrievedAdvisory {
258    fn into_retrieved(
259        self,
260        discovered: DiscoveredAdvisory,
261        signature: Option<String>,
262    ) -> RetrievedAdvisory {
263        RetrievedAdvisory {
264            discovered,
265            data: self.data,
266            signature,
267            sha256: self.sha256,
268            sha512: self.sha512,
269            metadata: self.metadata,
270        }
271    }
272}
273
274pub struct FetchingRetrievedAdvisory {
275    pub sha256: Option<RetrievingDigest<Sha256>>,
276    pub sha512: Option<RetrievingDigest<Sha512>>,
277}
278
279impl DataProcessor for FetchingRetrievedAdvisory {
280    type Type = FetchedRetrievedAdvisory;
281
282    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
283        let mut response = response.error_for_status()?;
284
285        let mut data = BytesMut::new();
286        let mut sha256 = self.sha256.clone();
287        let mut sha512 = self.sha512.clone();
288
289        while let Some(chunk) = response.chunk().await? {
290            if let Some(d) = &mut sha256 {
291                d.update(&chunk);
292            }
293            if let Some(d) = &mut sha512 {
294                d.update(&chunk);
295            }
296            data.put(chunk);
297        }
298
299        let etag = response
300            .headers()
301            .get(reqwest::header::ETAG)
302            .and_then(|s| s.to_str().ok())
303            .map(ToString::to_string);
304
305        let last_modification = response
306            .headers()
307            .get(reqwest::header::LAST_MODIFIED)
308            .and_then(|s| s.to_str().ok())
309            .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
310
311        Ok(FetchedRetrievedAdvisory {
312            data: data.freeze(),
313            sha256: sha256.map(|d| d.into()),
314            sha512: sha512.map(|d| d.into()),
315            metadata: RetrievalMetadata {
316                last_modification,
317                etag,
318            },
319        })
320    }
321}
322
323impl KeySource for HttpSource {
324    type Error = fetcher::Error;
325
326    async fn load_public_key(
327        &self,
328        key_source: Key<'_>,
329    ) -> Result<PublicKey, KeySourceError<Self::Error>> {
330        self.fetcher.load_public_key(key_source).await
331    }
332}