use base64;
use derive_builder::Builder;
use log::{log_enabled, trace, Level::*};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serialport::prelude::*;
use serialport::SerialPortType::*;
use snafu::Snafu;
use std::convert::TryFrom;
use std::io;
use std::str;
use std::time::{Duration as StdDuration, Instant};
const PROTOCOL_VERSION: u8 = 16;
const TIMEOUT: StdDuration = StdDuration::from_secs(1);
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct LoRaSerial {
port: Box<dyn SerialPort>,
timeout: StdDuration,
instance_id: String,
}
#[derive(Debug, Snafu)]
enum InitLoraWithPortNameErr {
#[snafu(display("Serial port error {:?}", inner_error))]
SerialErr {
inner_error: serialport::Error,
},
LoraErr {
inner_error: ErrorKind,
},
}
impl LoRaSerial {
pub fn init_all() -> Vec<LoRaSerial> {
serialport::available_ports()
.expect("Error enumerating ports")
.into_iter()
.filter(|port_info| match &port_info.port_type {
UsbPort(usb_info) => {
println!("{:?}", usb_info);
true
}
_ => false,
})
.enumerate()
.filter_map(|(i, port_info)| {
LoRaSerial::from_port_name(&port_info.port_name)
.ok()
.map(|mut lora| {
lora.set_instance_id(i.to_string());
lora
})
})
.collect()
}
fn from_port_name(port_name: &str) -> Result<Self, InitLoraWithPortNameErr> {
let settings = SerialPortSettings {
baud_rate: 115200,
data_bits: DataBits::Eight,
flow_control: FlowControl::None,
parity: Parity::None,
stop_bits: StopBits::One,
timeout: StdDuration::from_millis(1),
};
let port = serialport::open_with_settings(port_name, &settings)
.map_err(|e| InitLoraWithPortNameErr::SerialErr { inner_error: e })?;
let lora = LoRaSerial::from_serial(port)
.map_err(|e| InitLoraWithPortNameErr::LoraErr { inner_error: e })?;
Ok(lora)
}
pub fn from_serial(port: Box<dyn SerialPort>) -> Result<Self, ErrorKind> {
let mut new_self = Self {
port,
timeout: TIMEOUT,
instance_id: "".into(),
};
let version = new_self
.read_version()
.map_err(|e| ErrorKind::CommunicationError {
msg: format!("Error reading version {:?}", e),
})?;
if version != PROTOCOL_VERSION {
return Err(ErrorKind::VersionMismatch {
actual: version,
expected: PROTOCOL_VERSION,
});
}
new_self.reset()?;
Ok(new_self)
}
#[allow(dead_code)]
fn set_instance_id(&mut self, id: String) {
self.instance_id = id;
}
fn read_version(&mut self) -> Result<u8, ErrorKind> {
self.serial_fetch_json(&SerialRequest::ReadVersion)
}
pub fn reset(&mut self) -> Result<(), ErrorKind> {
self.init_modem()?;
self.set_frequency(915.0)?;
self.set_radio_preset(Bw125Cr45Sf128)?;
self.set_tx_power(0)?;
self.set_modem_address(0)?;
Ok(())
}
fn init_modem(&mut self) -> Result<(), ErrorKind> {
self.serial_fetch_json(&SerialRequest::Init)
}
pub fn set_frequency(&mut self, new_freq: f32) -> Result<(), ErrorKind> {
self.serial_fetch_json(&SerialRequest::SetFrequency { freq: new_freq })
}
pub fn set_radio_preset(&mut self, preset: RadioPreset) -> Result<(), ErrorKind> {
self.serial_fetch_json(&SerialRequest::SetRadioPreset { preset })
}
pub fn set_tx_power(&mut self, power: i8) -> Result<(), ErrorKind> {
self.serial_fetch_json(&SerialRequest::SetTxPower { power })
}
pub fn set_modem_address(&mut self, address: u8) -> Result<(), ErrorKind> {
let echo: u8 = self.serial_fetch_json(&SerialRequest::SetModemAddress { address })?;
if echo == address {
Ok(())
} else {
Err(ErrorKind::RoundtripIntegrityError {
actual: echo.to_string(),
expected: address.to_string(),
})
}
}
pub fn tx_string(&mut self, msg: &str, dst: u8) -> Result<(), ErrorKind> {
self.tx(msg.as_bytes(), dst)
}
pub fn tx(&mut self, msg: &[u8], dst: u8) -> Result<(), ErrorKind> {
self.serial_fetch_json(&SerialRequest::Tx {
msg: base64::encode(msg),
dst,
})
}
pub fn rx(&mut self) -> Result<Option<LoraMsg>, ErrorKind> {
let response = self.serial_fetch_json(&SerialRequest::Rx)?;
match response {
RxResult::Packet(incoming_msg) => {
let decoded_msg =
LoraMsg::try_from(incoming_msg).map_err(|e| ErrorKind::CommunicationError {
msg: format!("Error decoding message {:?}", e),
})?;
Ok(Some(decoded_msg))
}
RxResult::NoPacket => Ok(None),
}
}
pub fn rx_string(&mut self) -> Result<Option<LoraStringMsg>, ErrorKind> {
let decoded_msg = self.rx()?;
match decoded_msg {
None => Ok(None),
Some(decoded_msg) => {
let utf8_msg = LoraStringMsg::try_from(decoded_msg).map_err(|e| {
ErrorKind::CommunicationError {
msg: format!("Error reading message as string! {:?}", e),
}
})?;
Ok(Some(utf8_msg))
}
}
}
fn serial_fetch_json<T: DeserializeOwned>(
&mut self,
request: &SerialRequest,
) -> Result<T, ErrorKind> {
trace!("serial_fetch_json");
let response = self.serial_fetch_raw(request)?;
let response: SerialResponse<T> = serde_json::from_slice(&response).map_err(|e| {
trace!("Error deserializing serial response from slice, {:?}", e);
ErrorKind::ParseError { inner: e }
})?;
response.into_result()
}
fn serial_fetch_raw(&mut self, request: &SerialRequest) -> Result<Vec<u8>, ErrorKind> {
trace!("serial fetch");
let request = serde_json::to_string(request).map_err(|_e| ErrorKind::InvalidInput)?;
if log_enabled!(Debug) {
println!("->{} {}", self.instance_id, request);
} else {
println!("log not enabled");
}
self.port
.write_all(request.as_bytes())
.map_err(|e| ErrorKind::CommunicationError {
msg: format!("Error writing message to port\n{:?}", e),
})?;
self.port
.write_all(b"\n")
.map_err(|e| ErrorKind::CommunicationError {
msg: format!("Error writing newline to port\n{:?}", e),
})?;
let mut response_buffer: Vec<u8> = Vec::new();
let start_time = Instant::now();
let mut printed_leading_arrow = false;
loop {
if start_time.elapsed() > self.timeout {
return Err(ErrorKind::Timeout);
}
let mut char_buf = [0; 1];
match self.port.read(&mut char_buf) {
Ok(bytes) if bytes == 0 => (),
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => (),
Ok(_bytes) => {
if log_enabled!(Debug) {
if !printed_leading_arrow {
print!("<-{} ", self.instance_id);
printed_leading_arrow = true;
}
print!(
"{}",
String::from_utf8(char_buf.to_vec())
.unwrap_or_else(|_e| String::from("?"))
);
}
response_buffer.push(char_buf[0]);
if char_buf[0] == 10 {
return Ok(response_buffer);
}
}
Err(e) => {
return Err(ErrorKind::CommunicationError {
msg: format!("Error reading from port! {:?}", e),
});
}
}
}
}
}
#[derive(Debug, Snafu)]
pub enum ErrorKind {
VersionMismatch { actual: u8, expected: u8 },
CommunicationError { msg: String },
ParseError { inner: serde_json::Error },
RemoteError { msg: String },
InvalidInput,
Timeout,
RoundtripIntegrityError { actual: String, expected: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum RadioPreset {
Bw125Cr45Sf128,
Bw500Cr45Sf128,
Bw31_25Cr48Sf512,
Bw125Cr48Sf4096,
}
use RadioPreset::*;
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
enum SerialRequest {
ReadVersion,
Init,
SetFrequency { freq: f32 },
SetRadioPreset { preset: RadioPreset },
SetTxPower { power: i8 },
Tx { msg: String, dst: u8 },
SetModemAddress { address: u8 },
Rx,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "status")]
enum SerialResponse<T> {
Ok { body: T },
Err { err_msg: String },
}
impl<T> SerialResponse<T> {
fn into_result(self) -> Result<T, ErrorKind> {
match self {
Self::Ok { body } => Ok(body),
Self::Err { err_msg } => Err(ErrorKind::RemoteError { msg: err_msg }),
}
}
}
#[derive(Deserialize, Debug, Clone, PartialEq)]
pub struct IncomingLoraMsg {
src: u8,
dst: u8,
len: u8,
data: String,
id: u8,
flags: u8,
}
#[derive(Clone, Deserialize, Debug)]
pub struct LoraMsg {
pub src: u8,
pub dst: u8,
pub len: u8,
pub data: Vec<u8>,
pub id: u8,
pub flags: u8,
}
impl TryFrom<IncomingLoraMsg> for LoraMsg {
type Error = String;
fn try_from(msg: IncomingLoraMsg) -> Result<Self, Self::Error> {
Ok(Self {
src: msg.src,
dst: msg.dst,
len: msg.len,
id: msg.id,
flags: msg.flags,
data: base64::decode(&msg.data)
.map_err(|e| format!("Error decoding base64, {:?}", e))?,
})
}
}
#[derive(Clone, Deserialize, Debug)]
pub struct LoraStringMsg {
pub src: u8,
pub dst: u8,
pub len: u8,
pub data: String,
pub id: u8,
pub flags: u8,
}
impl TryFrom<LoraMsg> for LoraStringMsg {
type Error = String;
fn try_from(msg: LoraMsg) -> Result<Self, Self::Error> {
Ok(Self {
src: msg.src,
dst: msg.dst,
len: msg.len,
id: msg.id,
flags: msg.flags,
data: String::from_utf8(msg.data)
.map_err(|e| format!("Error reading bytes as utf-8\n{:?}", e))?,
})
}
}
impl TryFrom<IncomingLoraMsg> for LoraStringMsg {
type Error = String;
fn try_from(msg: IncomingLoraMsg) -> Result<Self, Self::Error> {
Self::try_from(LoraMsg::try_from(msg)?)
}
}
#[derive(Clone, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum RxResult {
Packet(IncomingLoraMsg),
NoPacket,
}
#[cfg(test)]
mod tests {
use super::*;
fn pre_test() -> Vec<LoRaSerial> {
let _ = env_logger::from_env(env_logger::Env::default().default_filter_or("debug"))
.is_test(true)
.try_init();
let instances = LoRaSerial::init_all();
assert!(!instances.is_empty());
instances
}
fn pre_test_pair() -> (LoRaSerial, LoRaSerial) {
let mut ports = pre_test();
assert!(
ports.len() >= 2,
"At least two ports are required for this test"
);
let mut a = ports.pop().unwrap();
let mut b = ports.pop().unwrap();
a.set_modem_address(1).unwrap();
b.set_modem_address(2).unwrap();
(a, b)
}
#[test]
fn a_smoke_init_ok() {
pre_test();
}
#[test]
fn a_smoke_set_frequency() {
let loras = pre_test();
for mut lora in loras {
lora.set_frequency(915.0).unwrap();
}
}
#[test]
fn a_smoke_set_radio_preset() {
let loras = pre_test();
for mut lora in loras {
lora.set_radio_preset(Bw125Cr45Sf128).unwrap();
}
}
#[test]
fn a_smoke_set_modem_address() {
let loras = pre_test();
for mut lora in loras {
lora.set_modem_address(0).unwrap();
lora.set_modem_address(1).unwrap();
lora.set_modem_address(2).unwrap();
lora.set_modem_address(3).unwrap();
lora.set_modem_address(4).unwrap();
}
}
#[test]
fn a_smoke_set_tx_power() {
let loras = pre_test();
for mut lora in loras {
lora.set_tx_power(-1).unwrap();
lora.set_tx_power(0).unwrap();
lora.set_tx_power(5).unwrap();
}
}
#[test]
fn a_smoke_tx() {
let loras = pre_test();
for mut lora in loras {
lora.tx_string("Hello, World!", 2).unwrap();
}
}
#[test]
fn a_smoke_rx() {
let loras = pre_test();
for mut lora in loras {
lora.rx_string().unwrap();
}
}
#[test]
fn a_smoke_reset() {
let loras = pre_test();
for mut lora in loras {
lora.tx_string("Hello, World!", 2).unwrap();
lora.reset().unwrap();
}
}
#[test]
fn integration_tx_rx() {
let (mut lora_a, mut lora_b) = pre_test_pair();
for n in 1..6 {
println!("Attempt {}/5", n);
lora_a.rx_string().unwrap();
lora_b.tx_string("Hello, World!", 1).unwrap();
let timeout = Instant::now();
while timeout.elapsed() <= StdDuration::from_secs(1) {
if let Some(packet) = lora_a.rx_string().unwrap() {
println!("{:?}", packet);
assert_eq!(packet.src, 2, "packet.src");
assert_eq!(packet.dst, 1, "packet.dst");
assert_eq!(packet.len, 13, "packet.len");
assert_eq!(packet.data, String::from("Hello, World!"), "packet.data");
return;
}
}
}
panic!("Didn't receive anything!");
}
#[test]
fn a_stress_delayed_action() {
let mut lora = pre_test().pop().unwrap();
std::thread::sleep(StdDuration::from_millis(1500));
lora.read_version().unwrap();
}
#[test]
fn stress_tx() {
let mut loras = pre_test();
let timeout = Instant::now();
while timeout.elapsed() <= StdDuration::from_secs(5) {
for lora in &mut loras {
lora.tx_string("Hello, World!", 1).unwrap();
}
}
}
#[test]
fn stress_init() {
let timeout = Instant::now();
while timeout.elapsed() <= StdDuration::from_secs(5) {
let _lora = pre_test();
}
}
#[test]
fn stress_rx() {
let mut lora = pre_test().pop().unwrap();
let timeout = Instant::now();
while timeout.elapsed() <= StdDuration::from_secs(5) {
lora.rx_string().unwrap();
}
}
#[test]
fn stress_packet_loss() {
let iterations = 10;
let (mut lora_a, mut lora_b) = pre_test_pair();
let mut success = 0;
'outer: for n in 0..iterations {
println!("test {}/{}", n, iterations);
lora_a.rx_string().unwrap();
lora_b.tx_string("Hello, World!", 1).unwrap();
let timeout = Instant::now();
while timeout.elapsed() <= StdDuration::from_secs(1) {
if let Some(packet) = lora_a.rx_string().unwrap() {
println!("{:?}", packet);
assert_eq!(packet.src, 2, "packet.src");
assert_eq!(packet.dst, 1, "packet.dst");
assert_eq!(packet.len, 13, "packet.len");
assert_eq!(packet.data, String::from("Hello, World!"), "packet.data");
success += 1;
continue 'outer;
}
}
}
let loss = iterations / success;
println!("{}/{} packet loss", loss, iterations);
assert!(
loss <= iterations / 10,
"{}/{} packet loss is unacceptably high",
loss,
iterations
);
}
}