use anyhow::Error;
use std::{
collections::HashMap,
fs,
io::{Read, Seek, Write},
path::Path,
thread,
time::Duration,
};
use crate::gpio_pin_data::{get_data, ChannelInfo, JetsonInfo, Mode};
static SYSFS_ROOT: &str = "/sys/class/gpio";
#[derive(PartialEq, Clone, Copy)]
pub enum Level {
LOW = 0,
HIGH = 1,
}
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
UNKNOWN = -1,
OUT = 0,
IN = 1,
HARD_PWM = 43,
}
impl Direction {
pub fn is_valid(&self) -> bool {
matches!(self, Direction::OUT | Direction::IN | Direction::HARD_PWM)
}
}
fn check_write_access() -> Result<(), Error> {
let export_path = format!("{}/export", SYSFS_ROOT);
let unexport_path = format!("{}/unexport", SYSFS_ROOT);
let export_metadata = fs::metadata(&export_path).unwrap();
let unexport_metadata = fs::metadata(&unexport_path).unwrap();
let export_permissions = export_metadata.permissions();
let unexport_permissions = unexport_metadata.permissions();
if !export_permissions.readonly() && !unexport_permissions.readonly() {
Ok(())
} else {
Err(Error::msg("You do not have write access to the GPIO sysfs interface."))
}
}
fn sysfs_channel_configuration(ch_info: ChannelInfo) -> Option<Direction> {
if let Some(pwm_chip_dir) = &ch_info.pwm_chip_dir {
let pwm_dir = format!("{}/pwm{}", pwm_chip_dir, ch_info.pwm_id?);
if Path::new(&pwm_dir).exists() {
return Some(Direction::HARD_PWM);
}
}
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
if !Path::new(&gpio_dir).exists() {
return None;
}
let direction_path = format!("{}/direction", gpio_dir);
let gpio_direction = fs::read_to_string(&direction_path).unwrap_or_default();
match gpio_direction.as_str() {
"in" => Some(Direction::IN),
"out" => Some(Direction::OUT),
_ => None,
}
}
fn export_gpio(ch_info: &ChannelInfo) {
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
if !Path::new(&gpio_dir).exists() {
let export_path = format!("{}/export", SYSFS_ROOT);
let mut f_export = fs::OpenOptions::new()
.write(true)
.open(&export_path)
.unwrap();
f_export
.write_all(ch_info.global_gpio.to_string().as_bytes())
.unwrap();
}
let value_path = format!("{}/value", gpio_dir);
while !Path::new(&value_path).exists() {
thread::sleep(Duration::from_millis(10));
}
}
fn unexport_gpio(ch_info: &ChannelInfo) {
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
if Path::new(&gpio_dir).exists() {
let unexport_path = format!("{}/unexport", SYSFS_ROOT);
let mut f_unexport = fs::OpenOptions::new()
.write(true)
.open(&unexport_path)
.unwrap();
f_unexport
.write_all(ch_info.global_gpio.to_string().as_bytes())
.unwrap();
}
}
fn write_sysfs_file(path: &str, data: &[u8]) {
let mut f = fs::OpenOptions::new().write(true).open(path).unwrap();
f.write_all(data).unwrap();
}
fn write_direction(ch_info: ChannelInfo, direction: &str) {
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
write_sysfs_file(&format!("{}/direction", gpio_dir), direction.as_bytes());
}
fn write_value(ch_info: &ChannelInfo, value: &str) {
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
write_sysfs_file(&format!("{}/value", gpio_dir), value.as_bytes());
}
fn read_value(ch_info: ChannelInfo) -> String {
let gpio_dir = format!("{}/{}", SYSFS_ROOT, ch_info.global_gpio_name);
let value_path = format!("{}/value", gpio_dir);
fs::read_to_string(&value_path).unwrap_or_default()
}
fn output_one(ch_info: &ChannelInfo, value: Level) {
let value_str = match value {
Level::HIGH => "1",
Level::LOW => "0",
};
write_value(ch_info, value_str);
}
pub struct GPIO {
pub model: String,
pub jetson_info: JetsonInfo,
channel_data_by_mode: HashMap<Mode, HashMap<u32, ChannelInfo>>,
channel_data: HashMap<u32, ChannelInfo>,
gpio_warnings: bool,
gpio_mode: Option<Mode>,
channel_configuration: HashMap<u32, Direction>,
}
impl GPIO {
pub fn new() -> Self {
let (model, jetson_info, channel_data_by_mode) = get_data();
GPIO {
model,
jetson_info,
channel_data_by_mode,
channel_data: HashMap::new(),
gpio_warnings: true,
gpio_mode: None,
channel_configuration: HashMap::new(),
}
}
pub fn setwarnings(&mut self, warnings: bool) {
self.gpio_warnings = warnings;
}
pub fn setmode(&mut self, mode: Mode) -> Result<(), Error> {
if let Some(current_mode) = self.gpio_mode {
if current_mode != mode {
return Err(Error::msg("A different mode has already been set!"));
}
}
if !mode.is_valid() {
return Err(Error::msg("An invalid mode was passed to setmode!"));
}
self.channel_data = self.channel_data_by_mode.get(&mode).unwrap().clone();
self.gpio_mode = Some(mode);
Ok(())
}
pub fn getmode(&self) -> Option<String> {
match self.gpio_mode {
Some(mode) => Some(String::from(mode.to_str())),
None => None,
}
}
fn validate_mode_set(&self) -> Result<(), Error> {
match self.gpio_mode {
Some(_) => Ok(()),
None => Err(Error::msg("Please set pin numbering mode using GPIO.setmode(Mode::BOARD), GPIO.setmode(Mode::BCM), GPIO.setmode(Mode::TEGRA_SOC) or GPIO.setmode(Mode::CVM)")),
}
}
fn channel_to_info_lookup(
&self,
channel: u32,
need_gpio: bool,
need_pwm: bool,
) -> Result<ChannelInfo, Error> {
if !self.channel_data.contains_key(&channel) {
return Err(Error::msg(format!(
"The channel sent is invalid: {}",
channel
)));
}
let ch_info = self.channel_data.get(&channel).unwrap().clone();
if need_gpio && ch_info.gpio_chip_dir == "" {
return Err(Error::msg(format!("Channel {} is not a GPIO", channel)));
}
if need_pwm && ch_info.pwm_chip_dir.is_none() {
return Err(Error::msg(format!("Channel {} is not a PWM", channel)));
}
Ok(ch_info)
}
fn channel_to_info(
&self,
channel: u32,
need_gpio: bool,
need_pwm: bool,
) -> Result<ChannelInfo, Error> {
self.validate_mode_set()?;
self.channel_to_info_lookup(channel, need_gpio, need_pwm)
}
fn channels_to_infos(
&self,
channels: Vec<u32>,
need_gpio: bool,
need_pwm: bool,
) -> Result<Vec<ChannelInfo>, Error> {
self.validate_mode_set()?;
let mut ret: Vec<ChannelInfo> = Vec::new();
for channel in channels {
ret.push(self.channel_to_info_lookup(channel, need_gpio, need_pwm)?);
}
Ok(ret)
}
fn app_channel_configuration(&self, ch_info: &ChannelInfo) -> Option<Direction> {
self.channel_configuration.get(&ch_info.channel).copied()
}
fn cleanup_one(&mut self, ch_info: ChannelInfo) {
match self.channel_configuration.get(&ch_info.channel) {
Some(direction) => {
if direction == &Direction::HARD_PWM {
} else {
unexport_gpio(&ch_info);
}
}
None => {}
}
self.channel_configuration.remove(&ch_info.channel);
}
fn cleanup_all(&mut self) -> Result<(), Error> {
for (channel, _) in self.channel_configuration.clone().iter() {
let ch_info = self.channel_to_info(*channel, false, false)?;
self.cleanup_one(ch_info);
}
self.gpio_mode = None;
Ok(())
}
fn setup_single_out(&mut self, ch_info: ChannelInfo, initial: Option<Level>) {
export_gpio(&ch_info);
write_direction(ch_info.clone(), "out");
if let Some(level) = initial {
output_one(&ch_info, level);
}
self.channel_configuration
.insert(ch_info.channel, Direction::OUT);
}
fn setup_single_in(&mut self, ch_info: ChannelInfo) {
export_gpio(&ch_info);
write_direction(ch_info.clone(), "in");
self.channel_configuration
.insert(ch_info.channel, Direction::IN);
}
pub fn setup(&mut self, channels: Vec<u32>, direction: Direction, initial: Option<Level>) -> Result<(), Error> {
check_write_access()?;
let ch_infos = self.channels_to_infos(channels, true, false)?;
if !direction.is_valid() {
return Err(Error::msg("An invalid direction was passed to setup()"));
}
if self.gpio_warnings {
for ch_info in ch_infos.clone() {
let sysfs_cfg = sysfs_channel_configuration(ch_info.clone());
let app_cfg = self.app_channel_configuration(&ch_info);
if app_cfg.is_none() && sysfs_cfg.is_some() {
println!("This channel is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings");
}
}
}
for ch_info in ch_infos.clone() {
if self.channel_configuration.contains_key(&ch_info.channel) {
self.cleanup_one(ch_info.clone());
}
}
match direction {
Direction::OUT => {
for ch_info in &ch_infos {
self.setup_single_out(ch_info.clone(), initial.clone());
}
}
_ => {
if initial.is_some() {
return Err(Error::msg("initial parameter is not valid for inputs"));
}
for ch_info in &ch_infos {
self.setup_single_in(ch_info.clone());
}
}
}
Ok(())
}
pub fn cleanup(&mut self, channels: Option<Vec<u32>>) -> Result<(), Error> {
if self.gpio_mode.is_none() {
if self.gpio_warnings {
println!("No channels have been set up yet - nothing to clean up! Try cleaning up at the end of your program instead!");
}
return Ok(());
}
if channels.is_none() {
self.cleanup_all()?;
return Ok(());
}
let ch_infos = self.channels_to_infos(channels.unwrap(), false, false)?;
for ch_info in ch_infos {
if self.channel_configuration.contains_key(&ch_info.channel) {
self.cleanup_one(ch_info);
}
}
Ok(())
}
pub fn input(&self, channel: u32) -> Result<Level, Error> {
let ch_info = self.channel_to_info(channel, true, false)?;
let app_cfg = self.app_channel_configuration(&ch_info);
if app_cfg.is_none() || ![Direction::IN, Direction::OUT].contains(&app_cfg.unwrap()) {
return Err(Error::msg("You must setup() the GPIO channel first"));
}
match read_value(ch_info.clone()).as_str() {
"0" => Ok(Level::LOW),
_ => Ok(Level::HIGH),
}
}
pub fn output(&self, channels: Vec<u32>, values: Vec<Level>) -> Result<(), Error> {
let ch_infos = self.channels_to_infos(channels.clone(), true, false)?;
if values.len() != ch_infos.len() {
return Err(Error::msg("Number of values != number of channels"));
}
for ch_info in &ch_infos {
let app_cfg = self.app_channel_configuration(ch_info);
if app_cfg.is_none() || app_cfg.unwrap() != Direction::OUT {
return Err(Error::msg("The GPIO channel has not been set up as an OUTPUT"));
}
}
for (ch_info, value) in ch_infos.iter().zip(values.iter()) {
output_one(ch_info, value.clone());
}
Ok(())
}
}