csaf_walker/source/
http.rs

1use crate::metadata::MetadataSource;
2use crate::{
3    discover::{DiscoveredAdvisory, DistributionContext},
4    metadata,
5    model::metadata::ProviderMetadata,
6    retrieve::RetrievedAdvisory,
7    rolie::{RolieSource, SourceFile},
8    source::Source,
9};
10use bytes::{BufMut, Bytes, BytesMut};
11use digest::Digest;
12use futures::try_join;
13use reqwest::Response;
14use sha2::{Sha256, Sha512};
15use std::sync::Arc;
16use std::time::SystemTime;
17use time::{OffsetDateTime, format_description::well_known::Rfc2822};
18use url::{ParseError, Url};
19use walker_common::{
20    changes::{self, ChangeEntry, ChangeSource},
21    fetcher::{self, DataProcessor, Fetcher},
22    retrieve::{RetrievalMetadata, RetrievedDigest, RetrievingDigest},
23    utils::openpgp::PublicKey,
24    validate::source::{Key, KeySource, KeySourceError},
25};
26
27#[non_exhaustive]
28#[derive(Clone, Debug, Default, PartialEq, Eq)]
29pub struct HttpOptions {
30    pub since: Option<SystemTime>,
31}
32
33impl HttpOptions {
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
39        self.since = since.into();
40        self
41    }
42}
43
44#[derive(Clone, Debug)]
45pub struct HttpSource {
46    fetcher: Fetcher,
47    metadata_source: Arc<dyn MetadataSource>,
48    options: HttpOptions,
49}
50
51impl HttpSource {
52    pub fn new<M: MetadataSource + 'static>(
53        metadata: M,
54        fetcher: Fetcher,
55        options: HttpOptions,
56    ) -> Self {
57        Self {
58            metadata_source: Arc::new(metadata),
59            fetcher,
60            options,
61        }
62    }
63}
64
65#[derive(Debug, thiserror::Error)]
66pub enum HttpSourceError {
67    #[error("Metadata discovery error: {0}")]
68    Metadata(#[from] metadata::Error),
69    #[error("Fetch error: {0}")]
70    Fetcher(#[from] fetcher::Error),
71    #[error("URL error: {0}")]
72    Url(#[from] ParseError),
73    #[error("CSV error: {0}")]
74    Csv(#[from] csv::Error),
75    #[error("JSON parse error: {0}")]
76    Json(#[from] serde_json::Error),
77}
78
79impl From<changes::Error> for HttpSourceError {
80    fn from(value: changes::Error) -> Self {
81        match value {
82            changes::Error::Fetcher(err) => Self::Fetcher(err),
83            changes::Error::Url(err) => Self::Url(err),
84            changes::Error::Csv(err) => Self::Csv(err),
85        }
86    }
87}
88
89impl walker_common::source::Source for HttpSource {
90    type Error = HttpSourceError;
91    type Retrieved = RetrievedAdvisory;
92}
93
94impl Source for HttpSource {
95    async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
96        Ok(self.metadata_source.load_metadata(&self.fetcher).await?)
97    }
98
99    async fn load_index(
100        &self,
101        context: DistributionContext,
102    ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
103        let discover_context = Arc::new(context);
104
105        // filter out advisories based on since, but only if we can be sure
106        let since_filter = |advisory: &Result<_, _>| match (advisory, &self.options.since) {
107            (
108                Ok(DiscoveredAdvisory {
109                    url: _,
110                    context: _,
111                    modified,
112                }),
113                Some(since),
114            ) => modified >= since,
115            _ => true,
116        };
117
118        match discover_context.as_ref() {
119            DistributionContext::Directory(base) => {
120                let has_slash = base.to_string().ends_with('/');
121
122                let join_url = |mut s: &str| {
123                    if has_slash && s.ends_with('/') {
124                        s = &s[1..];
125                    }
126                    Url::parse(&format!("{}{s}", base))
127                };
128
129                let changes = ChangeSource::retrieve(&self.fetcher, &base.clone()).await?;
130
131                Ok(changes
132                    .entries
133                    .into_iter()
134                    .map(|ChangeEntry { file, timestamp }| {
135                        let modified = timestamp.into();
136                        let url = join_url(&file)?;
137
138                        Ok::<_, ParseError>(DiscoveredAdvisory {
139                            context: discover_context.clone(),
140                            url,
141                            modified,
142                        })
143                    })
144                    .filter(since_filter)
145                    .collect::<Result<_, _>>()?)
146            }
147
148            DistributionContext::Feed(feed) => {
149                let source_files = RolieSource::retrieve(&self.fetcher, feed.clone()).await?;
150                Ok(source_files
151                    .files
152                    .into_iter()
153                    .map(|SourceFile { file, timestamp }| {
154                        let modified = timestamp.into();
155                        let url = Url::parse(&file)?;
156
157                        Ok::<_, ParseError>(DiscoveredAdvisory {
158                            context: discover_context.clone(),
159                            url,
160                            modified,
161                        })
162                    })
163                    .filter(since_filter)
164                    .collect::<Result<_, _>>()?)
165            }
166        }
167    }
168
169    async fn load_advisory(
170        &self,
171        discovered: DiscoveredAdvisory,
172    ) -> Result<RetrievedAdvisory, Self::Error> {
173        let (signature, sha256, sha512) = try_join!(
174            self.fetcher
175                .fetch::<Option<String>>(format!("{url}.asc", url = discovered.url)),
176            self.fetcher
177                .fetch::<Option<String>>(format!("{url}.sha256", url = discovered.url)),
178            self.fetcher
179                .fetch::<Option<String>>(format!("{url}.sha512", url = discovered.url)),
180        )?;
181
182        let sha256 = sha256
183            // take the first "word" from the line
184            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
185            .map(|expected| RetrievingDigest {
186                expected,
187                current: Sha256::new(),
188            });
189        let sha512 = sha512
190            // take the first "word" from the line
191            .and_then(|expected| expected.split(' ').next().map(ToString::to_string))
192            .map(|expected| RetrievingDigest {
193                expected,
194                current: Sha512::new(),
195            });
196
197        let advisory = self
198            .fetcher
199            .fetch_processed(
200                discovered.url.clone(),
201                FetchingRetrievedAdvisory { sha256, sha512 },
202            )
203            .await?;
204
205        Ok(advisory.into_retrieved(discovered, signature))
206    }
207}
208
209pub struct FetchedRetrievedAdvisory {
210    data: Bytes,
211    sha256: Option<RetrievedDigest<Sha256>>,
212    sha512: Option<RetrievedDigest<Sha512>>,
213    metadata: RetrievalMetadata,
214}
215
216impl FetchedRetrievedAdvisory {
217    fn into_retrieved(
218        self,
219        discovered: DiscoveredAdvisory,
220        signature: Option<String>,
221    ) -> RetrievedAdvisory {
222        RetrievedAdvisory {
223            discovered,
224            data: self.data,
225            signature,
226            sha256: self.sha256,
227            sha512: self.sha512,
228            metadata: self.metadata,
229        }
230    }
231}
232
233pub struct FetchingRetrievedAdvisory {
234    pub sha256: Option<RetrievingDigest<Sha256>>,
235    pub sha512: Option<RetrievingDigest<Sha512>>,
236}
237
238impl DataProcessor for FetchingRetrievedAdvisory {
239    type Type = FetchedRetrievedAdvisory;
240
241    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
242        let mut response = response.error_for_status()?;
243
244        let mut data = BytesMut::new();
245        let mut sha256 = self.sha256.clone();
246        let mut sha512 = self.sha512.clone();
247
248        while let Some(chunk) = response.chunk().await? {
249            if let Some(d) = &mut sha256 {
250                d.update(&chunk);
251            }
252            if let Some(d) = &mut sha512 {
253                d.update(&chunk);
254            }
255            data.put(chunk);
256        }
257
258        let etag = response
259            .headers()
260            .get(reqwest::header::ETAG)
261            .and_then(|s| s.to_str().ok())
262            .map(ToString::to_string);
263
264        let last_modification = response
265            .headers()
266            .get(reqwest::header::LAST_MODIFIED)
267            .and_then(|s| s.to_str().ok())
268            .and_then(|s| OffsetDateTime::parse(s, &Rfc2822).ok());
269
270        Ok(FetchedRetrievedAdvisory {
271            data: data.freeze(),
272            sha256: sha256.map(|d| d.into()),
273            sha512: sha512.map(|d| d.into()),
274            metadata: RetrievalMetadata {
275                last_modification,
276                etag,
277            },
278        })
279    }
280}
281
282impl KeySource for HttpSource {
283    type Error = fetcher::Error;
284
285    async fn load_public_key(
286        &self,
287        key_source: Key<'_>,
288    ) -> Result<PublicKey, KeySourceError<Self::Error>> {
289        self.fetcher.load_public_key(key_source).await
290    }
291}