use std::time::Duration;
use anyhow::Context;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_modbus::{
Slave,
client::{Client, Writer, rtu},
};
use tokio_serial::{DataBits, Parity, SerialPortBuilderExt, StopBits};
use crate::{
config::{BoardConfig, CustomPowerManagement, PowerManagementConfig, SerialPortKeyKind},
process::run_shell_command,
serial::discovery::resolve_serial_key,
};
const ZHONGSHENG_RELAY_BAUD_RATE: u32 = 38_400;
const ZHONGSHENG_RELAY_SLAVE_ID: u8 = 1;
const ZHONGSHENG_RELAY_COIL_ADDRESS: u16 = 0;
const ZHONGSHENG_RELAY_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PowerAction {
On,
Off,
}
impl PowerAction {
pub fn label(self) -> &'static str {
match self {
Self::On => "power-on",
Self::Off => "power-off",
}
}
}
#[derive(Debug, Error)]
pub enum PowerActionError {
#[error("board has no power management configured")]
NotConfigured,
#[error("{0}")]
InvalidConfig(String),
#[error(transparent)]
Execution(#[from] anyhow::Error),
}
pub async fn execute_power_action_for_board(
board: &BoardConfig,
action: PowerAction,
) -> Result<String, PowerActionError> {
execute_power_action(&board.power_management, action).await
}
pub async fn execute_power_action(
power_management: &PowerManagementConfig,
action: PowerAction,
) -> Result<String, PowerActionError> {
match power_management {
PowerManagementConfig::Custom(CustomPowerManagement {
power_on_cmd,
power_off_cmd,
}) => {
let command = match action {
PowerAction::On => power_on_cmd,
PowerAction::Off => power_off_cmd,
};
if command.trim().is_empty() {
return Err(PowerActionError::InvalidConfig(format!(
"board power management `{}` command is not configured",
action.label()
)));
}
run_shell_command(command).await?;
Ok(format!("executed `{command}`"))
}
PowerManagementConfig::ZhongshengRelay(relay) => {
if relay.key.value.trim().is_empty() {
return Err(PowerActionError::InvalidConfig(
"board power management relay serial key is not configured".to_string(),
));
}
let resolved_serial = resolve_serial_key(&relay.key).map_err(|err| {
PowerActionError::Execution(anyhow::anyhow!(
"failed to resolve relay serial device for {} `{}`: {err}",
relay_serial_key_kind_label(&relay.key.kind),
relay.key.value
))
})?;
run_zhongsheng_relay_action(&resolved_serial.current_device_path, action).await?;
Ok(format!(
"executed Zhongsheng relay {} via {}",
action.label(),
resolved_serial.current_device_path
))
}
PowerManagementConfig::Virtual(_) => Err(PowerActionError::InvalidConfig(
"virtual power management must be executed via application state".to_string(),
)),
}
}
async fn run_zhongsheng_relay_action(serial_port: &str, action: PowerAction) -> anyhow::Result<()> {
let transport = tokio_serial::new(serial_port, ZHONGSHENG_RELAY_BAUD_RATE)
.data_bits(DataBits::Eight)
.exclusive(false)
.parity(Parity::None)
.stop_bits(StopBits::One)
.timeout(ZHONGSHENG_RELAY_TIMEOUT)
.open_native_async()
.with_context(|| format!("failed to open relay serial port {serial_port}"))?;
write_zhongsheng_relay_action(transport, action).await
}
fn relay_serial_key_kind_label(kind: &SerialPortKeyKind) -> &'static str {
match kind {
SerialPortKeyKind::SerialNumber => "serial number",
SerialPortKeyKind::UsbPath => "usb path",
}
}
async fn write_zhongsheng_relay_action<T>(transport: T, action: PowerAction) -> anyhow::Result<()>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut context = rtu::attach_slave(transport, Slave(ZHONGSHENG_RELAY_SLAVE_ID));
let coil = matches!(action, PowerAction::On);
tokio::time::timeout(
ZHONGSHENG_RELAY_TIMEOUT,
context.write_single_coil(ZHONGSHENG_RELAY_COIL_ADDRESS, coil),
)
.await
.context("timed out writing Zhongsheng relay coil")?
.context("failed to write Zhongsheng relay coil")?
.context("Zhongsheng relay rejected coil write")?;
if let Err(err) = context.disconnect().await {
log::debug!("failed to close Zhongsheng relay Modbus session: {err}");
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{future, time::Duration};
use serialport::{SerialPort, TTYPort};
use tokio::sync::{mpsc, oneshot};
use tokio_modbus::{
ExceptionCode, Request, Response, SlaveRequest,
server::{Service, rtu::Server},
};
use super::{
PowerAction, ZHONGSHENG_RELAY_COIL_ADDRESS, ZHONGSHENG_RELAY_SLAVE_ID,
execute_power_action_for_board, write_zhongsheng_relay_action,
};
use crate::config::{
BoardConfig, BootConfig, CustomPowerManagement, PowerManagementConfig, PxeProfile,
SerialPortKey, SerialPortKeyKind, ZhongshengRelayPowerManagement,
};
type RelayServerHandle =
tokio::task::JoinHandle<std::io::Result<tokio_modbus::server::Terminated>>;
type RelayRequestRx = mpsc::UnboundedReceiver<(u8, u16, bool)>;
type RelayTestServer = (
String,
tokio_serial::SerialStream,
RelayServerHandle,
RelayRequestRx,
oneshot::Sender<()>,
);
fn board_with_power_management(power_management: PowerManagementConfig) -> BoardConfig {
BoardConfig {
id: "demo".into(),
board_type: "demo".into(),
tags: vec![],
serial: None,
power_management,
boot: BootConfig::Pxe(PxeProfile::default()),
notes: None,
disabled: false,
}
}
#[tokio::test]
async fn custom_power_management_executes_commands() {
let board =
board_with_power_management(PowerManagementConfig::Custom(CustomPowerManagement {
power_on_cmd: "printf power-on >/dev/null".into(),
power_off_cmd: "printf power-off >/dev/null".into(),
}));
let message = execute_power_action_for_board(&board, PowerAction::On)
.await
.unwrap();
assert_eq!(message, "executed `printf power-on >/dev/null`");
}
#[cfg(unix)]
#[tokio::test]
async fn relay_power_management_writes_single_coil_for_power_off() {
let (_relay_path, client, server, mut requests, stop_tx) = spawn_relay_test_server();
write_zhongsheng_relay_action(client, PowerAction::Off)
.await
.unwrap();
let request = tokio::time::timeout(Duration::from_secs(1), requests.recv())
.await
.unwrap()
.unwrap();
assert_eq!(
request,
(
ZHONGSHENG_RELAY_SLAVE_ID,
ZHONGSHENG_RELAY_COIL_ADDRESS,
false
)
);
let _ = stop_tx.send(());
let _ = server.await.unwrap();
}
#[cfg(unix)]
#[tokio::test]
async fn relay_power_management_writes_single_coil_for_power_on() {
let (_relay_path, client, server, mut requests, stop_tx) = spawn_relay_test_server();
write_zhongsheng_relay_action(client, PowerAction::On)
.await
.unwrap();
let request = tokio::time::timeout(Duration::from_secs(1), requests.recv())
.await
.unwrap()
.unwrap();
assert_eq!(
request,
(
ZHONGSHENG_RELAY_SLAVE_ID,
ZHONGSHENG_RELAY_COIL_ADDRESS,
true
)
);
let _ = stop_tx.send(());
let _ = server.await.unwrap();
}
#[cfg(unix)]
#[tokio::test]
async fn relay_power_management_resolves_current_device_from_key() {
let (relay_port, _client, server, mut requests, stop_tx) = spawn_relay_test_server();
let board = board_with_power_management(PowerManagementConfig::ZhongshengRelay(
ZhongshengRelayPowerManagement {
key: SerialPortKey {
kind: SerialPortKeyKind::UsbPath,
value: relay_port.clone(),
},
},
));
let message = execute_power_action_for_board(&board, PowerAction::Off)
.await
.unwrap();
assert_eq!(
message,
format!("executed Zhongsheng relay power-off via {relay_port}")
);
let request = tokio::time::timeout(Duration::from_secs(1), requests.recv())
.await
.unwrap()
.unwrap();
assert_eq!(
request,
(
ZHONGSHENG_RELAY_SLAVE_ID,
ZHONGSHENG_RELAY_COIL_ADDRESS,
false
)
);
let _ = stop_tx.send(());
let _ = server.await.unwrap();
}
#[tokio::test]
async fn relay_power_management_rejects_empty_key() {
let board = board_with_power_management(PowerManagementConfig::ZhongshengRelay(
ZhongshengRelayPowerManagement {
key: SerialPortKey {
kind: SerialPortKeyKind::UsbPath,
value: String::new(),
},
},
));
let err = execute_power_action_for_board(&board, PowerAction::On)
.await
.unwrap_err();
assert!(matches!(err, super::PowerActionError::InvalidConfig(_)));
assert_eq!(
err.to_string(),
"board power management relay serial key is not configured"
);
}
#[tokio::test]
async fn relay_power_management_reports_resolution_failures() {
let board = board_with_power_management(PowerManagementConfig::ZhongshengRelay(
ZhongshengRelayPowerManagement {
key: SerialPortKey {
kind: SerialPortKeyKind::UsbPath,
value: "/definitely/missing/relay-device".into(),
},
},
));
let err = execute_power_action_for_board(&board, PowerAction::Off)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("failed to resolve relay serial device for usb path")
);
}
#[derive(Clone)]
struct RecordingRelayService {
requests: mpsc::UnboundedSender<(u8, u16, bool)>,
}
impl Service for RecordingRelayService {
type Request = SlaveRequest<'static>;
type Response = Response;
type Exception = ExceptionCode;
type Future = future::Ready<std::result::Result<Self::Response, Self::Exception>>;
fn call(&self, req: Self::Request) -> Self::Future {
match req.request {
Request::WriteSingleCoil(address, coil) => {
self.requests.send((req.slave, address, coil)).unwrap();
future::ready(Ok(Response::WriteSingleCoil(address, coil)))
}
_ => future::ready(Err(ExceptionCode::IllegalFunction)),
}
}
}
#[cfg(unix)]
fn spawn_relay_test_server() -> RelayTestServer {
let (master, mut slave) = TTYPort::pair().unwrap();
slave.set_exclusive(false).unwrap();
let slave_path = slave.name().unwrap();
let client = tokio_serial::SerialStream::try_from(slave).unwrap();
let server = tokio_serial::SerialStream::try_from(master).unwrap();
let (request_tx, request_rx) = mpsc::unbounded_channel();
let (stop_tx, stop_rx) = oneshot::channel();
let task = tokio::spawn(async move {
Server::new(server)
.serve_until(
RecordingRelayService {
requests: request_tx,
},
async move {
let _ = stop_rx.await;
},
)
.await
});
(slave_path, client, task, request_rx, stop_tx)
}
}