csaf_walker/source/
file.rs

1use crate::{
2    discover::DiscoveredAdvisory,
3    discover::DistributionContext,
4    model::{
5        metadata::{self, ProviderMetadata},
6        store::distribution_base,
7    },
8    retrieve::RetrievedAdvisory,
9    source::Source,
10    visitors::store::DIR_METADATA,
11};
12use anyhow::{Context, anyhow};
13use bytes::Bytes;
14use std::fs;
15use std::io::ErrorKind;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::SystemTime;
19use time::OffsetDateTime;
20use tokio::sync::mpsc;
21use url::Url;
22use walkdir::WalkDir;
23use walker_common::{
24    retrieve::RetrievalMetadata,
25    source::file::{read_sig_and_digests, to_path},
26    utils::{self, openpgp::PublicKey},
27    validate::source::{Key, KeySource, KeySourceError},
28};
29
30#[non_exhaustive]
31#[derive(Clone, Debug, Default, PartialEq, Eq)]
32pub struct FileOptions {
33    pub since: Option<SystemTime>,
34}
35
36impl FileOptions {
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    pub fn since(mut self, since: impl Into<Option<SystemTime>>) -> Self {
42        self.since = since.into();
43        self
44    }
45}
46
47/// A file based source, possibly created by the [`crate::visitors::store::StoreVisitor`].
48#[derive(Clone, Debug)]
49pub struct FileSource {
50    /// the path to the storage base, an absolute path
51    base: PathBuf,
52    options: FileOptions,
53}
54
55impl FileSource {
56    pub fn new(
57        base: impl AsRef<Path>,
58        options: impl Into<Option<FileOptions>>,
59    ) -> anyhow::Result<Self> {
60        Ok(Self {
61            base: fs::canonicalize(base)?,
62            options: options.into().unwrap_or_default(),
63        })
64    }
65
66    async fn scan_keys(&self) -> Result<Vec<metadata::Key>, anyhow::Error> {
67        let dir = self.base.join(DIR_METADATA).join("keys");
68
69        let mut result = Vec::new();
70
71        let mut entries = match tokio::fs::read_dir(&dir).await {
72            Err(err) if err.kind() == ErrorKind::NotFound => {
73                return Ok(result);
74            }
75            Err(err) => {
76                return Err(err)
77                    .with_context(|| format!("Failed scanning for keys: {}", dir.display()));
78            }
79            Ok(entries) => entries,
80        };
81
82        while let Some(entry) = entries.next_entry().await? {
83            let path = entry.path();
84            if !path.is_file() {
85                continue;
86            }
87
88            #[allow(clippy::single_match)]
89            match path
90                .file_name()
91                .and_then(|s| s.to_str())
92                .and_then(|s| s.rsplit_once('.'))
93            {
94                Some((name, "txt")) => result.push(metadata::Key {
95                    fingerprint: Some(name.to_string()),
96                    url: Url::from_file_path(&path).map_err(|()| {
97                        anyhow!("Failed to build file URL for: {}", path.display())
98                    })?,
99                }),
100                Some((_, _)) | None => {}
101            }
102        }
103
104        Ok(result)
105    }
106
107    /// walk a distribution directory
108    fn walk_distribution(
109        &self,
110        context: Arc<DistributionContext>,
111    ) -> Result<mpsc::Receiver<walkdir::Result<walkdir::DirEntry>>, anyhow::Error> {
112        let (tx, rx) = mpsc::channel(8);
113
114        let path = context
115            .url()
116            .clone()
117            .to_file_path()
118            .map_err(|()| anyhow!("Failed to convert into path: {:?}", &context.url()))?;
119
120        tokio::task::spawn_blocking(move || {
121            for entry in WalkDir::new(path).into_iter().filter_entry(|entry| {
122                // if it's a file but doesn't end with .json -> skip it
123                !entry.file_type().is_file()
124                    || entry.file_name().to_string_lossy().ends_with(".json")
125            }) {
126                if let Err(err) = tx.blocking_send(entry) {
127                    // channel closed, abort
128                    log::debug!("Send error: {err}");
129                    return;
130                }
131            }
132            log::debug!("Finished walking files");
133        });
134
135        Ok(rx)
136    }
137}
138
139impl walker_common::source::Source for FileSource {
140    type Error = anyhow::Error;
141    type Retrieved = RetrievedAdvisory;
142}
143
144impl Source for FileSource {
145    async fn load_metadata(&self) -> Result<ProviderMetadata, Self::Error> {
146        let metadata = self.base.join(DIR_METADATA).join("provider-metadata.json");
147        let file = fs::File::open(&metadata)
148            .with_context(|| format!("Failed to open file: {}", metadata.display()))?;
149
150        let mut metadata: ProviderMetadata =
151            serde_json::from_reader(&file).context("Failed to read stored provider metadata")?;
152
153        metadata.public_openpgp_keys = self.scan_keys().await?;
154
155        for dist in &mut metadata.distributions {
156            if let Some(directory_url) = &dist.directory_url {
157                let distribution_base = distribution_base(&self.base, directory_url.as_str());
158                let directory_url = Url::from_directory_path(&distribution_base).map_err(|()| {
159                    anyhow!(
160                        "Failed to convert directory into URL: {}",
161                        self.base.display(),
162                    )
163                })?;
164
165                dist.directory_url = Some(directory_url);
166            }
167
168            if let Some(rolie) = &mut dist.rolie {
169                for feed in &mut rolie.feeds {
170                    let distribution_base = distribution_base(&self.base, feed.url.as_str());
171                    let feed_url = Url::from_directory_path(&distribution_base).map_err(|()| {
172                        anyhow!(
173                            "Failed to convert directory into URL: {}",
174                            self.base.display(),
175                        )
176                    })?;
177                    feed.url = feed_url;
178                }
179            }
180        }
181
182        Ok(metadata)
183    }
184
185    async fn load_index(
186        &self,
187        context: DistributionContext,
188    ) -> Result<Vec<DiscoveredAdvisory>, Self::Error> {
189        log::info!("Loading index - since: {:?}", self.options.since);
190
191        let context = Arc::new(context);
192
193        let mut entries = self.walk_distribution(context.clone())?;
194        let mut result = vec![];
195
196        while let Some(entry) = entries.recv().await {
197            let entry = entry?;
198            let path = entry.path();
199            if !path.is_file() {
200                continue;
201            }
202            let name = match path.file_name().and_then(|s| s.to_str()) {
203                Some(name) => name,
204                None => continue,
205            };
206
207            if !name.ends_with(".json") {
208                continue;
209            }
210
211            if let Some(since) = self.options.since {
212                let modified = path.metadata()?.modified()?;
213                if modified < since {
214                    log::debug!("Skipping file due to modification constraint: {modified:?}");
215                    continue;
216                }
217            }
218
219            let url = Url::from_file_path(path)
220                .map_err(|()| anyhow!("Failed to convert to URL: {}", path.display()))?;
221
222            let modified = path.metadata()?.modified()?;
223
224            result.push(DiscoveredAdvisory {
225                url,
226                modified,
227                digest: None,
228                signature: None,
229                context: context.clone(),
230            })
231        }
232
233        Ok(result)
234    }
235
236    async fn load_advisory(
237        &self,
238        discovered: DiscoveredAdvisory,
239    ) -> Result<RetrievedAdvisory, Self::Error> {
240        let path = discovered
241            .url
242            .to_file_path()
243            .map_err(|()| anyhow!("Unable to convert URL into path: {}", discovered.url))?;
244
245        let data = Bytes::from(tokio::fs::read(&path).await?);
246
247        let (signature, sha256, sha512) = read_sig_and_digests(&path, &data).await?;
248
249        let last_modification = path
250            .metadata()
251            .ok()
252            .and_then(|md| md.modified().ok())
253            .map(OffsetDateTime::from);
254
255        let etag = fsquirrel::get(&path, walker_common::store::ATTR_ETAG)
256            .transpose()
257            .and_then(|r| r.ok())
258            .and_then(|s| String::from_utf8(s).ok());
259
260        Ok(RetrievedAdvisory {
261            discovered,
262            data,
263            signature,
264            sha256,
265            sha512,
266            metadata: RetrievalMetadata {
267                last_modification,
268                etag,
269            },
270        })
271    }
272}
273
274impl KeySource for FileSource {
275    type Error = anyhow::Error;
276
277    async fn load_public_key(
278        &self,
279        key: Key<'_>,
280    ) -> Result<PublicKey, KeySourceError<Self::Error>> {
281        let bytes = tokio::fs::read(to_path(key.url).map_err(KeySourceError::Source)?)
282            .await
283            .map_err(|err| KeySourceError::Source(err.into()))?;
284        utils::openpgp::validate_keys(bytes.into(), key.fingerprint)
285            .map_err(KeySourceError::OpenPgp)
286    }
287}