#[cfg(feature = "bluetooth-le")]
use crate::connections::ble_handler::{BleDevice, BleHandler};
use crate::errors_internal::Error;
#[cfg(feature = "bluetooth-le")]
use futures::stream::StreamExt;
use std::time::Duration;
use std::time::UNIX_EPOCH;
use rand::{distr::StandardUniform, prelude::Distribution, Rng};
#[cfg(feature = "bluetooth-le")]
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use tokio_serial::{available_ports, SerialPort, SerialStream};
use crate::connections::stream_api::StreamHandle;
use crate::connections::wrappers::encoded_data::{
EncodedToRadioPacket, EncodedToRadioPacketWithHeader,
};
pub const DEFAULT_SERIAL_BAUD: u32 = 115_200;
pub const DEFAULT_DTR_PIN_STATE: bool = true;
pub const DEFAULT_RTS_PIN_STATE: bool = false;
pub fn available_serial_ports() -> Result<Vec<String>, tokio_serial::Error> {
let ports = available_ports()?
.into_iter()
.map(|port| port.port_name)
.collect();
Ok(ports)
}
pub fn build_serial_stream(
port_name: String,
baud_rate: Option<u32>,
dtr: Option<bool>,
rts: Option<bool>,
) -> Result<StreamHandle<SerialStream>, Error> {
let builder = tokio_serial::new(port_name.clone(), baud_rate.unwrap_or(DEFAULT_SERIAL_BAUD))
.flow_control(tokio_serial::FlowControl::None)
.timeout(Duration::from_millis(10));
let mut serial_stream =
tokio_serial::SerialStream::open(&builder).map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Error opening serial port \"{port_name}\"").to_string(),
})?;
serial_stream
.write_data_terminal_ready(dtr.unwrap_or(DEFAULT_DTR_PIN_STATE))
.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: "Failed to set DTR line".to_string(),
})?;
serial_stream
.write_request_to_send(rts.unwrap_or(DEFAULT_RTS_PIN_STATE))
.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: "Failed to set RTS line".to_string(),
})?;
Ok(StreamHandle::from_stream(serial_stream))
}
pub async fn build_tcp_stream(
address: String,
) -> Result<StreamHandle<tokio::net::TcpStream>, Error> {
let connection_future = tokio::net::TcpStream::connect(address.clone());
let timeout_duration = Duration::from_millis(3000);
let stream = match tokio::time::timeout(timeout_duration, connection_future).await {
Ok(stream) => stream.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Failed to connect to {address}"),
})?,
Err(e) => {
return Err(Error::StreamBuildError{
source: Box::new(e),
description: format!(
"Timed out connecting to {address}. Check that the radio is on, network is enabled, and the address is correct."
)});
}
};
Ok(StreamHandle::from_stream(stream))
}
#[cfg(feature = "bluetooth-le")]
pub async fn available_ble_devices(scan_duration: Duration) -> Result<Vec<BleDevice>, Error> {
BleHandler::available_ble_devices(scan_duration).await
}
#[cfg(feature = "bluetooth-le")]
pub async fn build_ble_stream<'a, B>(
device: B,
scan_duration: Duration,
) -> Result<StreamHandle<DuplexStream>, Error>
where
B: Into<std::borrow::Cow<'a, crate::connections::ble_handler::BleId>>,
{
use crate::{
connections::ble_handler::{AdapterEvent, RadioMessage},
errors_internal::InternalStreamError,
};
let ble_id: std::borrow::Cow<_> = device.into();
let ble_handler = BleHandler::new(&ble_id, scan_duration).await?;
let (client, mut server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move {
let duplex_write_error_fn = |e| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let mut read_messages_count = ble_handler.read_fromnum().await?;
let mut buf = [0u8; 1024];
if let Ok(len) = server.read(&mut buf).await {
ble_handler.write_to_radio(&buf[..len]).await?
}
loop {
match ble_handler.read_from_radio().await? {
RadioMessage::Eof => break,
RadioMessage::Packet(packet) => {
server
.write(packet.data())
.await
.map_err(duplex_write_error_fn)?;
}
}
}
let mut notification_stream = ble_handler.notifications().await?;
let mut adapter_events = ble_handler.adapter_events().await?;
loop {
tokio::select!(
notification = notification_stream.next() => {
let avail_msg_count = notification.ok_or(InternalStreamError::Eof)?;
for _ in read_messages_count..avail_msg_count {
if let RadioMessage::Packet(packet) = ble_handler.read_from_radio().await? {
server.write(packet.data()).await.map_err(duplex_write_error_fn)?;
}
}
read_messages_count = avail_msg_count;
},
from_server = server.read(&mut buf) => {
let len = from_server.map_err(duplex_write_error_fn)?;
if len != 0 {
ble_handler.write_to_radio(&buf[..len]).await?;
}
},
event = adapter_events.next() => {
if Some(AdapterEvent::Disconnected) == event {
log::error!("BLE disconnected");
Err(InternalStreamError::ConnectionLost)?
}
}
);
}
});
Ok(StreamHandle {
stream: client,
join_handle: Some(handle),
})
}
pub fn generate_rand_id<T>() -> T
where
StandardUniform: Distribution<T>,
{
let mut rng = rand::rng();
rng.random()
}
pub fn format_data_packet(
packet: EncodedToRadioPacket,
) -> Result<EncodedToRadioPacketWithHeader, Error> {
let data = packet.data();
if data.len() >= 1 << 16 {
return Err(Error::InvalidaDataSize {
data_length: data.len(),
});
}
let [lsb, msb, ..] = data.len().to_le_bytes();
let magic_buffer = [0x94, 0xc3, msb, lsb];
Ok([&magic_buffer, data].concat().into())
}
pub fn strip_data_packet_header(
packet: EncodedToRadioPacketWithHeader,
) -> Result<EncodedToRadioPacket, Error> {
let data = packet.data_vec();
let stripped_data = match data.get(4..) {
Some(data) => data,
None => return Err(Error::InsufficientPacketBufferLength { packet }),
};
Ok(stripped_data.into())
}
pub fn current_epoch_secs_u32() -> u32 {
std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get time since unix epoch")
.as_secs()
.try_into()
.expect("Could not convert u128 to u32")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn valid_empty_packet() {
let data = vec![];
let serial_data = format_data_packet(data.into());
assert_eq!(serial_data.unwrap().data(), vec![0x94, 0xc3, 0x00, 0x00]);
}
#[test]
fn valid_non_empty_packet() {
let data = vec![0x00, 0xff, 0x88];
let serial_data = format_data_packet(data.into());
assert_eq!(
serial_data.unwrap().data(),
vec![0x94, 0xc3, 0x00, 0x03, 0x00, 0xff, 0x88]
);
}
#[test]
fn valid_large_packet() {
let data = vec![0x00; 0x100];
let serial_data = format_data_packet(data.into());
assert_eq!(
serial_data.unwrap().data()[..4],
vec![0x94, 0xc3, 0x01, 0x00]
);
}
#[test]
fn invalid_too_large_packet() {
let data = vec![0x00; 0x10000];
let serial_data = format_data_packet(data.into());
assert!(serial_data.is_err());
}
}