use anyhow::{anyhow, bail, Context, Result};
use reqwest::{blocking, StatusCode, Url};
use serde::Deserialize;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::thread::sleep;
use std::time::Duration;
use crate::cmdline::*;
use crate::osmet::*;
use crate::util::set_die_on_sigpipe;
const HTTP_COMPLETION_TIMEOUT: Duration = Duration::from_secs(4 * 60 * 60);
const DEFAULT_STREAM_BASE_URL: &str = "https://builds.coreos.fedoraproject.org/streams/";
const OSMET_FILES_DIR: &str = "/run/coreos-installer/osmet";
pub trait ImageLocation: Display {
fn sources(&self) -> Result<Vec<ImageSource>>;
fn require_signature(&self) -> bool {
true
}
}
#[derive(Debug)]
pub struct FileLocation {
image_path: String,
sig_path: String,
}
pub struct OsmetLocation {
osmet_path: PathBuf,
architecture: String,
sector_size: u32,
description: String,
}
#[derive(Debug)]
pub struct UrlLocation {
image_url: Url,
sig_url: Url,
artifact_type: String,
retries: FetchRetries,
}
#[derive(Debug)]
pub struct StreamLocation {
stream_base_url: Option<Url>,
stream: String,
stream_url: Url,
architecture: String,
platform: String,
format: String,
retries: FetchRetries,
}
pub struct ImageSource {
pub reader: Box<dyn Read>,
pub length_hint: Option<u64>,
pub signature: Option<Vec<u8>>,
pub filename: String,
pub artifact_type: String,
}
impl FileLocation {
pub fn new(path: &str) -> Self {
Self {
image_path: path.to_string(),
sig_path: format!("{path}.sig"),
}
}
}
impl Display for FileLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> ::std::fmt::Result {
write!(
f,
"Copying image from {}\nReading signature from {}",
self.image_path, self.sig_path
)
}
}
impl ImageLocation for FileLocation {
fn sources(&self) -> Result<Vec<ImageSource>> {
let mut out = OpenOptions::new()
.read(true)
.open(&self.image_path)
.context("opening source image file")?;
let length = out
.seek(SeekFrom::End(0))
.context("seeking source image file")?;
out.rewind().context("seeking source image file")?;
let signature = match OpenOptions::new().read(true).open(&self.sig_path) {
Ok(mut file) => {
let mut sig_vec = Vec::new();
file.read_to_end(&mut sig_vec)
.context("reading signature file")?;
Some(sig_vec)
}
Err(err) => {
eprintln!("Couldn't read signature file: {err}");
None
}
};
let filename = Path::new(&self.image_path)
.file_name()
.context("extracting filename")?
.to_string_lossy()
.to_string();
Ok(vec![ImageSource {
reader: Box::new(out),
length_hint: Some(length),
signature,
filename,
artifact_type: "disk".to_string(),
}])
}
}
impl UrlLocation {
pub fn new(url: &Url, retries: FetchRetries) -> Self {
let mut sig_url = url.clone();
sig_url.set_path(&format!("{}.sig", sig_url.path()));
Self::new_full(url, &sig_url, "disk", retries)
}
fn new_full(url: &Url, sig_url: &Url, artifact_type: &str, retries: FetchRetries) -> Self {
Self {
image_url: url.clone(),
sig_url: sig_url.clone(),
artifact_type: artifact_type.to_string(),
retries,
}
}
fn fetch_signature(&self) -> Result<Vec<u8>> {
let client = new_http_client()?;
let mut resp =
http_get(client, &self.sig_url, self.retries).context("fetching signature URL")?;
let mut sig_bytes = Vec::new();
resp.read_to_end(&mut sig_bytes)
.context("reading signature content")?;
Ok(sig_bytes)
}
}
impl Display for UrlLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> ::std::fmt::Result {
write!(
f,
"Downloading image from {}\nDownloading signature from {}",
self.image_url, self.sig_url
)
}
}
impl ImageLocation for UrlLocation {
fn sources(&self) -> Result<Vec<ImageSource>> {
let signature = self
.fetch_signature()
.map_err(|e| eprintln!("Failed to fetch signature: {e}"))
.ok();
let client = new_http_client()?;
let resp = http_get(client, &self.image_url, self.retries).context("fetching image URL")?;
match resp.status() {
StatusCode::OK => (),
s => bail!("image fetch failed: {}", s),
};
let length_hint = resp.content_length();
let filename = resp
.url()
.path_segments()
.context("splitting image URL")?
.next_back()
.context("walking image URL")?
.to_string();
Ok(vec![ImageSource {
reader: Box::new(resp),
length_hint,
signature,
filename,
artifact_type: self.artifact_type.clone(),
}])
}
}
impl StreamLocation {
pub fn new(
stream: &str,
architecture: &str,
platform: &str,
format: &str,
base_url: Option<&Url>,
retries: FetchRetries,
) -> Result<Self> {
Ok(Self {
stream_base_url: base_url.cloned(),
stream: stream.to_string(),
stream_url: build_stream_url(stream, base_url)?,
architecture: architecture.to_string(),
platform: platform.to_string(),
format: format.to_string(),
retries,
})
}
}
impl Display for StreamLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> ::std::fmt::Result {
if self.stream_base_url.is_some() {
write!(
f,
"Downloading {} {} image ({}) and signature referenced from {}",
self.architecture, self.platform, self.format, self.stream_url
)
} else {
write!(
f,
"Downloading Fedora CoreOS {} {} {} image ({}) and signature",
self.stream, self.architecture, self.platform, self.format
)
}
}
}
impl ImageLocation for StreamLocation {
fn sources(&self) -> Result<Vec<ImageSource>> {
let client = new_http_client()?;
let stream = fetch_stream(client, &self.stream_url, self.retries)?;
let artifacts = stream
.architectures
.get(&self.architecture)
.map(|arch| arch.artifacts.get(&self.platform))
.unwrap_or(None)
.map(|platform| platform.formats.get(&self.format))
.unwrap_or(None)
.with_context(|| {
format!(
"couldn't find architecture {}, platform {}, format {} in stream metadata",
self.architecture, self.platform, self.format
)
})?;
let mut sources: Vec<ImageSource> = Vec::new();
for (artifact_type, artifact) in artifacts.iter() {
let artifact_url = Url::parse(&artifact.location)
.context("parsing artifact URL from stream metadata")?;
let signature_url = Url::parse(&artifact.signature)
.context("parsing signature URL from stream metadata")?;
let mut artifact_sources =
UrlLocation::new_full(&artifact_url, &signature_url, artifact_type, self.retries)
.sources()?;
sources.append(&mut artifact_sources);
}
sources.sort_by_key(|k| k.artifact_type.to_string());
Ok(sources)
}
}
impl OsmetLocation {
pub fn new(architecture: &str, sector_size: u32) -> Result<Option<Self>> {
let osmet_dir = Path::new(OSMET_FILES_DIR);
if !osmet_dir.exists() {
return Ok(None);
}
if let Some((osmet_path, description)) =
find_matching_osmet_in_dir(osmet_dir, architecture, sector_size)?
{
Ok(Some(Self {
osmet_path,
architecture: architecture.into(),
sector_size,
description,
}))
} else {
Ok(None)
}
}
}
impl Display for OsmetLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> ::std::fmt::Result {
write!(
f,
"Installing {} {} ({}-byte sectors)",
self.description, self.architecture, self.sector_size
)
}
}
impl ImageLocation for OsmetLocation {
fn sources(&self) -> Result<Vec<ImageSource>> {
let unpacker = OsmetUnpacker::new_from_sysroot(Path::new(&self.osmet_path))?;
let filename = {
let stem = self.osmet_path.file_stem().with_context(|| {
format!(
"can't create new .raw filename from osmet path {:?}",
&self.osmet_path
)
})?;
let mut filename: String = stem
.to_str()
.with_context(|| format!("non-UTF-8 osmet file stem: {stem:?}"))?
.into();
filename.push_str(".raw");
filename
};
let length = unpacker.length();
Ok(vec![ImageSource {
reader: Box::new(unpacker),
length_hint: Some(length),
signature: None,
filename,
artifact_type: "disk".to_string(),
}])
}
fn require_signature(&self) -> bool {
false
}
}
pub fn list_stream(config: ListStreamConfig) -> Result<()> {
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct Row<'a> {
architecture: &'a str,
platform: &'a str,
format: &'a str,
}
let client = new_http_client()?;
let stream_url = build_stream_url(&config.stream, config.stream_base_url.as_ref())?;
let stream = fetch_stream(client, &stream_url, FetchRetries::None)?;
let mut rows: Vec<Row> = Vec::new();
for (architecture_name, architecture) in stream.architectures.iter() {
for (platform_name, platform) in architecture.artifacts.iter() {
for format_name in platform.formats.keys() {
rows.push(Row {
architecture: architecture_name,
platform: platform_name,
format: format_name,
});
}
}
}
rows.sort();
rows.insert(
0,
Row {
architecture: "Architecture",
platform: "Platform",
format: "Format",
},
);
let mut widths: [usize; 2] = [0; 2];
for row in &rows {
widths[0] = widths[0].max(row.architecture.len());
widths[1] = widths[1].max(row.platform.len());
}
set_die_on_sigpipe()?;
for row in &rows {
println!(
"{:3$} {:4$} {}",
row.architecture, row.platform, row.format, widths[0], widths[1]
);
}
Ok(())
}
fn build_stream_url(stream: &str, base_url: Option<&Url>) -> Result<Url> {
base_url
.unwrap_or(&Url::parse(DEFAULT_STREAM_BASE_URL).unwrap())
.join(&format!("{stream}.json"))
.context("building stream URL")
}
fn fetch_stream(client: blocking::Client, url: &Url, retries: FetchRetries) -> Result<Stream> {
let resp = http_get(client, url, retries).context("fetching stream metadata")?;
match resp.status() {
StatusCode::OK => (),
s => bail!("stream metadata fetch from {} failed: {}", url, s),
};
let stream: Stream = serde_json::from_reader(resp).context("decoding stream metadata")?;
Ok(stream)
}
pub fn new_http_client() -> Result<blocking::Client> {
blocking::ClientBuilder::new()
.timeout(HTTP_COMPLETION_TIMEOUT)
.build()
.context("building HTTP client")
}
pub fn http_get(
client: blocking::Client,
url: &Url,
retries: FetchRetries,
) -> Result<blocking::Response> {
const RETRY_STATUS_CODES: [u16; 6] = [408, 429, 500, 502, 503, 504];
let mut delay = 1;
let (infinite, mut tries) = match retries {
FetchRetries::Infinite => (true, 0),
FetchRetries::Finite(n) => (false, n.get() + 1),
FetchRetries::None => (false, 1),
};
loop {
let err: anyhow::Error = match client.get(url.clone()).send() {
Err(err) => err.into(),
Ok(resp) => match resp.status().as_u16() {
code if RETRY_STATUS_CODES.contains(&code) => anyhow!(
"HTTP {} {}",
code,
resp.status().canonical_reason().unwrap_or("")
),
_ => {
return resp
.error_for_status()
.with_context(|| format!("fetching '{url}'"));
}
},
};
if !infinite {
tries -= 1;
if tries == 0 {
return Err(err).with_context(|| format!("fetching '{url}'"));
}
}
eprintln!("Error fetching '{url}': {err}");
eprintln!("Sleeping {delay}s and retrying...");
sleep(Duration::from_secs(delay));
delay = std::cmp::min(delay * 2, 10 * 60); }
}
#[derive(Debug, Deserialize)]
struct Stream {
architectures: HashMap<String, Arch>,
}
#[derive(Debug, Deserialize)]
struct Arch {
artifacts: HashMap<String, Platform>,
}
#[derive(Debug, Deserialize)]
struct Platform {
formats: HashMap<String, HashMap<String, Artifact>>,
}
#[derive(Debug, Deserialize)]
struct Artifact {
location: String,
signature: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_http_client() {
let _ = new_http_client().unwrap();
}
}