use anyhow::Context as _;
pub use camino::Utf8PathBuf as PathBuf;
use clap::Parser;
use rayon::prelude::*;
use reqwest::blocking::Client;
use std::time::Instant;
use tame_gcs::{self as gcs, http, objects::Metadata};
use symbolic_debuginfo::{
sourcebundle::SourceBundleWriter, Archive, FileFormat, Object, ObjectKind,
};
pub struct ObjectFile {
pub file: std::fs::File,
pub path: PathBuf,
pub map: memmap2::Mmap,
pub format: FileFormat,
}
pub fn gather_objects(dirs: Vec<PathBuf>) -> Vec<ObjectFile> {
dirs.into_iter()
.flat_map(walkdir::WalkDir::new)
.filter_map(|entry| {
entry
.ok()
.and_then(|entry| entry.file_type().is_file().then_some(entry))
})
.par_bridge()
.filter_map(|entry| {
let path = PathBuf::from_path_buf(entry.into_path()).ok()?;
let file = std::fs::File::open(&path).ok()?;
let map = unsafe { memmap2::Mmap::map(&file).ok()? };
let format = Archive::peek(&map);
if format != FileFormat::Unknown {
Some(ObjectFile {
file,
map,
path,
format,
})
} else {
None
}
})
.collect()
}
#[inline]
fn get_unified_id(obj: &Object<'_>) -> anyhow::Result<String> {
match obj.code_id() {
Some(code_id) if obj.file_format() != FileFormat::Pe => Ok(code_id.to_string()),
_ => {
let debug_id = obj.debug_id();
anyhow::ensure!(!debug_id.is_nil(), "unable to generate debug identifier");
Ok(debug_id.breakpad().to_string().to_lowercase())
}
}
}
#[inline]
fn create_source_bundle(name: &str, obj: &Object<'_>) -> anyhow::Result<Vec<u8>> {
let mut out = Vec::<u8>::new();
let writer = SourceBundleWriter::start(std::io::Cursor::new(&mut out))
.context("failed to create source bundle writer")?;
writer
.write_object(obj, name)
.context("failed to write source bundle")?;
Ok(out)
}
use std::time::Duration;
struct Ctx {
client: Client,
bucket: gcs::BucketName<'static>,
prefix: gcs::ObjectName<'static>,
gcs: gcs::objects::Object,
compression_level: i32,
bundle_sources: bool,
}
impl Ctx {
fn upload(&self, metadata: Metadata, content: Vec<u8>) -> anyhow::Result<()> {
let len = content.len() as u64;
let req = self.gcs.insert_multipart(
&self.bucket,
std::io::Cursor::new(content),
len,
&metadata,
None,
)?;
let (req, mut body) = req.into_parts();
let buf = Vec::with_capacity(len as usize + 1024);
let mut cursor = std::io::Cursor::new(buf);
std::io::copy(&mut body, &mut cursor)?;
let rb = self.client.request(req.method, req.uri.to_string());
let req = rb
.headers(req.headers)
.body(cursor.into_inner())
.build()
.context("failed to build request")?;
let res = {
let res = self.client.execute(req).context("failed to send request")?;
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
let headers = builder
.headers_mut()
.context("failed to convert response headers")?;
headers.extend(
res.headers()
.into_iter()
.map(|(k, v)| (k.clone(), v.clone())),
);
let body = res.bytes().context("failed to receive body")?;
builder.body(body)?
};
use gcs::ApiResponse;
if res.status().is_success() {
gcs::objects::InsertResponse::try_from_parts(res).context("API request failed")?;
} else {
match res
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|hv| hv.to_str().ok())
{
Some(ct) if ct.starts_with("text/plain") => {
anyhow::bail!(
"request failed: HTTP status: {} -> {}",
res.status(),
std::str::from_utf8(res.body()).unwrap_or("text/plain body was not utf8")
);
}
_ => {
gcs::objects::InsertResponse::try_from_parts(res)
.context("API request failed")?;
}
}
}
Ok(())
}
#[inline]
fn compress(&self, input: &[u8]) -> anyhow::Result<Vec<u8>> {
zstd::encode_all(input, self.compression_level).context("failed to compress")
}
#[inline]
fn get_gcs_path(&self, obj: &Object<'_>) -> anyhow::Result<(String, PathBuf)> {
let id = get_unified_id(obj)?;
#[allow(clippy::wildcard_enum_match_arm)]
let suffix = match obj.kind() {
ObjectKind::Debug => "debuginfo",
ObjectKind::Sources if obj.file_format() == FileFormat::SourceBundle => "sourcebundle",
ObjectKind::Relocatable | ObjectKind::Library | ObjectKind::Executable => "executable",
_ => anyhow::bail!("unsupported file"),
};
let path = format!("{}/{}/{}/{suffix}", self.prefix, &id[..2], &id[2..]).into();
Ok((id, path))
}
#[inline]
fn compress_and_upload(&self, obj: &Object<'_>) -> anyhow::Result<ObjectStat> {
let (id, path) = self.get_gcs_path(obj)?;
let (compressed_blob, compression_time) = {
let start = Instant::now();
let compressed = self.compress(obj.data())?;
(compressed, start.elapsed())
};
let compressed_size = compressed_blob.len() as u64;
let upload_time = {
let start = Instant::now();
let md = Metadata {
name: Some(path.into_string()),
content_encoding: Some("zstd".to_owned()),
content_type: Some("application/octet-stream".to_owned()),
..Default::default()
};
self.upload(md, compressed_blob)?;
start.elapsed()
};
Ok(ObjectStat {
id,
kind: obj.kind(),
size: obj.data().len() as u64,
compressed_size,
compression_time,
upload_time,
gather_time: None,
})
}
}
pub struct ObjectStat {
pub id: String,
pub kind: ObjectKind,
pub size: u64,
pub compressed_size: u64,
pub compression_time: Duration,
pub upload_time: Duration,
pub gather_time: Option<Duration>,
}
fn process_archive(
archive: &Archive<'_>,
name: &str,
ctx: &Ctx,
) -> Vec<anyhow::Result<ObjectStat>> {
let stats: Vec<_> = archive
.objects()
.par_bridge()
.map(|obj| {
let obj = obj.context("failed to parse object")?;
let mut obj_stat: Option<anyhow::Result<ObjectStat>> = None;
let mut sb_stat: Option<anyhow::Result<ObjectStat>> = None;
rayon::scope(|s| {
s.spawn(|_s| {
obj_stat = Some(ctx.compress_and_upload(&obj));
});
s.spawn(|_s| {
let upload_metadata = || -> anyhow::Result<()> {
let (_id, mut path) = ctx.get_gcs_path(&obj)?;
path.set_file_name("meta");
let json = serde_json::json!({
"name": name,
"arch": obj.arch(),
"file_format": obj.file_format(),
})
.to_string()
.into_bytes();
let md = Metadata {
name: Some(path.into_string()),
content_type: Some("application/json".to_owned()),
..Default::default()
};
ctx.upload(md, json)
};
let _res = upload_metadata();
});
if ctx.bundle_sources && obj.has_debug_info() && !obj.has_sources() {
s.spawn(|_s| {
let create_and_upload = || {
let (sb, gather_time) = {
let start = std::time::Instant::now();
let sb = create_source_bundle(name, &obj)?;
(sb, start.elapsed())
};
let sb_obj = Object::parse(&sb)?;
anyhow::ensure!(
sb_obj.file_format() == FileFormat::SourceBundle,
"expected SourceBundle but found {}",
sb_obj.file_format()
);
let mut sb_stat = ctx.compress_and_upload(&sb_obj)?;
sb_stat.gather_time = Some(gather_time);
Ok(sb_stat)
};
sb_stat = Some(create_and_upload());
});
}
});
let mut v = Vec::with_capacity(2);
v.extend(obj_stat);
v.extend(sb_stat);
Ok(v)
})
.collect();
stats
.into_iter()
.flat_map(|res| match res {
Ok(v) => v,
Err(err) => vec![Err(err)],
})
.collect()
}
pub struct FileStat {
pub path: PathBuf,
pub format: FileFormat,
pub objects: anyhow::Result<Vec<anyhow::Result<ObjectStat>>>,
}
pub fn upload(
client: Client,
bucket: String,
mut path: String,
compression_level: i32,
bundle_sources: bool,
objects: Vec<ObjectFile>,
) -> anyhow::Result<Vec<FileStat>> {
while path.ends_with('/') {
path.pop();
}
let bucket: gcs::BucketName<'static> = bucket.try_into().context("invalid gcs bucket name")?;
let prefix: gcs::ObjectName<'static> = path.try_into().context("invalid gcs path")?;
let ctx = Ctx {
client,
bucket,
prefix,
compression_level,
bundle_sources,
gcs: gcs::objects::Object::default(),
};
Ok(objects
.into_par_iter()
.map(|file| {
let process = || -> anyhow::Result<Vec<anyhow::Result<ObjectStat>>> {
let archive = Archive::parse(&file.map)
.with_context(|| format!("failed to parse {}", file.path))?;
Ok(process_archive(
&archive,
file.path.file_stem().context("no file stem for path")?,
&ctx,
))
};
let objects = process();
FileStat {
path: file.path,
format: file.format,
objects,
}
})
.collect())
}
fn level_in_range(s: &str) -> Result<i32, String> {
let range = zstd::compression_level_range();
let level: i32 = s
.parse()
.map_err(|err| format!("`{s}` isn't a valid integer {err}"))?;
if range.contains(&level) {
Ok(level)
} else {
Err(format!(
"Compression level not in range {}-{}",
range.start(),
range.end()
))
}
}
#[derive(Parser)]
pub struct Args {
#[arg(long, env = "SYMS_BUCKET")]
bucket: String,
#[arg(long, env = "SYMS_PATH")]
path: String,
#[arg(long)]
bundle_sources: bool,
#[arg(long, short, default_value = "5", value_parser = level_in_range)]
compression_level: i32,
#[arg(long)]
strict: bool,
dirs: Vec<PathBuf>,
}
impl crate::Scopes for Args {
fn scopes(&self) -> &'static [&'static str] {
&["https://www.googleapis.com/auth/devstorage.full_control"]
}
}
pub async fn run(args: Args, client: reqwest::blocking::Client) -> anyhow::Result<()> {
let objects = gather_objects(args.dirs);
anyhow::ensure!(
!objects.is_empty(),
"no valid objects were found in the specified directories"
);
let stats = upload(
client,
args.bucket,
args.path,
args.compression_level,
args.bundle_sources,
objects,
)?;
use nu_ansi_term::{Color, Style};
let mut failures = 0;
let mut successes = 0;
for fstat in stats {
match fstat.objects {
Ok(ostats) => {
println!(
"{} {} {}",
Color::Green.paint("OK"),
Style::default()
.dimmed()
.paint(fstat.path.file_name().unwrap_or_default()),
Style::default().dimmed().paint(fstat.format.to_string()),
);
for ostat in ostats {
match ostat {
Ok(ostat) => {
fn bytes_to_human(bytes: u64) -> String {
let mut bytes = bytes as f64;
for unit in ["B", "KB", "MB", "GB", "TB"] {
if bytes > 1024.0 {
bytes /= 1024.0;
} else {
return format!("{bytes:.1}{unit}");
}
}
unreachable!("if we have more than a TB something is wrong");
}
println!(
" {} {} {}",
Color::Green.paint("OK"),
Style::default().dimmed().paint(ostat.id),
Style::default().dimmed().paint(ostat.kind.to_string()),
);
if let Some(gt) = ostat.gather_time {
println!(" source gather: {:?}", gt);
}
println!(
" compression: {} -> {} {}% ({:?})\n upload: {:?}",
Style::default().dimmed().paint(bytes_to_human(ostat.size)),
Style::default()
.dimmed()
.paint(bytes_to_human(ostat.compressed_size)),
Style::default().bold().paint(
((ostat.compressed_size as f64 / ostat.size as f64 * 100f64)
as u32)
.to_string()
),
ostat.compression_time,
ostat.upload_time
);
successes += 1;
}
Err(err) => {
println!(" {} {err:#}", Color::Red.paint("ERR"));
failures += 1;
}
}
}
}
Err(err) => {
println!(
"{} {} {}\n {}",
Color::Red.paint("ERR"),
Style::default()
.dimmed()
.paint(fstat.path.file_name().unwrap_or_default()),
Style::default().dimmed().paint(fstat.format.to_string()),
Color::Red.paint(err.to_string()),
);
failures += 1;
}
}
}
if failures > 0 && args.strict {
anyhow::bail!("detected {failures} failures");
}
if successes == 0 {
anyhow::bail!("no debug objects were successfuly parsed and uploaded");
}
Ok(())
}