use clap::{Parser, Subcommand, ValueEnum};
use color_eyre::{Result, eyre::eyre};
use pico_de_gallo_lib::{AdcChannel, GpioEdge, I2cFrequency, PicoDeGallo, SpiPhase, SpiPolarity, list_devices};
use pico_de_gallo_lib::{GpioDirection, GpioPull, GpioState};
use std::num::ParseIntError;
use tabled::builder::Builder;
use tabled::settings::object::Rows;
use tabled::settings::{Alignment, Style};
#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
pub enum I2cFrequencyArg {
Standard,
Fast,
FastPlus,
}
impl From<I2cFrequencyArg> for I2cFrequency {
fn from(arg: I2cFrequencyArg) -> Self {
match arg {
I2cFrequencyArg::Standard => I2cFrequency::Standard,
I2cFrequencyArg::Fast => I2cFrequency::Fast,
I2cFrequencyArg::FastPlus => I2cFrequency::FastPlus,
}
}
}
#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
pub enum GpioDirectionArg {
Input,
Output,
}
impl From<GpioDirectionArg> for GpioDirection {
fn from(arg: GpioDirectionArg) -> Self {
match arg {
GpioDirectionArg::Input => GpioDirection::Input,
GpioDirectionArg::Output => GpioDirection::Output,
}
}
}
#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
pub enum GpioPullArg {
None,
Up,
Down,
}
impl From<GpioPullArg> for GpioPull {
fn from(arg: GpioPullArg) -> Self {
match arg {
GpioPullArg::None => GpioPull::None,
GpioPullArg::Up => GpioPull::Up,
GpioPullArg::Down => GpioPull::Down,
}
}
}
#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
pub enum GpioEdgeArg {
Rising,
Falling,
Any,
}
impl From<GpioEdgeArg> for GpioEdge {
fn from(arg: GpioEdgeArg) -> Self {
match arg {
GpioEdgeArg::Rising => GpioEdge::Rising,
GpioEdgeArg::Falling => GpioEdge::Falling,
GpioEdgeArg::Any => GpioEdge::Any,
}
}
}
#[derive(clap::ValueEnum, Clone, Debug, Default)]
pub enum OutputFormat {
#[default]
Hex,
Binary,
Ascii,
}
#[derive(Parser, Debug)]
#[command(
name = "Pico De Gallo",
author = "Felipe Balbi <febalbi@microsoft.com>",
about = "Access I2C/SPI devices through Pico De Gallo",
arg_required_else_help = true,
version
)]
pub struct Cli {
#[arg(short, long)]
serial_number: Option<String>,
#[arg(short, long, value_enum, default_value_t)]
format: OutputFormat,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
List,
Version,
I2c {
#[command(subcommand)]
command: I2cCommands,
},
Spi {
#[command(subcommand)]
command: SpiCommands,
},
Gpio {
#[command(subcommand)]
command: GpioCommands,
},
Uart {
#[command(subcommand)]
command: UartCommands,
},
Pwm {
#[command(subcommand)]
command: PwmCommands,
},
Adc {
#[command(subcommand)]
command: AdcCommands,
},
#[command(name = "onewire")]
OneWire {
#[command(subcommand)]
command: OneWireCommands,
},
}
#[derive(Subcommand, Debug)]
enum I2cCommands {
Scan {
#[arg(short, long, default_value_t = false)]
reserved: bool,
},
Read {
#[arg(short, long, value_parser(parse_i2c_address))]
address: u8,
#[arg(short, long)]
count: usize,
},
Write {
#[arg(short, long, value_parser(parse_i2c_address))]
address: u8,
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
},
WriteRead {
#[arg(short, long, value_parser(parse_i2c_address))]
address: u8,
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
#[arg(short, long)]
count: usize,
},
SetConfig {
#[arg(long)]
frequency: I2cFrequencyArg,
},
GetConfig,
Batch {
#[arg(short, long, value_parser(parse_i2c_address))]
address: u8,
#[arg(long, num_args(1..), required = true)]
op: Vec<String>,
},
}
#[derive(Subcommand, Debug)]
enum SpiCommands {
Read {
#[arg(short, long)]
count: usize,
},
Write {
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
},
Transfer {
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
},
WriteRead {
#[arg(short, long)]
count: usize,
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
},
SetConfig {
#[arg(long)]
frequency: u32,
#[arg(long, default_value_t)]
first_transition: bool,
#[arg(long, default_value_t)]
idle_low: bool,
},
GetConfig,
Batch {
#[arg(long)]
cs: u8,
#[arg(long, num_args(1..), required = true)]
op: Vec<String>,
},
}
#[derive(Subcommand, Debug)]
enum GpioCommands {
Get {
#[arg(short, long)]
pin: u8,
},
Put {
#[arg(short, long)]
pin: u8,
#[arg(short, long)]
high: bool,
},
SetConfig {
#[arg(short, long)]
pin: u8,
#[arg(short, long)]
direction: GpioDirectionArg,
#[arg(long, default_value = "none")]
pull: GpioPullArg,
},
Monitor {
#[arg(short, long)]
pin: u8,
#[arg(short, long, default_value = "any")]
edge: GpioEdgeArg,
},
}
#[derive(Subcommand, Debug)]
enum UartCommands {
Read {
#[arg(short, long)]
count: u16,
#[arg(short, long, default_value_t = 1000)]
timeout: u32,
},
Write {
#[arg(short, long, num_args(1..), value_parser(parse_byte))]
bytes: Vec<u8>,
},
Flush,
SetConfig {
#[arg(long)]
baud_rate: u32,
},
GetConfig,
}
#[derive(Subcommand, Debug)]
enum PwmCommands {
SetDuty {
#[arg(short, long)]
channel: u8,
#[arg(short, long)]
duty: u16,
},
GetDuty {
#[arg(short, long)]
channel: u8,
},
Enable {
#[arg(short, long)]
channel: u8,
},
Disable {
#[arg(short, long)]
channel: u8,
},
SetConfig {
#[arg(short, long)]
channel: u8,
#[arg(short, long)]
frequency: u32,
#[arg(short, long, default_value_t = false)]
phase_correct: bool,
},
GetConfig {
#[arg(short, long)]
channel: u8,
},
}
#[derive(Subcommand, Debug)]
enum AdcCommands {
Read {
#[arg(short, long)]
channel: u8,
},
Info,
}
#[derive(Subcommand, Debug)]
enum OneWireCommands {
Reset,
Read {
#[arg(short, long)]
len: u16,
},
Write {
#[arg(short, long, value_parser(parse_hex_string))]
data: Vec<u8>,
},
WritePullup {
#[arg(short, long, value_parser(parse_hex_string))]
data: Vec<u8>,
#[arg(short = 't', long, default_value_t = 750)]
duration: u16,
},
Search,
}
fn print_data(data: &[u8], format: &OutputFormat) {
match format {
OutputFormat::Hex => {
for (i, b) in data.iter().enumerate() {
if i > 0 && i % 16 == 0 {
println!();
}
print!("{:02x} ", b);
}
println!();
}
OutputFormat::Binary => {
use std::io::Write;
std::io::stdout().write_all(data).unwrap();
}
OutputFormat::Ascii => {
for (i, b) in data.iter().enumerate() {
if i > 0 && i % 16 == 0 {
println!();
}
let ch = if b.is_ascii_graphic() || *b == b' ' {
*b as char
} else {
'.'
};
print!("{ch}");
}
println!();
}
}
}
impl Cli {
fn connect(&self) -> PicoDeGallo {
if let Some(serial_number) = &self.serial_number {
PicoDeGallo::new_with_serial_number(serial_number)
} else {
PicoDeGallo::new()
}
}
pub async fn run(&self) -> Result<()> {
match &self.command {
Commands::List => Self::list_devices(),
Commands::Version => self.version().await,
Commands::I2c { command } => match command {
I2cCommands::Scan { reserved } => self.i2c_scan(*reserved).await,
I2cCommands::Read { address, count } => self.i2c_read(address, count).await,
I2cCommands::Write { address, bytes } => self.i2c_write(address, bytes).await,
I2cCommands::WriteRead { address, bytes, count } => {
self.i2c_write_then_read(address, bytes, count).await
}
I2cCommands::SetConfig { frequency } => self.i2c_set_config((*frequency).into()).await,
I2cCommands::GetConfig => self.i2c_get_config().await,
I2cCommands::Batch { address, op } => self.i2c_batch(*address, op).await,
},
Commands::Spi { command } => match command {
SpiCommands::Read { count } => self.spi_read(count).await,
SpiCommands::Write { bytes } => self.spi_write(bytes).await,
SpiCommands::Transfer { bytes } => self.spi_transfer(bytes).await,
SpiCommands::WriteRead { count, bytes } => self.spi_write_then_read(bytes, count).await,
SpiCommands::SetConfig {
frequency,
first_transition,
idle_low,
} => self.spi_set_config(*frequency, *first_transition, *idle_low).await,
SpiCommands::GetConfig => self.spi_get_config().await,
SpiCommands::Batch { cs, op } => self.spi_batch(*cs, op).await,
},
Commands::Gpio { command } => match command {
GpioCommands::Get { pin } => self.gpio_get(*pin).await,
GpioCommands::Put { pin, high } => self.gpio_put(*pin, *high).await,
GpioCommands::SetConfig { pin, direction, pull } => self.gpio_set_config(*pin, *direction, *pull).await,
GpioCommands::Monitor { pin, edge } => self.gpio_monitor(*pin, *edge).await,
},
Commands::Uart { command } => match command {
UartCommands::Read { count, timeout } => self.uart_read(*count, *timeout).await,
UartCommands::Write { bytes } => self.uart_write(bytes).await,
UartCommands::Flush => self.uart_flush().await,
UartCommands::SetConfig { baud_rate } => self.uart_set_config(*baud_rate).await,
UartCommands::GetConfig => self.uart_get_config().await,
},
Commands::Pwm { command } => match command {
PwmCommands::SetDuty { channel, duty } => self.pwm_set_duty(*channel, *duty).await,
PwmCommands::GetDuty { channel } => self.pwm_get_duty(*channel).await,
PwmCommands::Enable { channel } => self.pwm_enable(*channel).await,
PwmCommands::Disable { channel } => self.pwm_disable(*channel).await,
PwmCommands::SetConfig {
channel,
frequency,
phase_correct,
} => self.pwm_set_config(*channel, *frequency, *phase_correct).await,
PwmCommands::GetConfig { channel } => self.pwm_get_config(*channel).await,
},
Commands::Adc { command } => match command {
AdcCommands::Read { channel } => self.adc_read(*channel).await,
AdcCommands::Info => self.adc_get_info().await,
},
Commands::OneWire { command } => match command {
OneWireCommands::Reset => self.onewire_reset().await,
OneWireCommands::Read { len } => self.onewire_read(*len).await,
OneWireCommands::Write { data } => self.onewire_write(data).await,
OneWireCommands::WritePullup { data, duration } => self.onewire_write_pullup(data, *duration).await,
OneWireCommands::Search => self.onewire_search().await,
},
}
}
fn list_devices() -> Result<()> {
let devices = list_devices();
if devices.is_empty() {
println!("No Pico de Gallo devices found.");
return Ok(());
}
for dev in &devices {
let product = dev.product.as_deref().unwrap_or("(unknown product)");
let serial = dev.serial_number.as_deref().unwrap_or("(unknown)");
println!(" - {product} - {serial}");
}
Ok(())
}
async fn version(&self) -> Result<()> {
let pg = self.connect();
match pg.device_info().await {
Ok(info) => {
println!(
"Pico de Gallo FW v{}.{}.{}",
info.fw_major, info.fw_minor, info.fw_patch
);
println!(
"Schema v{}.{}.{}",
info.schema_major, info.schema_minor, info.schema_patch
);
println!("HW revision {}", info.hw_version);
let cap = info.capabilities;
let status = |flag: pico_de_gallo_lib::Capabilities| {
if cap.contains(flag) { "✓" } else { "✗" }
};
println!(
"Capabilities: I2C {} | SPI {} | UART {} | GPIO {} | PWM {} | ADC {} | 1-Wire {}",
status(pico_de_gallo_lib::Capabilities::I2C),
status(pico_de_gallo_lib::Capabilities::SPI),
status(pico_de_gallo_lib::Capabilities::UART),
status(pico_de_gallo_lib::Capabilities::GPIO),
status(pico_de_gallo_lib::Capabilities::PWM),
status(pico_de_gallo_lib::Capabilities::ADC),
status(pico_de_gallo_lib::Capabilities::ONEWIRE),
);
Ok(())
}
Err(_) => {
match pg.version().await {
Ok(version) => {
println!(
"Pico de Gallo FW v{}.{}.{}",
version.major, version.minor, version.patch
);
println!("(legacy firmware — no schema/hw/capabilities info)");
Ok(())
}
Err(_) => Err(eyre!("Failed to get version")),
}
}
}
}
async fn i2c_scan(&self, reserved: bool) -> Result<()> {
let pg = self.connect();
let addresses = pg
.i2c_scan(reserved)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("i2c_scan failed"))?;
let mut builder = Builder::with_capacity(17, 8);
builder.push_record(
(0..=16)
.map(|i| if i == 0 { String::new() } else { format!("{:x}", i - 1) })
.collect::<Vec<_>>(),
);
for hi in 0u8..=7 {
let mut row = vec![format!("{:x} ", hi)];
for lo in 0u8..=15 {
let address = hi << 4 | lo;
let stat = match address {
0x00..=0x07 | 0x78..=0x7f if !reserved => "RR".to_string(),
_ => {
if addresses.contains(&address) {
format!("{:02x}", address)
} else {
"--".to_string()
}
}
};
row.push(stat);
}
builder.push_record(row);
}
let mut table = builder.build();
table.modify(Rows::first(), Alignment::right());
table.with(Style::rounded());
println!("{}", table);
Ok(())
}
async fn i2c_read(&self, address: &u8, count: &usize) -> Result<()> {
let pg = self.connect();
let buf = match pg.i2c_read(*address, *count as u16).await {
Ok(data) => data,
Err(e) => return Err(eyre!("{:?}", e).wrap_err("i2c_read failed")),
};
print_data(&buf, &self.format);
Ok(())
}
async fn i2c_write(&self, address: &u8, bytes: &[u8]) -> Result<()> {
let pg = self.connect();
pg.i2c_write(*address, bytes)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("i2c_write failed"))
}
async fn i2c_write_then_read(&self, address: &u8, bytes: &[u8], count: &usize) -> Result<()> {
let pg = self.connect();
let buf = match pg.i2c_write_read(*address, bytes, *count as u16).await {
Ok(data) => data,
Err(e) => return Err(eyre!("{:?}", e).wrap_err("i2c_write_read failed")),
};
print_data(&buf, &self.format);
Ok(())
}
async fn spi_read(&self, count: &usize) -> Result<()> {
let pg = self.connect();
let buf = match pg.spi_read(*count as u16).await {
Ok(data) => data,
Err(e) => return Err(eyre!("{:?}", e).wrap_err("spi_read failed")),
};
print_data(&buf, &self.format);
Ok(())
}
async fn spi_write(&self, bytes: &[u8]) -> Result<()> {
let pg = self.connect();
pg.spi_write(bytes)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("spi_write failed"))
}
async fn spi_transfer(&self, bytes: &[u8]) -> Result<()> {
let pg = self.connect();
let buf = pg
.spi_transfer(bytes)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("spi_transfer failed"))?;
print_data(&buf, &self.format);
Ok(())
}
async fn spi_write_then_read(&self, bytes: &[u8], count: &usize) -> Result<()> {
self.spi_write(bytes).await?;
self.spi_read(count).await
}
async fn i2c_set_config(&self, frequency: I2cFrequency) -> Result<()> {
let pg = self.connect();
pg.i2c_set_config(frequency)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("i2c set-config failed"))
}
async fn i2c_get_config(&self) -> Result<()> {
let pg = self.connect();
let freq = pg
.i2c_get_config()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("i2c get-config failed"))?;
let label = match freq {
I2cFrequency::Standard => "Standard (100 kHz)",
I2cFrequency::Fast => "Fast (400 kHz)",
I2cFrequency::FastPlus => "Fast+ (1 MHz)",
};
println!("I2C frequency: {label}");
Ok(())
}
async fn spi_set_config(&self, frequency: u32, first_transition: bool, idle_low: bool) -> Result<()> {
let pg = self.connect();
let spi_polarity = if idle_low {
SpiPolarity::IdleLow
} else {
SpiPolarity::IdleHigh
};
let spi_phase = if first_transition {
SpiPhase::CaptureOnFirstTransition
} else {
SpiPhase::CaptureOnSecondTransition
};
pg.spi_set_config(frequency, spi_phase, spi_polarity)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("spi set-config failed"))
}
async fn spi_get_config(&self) -> Result<()> {
let pg = self.connect();
let info = pg
.spi_get_config()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("spi get-config failed"))?;
let phase = match info.spi_phase {
SpiPhase::CaptureOnFirstTransition => "CaptureOnFirstTransition (CPHA=0)",
SpiPhase::CaptureOnSecondTransition => "CaptureOnSecondTransition (CPHA=1)",
};
let polarity = match info.spi_polarity {
SpiPolarity::IdleLow => "IdleLow (CPOL=0)",
SpiPolarity::IdleHigh => "IdleHigh (CPOL=1)",
};
println!("SPI frequency: {} Hz", info.spi_frequency);
println!("SPI phase: {phase}");
println!("SPI polarity: {polarity}");
Ok(())
}
async fn i2c_batch(&self, address: u8, ops: &[String]) -> Result<()> {
use pico_de_gallo_lib::I2cBatchOp;
let pg = self.connect();
let batch_ops = parse_i2c_batch_ops(ops)?;
let refs: Vec<I2cBatchOp<'_>> = batch_ops
.iter()
.map(|(kind, data)| match kind {
I2cBatchKind::Read(len) => I2cBatchOp::Read { len: *len },
I2cBatchKind::Write => I2cBatchOp::Write { data },
})
.collect();
let result = pg
.i2c_batch(address, &refs)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("i2c batch failed"))?;
if result.is_empty() {
println!("Batch complete (no read data)");
} else {
println!("Read data ({} bytes):", result.len());
print_hex_dump(&result);
}
Ok(())
}
async fn spi_batch(&self, cs: u8, ops: &[String]) -> Result<()> {
use pico_de_gallo_lib::SpiBatchOp;
let pg = self.connect();
let batch_ops = parse_spi_batch_ops(ops)?;
let refs: Vec<SpiBatchOp<'_>> = batch_ops
.iter()
.map(|(kind, data)| match kind {
SpiBatchKind::Read(len) => SpiBatchOp::Read { len: *len },
SpiBatchKind::Write => SpiBatchOp::Write { data },
SpiBatchKind::Transfer => SpiBatchOp::Transfer { data },
SpiBatchKind::DelayNs(ns) => SpiBatchOp::DelayNs { ns: *ns },
})
.collect();
let result = pg
.spi_batch(cs, &refs)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("spi batch failed"))?;
if result.is_empty() {
println!("Batch complete (no read data)");
} else {
println!("Read data ({} bytes):", result.len());
print_hex_dump(&result);
}
Ok(())
}
async fn gpio_get(&self, pin: u8) -> Result<()> {
let pg = self.connect();
let level = pg
.gpio_get(pin)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("gpio get failed"))?;
let label = match level {
GpioState::High => "HIGH",
GpioState::Low => "LOW",
};
println!("GPIO pin {pin}: {label}");
Ok(())
}
async fn gpio_put(&self, pin: u8, high: bool) -> Result<()> {
let pg = self.connect();
let state = if high { GpioState::High } else { GpioState::Low };
pg.gpio_put(pin, state)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("gpio put failed"))?;
println!("GPIO pin {pin} set to {}", if high { "HIGH" } else { "LOW" });
Ok(())
}
async fn gpio_set_config(&self, pin: u8, direction: GpioDirectionArg, pull: GpioPullArg) -> Result<()> {
let pg = self.connect();
pg.gpio_set_config(pin, direction.into(), pull.into())
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("gpio set-config failed"))?;
println!("GPIO pin {pin} configured as {direction:?} with pull {pull:?}");
Ok(())
}
async fn gpio_monitor(&self, pin: u8, edge: GpioEdgeArg) -> Result<()> {
let pg = self.connect();
pg.gpio_subscribe(pin, edge.into())
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("gpio subscribe failed"))?;
println!("Monitoring GPIO pin {pin} for {edge:?} edges (Ctrl+C to stop)...");
let mut sub = pg
.subscribe_gpio_events(4)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("failed to open event subscription"))?;
let result = loop {
tokio::select! {
event = sub.recv() => {
match event {
Ok(event) => {
println!(
"[{:>12} µs] pin={} edge={:?}",
event.timestamp_us, event.pin, event.edge,
);
}
Err(_) => {
break Err(eyre!("event subscription closed"));
}
}
}
_ = tokio::signal::ctrl_c() => {
break Ok(());
}
}
};
pg.gpio_unsubscribe(pin)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("gpio unsubscribe failed"))?;
println!("Stopped monitoring GPIO pin {pin}");
result
}
async fn uart_read(&self, count: u16, timeout_ms: u32) -> Result<()> {
let pg = self.connect();
let data = pg
.uart_read(count, timeout_ms)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("uart read failed"))?;
if data.is_empty() {
println!("(no data received within timeout)");
} else {
print_data(&data, &self.format);
}
Ok(())
}
async fn uart_write(&self, bytes: &[u8]) -> Result<()> {
let pg = self.connect();
pg.uart_write(bytes)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("uart write failed"))?;
println!("Wrote {} byte(s)", bytes.len());
Ok(())
}
async fn uart_flush(&self) -> Result<()> {
let pg = self.connect();
pg.uart_flush()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("uart flush failed"))?;
println!("UART TX buffer flushed");
Ok(())
}
async fn uart_set_config(&self, baud_rate: u32) -> Result<()> {
let pg = self.connect();
pg.uart_set_config(baud_rate)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("uart set-config failed"))?;
println!("UART baud rate set to {baud_rate}");
Ok(())
}
async fn uart_get_config(&self) -> Result<()> {
let pg = self.connect();
let info = pg
.uart_get_config()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("uart get-config failed"))?;
println!("UART baud rate: {} bps", info.baud_rate);
Ok(())
}
async fn pwm_set_duty(&self, channel: u8, duty: u16) -> Result<()> {
let pg = self.connect();
pg.pwm_set_duty_cycle(channel, duty)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm set-duty failed"))?;
println!("PWM channel {channel}: duty set to {duty}");
Ok(())
}
async fn pwm_get_duty(&self, channel: u8) -> Result<()> {
let pg = self.connect();
let info = pg
.pwm_get_duty_cycle(channel)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm get-duty failed"))?;
println!(
"PWM channel {channel}: duty={} / max={}",
info.current_duty, info.max_duty
);
Ok(())
}
async fn pwm_enable(&self, channel: u8) -> Result<()> {
let pg = self.connect();
pg.pwm_enable(channel)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm enable failed"))?;
println!("PWM channel {channel}: slice enabled");
Ok(())
}
async fn pwm_disable(&self, channel: u8) -> Result<()> {
let pg = self.connect();
pg.pwm_disable(channel)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm disable failed"))?;
println!("PWM channel {channel}: slice disabled");
Ok(())
}
async fn pwm_set_config(&self, channel: u8, frequency_hz: u32, phase_correct: bool) -> Result<()> {
let pg = self.connect();
pg.pwm_set_config(channel, frequency_hz, phase_correct)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm set-config failed"))?;
println!("PWM channel {channel}: frequency={frequency_hz} Hz, phase_correct={phase_correct}");
Ok(())
}
async fn pwm_get_config(&self, channel: u8) -> Result<()> {
let pg = self.connect();
let info = pg
.pwm_get_config(channel)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("pwm get-config failed"))?;
println!(
"PWM channel {channel}: frequency={} Hz, phase_correct={}, enabled={}",
info.frequency_hz, info.phase_correct, info.enabled
);
Ok(())
}
async fn adc_read(&self, channel: u8) -> Result<()> {
let adc_channel = match channel {
0 => AdcChannel::Adc0,
1 => AdcChannel::Adc1,
2 => AdcChannel::Adc2,
3 => AdcChannel::Adc3,
_ => return Err(eyre!("invalid ADC channel {channel}: expected 0–3")),
};
let pg = self.connect();
let raw = pg
.adc_read(adc_channel)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("adc read failed"))?;
let voltage_mv = (raw as u32) * 3300 / 4096;
println!("ADC channel {channel} ({adc_channel}): raw={raw}, ~{voltage_mv} mV");
Ok(())
}
async fn adc_get_info(&self) -> Result<()> {
let pg = self.connect();
let info = pg
.adc_get_config()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("adc get-config failed"))?;
println!("ADC configuration:");
println!(" Resolution: {} bits", info.resolution_bits);
println!(" Nominal ref: {} mV", info.nominal_reference_mv);
println!(" GPIO channels: {}", info.num_gpio_channels);
Ok(())
}
async fn onewire_reset(&self) -> Result<()> {
let pg = self.connect();
let present = pg
.onewire_reset()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire reset failed"))?;
if present {
println!("Device(s) present on the 1-Wire bus");
} else {
println!("No device detected on the 1-Wire bus");
}
Ok(())
}
async fn onewire_read(&self, len: u16) -> Result<()> {
let pg = self.connect();
let data = pg
.onewire_read(len)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire read failed"))?;
print_data(&data, &self.format);
Ok(())
}
async fn onewire_write(&self, data: &[u8]) -> Result<()> {
let pg = self.connect();
pg.onewire_write(data)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire write failed"))?;
println!("Wrote {} byte(s)", data.len());
Ok(())
}
async fn onewire_write_pullup(&self, data: &[u8], duration_ms: u16) -> Result<()> {
let pg = self.connect();
pg.onewire_write_pullup(data, duration_ms)
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire write-pullup failed"))?;
println!("Wrote {} byte(s) with {}ms strong pullup", data.len(), duration_ms);
Ok(())
}
async fn onewire_search(&self) -> Result<()> {
let pg = self.connect();
let mut rom_ids = Vec::new();
match pg
.onewire_search()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire search failed"))?
{
Some(id) => rom_ids.push(id),
None => {
println!("No devices found on the 1-Wire bus");
return Ok(());
}
}
while let Some(id) = pg
.onewire_search_next()
.await
.map_err(|e| eyre!("{:?}", e).wrap_err("1-Wire search-next failed"))?
{
rom_ids.push(id);
}
println!("Found {} device(s):", rom_ids.len());
for (i, id) in rom_ids.iter().enumerate() {
let family = (*id & 0xFF) as u8;
println!(" {}: ROM ID 0x{:016X} (family 0x{:02X})", i + 1, id, family);
}
Ok(())
}
}
fn parse_byte(s: &str) -> Result<u8, ParseIntError> {
if let Some(hex) = s.strip_prefix("0x") {
u8::from_str_radix(hex, 16)
} else if let Some(bin) = s.strip_prefix("0b") {
u8::from_str_radix(bin, 2)
} else {
s.parse::<u8>()
}
}
fn parse_i2c_address(s: &str) -> Result<u8, String> {
let byte = parse_byte(s).map_err(|e| e.to_string())?;
if byte > 0x7F {
return Err(format!("I2C address {s} exceeds 7-bit range (max 0x7F)"));
}
Ok(byte)
}
fn parse_hex_string(s: &str) -> Result<Vec<u8>, String> {
let s = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")).unwrap_or(s);
if !s.len().is_multiple_of(2) {
return Err(format!("hex string must have even length, got {}", s.len()));
}
(0..s.len())
.step_by(2)
.map(|i| u8::from_str_radix(&s[i..i + 2], 16).map_err(|e| format!("invalid hex at position {i}: {e}")))
.collect()
}
fn parse_byte_list(s: &str) -> Result<Vec<u8>> {
s.split(',')
.map(|b| {
let b = b.trim();
parse_byte(b).map_err(|e| eyre!("invalid byte '{b}': {e}"))
})
.collect()
}
enum I2cBatchKind {
Read(u16),
Write,
}
enum SpiBatchKind {
Read(u16),
Write,
Transfer,
DelayNs(u32),
}
fn parse_i2c_batch_ops(ops: &[String]) -> Result<Vec<(I2cBatchKind, Vec<u8>)>> {
ops.iter()
.map(|op| {
if let Some(n) = op.strip_prefix("read:") {
let len: u16 = n.trim().parse().map_err(|e| eyre!("invalid read length '{n}': {e}"))?;
Ok((I2cBatchKind::Read(len), Vec::new()))
} else if let Some(data) = op.strip_prefix("write:") {
let bytes = parse_byte_list(data)?;
Ok((I2cBatchKind::Write, bytes))
} else {
Err(eyre!("unknown I2C batch op '{op}'. Use 'read:N' or 'write:B1,B2,...'"))
}
})
.collect()
}
fn parse_spi_batch_ops(ops: &[String]) -> Result<Vec<(SpiBatchKind, Vec<u8>)>> {
ops.iter()
.map(|op| {
if let Some(n) = op.strip_prefix("read:") {
let len: u16 = n.trim().parse().map_err(|e| eyre!("invalid read length '{n}': {e}"))?;
Ok((SpiBatchKind::Read(len), Vec::new()))
} else if let Some(data) = op.strip_prefix("write:") {
let bytes = parse_byte_list(data)?;
Ok((SpiBatchKind::Write, bytes))
} else if let Some(data) = op.strip_prefix("transfer:") {
let bytes = parse_byte_list(data)?;
Ok((SpiBatchKind::Transfer, bytes))
} else if let Some(ns) = op.strip_prefix("delay:") {
let nanos: u32 = ns
.trim()
.parse()
.map_err(|e| eyre!("invalid delay nanoseconds '{ns}': {e}"))?;
Ok((SpiBatchKind::DelayNs(nanos), Vec::new()))
} else {
Err(eyre!(
"unknown SPI batch op '{op}'. Use 'read:N', 'write:B1,B2,...', 'transfer:B1,B2,...', or 'delay:NS'"
))
}
})
.collect()
}
fn print_hex_dump(data: &[u8]) {
for (i, chunk) in data.chunks(16).enumerate() {
let offset = i * 16;
let hex: Vec<String> = chunk.iter().map(|b| format!("{b:02x}")).collect();
let ascii: String = chunk
.iter()
.map(|&b| {
if b.is_ascii_graphic() || b == b' ' {
b as char
} else {
'.'
}
})
.collect();
println!(" {offset:04x}: {:<48} {ascii}", hex.join(" "));
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn parse_byte_decimal() {
assert_eq!(parse_byte("0").unwrap(), 0);
assert_eq!(parse_byte("255").unwrap(), 255);
assert_eq!(parse_byte("42").unwrap(), 42);
}
#[test]
fn parse_byte_hex() {
assert_eq!(parse_byte("0x00").unwrap(), 0x00);
assert_eq!(parse_byte("0xFF").unwrap(), 0xFF);
assert_eq!(parse_byte("0x48").unwrap(), 0x48);
assert_eq!(parse_byte("0xab").unwrap(), 0xAB);
}
#[test]
fn parse_byte_binary() {
assert_eq!(parse_byte("0b00000000").unwrap(), 0);
assert_eq!(parse_byte("0b11111111").unwrap(), 255);
assert_eq!(parse_byte("0b10101010").unwrap(), 0xAA);
}
#[test]
fn parse_byte_overflow_fails() {
assert!(parse_byte("256").is_err());
assert!(parse_byte("0x100").is_err());
assert!(parse_byte("0b100000000").is_err());
}
#[test]
fn parse_byte_invalid_fails() {
assert!(parse_byte("xyz").is_err());
assert!(parse_byte("0xGG").is_err());
assert!(parse_byte("0b2").is_err());
assert!(parse_byte("").is_err());
}
#[test]
fn cli_no_args_requires_help() {
let result = Cli::try_parse_from(["gallo"]);
assert!(result.is_err());
}
#[test]
fn cli_version_subcommand() {
let cli = Cli::try_parse_from(["gallo", "version"]).unwrap();
assert!(matches!(cli.command, Commands::Version));
assert!(cli.serial_number.is_none());
}
#[test]
fn cli_list_subcommand() {
let cli = Cli::try_parse_from(["gallo", "list"]).unwrap();
assert!(matches!(cli.command, Commands::List));
}
#[test]
fn cli_version_with_serial() {
let cli = Cli::try_parse_from(["gallo", "-s", "ABCD1234", "version"]).unwrap();
assert_eq!(cli.serial_number.as_deref(), Some("ABCD1234"));
assert!(matches!(cli.command, Commands::Version));
}
#[test]
fn cli_i2c_read() {
let cli = Cli::try_parse_from(["gallo", "i2c", "read", "-a", "0x48", "-c", "4"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::Read { address, count },
} => {
assert_eq!(address, 0x48);
assert_eq!(count, 4);
}
_ => panic!("expected I2c Read command"),
}
}
#[test]
fn cli_i2c_write() {
let cli = Cli::try_parse_from(["gallo", "i2c", "write", "-a", "0x50", "-b", "0xDE", "0xAD"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::Write { address, bytes },
} => {
assert_eq!(address, 0x50);
assert_eq!(bytes, vec![0xDE, 0xAD]);
}
_ => panic!("expected I2c Write command"),
}
}
#[test]
fn cli_i2c_write_read() {
let cli = Cli::try_parse_from(["gallo", "i2c", "write-read", "-a", "0x68", "-b", "0x01", "-c", "6"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::WriteRead { address, bytes, count },
} => {
assert_eq!(address, 0x68);
assert_eq!(bytes, vec![0x01]);
assert_eq!(count, 6);
}
_ => panic!("expected I2c WriteRead command"),
}
}
#[test]
fn cli_i2c_scan() {
let cli = Cli::try_parse_from(["gallo", "i2c", "scan"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::Scan { reserved },
} => {
assert!(!reserved);
}
_ => panic!("expected I2c Scan command"),
}
}
#[test]
fn cli_i2c_scan_reserved() {
let cli = Cli::try_parse_from(["gallo", "i2c", "scan", "-r"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::Scan { reserved },
} => {
assert!(reserved);
}
_ => panic!("expected I2c Scan command"),
}
}
#[test]
fn cli_spi_read() {
let cli = Cli::try_parse_from(["gallo", "spi", "read", "-c", "16"]).unwrap();
match cli.command {
Commands::Spi {
command: SpiCommands::Read { count },
} => {
assert_eq!(count, 16);
}
_ => panic!("expected Spi Read command"),
}
}
#[test]
fn cli_spi_write() {
let cli = Cli::try_parse_from(["gallo", "spi", "write", "-b", "0xCA", "0xFE"]).unwrap();
match cli.command {
Commands::Spi {
command: SpiCommands::Write { bytes },
} => {
assert_eq!(bytes, vec![0xCA, 0xFE]);
}
_ => panic!("expected Spi Write command"),
}
}
#[test]
fn cli_spi_transfer() {
let cli = Cli::try_parse_from(["gallo", "spi", "transfer", "-b", "0x01", "0x02", "0x03"]).unwrap();
match cli.command {
Commands::Spi {
command: SpiCommands::Transfer { bytes },
} => {
assert_eq!(bytes, vec![0x01, 0x02, 0x03]);
}
_ => panic!("expected Spi Transfer command"),
}
}
#[test]
fn cli_i2c_set_config() {
let cli = Cli::try_parse_from(["gallo", "i2c", "set-config", "--frequency", "fast"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::SetConfig { frequency },
} => {
assert_eq!(frequency, I2cFrequencyArg::Fast);
}
_ => panic!("expected I2c SetConfig command"),
}
}
#[test]
fn cli_i2c_set_config_fast_plus() {
let cli = Cli::try_parse_from(["gallo", "i2c", "set-config", "--frequency", "fast-plus"]).unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::SetConfig { frequency },
} => {
assert_eq!(frequency, I2cFrequencyArg::FastPlus);
}
_ => panic!("expected I2c SetConfig command"),
}
}
#[test]
fn cli_i2c_set_config_invalid_frequency_fails() {
let result = Cli::try_parse_from(["gallo", "i2c", "set-config", "--frequency", "ultra-fast"]);
assert!(result.is_err());
}
#[test]
fn cli_spi_set_config() {
let cli = Cli::try_parse_from([
"gallo",
"spi",
"set-config",
"--frequency",
"1000000",
"--first-transition",
"--idle-low",
])
.unwrap();
match cli.command {
Commands::Spi {
command:
SpiCommands::SetConfig {
frequency,
first_transition,
idle_low,
},
} => {
assert_eq!(frequency, 1_000_000);
assert!(first_transition);
assert!(idle_low);
}
_ => panic!("expected Spi SetConfig command"),
}
}
#[test]
fn cli_spi_set_config_defaults() {
let cli = Cli::try_parse_from(["gallo", "spi", "set-config", "--frequency", "500000"]).unwrap();
match cli.command {
Commands::Spi {
command:
SpiCommands::SetConfig {
first_transition,
idle_low,
..
},
} => {
assert!(!first_transition);
assert!(!idle_low);
}
_ => panic!("expected Spi SetConfig command"),
}
}
#[test]
fn cli_i2c_set_config_missing_frequency_fails() {
let result = Cli::try_parse_from(["gallo", "i2c", "set-config"]);
assert!(result.is_err());
}
#[test]
fn cli_spi_set_config_missing_frequency_fails() {
let result = Cli::try_parse_from(["gallo", "spi", "set-config"]);
assert!(result.is_err());
}
#[test]
fn cli_i2c_read_missing_address_fails() {
let result = Cli::try_parse_from(["gallo", "i2c", "read", "-c", "4"]);
assert!(result.is_err());
}
#[test]
fn cli_unknown_subcommand_fails() {
let result = Cli::try_parse_from(["gallo", "uart"]);
assert!(result.is_err());
}
#[test]
fn cli_i2c_without_subcommand_fails() {
let result = Cli::try_parse_from(["gallo", "i2c"]);
assert!(result.is_err());
}
#[test]
fn cli_spi_without_subcommand_fails() {
let result = Cli::try_parse_from(["gallo", "spi"]);
assert!(result.is_err());
}
#[test]
fn cli_i2c_batch() {
let cli = Cli::try_parse_from([
"gallo",
"i2c",
"batch",
"-a",
"0x50",
"--op",
"write:0x00,0x10",
"--op",
"read:16",
])
.unwrap();
match cli.command {
Commands::I2c {
command: I2cCommands::Batch { address, op },
} => {
assert_eq!(address, 0x50);
assert_eq!(op, vec!["write:0x00,0x10", "read:16"]);
}
_ => panic!("expected I2c Batch command"),
}
}
#[test]
fn cli_i2c_batch_requires_ops() {
let result = Cli::try_parse_from(["gallo", "i2c", "batch", "-a", "0x50"]);
assert!(result.is_err());
}
#[test]
fn cli_spi_batch() {
let cli = Cli::try_parse_from([
"gallo",
"spi",
"batch",
"--cs",
"0",
"--op",
"write:0x9F",
"--op",
"read:3",
])
.unwrap();
match cli.command {
Commands::Spi {
command: SpiCommands::Batch { cs, op },
} => {
assert_eq!(cs, 0);
assert_eq!(op, vec!["write:0x9F", "read:3"]);
}
_ => panic!("expected Spi Batch command"),
}
}
#[test]
fn cli_spi_batch_with_transfer_and_delay() {
let cli = Cli::try_parse_from([
"gallo",
"spi",
"batch",
"--cs",
"1",
"--op",
"transfer:0x01,0x02",
"--op",
"delay:1000",
])
.unwrap();
match cli.command {
Commands::Spi {
command: SpiCommands::Batch { cs, op },
} => {
assert_eq!(cs, 1);
assert_eq!(op, vec!["transfer:0x01,0x02", "delay:1000"]);
}
_ => panic!("expected Spi Batch command"),
}
}
#[test]
fn cli_spi_batch_requires_ops() {
let result = Cli::try_parse_from(["gallo", "spi", "batch", "--cs", "0"]);
assert!(result.is_err());
}
#[test]
fn parse_i2c_batch_ops_read() {
let ops = vec!["read:16".to_string()];
let parsed = parse_i2c_batch_ops(&ops).unwrap();
assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0].0, I2cBatchKind::Read(16)));
}
#[test]
fn parse_i2c_batch_ops_write() {
let ops = vec!["write:0xDE,0xAD".to_string()];
let parsed = parse_i2c_batch_ops(&ops).unwrap();
assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0].0, I2cBatchKind::Write));
assert_eq!(parsed[0].1, vec![0xDE, 0xAD]);
}
#[test]
fn parse_i2c_batch_ops_mixed() {
let ops = vec!["write:0x00,0x10".to_string(), "read:32".to_string()];
let parsed = parse_i2c_batch_ops(&ops).unwrap();
assert_eq!(parsed.len(), 2);
}
#[test]
fn parse_i2c_batch_ops_invalid() {
let ops = vec!["transfer:0x01".to_string()];
assert!(parse_i2c_batch_ops(&ops).is_err());
}
#[test]
fn parse_spi_batch_ops_all_types() {
let ops = vec![
"read:4".to_string(),
"write:0x9F".to_string(),
"transfer:0x01,0x02".to_string(),
"delay:1000".to_string(),
];
let parsed = parse_spi_batch_ops(&ops).unwrap();
assert_eq!(parsed.len(), 4);
}
#[test]
fn parse_spi_batch_ops_invalid() {
let ops = vec!["nope:1".to_string()];
assert!(parse_spi_batch_ops(&ops).is_err());
}
#[test]
fn parse_byte_list_hex_and_decimal() {
let result = parse_byte_list("0x0A,20,0xFF").unwrap();
assert_eq!(result, vec![0x0A, 20, 0xFF]);
}
#[test]
fn print_hex_dump_does_not_panic() {
print_hex_dump(&[0x00, 0x41, 0x42, 0x7F, 0x80, 0xFF]);
}
}