Skip to main content

secure_serial/
sender.rs

1//! Application-facing [`SecureSerialSender`]: splits packets into chunks and cooperates with
2//! [`crate::run_write`] and [`crate::run_read`] via shared pools and channels.
3
4use embassy_futures::select::{Either, select};
5use embassy_sync::{blocking_mutex::raw::RawMutex, channel};
6use embassy_time::{Duration, Instant, Timer};
7use embedded_buffer_pool::{BufferGuard, BufferPool};
8use heapless::Vec;
9
10use crate::protocol::{Ack, CHUNK_LEN_MAX, CHUNK_PAYLOAD_MAX, MAGIC, PACKET_DATA};
11
12/// Sends logical packets over the link by chunking, queueing wire buffers, and waiting for ACKs.
13pub struct SecureSerialSender<'a, M: RawMutex + 'static, const N_INFLIGHT: usize> {
14    write_packet_id: u16,
15    allowed_retransmits: usize,
16
17    tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N_INFLIGHT>,
18    tx_queue: channel::Sender<'a, M, BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>>, N_INFLIGHT>,
19    rx_confirm: channel::Receiver<'a, M, Ack, N_INFLIGHT>,
20    retransmit_delay: Duration,
21}
22
23impl<'a, M, const N_INFLIGHT: usize> SecureSerialSender<'a, M, N_INFLIGHT>
24where
25    M: RawMutex + 'static,
26{
27    /// Creates a sender using the shared TX pool and channels from [`crate::SecureSerialResources`].
28    ///
29    /// `retransmit_delay` spaces chunk transmissions; `allowed_retransmits` limits how often each
30    /// chunk may be sent before [`write_packet`](Self::write_packet) fails.
31    pub fn new(
32        tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N_INFLIGHT>,
33        tx_queue: channel::Sender<'a, M, BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>>, N_INFLIGHT>,
34        rx_confirm: channel::Receiver<'a, M, Ack, N_INFLIGHT>,
35        retransmit_delay: Duration,
36        allowed_retransmits: usize,
37    ) -> Self {
38        Self {
39            write_packet_id: 0,
40            allowed_retransmits,
41            tx_pool,
42            tx_queue,
43            rx_confirm,
44            retransmit_delay,
45        }
46    }
47
48    /// Encodes `data` as one packet id, splits it into chunks, and returns when all chunks are ACKed
49    /// or retries are exhausted (`Err(())`).
50    pub async fn write_packet(&mut self, data: &[u8]) -> Result<(), ()> {
51        self.write_packet_id += 1;
52
53        let chunks_total = data.len().div_ceil(CHUNK_PAYLOAD_MAX);
54        let mut chunk_next_queue = 0;
55
56        struct ChunkInfo {
57            chunk_offset: u32,
58            last_sent_at: Instant,
59            allowed_retransmits: usize,
60        }
61
62        let mut chunks: Vec<ChunkInfo, N_INFLIGHT> = Vec::new();
63
64        loop {
65            // sort buffer by next-to-send, b.cmp(a)
66            // the oldest element is placed at the end, this one gets ack'ed or retransmitted first
67            chunks.sort_unstable_by(|a, b| b.last_sent_at.cmp(&a.last_sent_at));
68
69            // fill up chunk buffer, these ones should get transmitted first -> place at the end
70            while !chunks.is_full() && chunk_next_queue < chunks_total {
71                let info = ChunkInfo {
72                    chunk_offset: chunk_next_queue as u32,
73                    last_sent_at: Instant::MIN,
74                    allowed_retransmits: self.allowed_retransmits,
75                };
76                chunks.push(info).ok();
77                chunk_next_queue += 1;
78            }
79
80            // which chunk will get transmitted next
81            let Some(next_chunk) = chunks.last_mut() else {
82                // no chunk remaining? --> finished transmitting the packet
83                break Ok(());
84            };
85
86            // wait for next chunk transmission delay, allow possible acks to be processed
87            let next_chunk_tx_time = next_chunk.last_sent_at + self.retransmit_delay;
88            let fut_tx = wait_then_send(next_chunk_tx_time, self.tx_pool);
89            let fut_ack = self.rx_confirm.receive();
90
91            // wait for slot in tx buffer
92            let mut tx_buffer = match select(fut_tx, fut_ack).await {
93                // tx buffer available
94                Either::First(tx_buffer) => tx_buffer,
95                // ack received -> update chunks
96                Either::Second(ack) => {
97                    // remove the matching chunk from the deque
98                    if ack.packet_id == self.write_packet_id {
99                        for (i, info) in chunks.iter().enumerate() {
100                            if info.chunk_offset == ack.chunk_offset {
101                                chunks.remove(i);
102                                break;
103                            }
104                        }
105                    }
106                    // sort chunks again before sending
107                    continue;
108                }
109            };
110
111            if next_chunk.allowed_retransmits == 0 {
112                break Err(());
113            }
114
115            // encode packet
116            let offset_bytes = next_chunk.chunk_offset as usize * CHUNK_PAYLOAD_MAX;
117            let data_chunk = &data[offset_bytes..];
118            let data_chunk = &data_chunk[..data_chunk.len().min(CHUNK_PAYLOAD_MAX)];
119
120            tx_buffer.clear();
121            tx_buffer.extend_from_slice(&MAGIC).ok();
122            tx_buffer
123                .push(
124                    (MAGIC.len() // magic
125                    + 1 // len
126                    + 1 // packet type
127                    + 2 // packet_id
128                    + 4 // packet_len
129                    + 4 // chunk_offset
130                    + data_chunk.len()) as u8,
131                )
132                .ok();
133            tx_buffer.push(PACKET_DATA).ok();
134
135            let packet_id_encoded = self.write_packet_id.to_le_bytes();
136            tx_buffer.extend_from_slice(&packet_id_encoded).ok();
137
138            let packet_length_encoded = (data.len() as u32).to_le_bytes();
139            tx_buffer.extend_from_slice(&packet_length_encoded).ok();
140
141            let offset_encoded = next_chunk.chunk_offset.to_le_bytes();
142            tx_buffer.extend_from_slice(&offset_encoded).ok();
143
144            tx_buffer.extend_from_slice(data_chunk).ok();
145            // crc is appended later
146
147            next_chunk.last_sent_at = Instant::now();
148            next_chunk.allowed_retransmits -= 1;
149
150            self.tx_queue.send(tx_buffer).await;
151        }
152    }
153}
154
155async fn wait_then_send<M: RawMutex + 'static, const N: usize>(
156    at: Instant,
157    tx_pool: &'static BufferPool<M, Vec<u8, CHUNK_LEN_MAX>, N>,
158) -> BufferGuard<M, Vec<u8, CHUNK_LEN_MAX>> {
159    Timer::at(at).await;
160    tx_pool.take().await
161}