1use embassy_futures::select::{Either, select};
5use embassy_sync::{blocking_mutex::raw::RawMutex, channel};
6use embassy_time::{Duration, Instant, Timer};
7use embedded_buffer_pool::{BufferGuard, BufferPool};
8use heapless::Vec;
9
10use crate::protocol::{Ack, CHUNK_LEN_MAX, CHUNK_PAYLOAD_MAX, MAGIC, PACKET_DATA};
11
12pub struct SecureSerialSender<'a, M: RawMutex + 'static, const N_INFLIGHT: usize> {
14 write_packet_id: u16,
15 allowed_retransmits: usize,
16
17 tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N_INFLIGHT>,
18 tx_queue: channel::Sender<'a, M, BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>>, N_INFLIGHT>,
19 rx_confirm: channel::Receiver<'a, M, Ack, N_INFLIGHT>,
20 retransmit_delay: Duration,
21}
22
23impl<'a, M, const N_INFLIGHT: usize> SecureSerialSender<'a, M, N_INFLIGHT>
24where
25 M: RawMutex + 'static,
26{
27 pub fn new(
32 tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N_INFLIGHT>,
33 tx_queue: channel::Sender<'a, M, BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>>, N_INFLIGHT>,
34 rx_confirm: channel::Receiver<'a, M, Ack, N_INFLIGHT>,
35 retransmit_delay: Duration,
36 allowed_retransmits: usize,
37 ) -> Self {
38 Self {
39 write_packet_id: 0,
40 allowed_retransmits,
41 tx_pool,
42 tx_queue,
43 rx_confirm,
44 retransmit_delay,
45 }
46 }
47
48 pub async fn write_packet(&mut self, data: &[u8]) -> Result<(), ()> {
51 self.write_packet_id += 1;
52
53 let chunks_total = data.len().div_ceil(CHUNK_PAYLOAD_MAX);
54 let mut chunk_next_queue = 0;
55
56 struct ChunkInfo {
57 chunk_offset: u32,
58 last_sent_at: Instant,
59 allowed_retransmits: usize,
60 }
61
62 let mut chunks: Vec<ChunkInfo, N_INFLIGHT> = Vec::new();
63
64 loop {
65 chunks.sort_unstable_by(|a, b| b.last_sent_at.cmp(&a.last_sent_at));
68
69 while !chunks.is_full() && chunk_next_queue < chunks_total {
71 let info = ChunkInfo {
72 chunk_offset: chunk_next_queue as u32,
73 last_sent_at: Instant::MIN,
74 allowed_retransmits: self.allowed_retransmits,
75 };
76 chunks.push(info).ok();
77 chunk_next_queue += 1;
78 }
79
80 let Some(next_chunk) = chunks.last_mut() else {
82 break Ok(());
84 };
85
86 let next_chunk_tx_time = next_chunk.last_sent_at + self.retransmit_delay;
88 let fut_tx = wait_then_send(next_chunk_tx_time, self.tx_pool);
89 let fut_ack = self.rx_confirm.receive();
90
91 let mut tx_buffer = match select(fut_tx, fut_ack).await {
93 Either::First(tx_buffer) => tx_buffer,
95 Either::Second(ack) => {
97 if ack.packet_id == self.write_packet_id {
99 for (i, info) in chunks.iter().enumerate() {
100 if info.chunk_offset == ack.chunk_offset {
101 chunks.remove(i);
102 break;
103 }
104 }
105 }
106 continue;
108 }
109 };
110
111 if next_chunk.allowed_retransmits == 0 {
112 break Err(());
113 }
114
115 let offset_bytes = next_chunk.chunk_offset as usize * CHUNK_PAYLOAD_MAX;
117 let data_chunk = &data[offset_bytes..];
118 let data_chunk = &data_chunk[..data_chunk.len().min(CHUNK_PAYLOAD_MAX)];
119
120 tx_buffer.clear();
121 tx_buffer.extend_from_slice(&MAGIC).ok();
122 tx_buffer
123 .push(
124 (MAGIC.len() + 1 + 1 + 2 + 4 + 4 + data_chunk.len()) as u8,
131 )
132 .ok();
133 tx_buffer.push(PACKET_DATA).ok();
134
135 let packet_id_encoded = self.write_packet_id.to_le_bytes();
136 tx_buffer.extend_from_slice(&packet_id_encoded).ok();
137
138 let packet_length_encoded = (data.len() as u32).to_le_bytes();
139 tx_buffer.extend_from_slice(&packet_length_encoded).ok();
140
141 let offset_encoded = next_chunk.chunk_offset.to_le_bytes();
142 tx_buffer.extend_from_slice(&offset_encoded).ok();
143
144 tx_buffer.extend_from_slice(data_chunk).ok();
145 next_chunk.last_sent_at = Instant::now();
148 next_chunk.allowed_retransmits -= 1;
149
150 self.tx_queue.send(tx_buffer).await;
151 }
152 }
153}
154
155async fn wait_then_send<M: RawMutex + 'static, const N: usize>(
156 at: Instant,
157 tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N>,
158) -> BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>> {
159 Timer::at(at).await;
160 tx_pool.take().await
161}