#![allow(clippy::unwrap_used, clippy::panic)]
use donglora_protocol::{
Command, DeviceMessage, ErrorCode, FlrcBitrate, FlrcBt, FlrcCodingRate, FlrcConfig,
FlrcPreambleLen, FrameDecoder, FrameResult, FskConfig, Info, LoRaBandwidth, LoRaCodingRate,
LoRaConfig, LoRaHeaderMode, LrFhssBandwidth, LrFhssCodingRate, LrFhssConfig, LrFhssGrid,
MAX_MCU_UID_LEN, MAX_OTA_PAYLOAD, MAX_PAYLOAD_FIELD, MAX_RADIO_UID_LEN, MAX_SYNC_WORD_LEN,
MAX_WIRE_FRAME, Modulation, RadioChipId, RxOrigin, RxPayload, TxDonePayload, TxFlags, TxResult,
commands, encode_frame, events,
};
use heapless::Vec as HVec;
use proptest::prelude::*;
fn arb_lora() -> impl Strategy<Value = LoRaConfig> {
(
150_000_000u32..=960_000_000u32,
5u8..=12u8,
0u8..=13u8,
0u8..=3u8,
0u16..=1024u16,
any::<u16>(),
-20i8..=22i8,
0u8..=1u8,
any::<bool>(),
any::<bool>(),
)
.prop_map(
|(
freq_hz,
sf,
bw_raw,
cr_raw,
preamble_len,
sync_word,
tx_power_dbm,
header_raw,
payload_crc,
iq_invert,
)| {
LoRaConfig {
freq_hz,
sf,
bw: LoRaBandwidth::from_u8(bw_raw).unwrap(),
cr: LoRaCodingRate::from_u8(cr_raw).unwrap(),
preamble_len,
sync_word,
tx_power_dbm,
header_mode: LoRaHeaderMode::from_u8(header_raw).unwrap(),
payload_crc,
iq_invert,
}
},
)
}
fn arb_fsk() -> impl Strategy<Value = FskConfig> {
(
any::<u32>(),
any::<u32>(),
any::<u32>(),
any::<u8>(),
any::<u16>(),
0u8..=MAX_SYNC_WORD_LEN as u8,
prop::array::uniform8(any::<u8>()),
)
.prop_map(
|(freq_hz, bitrate_bps, freq_dev_hz, rx_bw, preamble_len, sync_word_len, sync)| {
let mut sync_word = [0u8; MAX_SYNC_WORD_LEN];
sync_word[..sync_word_len as usize]
.copy_from_slice(&sync[..sync_word_len as usize]);
FskConfig {
freq_hz,
bitrate_bps,
freq_dev_hz,
rx_bw,
preamble_len,
sync_word_len,
sync_word,
}
},
)
}
fn arb_lr_fhss() -> impl Strategy<Value = LrFhssConfig> {
(
any::<u32>(),
0u8..=7u8,
0u8..=3u8,
0u8..=1u8,
any::<bool>(),
any::<i8>(),
)
.prop_map(
|(freq_hz, bw, cr, grid, hopping, tx_power_dbm)| LrFhssConfig {
freq_hz,
bw: LrFhssBandwidth::from_u8(bw).unwrap(),
cr: LrFhssCodingRate::from_u8(cr).unwrap(),
grid: LrFhssGrid::from_u8(grid).unwrap(),
hopping,
tx_power_dbm,
},
)
}
fn arb_flrc() -> impl Strategy<Value = FlrcConfig> {
(
any::<u32>(),
0u8..=7u8,
0u8..=2u8,
0u8..=2u8,
0u8..=6u8,
any::<u32>(),
any::<i8>(),
)
.prop_map(
|(freq_hz, br, cr, bt, pre, sync_word, tx_power_dbm)| FlrcConfig {
freq_hz,
bitrate: FlrcBitrate::from_u8(br).unwrap(),
cr: FlrcCodingRate::from_u8(cr).unwrap(),
bt: FlrcBt::from_u8(bt).unwrap(),
preamble_len: FlrcPreambleLen::from_u8(pre).unwrap(),
sync_word,
tx_power_dbm,
},
)
}
fn arb_modulation() -> impl Strategy<Value = Modulation> {
prop_oneof![
arb_lora().prop_map(Modulation::LoRa),
arb_fsk().prop_map(Modulation::FskGfsk),
arb_lr_fhss().prop_map(Modulation::LrFhss),
arb_flrc().prop_map(Modulation::Flrc),
]
}
fn arb_ota_payload() -> impl Strategy<Value = HVec<u8, MAX_OTA_PAYLOAD>> {
prop::collection::vec(any::<u8>(), 1..=MAX_OTA_PAYLOAD).prop_map(|v| {
let mut h = HVec::new();
h.extend_from_slice(&v).unwrap();
h
})
}
fn arb_command() -> impl Strategy<Value = Command> {
prop_oneof![
Just(Command::Ping),
Just(Command::GetInfo),
Just(Command::RxStart),
Just(Command::RxStop),
arb_modulation().prop_map(Command::SetConfig),
(any::<bool>(), arb_ota_payload()).prop_map(|(skip_cad, data)| Command::Tx {
flags: TxFlags { skip_cad },
data,
}),
]
}
fn arb_tag() -> impl Strategy<Value = u16> {
1u16..=u16::MAX
}
fn arb_rx_payload() -> impl Strategy<Value = RxPayload> {
(
any::<i16>(),
any::<i16>(),
any::<i32>(),
any::<u64>(),
any::<bool>(),
any::<u16>(),
0u8..=1u8,
prop::collection::vec(any::<u8>(), 0..=MAX_OTA_PAYLOAD),
)
.prop_map(
|(rssi, snr, freq_err, ts, crc_valid, drops, origin, data)| {
let mut d = HVec::new();
d.extend_from_slice(&data).unwrap();
RxPayload {
rssi_tenths_dbm: rssi,
snr_tenths_db: snr,
freq_err_hz: freq_err,
timestamp_us: ts,
crc_valid,
packets_dropped: drops,
origin: RxOrigin::from_u8(origin).unwrap(),
data: d,
}
},
)
}
fn arb_info() -> impl Strategy<Value = Info> {
let head = (
any::<u8>(),
any::<u8>(),
any::<u8>(),
any::<u8>(),
any::<u8>(),
any::<u16>(),
any::<u64>(),
any::<u16>(),
any::<u16>(),
any::<u16>(),
any::<u16>(),
any::<u16>(),
);
let tail = (
any::<u32>(),
any::<u32>(),
any::<i8>(),
any::<i8>(),
prop::collection::vec(any::<u8>(), 0..=MAX_MCU_UID_LEN),
prop::collection::vec(any::<u8>(), 0..=MAX_RADIO_UID_LEN),
);
(head, tail).prop_map(|(h, t)| {
let (pma, pmi, fa, fi, fp, chip, capb, sf, bw, mpb, rxq, txq) = h;
let (fmin, fmax, pmin, pmax, mcu_bytes, radio_bytes) = t;
let mut mcu_uid = [0u8; MAX_MCU_UID_LEN];
mcu_uid[..mcu_bytes.len()].copy_from_slice(&mcu_bytes);
let mut radio_uid = [0u8; MAX_RADIO_UID_LEN];
radio_uid[..radio_bytes.len()].copy_from_slice(&radio_bytes);
Info {
proto_major: pma,
proto_minor: pmi,
fw_major: fa,
fw_minor: fi,
fw_patch: fp,
radio_chip_id: chip,
capability_bitmap: capb,
supported_sf_bitmap: sf,
supported_bw_bitmap: bw,
max_payload_bytes: mpb,
rx_queue_capacity: rxq,
tx_queue_capacity: txq,
freq_min_hz: fmin,
freq_max_hz: fmax,
tx_power_min_dbm: pmin,
tx_power_max_dbm: pmax,
mcu_uid_len: mcu_bytes.len() as u8,
mcu_uid,
radio_uid_len: radio_bytes.len() as u8,
radio_uid,
}
})
}
proptest! {
#[test]
fn command_roundtrip(cmd in arb_command(), tag in arb_tag()) {
let mut payload_buf = [0u8; 320];
let payload_len = cmd.encode_payload(&mut payload_buf).unwrap();
let mut wire = [0u8; MAX_WIRE_FRAME];
let n = encode_frame(cmd.type_id(), tag, &payload_buf[..payload_len], &mut wire).unwrap();
let mut decoder = FrameDecoder::new();
let mut got: Option<(u8, u16, HVec<u8, 320>)> = None;
decoder.feed(&wire[..n], |res| match res {
FrameResult::Ok { type_id, tag, payload } => {
let mut p = HVec::new();
p.extend_from_slice(payload).unwrap();
got = Some((type_id, tag, p));
}
FrameResult::Err(e) => panic!("decode error: {:?}", e),
});
let (type_id, decoded_tag, payload) = got.unwrap();
prop_assert_eq!(type_id, cmd.type_id());
prop_assert_eq!(decoded_tag, tag);
let parsed = Command::parse(type_id, &payload).unwrap();
prop_assert_eq!(parsed, cmd);
}
#[test]
fn lora_config_roundtrip(cfg in arb_lora()) {
let mut buf = [0u8; 16];
let n = cfg.encode(&mut buf).unwrap();
let decoded = LoRaConfig::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, cfg);
}
#[test]
fn fsk_config_roundtrip(cfg in arb_fsk()) {
let mut buf = [0u8; 32];
let n = cfg.encode(&mut buf).unwrap();
let decoded = FskConfig::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, cfg);
}
#[test]
fn lr_fhss_roundtrip(cfg in arb_lr_fhss()) {
let mut buf = [0u8; 16];
let n = cfg.encode(&mut buf).unwrap();
let decoded = LrFhssConfig::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, cfg);
}
#[test]
fn flrc_roundtrip(cfg in arb_flrc()) {
let mut buf = [0u8; 16];
let n = cfg.encode(&mut buf).unwrap();
let decoded = FlrcConfig::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, cfg);
}
#[test]
fn rx_payload_roundtrip(rx in arb_rx_payload()) {
let mut buf = [0u8; MAX_PAYLOAD_FIELD];
let n = rx.encode(&mut buf).unwrap();
let decoded = RxPayload::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, rx);
}
#[test]
fn tx_done_roundtrip(result_raw in 0u8..=2u8, airtime in any::<u32>()) {
let td = TxDonePayload {
result: TxResult::from_u8(result_raw).unwrap(),
airtime_us: airtime,
};
let mut buf = [0u8; 8];
let n = td.encode(&mut buf).unwrap();
prop_assert_eq!(TxDonePayload::decode(&buf[..n]).unwrap(), td);
}
#[test]
fn err_roundtrip(code_raw in any::<u16>()) {
let code = ErrorCode::from_u16(code_raw);
let mut buf = [0u8; 2];
let n = events::encode_err_payload(code, &mut buf).unwrap();
prop_assert_eq!(n, 2);
let decoded = events::decode_err_payload(&buf).unwrap();
prop_assert_eq!(decoded.as_u16(), code_raw);
}
#[test]
fn info_roundtrip(info in arb_info()) {
let mut buf = [0u8; 128];
let n = info.encode(&mut buf).unwrap();
let decoded = Info::decode(&buf[..n]).unwrap();
prop_assert_eq!(decoded, info);
}
#[test]
fn decoder_never_panics_on_arbitrary_bytes(bytes in prop::collection::vec(any::<u8>(), 0..1024)) {
let mut decoder = FrameDecoder::new();
decoder.feed(&bytes, |_res| {});
}
#[test]
fn command_parse_never_panics(type_id in any::<u8>(), payload in prop::collection::vec(any::<u8>(), 0..320)) {
let _ = Command::parse(type_id, &payload);
}
#[test]
fn device_message_parse_never_panics(
type_id in any::<u8>(),
payload in prop::collection::vec(any::<u8>(), 0..320),
cmd_type in prop::option::of(any::<u8>()),
) {
let _ = DeviceMessage::parse(type_id, &payload, cmd_type);
}
#[test]
fn tag_zero_still_encodes(payload in prop::collection::vec(any::<u8>(), 0..=MAX_PAYLOAD_FIELD)) {
let mut wire = [0u8; MAX_WIRE_FRAME];
let _ = encode_frame(0xC0, 0, &payload, &mut wire);
}
#[test]
fn random_modulation_via_set_config_roundtrip(m in arb_modulation(), tag in arb_tag()) {
let cmd = Command::SetConfig(m);
let mut payload_buf = [0u8; 320];
let payload_len = cmd.encode_payload(&mut payload_buf).unwrap();
let mut wire = [0u8; MAX_WIRE_FRAME];
let n = encode_frame(cmd.type_id(), tag, &payload_buf[..payload_len], &mut wire).unwrap();
let mut decoder = FrameDecoder::new();
let mut got: Option<(u8, u16, HVec<u8, 320>)> = None;
decoder.feed(&wire[..n], |res| {
if let FrameResult::Ok { type_id, tag, payload } = res {
let mut p = HVec::new();
p.extend_from_slice(payload).unwrap();
got = Some((type_id, tag, p));
}
});
let (type_id, decoded_tag, payload) = got.unwrap();
prop_assert_eq!(type_id, commands::TYPE_SET_CONFIG);
prop_assert_eq!(decoded_tag, tag);
prop_assert_eq!(Command::parse(type_id, &payload).unwrap(), cmd);
}
#[test]
fn radio_chip_id_from_u16_matches_as_u16(v in any::<u16>()) {
if let Some(id) = RadioChipId::from_u16(v) {
prop_assert_eq!(id.as_u16(), v);
}
}
}