use std::collections::VecDeque;
use std::io::Write;
use std::net::SocketAddr;
use anyhow::anyhow;
use log::debug;
use rustradio::block::{Block, BlockRet};
use rustradio::blocks::{
AddConst, BinarySlicer, FftFilter, FileSource, QuadratureDemod, RationalResampler,
RtlSdrDecode, RtlSdrSource, TcpSource, ZeroCrossing,
};
use rustradio::graph::GraphRunner;
use rustradio::stream::ReadStream;
use rustradio::window::WindowType;
use rustradio::{Complex, Result, blockchain};
#[derive(clap::Parser, Debug)]
#[command(version, about)]
pub struct Opt {
#[arg(short, long = "serial")]
sensor_id: u32,
#[arg(short, long = "output", default_value = "sparslog.csv")]
output: String,
#[arg(short, long = "connect")]
connect: Option<String>,
#[arg(short, long = "read")]
read: Option<String>,
#[arg(long = "rtlsdr")]
rtlsdr: bool,
#[arg(short, default_value = "0")]
pub verbose: usize,
#[arg(long = "gain", default_value = "30")]
gain: f32,
#[arg(long = "sample_rate", default_value_t = 1_024_000)]
sample_rate: u32,
#[arg(long = "freq", default_value_t = 868_000_000)]
freq: u64,
#[arg(long = "offset", default_value = "0.4")]
offset: f32,
#[arg(long)]
pub multithread: bool,
}
#[derive(rustradio::rustradio_macros::Block)]
#[rustradio(new, custom_name)]
struct Decode {
#[rustradio(in)]
src: ReadStream<u8>,
sensor_id: u32,
output: String,
#[rustradio(default)]
history: VecDeque<u8>,
}
impl Decode {
fn custom_name(&self) -> &'static str {
let _ = self;
"Sparsnäs decoder"
}
}
#[allow(clippy::cast_precision_loss)]
fn f32_to_i32(value: f32) -> anyhow::Result<i32> {
let value = f64::from(value);
if value.is_finite() && value >= f64::from(i32::MIN) && value <= f64::from(i32::MAX) {
#[allow(clippy::cast_possible_truncation)]
Ok(value as i32)
} else {
Err(anyhow!("invalid conversion from {value} to i32"))
}
}
#[allow(clippy::cast_precision_loss)]
fn f32_to_usize(value: f32) -> anyhow::Result<usize> {
let value = f64::from(value);
if value.is_finite() && value >= 0.0 && value <= usize::MAX as f64 {
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
Ok(value as usize)
} else {
Err(anyhow!("invalid conversion from {value} to usize"))
}
}
fn bits2byte(data: &[u8]) -> u8 {
assert!(data.len() == 8);
(data[0] << 7)
| (data[1] << 6)
| (data[2] << 5)
| (data[3] << 4)
| (data[4] << 3)
| (data[5] << 2)
| (data[6] << 1)
| data[7]
}
fn calc_crc(mut s: u8, mut reg: u16) -> u16 {
let poly: u16 = 0x8005;
for _i in 0..8 {
let regbit = reg & 0x8000 != 0;
let databit = s & 0x80 != 0;
if regbit ^ databit {
reg = (reg << 1) ^ poly;
} else {
reg <<= 1;
}
s <<= 1;
}
reg
}
fn crc16(input: &[u8], expected: u16) -> bool {
let mut checksum = 0xffffu16;
for i in input {
checksum = calc_crc(*i, checksum);
}
checksum == expected
}
fn fix_packet(packet: &[u8]) -> Vec<u8> {
let crc = (u16::from(packet[packet.len() - 2]) << 8) | u16::from(packet[packet.len() - 1]);
if crc16(&packet[..packet.len() - 2], crc) {
return packet.to_vec();
}
for i in 0..(packet.len() * 8) {
let mut test = packet.to_vec();
let bit = 1 << (i % 8);
test[i / 8] ^= bit;
if crc16(&test[..packet.len() - 2], crc) {
return test.clone();
}
}
packet.to_vec()
}
fn parsepacket(packet: &[u8], sensor_id: u32) -> String {
assert!(packet.len() == 20);
let packet = fix_packet(packet);
println!("Packet: {packet:02x?}");
let sensor_id_sub = {
let magic = 0x5D38_E8CB;
if sensor_id >= magic {
sensor_id - magic
} else {
4_294_967_295 - (magic - sensor_id - 1)
}
};
let enc_key = [
((sensor_id_sub >> 24) & 0xff) as u8,
(sensor_id_sub & 0xff) as u8,
((sensor_id_sub >> 8) & 0xff) as u8,
0x47u8,
((sensor_id_sub >> 16) & 0xff) as u8,
];
let mut dec = Vec::new();
for i in 0..13 {
dec.push(packet[i + 5] ^ enc_key[i % 5]);
}
let crc = (u16::from(packet[packet.len() - 2]) << 8) | u16::from(packet[packet.len() - 1]);
let crc_ok = crc16(&packet[..packet.len() - 2], crc);
let seq = (u16::from(dec[4]) << 8) | u16::from(dec[5]);
let effect = (u16::from(dec[6]) << 8) | u16::from(dec[7]);
let wh = (u32::from(dec[8]) << 24)
| (u32::from(dec[9]) << 16)
| (u32::from(dec[10]) << 8)
| u32::from(dec[11]);
let kwh = format!("{}.{:03}", wh / 1000, wh % 1000);
let battery = dec[12];
let watt = 3600.0 * 1024.0 / f32::from(effect);
let now = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
format!(
"{},{seq},{watt:.3},{kwh},{battery},{}",
now,
if crc_ok { "OK" } else { "BAD" }
)
}
impl Block for Decode {
fn work(&mut self) -> Result<BlockRet<'_>> {
let cac = [
1u8, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0,
0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1,
];
let (input, _) = self.src.read_buf()?;
if input.is_empty() {
return Ok(BlockRet::WaitForStream(&self.src, 1));
}
self.history.extend(input.iter());
{
let n = input.len();
input.consume(n);
}
let packet_bits_len = cac.len() + 19 * 8;
let n = self.history.len();
if n < packet_bits_len {
return Ok(BlockRet::WaitForStream(&self.src, packet_bits_len - n));
}
let input = &self.history;
for i in 0..(n - packet_bits_len) {
let equal = cac
.iter()
.zip(input.range(i..(i + cac.len())))
.all(|(a, b)| a == b);
if equal {
debug!("Found CAC");
let bits = &input
.range(i..(i + cac.len() + 19 * 8))
.copied()
.collect::<Vec<u8>>();
let mut bytes = Vec::new();
for j in (0..bits.len()).step_by(8) {
bytes.push(bits2byte(&bits[j..j + 8]));
}
let packet = &bytes[4..];
let parsed = parsepacket(packet, self.sensor_id);
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&self.output)
.map_err(|e| -> rustradio::Error { e.into() })?
.write_all(format!("{parsed}\n").as_bytes())
.map_err(|e| -> rustradio::Error { e.into() })?;
println!("{parsed}");
}
}
self.history
.drain(0..(self.history.len() - packet_bits_len));
Ok(BlockRet::Again)
}
}
pub fn create_graph(graph: &mut (impl GraphRunner + ?Sized), opt: &Opt) -> anyhow::Result<()> {
let src = {
if let Some(connect) = &opt.connect {
if opt.read.is_some() {
return Err(anyhow::Error::msg("-c and -r can't be combined"));
}
let sa: SocketAddr = connect.parse()?;
let host = format!("{}", sa.ip());
let port = sa.port();
println!("Connecting to host {host} port {port}");
blockchain![graph, prev, TcpSource::<Complex>::new(&host, port)?]
} else if let Some(read) = &opt.read {
if opt.rtlsdr {
blockchain![
graph,
prev,
FileSource::<u8>::new(read)?,
RtlSdrDecode::new(prev),
]
} else {
blockchain![graph, prev, FileSource::<Complex>::new(read)?]
}
} else if opt.rtlsdr {
blockchain![
graph,
prev,
RtlSdrSource::new(opt.freq, opt.sample_rate, f32_to_i32(opt.gain)?)?,
RtlSdrDecode::new(prev),
]
} else {
return Err(anyhow::Error::msg(
"Need to provide either -r, -c, or --rtlsdr",
));
}
};
#[allow(clippy::cast_precision_loss)]
let samp_rate = opt.sample_rate as f32;
let samp_rate_2 = 200_000.0;
let baud = 38383.5;
let prev = src;
let prev = blockchain![
graph,
prev,
FftFilter::new(
prev,
rustradio::fir::low_pass_complex(samp_rate, 50000.0, 10000.0, &WindowType::Hamming)
),
RationalResampler::new(prev, f32_to_usize(samp_rate_2)?, f32_to_usize(samp_rate)?)?,
QuadratureDemod::new(prev, 1.0),
AddConst::new(prev, opt.offset),
ZeroCrossing::new(prev, samp_rate_2 / baud, 0.1),
BinarySlicer::new(prev),
];
let decode = Box::new(Decode::new(prev, opt.sensor_id, opt.output.clone()));
graph.add(decode);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn convert_i32() -> anyhow::Result<()> {
for (i, o) in [
(0.0, 0),
(-1.0, -1),
(-1.1, -1),
(-1.9, -1),
(1.0, 1),
(1.1, 1),
(1.9, 1),
] {
assert_eq!(f32_to_i32(i)?, o);
}
Ok(())
}
#[test]
fn convert_usize() -> anyhow::Result<()> {
for (i, o) in [
(0.0, Some(0)),
(-1.0, None),
(-1.1, None),
(1.0, Some(1)),
(1.1, Some(1)),
(1.9, Some(1)),
] {
match o {
None => {
assert!(f32_to_usize(i).is_err());
}
Some(v) => {
assert_eq!(f32_to_usize(i)?, v);
}
}
}
Ok(())
}
#[test]
fn decode() {
let packet = vec![
0x11, 0xa1, 0x38, 0x07, 0x0e, 0xa2, 0xde, 0x29, 0xe6, 0x8b, 0x1a, 0xfd, 0x74, 0x47,
0xcf, 0xf2, 0x14, 0x80, 0x23, 0x7b,
];
let got = parsepacket(&packet, 576_929);
let want = ",17592,330.560,20.674,100,OK";
assert!(got.ends_with(want), "got: {got}, want {want}");
let packet = vec![
0x11, 0xa1, 0x38, 0x07, 0x0e, 0xa2, 0xde, 0x29, 0xe7, 0x8b, 0x1a, 0xfd, 0x74, 0x47,
0xcf, 0xf2, 0x14, 0x80, 0x23, 0x7b,
];
let got = parsepacket(&packet, 576_929);
let want = ",17592,330.560,20.674,100,OK";
assert!(got.ends_with(want), "got: {got}, want {want}");
let packet = vec![
0x11, 0xa1, 0x38, 0x07, 0x0e, 0xa2, 0xdf, 0x29, 0xe6, 0x8b, 0x1a, 0xfd, 0x74, 0x47,
0xcf, 0xf2, 0x14, 0x80, 0x23, 0x7a,
];
let got = parsepacket(&packet, 576_929);
let want = ",17592,330.560,20.674,100,BAD";
assert!(got.ends_with(want), "got: {got}, want {want}");
}
}