omicron_zone_package/
blob.rs1use anyhow::{anyhow, Context, Result};
8use camino::{Utf8Path, Utf8PathBuf};
9use chrono::{DateTime, FixedOffset, Utc};
10use futures_util::StreamExt;
11use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14use std::str::FromStr;
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
16
17use crate::progress::{NoProgress, Progress};
18
19const S3_BUCKET: &str = "https://oxide-omicron-build.s3.amazonaws.com";
21pub(crate) const BLOB: &str = "blob";
23
24#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
25pub enum Source {
26 S3(Utf8PathBuf),
27 Buildomat(crate::package::PrebuiltBlob),
28}
29
30impl Source {
31 pub(crate) fn get_url(&self) -> String {
32 match self {
33 Self::S3(s) => format!("{}/{}", S3_BUCKET, s),
34 Self::Buildomat(spec) => {
35 format!(
36 "https://buildomat.eng.oxide.computer/public/file/oxidecomputer/{}/{}/{}/{}",
37 spec.repo, spec.series, spec.commit, spec.artifact
38 )
39 }
40 }
41 }
42
43 async fn download_required(
44 &self,
45 url: &str,
46 client: &reqwest::Client,
47 destination: &Utf8Path,
48 ) -> Result<bool> {
49 if !destination.exists() {
50 return Ok(true);
51 }
52
53 match self {
54 Self::S3(_) => {
55 let head_response = client
59 .head(url)
60 .send()
61 .await?
62 .error_for_status()
63 .with_context(|| format!("HEAD failed for {}", url))?;
64 let headers = head_response.headers();
65 let content_length = headers
66 .get(CONTENT_LENGTH)
67 .ok_or_else(|| anyhow!("no content length on {} HEAD response!", url))?;
68 let content_length: u64 = u64::from_str(content_length.to_str()?)?;
69
70 let last_modified = headers
74 .get(LAST_MODIFIED)
75 .ok_or_else(|| anyhow!("no last modified on {} HEAD response!", url))?;
76 let last_modified: DateTime<FixedOffset> =
77 chrono::DateTime::parse_from_rfc2822(last_modified.to_str()?)?;
78
79 let metadata = tokio::fs::metadata(&destination).await?;
80 let metadata_modified: DateTime<Utc> = metadata.modified()?.into();
81
82 Ok(metadata.len() != content_length || metadata_modified != last_modified)
83 }
84 Self::Buildomat(blob_spec) => {
85 let digest = get_sha256_digest(destination).await?;
86 let expected_digest = hex::decode(&blob_spec.sha256)?;
87 Ok(digest.as_ref() != expected_digest)
88 }
89 }
90 }
91}
92
93pub async fn download(
95 progress: &dyn Progress,
96 source: &Source,
97 destination: &Utf8Path,
98) -> Result<()> {
99 let blob = destination
100 .file_name()
101 .as_ref()
102 .ok_or_else(|| anyhow!("missing blob filename"))?
103 .to_string();
104
105 let url = source.get_url();
106 let client = reqwest::Client::new();
107 if !source.download_required(&url, &client, destination).await? {
108 return Ok(());
109 }
110
111 let response = client.get(url).send().await?.error_for_status()?;
112 let response_headers = response.headers();
113
114 let content_length = if let Some(Ok(Ok(resp_len))) = response_headers
117 .get(CONTENT_LENGTH)
118 .map(|c| c.to_str().map(u64::from_str))
119 {
120 Some(resp_len)
121 } else {
122 None
123 };
124
125 let last_modified = if let Some(time) = response_headers.get(LAST_MODIFIED) {
128 Some(chrono::DateTime::parse_from_rfc2822(time.to_str()?)?)
129 } else {
130 None
131 };
132
133 let mut file = tokio::fs::File::create(destination).await?;
135
136 let blob_progress = if let Some(length) = content_length {
138 progress.sub_progress(length)
139 } else {
140 Box::new(NoProgress::new())
141 };
142 blob_progress.set_message(blob.into());
143
144 let mut stream = response.bytes_stream();
145 while let Some(chunk) = stream.next().await {
146 let chunk = chunk?;
147 file.write_all(&chunk).await?;
148 blob_progress.increment_completed(chunk.len() as u64);
149 }
150 drop(blob_progress);
151
152 file.sync_all().await?;
163 drop(file);
164
165 if let Some(last_modified) = last_modified {
167 filetime::set_file_mtime(
168 destination,
169 filetime::FileTime::from_system_time(last_modified.into()),
170 )?;
171 }
172
173 Ok(())
174}
175
176async fn get_sha256_digest(path: &Utf8Path) -> Result<[u8; 32]> {
177 let mut reader = BufReader::new(
178 tokio::fs::File::open(path)
179 .await
180 .with_context(|| format!("could not open {path:?}"))?,
181 );
182 let mut hasher = Sha256::new();
183 let mut buffer = [0; 1024];
184
185 loop {
186 let count = reader
187 .read(&mut buffer)
188 .await
189 .with_context(|| format!("failed to read {path:?}"))?;
190 if count == 0 {
191 break;
192 } else {
193 hasher.update(&buffer[..count]);
194 }
195 }
196 Ok(hasher.finalize().into())
197}
198
199#[test]
200fn test_converts() {
201 let content_length = "1966080";
202 let last_modified = "Fri, 30 Apr 2021 22:37:39 GMT";
203
204 let content_length: u64 = u64::from_str(content_length).unwrap();
205 assert_eq!(1966080, content_length);
206
207 let _last_modified: DateTime<FixedOffset> =
208 chrono::DateTime::parse_from_rfc2822(last_modified).unwrap();
209}