omicron_zone_package/
blob.rsuse anyhow::{anyhow, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, FixedOffset, Utc};
use futures_util::StreamExt;
use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
use ring::digest::{Context as DigestContext, Digest, SHA256};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use crate::progress::{NoProgress, Progress};
const S3_BUCKET: &str = "https://oxide-omicron-build.s3.amazonaws.com";
pub(crate) const BLOB: &str = "blob";
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Source {
S3(Utf8PathBuf),
Buildomat(crate::package::PrebuiltBlob),
}
impl Source {
pub(crate) fn get_url(&self) -> String {
match self {
Self::S3(s) => format!("{}/{}", S3_BUCKET, s),
Self::Buildomat(spec) => {
format!(
"https://buildomat.eng.oxide.computer/public/file/oxidecomputer/{}/{}/{}/{}",
spec.repo, spec.series, spec.commit, spec.artifact
)
}
}
}
async fn download_required(
&self,
url: &str,
client: &reqwest::Client,
destination: &Utf8Path,
) -> Result<bool> {
if !destination.exists() {
return Ok(true);
}
match self {
Self::S3(_) => {
let head_response = client
.head(url)
.send()
.await?
.error_for_status()
.with_context(|| format!("HEAD failed for {}", url))?;
let headers = head_response.headers();
let content_length = headers
.get(CONTENT_LENGTH)
.ok_or_else(|| anyhow!("no content length on {} HEAD response!", url))?;
let content_length: u64 = u64::from_str(content_length.to_str()?)?;
let last_modified = headers
.get(LAST_MODIFIED)
.ok_or_else(|| anyhow!("no last modified on {} HEAD response!", url))?;
let last_modified: DateTime<FixedOffset> =
chrono::DateTime::parse_from_rfc2822(last_modified.to_str()?)?;
let metadata = tokio::fs::metadata(&destination).await?;
let metadata_modified: DateTime<Utc> = metadata.modified()?.into();
Ok(metadata.len() != content_length || metadata_modified != last_modified)
}
Self::Buildomat(blob_spec) => {
let digest = get_sha256_digest(destination).await?;
let expected_digest = hex::decode(&blob_spec.sha256)?;
Ok(digest.as_ref() != expected_digest)
}
}
}
}
pub async fn download(
progress: &dyn Progress,
source: &Source,
destination: &Utf8Path,
) -> Result<()> {
let blob = destination
.file_name()
.as_ref()
.ok_or_else(|| anyhow!("missing blob filename"))?
.to_string();
let url = source.get_url();
let client = reqwest::Client::new();
if !source.download_required(&url, &client, destination).await? {
return Ok(());
}
let response = client.get(url).send().await?.error_for_status()?;
let response_headers = response.headers();
let content_length = if let Some(Ok(Ok(resp_len))) = response_headers
.get(CONTENT_LENGTH)
.map(|c| c.to_str().map(u64::from_str))
{
Some(resp_len)
} else {
None
};
let last_modified = if let Some(time) = response_headers.get(LAST_MODIFIED) {
Some(chrono::DateTime::parse_from_rfc2822(time.to_str()?)?)
} else {
None
};
let mut file = tokio::fs::File::create(destination).await?;
let blob_progress = if let Some(length) = content_length {
progress.sub_progress(length)
} else {
Box::new(NoProgress::new())
};
blob_progress.set_message(blob.into());
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
blob_progress.increment_completed(chunk.len() as u64);
}
drop(blob_progress);
file.sync_all().await?;
drop(file);
if let Some(last_modified) = last_modified {
filetime::set_file_mtime(
destination,
filetime::FileTime::from_system_time(last_modified.into()),
)?;
}
Ok(())
}
async fn get_sha256_digest(path: &Utf8Path) -> Result<Digest> {
let mut reader = BufReader::new(
tokio::fs::File::open(path)
.await
.with_context(|| format!("could not open {path:?}"))?,
);
let mut context = DigestContext::new(&SHA256);
let mut buffer = [0; 1024];
loop {
let count = reader
.read(&mut buffer)
.await
.with_context(|| format!("failed to read {path:?}"))?;
if count == 0 {
break;
} else {
context.update(&buffer[..count]);
}
}
Ok(context.finish())
}
#[test]
fn test_converts() {
let content_length = "1966080";
let last_modified = "Fri, 30 Apr 2021 22:37:39 GMT";
let content_length: u64 = u64::from_str(content_length).unwrap();
assert_eq!(1966080, content_length);
let _last_modified: DateTime<FixedOffset> =
chrono::DateTime::parse_from_rfc2822(last_modified).unwrap();
}