#![allow(non_camel_case_types)]
use core::{fmt::Write, mem::MaybeUninit};
use heapless::{
box_pool,
pool::boxed::{Box, BoxBlock},
spsc::{Consumer, Producer, Queue},
String,
};
use num_enum::IntoPrimitive;
use serde::Serialize;
use serde_with::DeserializeFromStr;
use smoltcp_nal::embedded_nal::{nb, SocketAddr, UdpClientStack};
use super::NetworkReference;
const MAGIC: u16 = 0x057B;
const HEADER_SIZE: usize = 8;
const FRAME_COUNT: usize = 4;
const FRAME_SIZE: usize = 1500 - 40 - 8;
const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
type Frame = [MaybeUninit<u8>; FRAME_SIZE];
box_pool!(FRAME_POOL: Frame);
#[derive(Copy, Clone, Debug, DeserializeFromStr, PartialEq, Eq)]
pub struct StreamTarget(pub SocketAddr);
impl Default for StreamTarget {
fn default() -> Self {
Self("0.0.0.0:0".parse().unwrap())
}
}
impl Serialize for StreamTarget {
fn serialize<S: serde::Serializer>(
&self,
serializer: S,
) -> Result<S::Ok, S::Error> {
let mut display: String<30> = String::new();
write!(&mut display, "{}", self.0).unwrap();
serializer.serialize_str(&display)
}
}
impl core::str::FromStr for StreamTarget {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let addr = SocketAddr::from_str(s)
.map_err(|_| "Invalid socket address format")?;
Ok(Self(addr))
}
}
impl core::fmt::Display for StreamTarget {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.0.fmt(f)
}
}
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoPrimitive)]
pub enum StreamFormat {
Unknown = 0,
AdcDacData = 1,
Fls = 2,
ThermostatEem = 3,
}
pub fn setup_streaming(
stack: NetworkReference,
) -> (FrameGenerator, DataStream) {
let queue =
cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
.unwrap();
let (producer, consumer) = queue.split();
#[allow(clippy::declare_interior_mutable_const)]
const FRAME: BoxBlock<Frame> = BoxBlock::new();
let memory =
cortex_m::singleton!(FRAME_DATA: [BoxBlock<Frame>; FRAME_COUNT] =
[FRAME; FRAME_COUNT])
.unwrap();
for block in memory.iter_mut() {
FRAME_POOL.manage(block);
}
let generator = FrameGenerator::new(producer);
let stream = DataStream::new(stack, consumer);
(generator, stream)
}
#[derive(Debug)]
struct StreamFrame {
buffer: Box<FRAME_POOL>,
offset: usize,
batches: u8,
}
impl StreamFrame {
pub fn new(
mut buffer: Box<FRAME_POOL>,
format_id: u8,
sequence_number: u32,
) -> Self {
for (byte, buf) in MAGIC
.to_le_bytes()
.iter()
.chain(&[format_id, 0])
.chain(sequence_number.to_le_bytes().iter())
.zip(buffer.iter_mut())
{
buf.write(*byte);
}
Self {
buffer,
offset: HEADER_SIZE,
batches: 0,
}
}
pub fn add_batch<F>(&mut self, mut f: F) -> usize
where
F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
{
let len = f(&mut self.buffer[self.offset..]);
self.offset += len;
self.batches += 1;
len
}
pub fn is_full(&self, len: usize) -> bool {
self.offset + len > self.buffer.len()
}
pub fn finish(&mut self) -> &[MaybeUninit<u8>] {
self.buffer[3].write(self.batches);
&self.buffer[..self.offset]
}
}
pub struct FrameGenerator {
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
current_frame: Option<StreamFrame>,
sequence_number: u32,
format: u8,
}
impl FrameGenerator {
fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self {
Self {
queue,
format: StreamFormat::Unknown.into(),
current_frame: None,
sequence_number: 0,
}
}
#[doc(hidden)]
pub(crate) fn configure(&mut self, format: impl Into<u8>) {
self.format = format.into();
}
pub fn add<F>(&mut self, func: F)
where
F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
{
let sequence_number = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);
if self.current_frame.is_none() {
if let Ok(buffer) =
FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE])
{
self.current_frame.replace(StreamFrame::new(
buffer,
self.format,
sequence_number,
));
} else {
return;
}
}
let current_frame = self.current_frame.as_mut().unwrap();
let len = current_frame.add_batch(func);
if current_frame.is_full(len) {
self.queue
.enqueue(self.current_frame.take().unwrap())
.unwrap();
}
}
}
pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
remote: StreamTarget,
}
impl DataStream {
fn new(
stack: NetworkReference,
consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
) -> Self {
Self {
stack,
socket: None,
remote: StreamTarget::default(),
queue: consumer,
}
}
fn close(&mut self) {
if let Some(socket) = self.socket.take() {
log::info!("Closing stream");
self.stack.close(socket).unwrap();
}
}
fn open(&mut self) -> Result<(), ()> {
if self.socket.is_some() || self.remote.0.ip().is_unspecified() {
return Err(());
}
let mut socket = self.stack.socket().or(Err(()))?;
if self.stack.connect(&mut socket, self.remote.0).is_err() {
self.stack.close(socket).unwrap();
return Err(());
}
self.socket.replace(socket);
log::info!("Opening stream");
Ok(())
}
pub fn set_remote(&mut self, remote: StreamTarget) {
if remote != self.remote {
self.close();
}
self.remote = remote;
}
pub fn process(&mut self) {
match self.socket.as_mut() {
None => {
if self.open().is_ok() {
while let Some(frame) = self.queue.dequeue() {
drop(frame.buffer);
}
}
}
Some(handle) => {
if let Some(mut frame) = self.queue.dequeue() {
let buf = frame.finish();
let data = unsafe {
core::slice::from_raw_parts(
buf.as_ptr() as *const u8,
size_of_val(buf),
)
};
match self.stack.send(handle, data) {
Ok(_) => {},
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::Unaddressable))) => {
log::warn!( "IP address updated during stream. Reopening socket");
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
}
Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::BufferFull))) => {}
Err(other) => {
log::warn!("Unexpected UDP error during data stream: {other:?}");
}
}
drop(frame.buffer)
}
}
}
}
}