omicron_zone_package/
blob.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Tools for downloading blobs
6
7use anyhow::{anyhow, Context, Result};
8use camino::{Utf8Path, Utf8PathBuf};
9use chrono::{DateTime, FixedOffset, Utc};
10use futures_util::StreamExt;
11use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14use std::str::FromStr;
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
16
17use crate::progress::{NoProgress, Progress};
18
19// Path to the blob S3 Bucket.
20const S3_BUCKET: &str = "https://oxide-omicron-build.s3.amazonaws.com";
21// Name for the directory component where downloaded blobs are stored.
22pub(crate) const BLOB: &str = "blob";
23
24#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
25pub enum Source {
26    S3(Utf8PathBuf),
27    Buildomat(crate::package::PrebuiltBlob),
28}
29
30impl Source {
31    pub(crate) fn get_url(&self) -> String {
32        match self {
33            Self::S3(s) => format!("{}/{}", S3_BUCKET, s),
34            Self::Buildomat(spec) => {
35                format!(
36                    "https://buildomat.eng.oxide.computer/public/file/oxidecomputer/{}/{}/{}/{}",
37                    spec.repo, spec.series, spec.commit, spec.artifact
38                )
39            }
40        }
41    }
42
43    async fn download_required(
44        &self,
45        url: &str,
46        client: &reqwest::Client,
47        destination: &Utf8Path,
48    ) -> Result<bool> {
49        if !destination.exists() {
50            return Ok(true);
51        }
52
53        match self {
54            Self::S3(_) => {
55                // Issue a HEAD request to get the blob's size and last modified
56                // time. If these match what's on disk, assume the blob is
57                // current and don't re-download it.
58                let head_response = client
59                    .head(url)
60                    .send()
61                    .await?
62                    .error_for_status()
63                    .with_context(|| format!("HEAD failed for {}", url))?;
64                let headers = head_response.headers();
65                let content_length = headers
66                    .get(CONTENT_LENGTH)
67                    .ok_or_else(|| anyhow!("no content length on {} HEAD response!", url))?;
68                let content_length: u64 = u64::from_str(content_length.to_str()?)?;
69
70                // From S3, header looks like:
71                //
72                //    "Last-Modified: Fri, 27 May 2022 20:50:17 GMT"
73                let last_modified = headers
74                    .get(LAST_MODIFIED)
75                    .ok_or_else(|| anyhow!("no last modified on {} HEAD response!", url))?;
76                let last_modified: DateTime<FixedOffset> =
77                    chrono::DateTime::parse_from_rfc2822(last_modified.to_str()?)?;
78
79                let metadata = tokio::fs::metadata(&destination).await?;
80                let metadata_modified: DateTime<Utc> = metadata.modified()?.into();
81
82                Ok(metadata.len() != content_length || metadata_modified != last_modified)
83            }
84            Self::Buildomat(blob_spec) => {
85                let digest = get_sha256_digest(destination).await?;
86                let expected_digest = hex::decode(&blob_spec.sha256)?;
87                Ok(digest.as_ref() != expected_digest)
88            }
89        }
90    }
91}
92
93// Downloads "source" from S3_BUCKET to "destination".
94pub async fn download(
95    progress: &dyn Progress,
96    source: &Source,
97    destination: &Utf8Path,
98) -> Result<()> {
99    let blob = destination
100        .file_name()
101        .as_ref()
102        .ok_or_else(|| anyhow!("missing blob filename"))?
103        .to_string();
104
105    let url = source.get_url();
106    let client = reqwest::Client::new();
107    if !source.download_required(&url, &client, destination).await? {
108        return Ok(());
109    }
110
111    let response = client.get(url).send().await?.error_for_status()?;
112    let response_headers = response.headers();
113
114    // Grab update Content-Length from response headers, if present.
115    // We only use it as a hint for the progress so no need to fail.
116    let content_length = if let Some(Ok(Ok(resp_len))) = response_headers
117        .get(CONTENT_LENGTH)
118        .map(|c| c.to_str().map(u64::from_str))
119    {
120        Some(resp_len)
121    } else {
122        None
123    };
124
125    // If the server advertised a last-modified time for the blob, save it here
126    // so that the downloaded blob's last-modified time can be set to it.
127    let last_modified = if let Some(time) = response_headers.get(LAST_MODIFIED) {
128        Some(chrono::DateTime::parse_from_rfc2822(time.to_str()?)?)
129    } else {
130        None
131    };
132
133    // Write file bytes to destination
134    let mut file = tokio::fs::File::create(destination).await?;
135
136    // Create a sub-progress for the blob download
137    let blob_progress = if let Some(length) = content_length {
138        progress.sub_progress(length)
139    } else {
140        Box::new(NoProgress::new())
141    };
142    blob_progress.set_message(blob.into());
143
144    let mut stream = response.bytes_stream();
145    while let Some(chunk) = stream.next().await {
146        let chunk = chunk?;
147        file.write_all(&chunk).await?;
148        blob_progress.increment_completed(chunk.len() as u64);
149    }
150    drop(blob_progress);
151
152    // tokio performs async file I/O via thread pools in the background
153    // and so just completing the `write_all` futures and dropping the
154    // file here is not necessarily enough to ensure the blob has been
155    // written out to the filesystem. This unfortunately can cause a race
156    // condition as `tar-rs` will read the file metadata to write out the
157    // tar Header and then subsequently read the file itself to write to
158    // the archive. This can cause us to create a corrupted archive if the
159    // file content size does not match the header size from the metadata.
160    // All this to say we need to explicitly sync here before returning
161    // and trying to add the blob to the archive.
162    file.sync_all().await?;
163    drop(file);
164
165    // Set destination file's modified time based on HTTPS response
166    if let Some(last_modified) = last_modified {
167        filetime::set_file_mtime(
168            destination,
169            filetime::FileTime::from_system_time(last_modified.into()),
170        )?;
171    }
172
173    Ok(())
174}
175
176async fn get_sha256_digest(path: &Utf8Path) -> Result<[u8; 32]> {
177    let mut reader = BufReader::new(
178        tokio::fs::File::open(path)
179            .await
180            .with_context(|| format!("could not open {path:?}"))?,
181    );
182    let mut hasher = Sha256::new();
183    let mut buffer = [0; 1024];
184
185    loop {
186        let count = reader
187            .read(&mut buffer)
188            .await
189            .with_context(|| format!("failed to read {path:?}"))?;
190        if count == 0 {
191            break;
192        } else {
193            hasher.update(&buffer[..count]);
194        }
195    }
196    Ok(hasher.finalize().into())
197}
198
199#[test]
200fn test_converts() {
201    let content_length = "1966080";
202    let last_modified = "Fri, 30 Apr 2021 22:37:39 GMT";
203
204    let content_length: u64 = u64::from_str(content_length).unwrap();
205    assert_eq!(1966080, content_length);
206
207    let _last_modified: DateTime<FixedOffset> =
208        chrono::DateTime::parse_from_rfc2822(last_modified).unwrap();
209}