use crate::gpio_cdev::*;
use crate::gpio_pin_data::{ChannelInfo, JetsonInfo, Mode, get_data};
use anyhow::Error;
use std::os::fd::AsRawFd;
use std::{collections::HashMap, fs::OpenOptions, path::Path};
const GPIOHANDLE_REQUEST_INPUT: u32 = 0x1;
const GPIOHANDLE_REQUEST_OUTPUT: u32 = 0x2;
#[derive(PartialEq, Clone, Copy)]
pub enum Level {
LOW = 0,
HIGH = 1,
}
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
UNKNOWN = -1,
OUT = 0,
IN = 1,
HardPwm = 43,
}
impl Direction {
pub fn is_valid(&self) -> bool {
matches!(self, Direction::OUT | Direction::IN | Direction::HardPwm)
}
fn from_cdev(value: i32) -> Self {
if value == GPIOHANDLE_REQUEST_INPUT as i32 {
Direction::IN
} else if value == GPIOHANDLE_REQUEST_OUTPUT as i32 {
Direction::OUT
} else {
Direction::UNKNOWN
}
}
fn to_cdev(&self) -> u32 {
match self {
Direction::IN => GPIOHANDLE_REQUEST_INPUT,
Direction::OUT => GPIOHANDLE_REQUEST_OUTPUT,
_ => GPIOHANDLE_REQUEST_INPUT,
}
}
}
#[derive(PartialEq, Clone, Copy)]
pub enum Edge {
RISING = 31, FALLING = 32, BOTH = 33, }
impl Edge {
pub fn is_valid(&self) -> bool {
matches!(self, Edge::RISING | Edge::FALLING | Edge::BOTH)
}
}
#[derive(PartialEq, Clone, Copy)]
pub enum PullUpDown {
PudOff = 20, PudDown = 21, PudUp = 22, }
impl PullUpDown {
pub fn is_valid(&self) -> bool {
matches!(
self,
PullUpDown::PudOff | PullUpDown::PudDown | PullUpDown::PudUp
)
}
}
fn check_write_access() -> Result<(), Error> {
let gpiochip_path = "/dev/gpiochip0";
if !Path::new(gpiochip_path).exists() {
return Err(Error::msg(
"GPIO character device not found. This library requires a Jetson platform.",
));
}
if !Path::new(gpiochip_path).metadata().is_ok_and(|_m| {
OpenOptions::new()
.read(true)
.write(true)
.open(gpiochip_path)
.is_ok()
}) {
return Err(Error::msg(
"The current user does not have permissions set to access the library functionalites. Please configure permissions or use the root user to run this. It is also possible that /dev/gpiochip0 does not exist. Please check if that file is present.",
));
}
Ok(())
}
pub struct GPIO {
pub model: String,
pub jetson_info: JetsonInfo,
channel_data_by_mode: HashMap<Mode, HashMap<u32, ChannelInfo>>,
channel_data: HashMap<u32, ChannelInfo>,
gpio_warnings: bool,
gpio_mode: Option<Mode>,
channel_configuration: HashMap<u32, Direction>,
chip_fd_map: HashMap<String, std::fs::File>,
}
impl GPIO {
pub fn new() -> Self {
let (model, jetson_info, channel_data_by_mode) = get_data();
GPIO {
model,
jetson_info,
channel_data_by_mode,
channel_data: HashMap::new(),
gpio_warnings: true,
gpio_mode: None,
channel_configuration: HashMap::new(),
chip_fd_map: HashMap::new(),
}
}
pub fn setwarnings(&mut self, warnings: bool) {
self.gpio_warnings = warnings;
}
pub fn setmode(&mut self, mode: Mode) -> Result<(), Error> {
if let Some(ref current_mode) = self.gpio_mode {
if *current_mode != mode {
return Err(Error::msg("A different mode has already been set!"));
}
}
if !mode.is_valid() {
return Err(Error::msg("An invalid mode was passed to setmode!"));
}
self.channel_data = self.channel_data_by_mode.get(&mode).unwrap().clone();
self.gpio_mode = Some(mode);
Ok(())
}
pub fn getmode(&self) -> Option<Mode> {
self.gpio_mode.clone()
}
fn validate_mode_set(&self) -> Result<(), Error> {
match self.gpio_mode {
Some(_) => Ok(()),
None => Err(Error::msg(
"Please set pin numbering mode using GPIO.setmode(Mode::BOARD), GPIO.setmode(Mode::BCM), GPIO.setmode(Mode::TEGRA_SOC) or GPIO.setmode(Mode::CVM)",
)),
}
}
fn channel_to_info_lookup(
&self,
channel: u32,
need_gpio: bool,
need_pwm: bool,
) -> Result<&ChannelInfo, Error> {
if !self.channel_data.contains_key(&channel) {
return Err(Error::msg(format!(
"The channel sent is invalid: {}",
channel
)));
}
let ch_info = self.channel_data.get(&channel).unwrap();
if need_gpio && ch_info.gpio_chip.is_empty() {
return Err(Error::msg(format!("Channel {} is not a GPIO", channel)));
}
if need_pwm && ch_info.pwm_chip_dir.is_none() {
return Err(Error::msg(format!("Channel {} is not a PWM", channel)));
}
Ok(ch_info)
}
fn channel_to_info(
&self,
channel: u32,
need_gpio: bool,
need_pwm: bool,
) -> Result<&ChannelInfo, Error> {
self.validate_mode_set()?;
self.channel_to_info_lookup(channel, need_gpio, need_pwm)
}
fn channels_to_infos(
&self,
channels: Vec<u32>,
need_gpio: bool,
need_pwm: bool,
) -> Result<Vec<&ChannelInfo>, Error> {
self.validate_mode_set()?;
let mut ret: Vec<&ChannelInfo> = Vec::new();
for channel in channels {
ret.push(self.channel_to_info_lookup(channel, need_gpio, need_pwm)?);
}
Ok(ret)
}
fn app_channel_configuration(&self, ch_info: &ChannelInfo) -> Option<Direction> {
self.channel_configuration.get(&ch_info.channel).copied()
}
fn do_one_channel(
&mut self,
ch_info: ChannelInfo,
direction: u32,
initial: Option<u8>,
consumer: &str,
) {
let chip_name = ch_info.gpio_chip.clone();
let chip_fd = if !self.chip_fd_map.contains_key(&chip_name) {
let fd = chip_open_by_label(&chip_name).expect("Failed to open GPIO chip");
self.chip_fd_map.insert(chip_name.clone(), fd);
self.chip_fd_map
.get(&chip_name)
.unwrap()
.try_clone()
.expect("Failed to clone chip fd")
} else {
self.chip_fd_map
.get(&chip_name)
.unwrap()
.try_clone()
.expect("Failed to clone chip fd")
};
let chip_fd_raw = chip_fd.as_raw_fd();
let mut request = request_handle(ch_info.line_offset, direction, initial, consumer)
.expect("Failed to create request");
let line_handle = open_line(&mut request, &chip_fd).expect("Failed to open GPIO line");
let mut ch_info = ch_info;
ch_info.chip_fd = Some(chip_fd_raw);
ch_info.line_handle = Some(line_handle);
if self.gpio_warnings {
if let Err(e) = check_pinmux(ch_info.reg_addr, direction, ch_info.channel) {
eprintln!("Pinmux check warning: {}", e);
}
}
self.channel_configuration
.insert(ch_info.channel, Direction::from_cdev(direction as i32));
self.channel_data.insert(ch_info.channel, ch_info);
}
fn cleanup_one(&mut self, ch_info: ChannelInfo) {
if let Some(line_handle) = ch_info.line_handle {
let _ = close_line(Some(line_handle));
}
self.channel_configuration.remove(&ch_info.channel);
}
fn cleanup_all(&mut self) -> Result<(), Error> {
for (_chip_name, chip_fd) in self.chip_fd_map.drain() {
let _ = close_chip(Some(chip_fd));
}
let ch_infos_to_cleanup: Vec<ChannelInfo> = self.channel_data.values().cloned().collect();
for ch_info in ch_infos_to_cleanup {
self.cleanup_one(ch_info);
}
self.gpio_mode = None;
Ok(())
}
fn setup_single_out(&mut self, ch_info: ChannelInfo, initial: Option<Level>) {
let initial_value = initial.map(|l| l as u8);
self.do_one_channel(
ch_info,
Direction::OUT.to_cdev(),
initial_value,
"jetsongpio-rs",
);
}
fn setup_single_in(&mut self, ch_info: ChannelInfo) {
self.do_one_channel(ch_info, Direction::IN.to_cdev(), None, "jetsongpio-rs");
}
pub fn setup(
&mut self,
channels: Vec<u32>,
direction: Direction,
initial: Option<Level>,
) -> Result<(), Error> {
check_write_access()?;
let ch_infos = self.channels_to_infos(channels, true, false)?;
if !direction.is_valid() {
return Err(Error::msg("An invalid direction was passed to setup()"));
}
let ch_infos_owned: Vec<ChannelInfo> = ch_infos.iter().map(|&ch| ch.clone()).collect();
for ch_info in ch_infos_owned.iter() {
if self.channel_configuration.contains_key(&ch_info.channel) {
self.cleanup_one(ch_info.clone());
}
}
match direction {
Direction::OUT => {
for ch_info in ch_infos_owned {
self.setup_single_out(ch_info, initial);
}
}
Direction::IN => {
if initial.is_some() {
return Err(Error::msg("initial parameter is not valid for inputs"));
}
for ch_info in ch_infos_owned {
self.setup_single_in(ch_info);
}
}
_ => {
return Err(Error::msg("Unsupported direction for setup()"));
}
}
Ok(())
}
pub fn cleanup(&mut self, channels: Option<Vec<u32>>) -> Result<(), Error> {
if self.gpio_mode.is_none() {
if self.gpio_warnings {
println!(
"No channels have been set up yet - nothing to clean up! Try cleaning up at the end of your program instead!"
);
}
return Ok(());
}
if channels.is_none() {
self.cleanup_all()?;
return Ok(());
}
let ch_infos = self.channels_to_infos(channels.unwrap(), false, false)?;
let channels_to_cleanup: Vec<u32> = ch_infos
.iter()
.filter_map(|ch_info| {
if self.channel_configuration.contains_key(&ch_info.channel) {
Some(ch_info.channel)
} else {
None
}
})
.collect();
for channel in channels_to_cleanup {
if let Some(ch_info) = self.channel_data.get(&channel).cloned() {
self.cleanup_one(ch_info);
}
}
Ok(())
}
pub fn input(&self, channel: u32) -> Result<Level, Error> {
let ch_info = self.channel_to_info(channel, true, false)?;
let app_cfg = self.app_channel_configuration(ch_info);
if app_cfg.is_none() || ![Direction::IN, Direction::OUT].contains(&app_cfg.unwrap()) {
return Err(Error::msg("You must setup() the GPIO channel first"));
}
let line_handle = ch_info
.line_handle
.ok_or_else(|| Error::msg("GPIO line handle not found"))?;
let value = get_value(line_handle)?;
match value {
0 => Ok(Level::LOW),
_ => Ok(Level::HIGH),
}
}
pub fn output(&self, channels: Vec<u32>, values: Vec<Level>) -> Result<(), Error> {
let ch_infos = self.channels_to_infos(channels, true, false)?;
if values.len() != ch_infos.len() {
return Err(Error::msg("Number of values != number of channels"));
}
for ch_info in &ch_infos {
let app_cfg = self.app_channel_configuration(ch_info);
if app_cfg.is_none() || app_cfg.unwrap() != Direction::OUT {
return Err(Error::msg(
"The GPIO channel has not been set up as an OUTPUT",
));
}
}
for (ch_info, value) in ch_infos.iter().zip(values.iter()) {
let line_handle = ch_info
.line_handle
.ok_or_else(|| Error::msg("GPIO line handle not found"))?;
set_value(line_handle, *value as u8)?;
}
Ok(())
}
pub fn gpio_function(&self, channel: u32) -> Result<Direction, Error> {
let ch_info = self.channel_to_info(channel, false, false)?;
let func = self.app_channel_configuration(ch_info);
Ok(func.unwrap_or(Direction::UNKNOWN))
}
}