use std::time::{SystemTime, UNIX_EPOCH};
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_serial::SerialPortBuilderExt;
use tracing::{debug, error, info};
use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
pub struct SerialConnector {
port: String,
baud: u32,
topic_prefix: String,
classifier: TopicClassifier,
delta_filter: DeltaFilter,
}
impl SerialConnector {
pub fn new(port: impl Into<String>, baud: u32) -> Self {
Self {
port: port.into(),
baud,
topic_prefix: "serial".to_string(),
classifier: TopicClassifier::new(),
delta_filter: DeltaFilter::new(0.0),
}
}
pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
self.topic_prefix = prefix.into();
self
}
pub fn delta_threshold(mut self, threshold: f64) -> Self {
self.delta_filter = DeltaFilter::new(threshold);
self
}
pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
&mut self.classifier
}
pub async fn run(mut self, client: WireBandClient) -> Result<()> {
let port = tokio_serial::new(&self.port, self.baud)
.open_native_async()
.map_err(|e| WireBandError::Serial(e.to_string()))?;
info!(port = %self.port, baud = self.baud, "Serial connector started");
let mut lines = BufReader::new(port).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
self.process_line(&client, &line).await;
}
Ok(None) => {
info!(port = %self.port, "Serial port EOF");
break;
}
Err(e) => {
error!(port = %self.port, err = %e, "Serial read error");
return Err(WireBandError::Serial(e.to_string()));
}
}
}
Ok(())
}
async fn process_line(&mut self, client: &WireBandClient, line: &str) {
let line = line.trim();
if line.is_empty() { return; }
let payload: Value = serde_json::from_str(line)
.unwrap_or_else(|_| Value::String(line.to_string()));
let topic = format!("{}/data", self.topic_prefix);
if self.delta_filter.should_suppress(&topic, &payload) {
debug!(topic = %topic, "Serial line delta-filtered");
return;
}
let symbol = self.classifier.classify(&topic, &payload);
let encoded = frame::encode(symbol, &topic, &payload);
let ts = unix_ts();
client.buffer_event(topic, symbol, encoded, ts).await;
debug!(port = %self.port, symbol = format!("{symbol:#06x}"), "Serial line buffered");
}
}