use std::ffi::OsStr;
use std::fs;
use std::io::{self, BufRead, BufReader, Write, stdout};
use std::process::exit;
use std::str::FromStr;
use bmputil::bmp::{BmpDevice, BmpMatcher, FirmwareType};
use bmputil::metadata::download_metadata;
#[cfg(windows)]
use bmputil::windows;
use bmputil::{AllowDangerous, BmpParams, FlashParams};
use clap::builder::TypedValueParser;
use clap::builder::styling::Styles;
use clap::{Arg, ArgAction, Args, Command, CommandFactory, Parser, Subcommand, crate_description, crate_version};
use clap_complete::{Shell, generate};
use color_eyre::config::HookBuilder;
use color_eyre::eyre::{Context, EyreHandler, InstallError, OptionExt, Result, eyre};
use directories::ProjectDirs;
use log::{debug, error, info, warn};
use owo_colors::OwoColorize;
#[derive(Parser)]
#[command(
version,
about = format!("{} v{}", crate_description!(), crate_version!()),
styles(style()),
disable_colored_help(false),
arg_required_else_help(true)
)]
struct CliArguments
{
#[arg(global = true, short = 's', long = "serial", alias = "serial-number")]
serial_number: Option<String>,
#[arg(global = true, long = "index", value_parser = usize::from_str)]
index: Option<usize>,
#[arg(global = true, short = 'p', long = "port")]
port: Option<String>,
#[cfg(windows)]
#[arg(global = true, long = "windows-wdi-install-mode", value_parser = u32::from_str, hide = true)]
windows_wdi_install_mode: Option<u32>,
#[command(subcommand)]
pub subcommand: ToplevelCommmands,
}
#[derive(Subcommand)]
enum ToplevelCommmands
{
Probe(ProbeArguments),
Logs(LogsArguments),
#[command(subcommand)]
Target(TargetCommmands),
Server,
Debug,
Complete(CompletionArguments),
}
#[derive(Args)]
struct ProbeArguments
{
#[arg(global = true, long = "allow-dangerous-options", hide = true, default_value_t = AllowDangerous::Never)]
#[arg(value_enum)]
allow_dangerous_options: AllowDangerous,
#[command(subcommand)]
subcommand: ProbeCommmands,
}
#[derive(Args)]
struct LogsArguments
{
#[arg(long = "defmt")]
defmt: Option<String>,
}
#[derive(Subcommand)]
#[command(arg_required_else_help(true))]
enum TargetCommmands
{
Power,
}
#[derive(Subcommand)]
#[command(arg_required_else_help(true))]
enum ProbeCommmands
{
Info(InfoArguments),
Update(UpdateArguments),
Switch(SwitchArguments),
Reboot(RebootArguments),
#[cfg(windows)]
InstallDrivers(DriversArguments),
}
#[derive(Args)]
struct InfoArguments
{
#[arg(long = "list-targets", default_value_t = false)]
list_targets: bool,
}
#[derive(Args)]
struct UpdateArguments
{
firmware_binary: Option<String>,
#[arg(long = "override-firmware-type", hide_short_help = true, value_enum)]
override_firmware_type: Option<FirmwareType>,
#[arg(long = "force-override-flash", hide = true, default_value_t = false, value_parser = ConfirmedBoolParser {})]
#[arg(action = ArgAction::Set)]
force_override_flash: bool,
#[arg(short = 'f', long = "force", default_value_t = false)]
force: bool,
#[arg(long = "use-rc", default_value_t = false)]
use_rc: bool,
#[command(subcommand)]
subcommand: Option<UpdateCommands>,
}
#[derive(Subcommand)]
enum UpdateCommands
{
List,
}
#[derive(Args)]
struct SwitchArguments
{
#[arg(long = "override-firmware-type", hide_short_help = true, value_enum)]
override_firmware_type: Option<FirmwareType>,
#[arg(long = "force-override-flash", hide = true, default_value_t = false, value_parser = ConfirmedBoolParser {})]
#[arg(action = ArgAction::Set)]
force_override_flash: bool,
}
#[derive(Args)]
#[group(multiple = false)]
struct RebootArguments
{
#[arg(long = "dfu", default_value_t = false)]
dfu: bool,
#[arg(long = "repeat", default_value_t = false)]
repeat: bool,
}
#[cfg(windows)]
#[derive(Args)]
struct DriversArguments
{
#[arg(long = "force", default_value_t = false)]
force: bool,
}
#[derive(Args)]
struct CompletionArguments
{
shell: Shell,
}
impl BmpParams for CliArguments
{
fn index(&self) -> Option<usize>
{
self.index
}
fn serial_number(&self) -> Option<&str>
{
self.serial_number.as_deref()
}
}
impl FlashParams for CliArguments
{
fn allow_dangerous_options(&self) -> AllowDangerous
{
match &self.subcommand {
ToplevelCommmands::Probe(probe_args) => probe_args.allow_dangerous_options,
_ => AllowDangerous::Never,
}
}
fn override_firmware_type(&self) -> Option<FirmwareType>
{
match &self.subcommand {
ToplevelCommmands::Probe(probe_args) => match &probe_args.subcommand {
ProbeCommmands::Update(flash_args) => flash_args.override_firmware_type,
ProbeCommmands::Switch(switch_args) => switch_args.override_firmware_type,
_ => None,
},
_ => None,
}
}
}
#[derive(Clone)]
struct ConfirmedBoolParser {}
impl TypedValueParser for ConfirmedBoolParser
{
type Value = bool;
fn parse_ref(&self, cmd: &Command, _arg: Option<&Arg>, value: &OsStr) -> Result<Self::Value, clap::Error>
{
let value = value
.to_str()
.ok_or_else(|| clap::Error::new(clap::error::ErrorKind::InvalidUtf8).with_cmd(cmd))?;
Ok(value == "really")
}
}
fn reboot_command(cli_args: &CliArguments, reboot_args: &RebootArguments) -> Result<()>
{
let matcher = BmpMatcher::from_params(cli_args);
let mut results = matcher.find_matching_probes();
let mut dev = results.pop_single("detach").map_err(|_| exit(1))?;
use bmputil::usb::DfuOperatingMode::*;
if reboot_args.dfu {
return match dev.operating_mode() {
Runtime => {
println!("Rebooting probe into bootloader...");
dev.detach_and_destroy().wrap_err("detaching device")
},
FirmwareUpgrade => {
println!("Probe already in bootloader, nothing to do.");
Ok(())
},
};
}
if reboot_args.repeat {
println!("Switching probe between bootloader and firmware...");
return dev.detach_and_destroy().wrap_err("detaching device");
}
match dev.operating_mode() {
Runtime => {
println!("Rebooting probe...");
dev.detach_and_enumerate().wrap_err("detaching device")?;
},
FirmwareUpgrade => println!("Rebooting probe into firmware..."),
}
dev.detach_and_destroy().wrap_err("detaching device")
}
fn update_probe(cli_args: &CliArguments, flash_args: &UpdateArguments, paths: &ProjectDirs) -> Result<()>
{
use bmputil::switcher::{download_firmware, pick_firmware};
let matcher = BmpMatcher::from_params(cli_args);
let mut results = matcher.find_matching_probes();
let probe = results.pop_single("flash").map_err(|_| exit(1))?;
let file_name = match &flash_args.firmware_binary {
Some(file_path) => file_path.into(),
None => {
let identity = &probe.firmware_identity()?;
let cache = paths.cache_dir();
let metadata = download_metadata(cache)?;
let (latest_version, latest_release) = metadata
.latest(flash_args.use_rc)
.ok_or_eyre("Could not determine the latest release of the firmware")?;
let latest_firmware = latest_release.firmware.get(
&identity
.variant()
.ok_or_eyre("Device appears to be in bootloader, so cannot determine probe type")?,
);
let latest_firmware = if let Some(firmware) = latest_firmware {
firmware
} else {
error!("Cannot find suitable firmware for your probe from the pre-built releases");
return Ok(());
};
if identity.version >= latest_version && !flash_args.force {
info!(
"Latest release {} is not newer than firmware version {}, not updating",
latest_version, identity.version
);
return Ok(());
}
let latest_version_str = latest_version.to_string();
if identity.version < latest_version {
info!("Upgrading probe firmware from {} to {}", identity.version, latest_version_str);
} else if flash_args.force {
warn!(
"Forcibly downgrading firmware from {} to {}",
identity.version, latest_version_str
);
}
let firmware_variant = match latest_firmware.variants.len() {
1 => latest_firmware.variants.values().next().unwrap(),
_ => match pick_firmware(latest_version_str.as_str(), latest_firmware)? {
Some(variant) => variant,
None => {
println!("firmware variant selection cancelled, stopping operation");
return Ok(());
},
},
};
download_firmware(firmware_variant, paths.cache_dir())?
},
};
bmputil::flasher::flash_probe(cli_args, probe, file_name)
}
fn display_releases(paths: &ProjectDirs) -> Result<()>
{
let cache = paths.cache_dir();
let metadata = download_metadata(cache)?;
for (version, release) in metadata.releases {
info!("Details of release {version}:");
info!("-> Release includes BMDA builds? {}", release.includes_bmda);
info!(
"-> Release done for probes: {}",
release
.firmware
.keys()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", ")
);
for (probe, firmware) in release.firmware {
info!(
"-> probe {} has {} firmware variants",
probe.to_string(),
firmware.variants.len()
);
for (variant, download) in firmware.variants {
info!(" -> Firmware variant {}", variant);
info!(
" -> {} will be downloaded as {}",
download.friendly_name,
download.file_name.display()
);
info!(" -> Variant will be downloaded from {}", download.uri);
}
}
if let Some(bmda) = release.bmda {
info!("-> Release contains BMDA for {} OSes", bmda.len());
for (os, bmda_arch) in bmda {
info!(
" -> {} release is for {} architectures",
os.to_string(),
bmda_arch.binaries.len()
);
for (arch, binary) in bmda_arch.binaries {
info!(" -> BMDA binary for {}", arch.to_string());
info!(" -> Name of executable in archive: {}", binary.file_name.display());
info!(" -> Archive will be downloaded from {}", binary.uri);
}
}
}
}
Ok(())
}
fn serial_logging(cli_args: &CliArguments, logs_args: &LogsArguments) -> Result<()>
{
let matcher = BmpMatcher::from_params(cli_args);
let mut results = matcher.find_matching_probes();
let device = results.pop_single("logging").map_err(|_| exit(1))?;
let aux_port = device.aux_serial_interface()?;
println!("Found probe auxilliary serial port: {}", aux_port.serial_port().display());
let serial_port = aux_port.open(115_200)?;
let mut reader = BufReader::new(serial_port);
let mut delimiter = b'\n';
let mut defmt_table_locs = if let Some(binary_path) = &logs_args.defmt {
let buffer: Vec<u8> = fs::read(binary_path)?;
match defmt_decoder::Table::parse(&buffer).map_err(|_| eyre!("failed to decode defmt table from ELF"))? {
Some(table) => {
let locs = table
.get_locations(&buffer)
.map_err(|_| eyre!("failed to load location table from ELF"))?;
let locs = if !table.is_empty() && locs.is_empty() {
warn!("Insufficient DWARF info; compile your program with `debug = 2` to enable location info.");
None
} else if table.indices().all(|idx| locs.contains_key(&(idx as u64))) {
Some(locs)
} else {
warn!("Location info is incomplete; it will be omitted from the output.");
None
};
delimiter = 0b0;
println!("Decoding logs as defmt using supplied ELF binary");
Some((table, locs))
},
None => {
warn!("No Defmt table found in provide binary, logs lines will not be decoded.");
None
},
}
} else {
None
};
let mut buffer = Vec::new();
loop {
buffer.clear();
match reader.read_until(delimiter, &mut buffer) {
Ok(read_bytes) => {
if let Some((table, _locs)) = &mut defmt_table_locs {
let mut stream_decoder = table.new_stream_decoder();
stream_decoder.received(&buffer[..read_bytes]);
match stream_decoder.decode() {
Ok(frame) => {
io::stdout()
.write_all(frame.display_message().to_string().as_bytes())
.unwrap();
io::stdout().write_all("\r\n".as_bytes()).unwrap();
io::stdout().flush().unwrap();
},
Err(defmt_decoder::DecodeError::UnexpectedEof) => {},
Err(defmt_decoder::DecodeError::Malformed) => match table.encoding().can_recover() {
false => return Err(defmt_decoder::DecodeError::Malformed.into()),
true => {
eprintln!("malformed log frame skipped");
continue;
},
},
}
} else {
io::stdout().write_all(&buffer[..read_bytes]).unwrap();
io::stdout().flush().unwrap();
}
},
Err(ref error) if error.kind() == io::ErrorKind::TimedOut => (),
Err(error) => eprintln!("Error reading from serial port: {:?}", error),
}
}
}
fn power_command(cli_args: &CliArguments) -> Result<()>
{
let matcher = BmpMatcher::from_params(cli_args);
let mut results = matcher.find_matching_probes();
let device = results.pop_single("power").map_err(|_| exit(1))?;
let remote = device.bmd_serial_interface()?.remote()?;
let power = remote.get_target_power_state()?;
info!("Device target power state: {}", power);
Ok(())
}
fn list_targets(probe: BmpDevice) -> Result<()>
{
let remote = probe.bmd_serial_interface()?.remote()?;
let archs = remote.supported_architectures()?;
if let Some(archs) = archs {
info!("Probe supports the following target architectures: {archs}");
} else {
info!("Could not determine what target architectures your probe supports - please upgrade your firmware.");
}
let families = remote.supported_families()?;
if let Some(families) = families {
info!("Probe supports the following target families: {families}");
} else {
info!("Could not determine what target families your probe supports - please upgrade your firmware.");
}
Ok(())
}
fn info_command(cli_args: &CliArguments, info_args: &InfoArguments) -> Result<()>
{
let matcher = BmpMatcher::from_params(cli_args);
let mut results = matcher.find_matching_probes();
if info_args.list_targets {
return list_targets(results.pop_single("list targets").map_err(|_| exit(1))?);
}
let devices = results.pop_all().map_err(|_| exit(1))?;
let multiple = devices.len() > 1;
for (index, dev) in devices.iter().enumerate() {
debug!("Probe identity: {}", dev.firmware_identity()?);
println!("Found: {dev}");
if multiple {
println!(" Index: {index}\n");
}
}
Ok(())
}
type EyreHookFunc = Box<dyn Fn(&(dyn std::error::Error + 'static)) -> Box<dyn EyreHandler> + Send + Sync + 'static>;
type PanicHookFunc = Box<dyn Fn(&std::panic::PanicHookInfo<'_>) + Send + Sync + 'static>;
struct BmputilHook
{
inner_hook: EyreHookFunc,
}
struct BmputilPanic
{
inner_hook: PanicHookFunc,
}
struct BmputilHandler
{
inner_handler: Box<dyn EyreHandler>,
}
impl BmputilHook
{
fn build_handler(&self, error: &(dyn std::error::Error + 'static)) -> BmputilHandler
{
BmputilHandler {
inner_handler: (*self.inner_hook)(error),
}
}
pub fn install(self) -> Result<(), InstallError>
{
color_eyre::eyre::set_hook(self.into_eyre_hook())
}
pub fn into_eyre_hook(self) -> EyreHookFunc
{
Box::new(move |err| Box::new(self.build_handler(err)))
}
}
impl BmputilPanic
{
pub fn install(self)
{
std::panic::set_hook(self.into_panic_hook());
}
pub fn into_panic_hook(self) -> PanicHookFunc
{
Box::new(move |panic_info| {
self.print_header();
(*self.inner_hook)(panic_info);
self.print_footer();
})
}
fn print_header(&self)
{
eprintln!("------------[ ✂ cut here ✂ ]------------");
eprintln!(
"Unhandled crash in bmputil-cli v{} ({})",
crate_version!(),
std::env::consts::OS
);
eprintln!();
}
fn print_footer(&self)
{
eprintln!();
eprintln!("{}", "Please include all lines down to this one from the cut here".yellow());
eprintln!("{}", "marker, and report this issue to our issue tracker at".yellow());
eprintln!("https://codeberg.org/blackmagic-debug/bmputil/issues");
}
}
impl EyreHandler for BmputilHandler
{
fn debug(&self, error: &(dyn std::error::Error + 'static), fmt: &mut core::fmt::Formatter<'_>)
-> core::fmt::Result
{
writeln!(fmt, "------------[ ✂ cut here ✂ ]------------")?;
write!(fmt, "Unhandled crash in bmputil-cli v{}", crate_version!())?;
self.inner_handler.debug(error, fmt)?;
writeln!(fmt)?;
writeln!(fmt)?;
writeln!(
fmt,
"{}",
"Please include all lines down to this one from the cut here".yellow()
)?;
writeln!(fmt, "{}", " marker, and report this issue to our issue tracker at".yellow())?;
write!(fmt, "https://codeberg.org/blackmagic-debug/bmputil/issues")
}
fn track_caller(&mut self, location: &'static std::panic::Location<'static>)
{
self.inner_handler.track_caller(location);
}
}
fn install_error_handler() -> Result<()>
{
let default_handler = HookBuilder::default();
let (panic_hook, eyre_hook) = default_handler.try_into_hooks()?;
BmputilPanic {
inner_hook: panic_hook.into_panic_hook(),
}
.install();
BmputilHook {
inner_hook: eyre_hook.into_eyre_hook(),
}
.install()?;
Ok(())
}
fn style() -> clap::builder::Styles
{
Styles::styled()
.usage(
anstyle::Style::new()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Yellow)))
.bold(),
)
.header(
anstyle::Style::new()
.bold()
.fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Yellow))),
)
.literal(anstyle::Style::new().fg_color(Some(anstyle::Color::Ansi(anstyle::AnsiColor::Green))))
}
fn main() -> Result<()>
{
install_error_handler()?;
env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.init();
let cli_args = CliArguments::parse();
#[cfg(windows)]
match cli_args.subcommand {
ToplevelCommmands::Probe(ProbeArguments {
subcommand: ProbeCommmands::InstallDrivers(_),
..
}) => (),
_ => {
windows::ensure_access(
cli_args.windows_wdi_install_mode,
false, false, );
},
}
let paths = match ProjectDirs::from("org", "black-magic", "bmputil") {
Some(paths) => paths,
None => {
error!("Failed to get program working paths");
std::process::exit(2);
},
};
match &cli_args.subcommand {
ToplevelCommmands::Probe(probe_args) => match &probe_args.subcommand {
ProbeCommmands::Info(info_args) => info_command(&cli_args, info_args),
ProbeCommmands::Update(update_args) => {
if let Some(subcommand) = &update_args.subcommand {
match subcommand {
UpdateCommands::List => display_releases(&paths),
}
} else {
update_probe(&cli_args, update_args, &paths)
}
},
ProbeCommmands::Switch(_) => bmputil::switcher::switch_firmware(&cli_args, &paths),
ProbeCommmands::Reboot(reboot_args) => reboot_command(&cli_args, reboot_args),
#[cfg(windows)]
ProbeCommmands::InstallDrivers(driver_args) => {
windows::ensure_access(
cli_args.windows_wdi_install_mode,
true, driver_args.force,
);
Ok(())
},
},
ToplevelCommmands::Logs(log_args) => serial_logging(&cli_args, log_args),
ToplevelCommmands::Target(command) => match command {
TargetCommmands::Power => power_command(&cli_args),
},
ToplevelCommmands::Server => {
warn!("Command space reserved for future tool version");
Ok(())
},
ToplevelCommmands::Debug => {
warn!("Command space reserved for future tool version");
Ok(())
},
ToplevelCommmands::Complete(comp_args) => {
let mut cmd = CliArguments::command();
generate(comp_args.shell, &mut cmd, "bmputil-cli", &mut stdout());
Ok(())
},
}
}