Skip to main content

cu_msp_bridge/
lib.rs

1//! Copper bridge that multiplexes MSP traffic in both directions over a single serial port.
2//! The type exposes a `requests` Tx channel that accepts [`MspRequestBatch`] messages and a
3//! `responses` Rx channel that yields [`MspResponseBatch`] payloads decoded from the line.
4
5#![cfg_attr(not(feature = "std"), no_std)]
6
7#[cfg(not(feature = "std"))]
8extern crate alloc;
9
10// std implementation
11#[cfg(feature = "std")]
12mod std_impl {
13    pub use std::{mem, vec::Vec};
14}
15
16// no-std implementation
17#[cfg(not(feature = "std"))]
18mod no_std_impl {
19    pub use alloc::vec::Vec;
20    pub use core::mem;
21}
22
23#[cfg(not(feature = "std"))]
24use no_std_impl::*;
25#[cfg(feature = "std")]
26use std_impl::*;
27
28use bincode::de::Decoder;
29use bincode::enc::Encoder;
30use bincode::error::{DecodeError, EncodeError};
31use bincode::{Decode, Encode};
32use cu_msp_lib::structs::{MspRequest, MspResponse};
33use cu_msp_lib::{MSP_MAX_PAYLOAD_LEN, MspPacket, MspPacketDirection, MspParser};
34use cu29::cubridge::{
35    BridgeChannel, BridgeChannelConfig, BridgeChannelInfo, BridgeChannelSet, CuBridge,
36};
37use cu29::prelude::*;
38use cu29::resource::{Owned, ResourceBindings, ResourceManager};
39use embedded_io::{Read, Write};
40use heapless::Vec as HeaplessVec;
41use serde::{Deserialize, Serialize};
42
43const READ_BUFFER_SIZE: usize = 512;
44const MAX_REQUESTS_PER_BATCH: usize = 8;
45const MAX_RESPONSES_PER_BATCH: usize = 16;
46const TX_BUFFER_CAPACITY: usize = MSP_MAX_PAYLOAD_LEN + 12;
47
48fn decode_bounded_vec<T, const N: usize, D>(
49    decoder: &mut D,
50) -> Result<HeaplessVec<T, N>, DecodeError>
51where
52    T: Decode<()>,
53    D: Decoder<Context = ()>,
54{
55    let values = <Vec<T> as Decode<()>>::decode(decoder)?;
56    let count = values.len();
57    if count > N {
58        return Err(DecodeError::ArrayLengthMismatch {
59            required: N,
60            found: count,
61        });
62    }
63    let mut batch = HeaplessVec::new();
64    for value in values {
65        batch
66            .push(value)
67            .map_err(|_| DecodeError::ArrayLengthMismatch {
68                required: N,
69                found: count,
70            })?;
71    }
72    Ok(batch)
73}
74
75/// Batch of MSP requests transported over the bridge.
76#[derive(Debug, Clone, Default, Serialize, Deserialize, Reflect)]
77#[reflect(opaque, from_reflect = false)]
78pub struct MspRequestBatch(pub HeaplessVec<MspRequest, MAX_REQUESTS_PER_BATCH>);
79
80impl MspRequestBatch {
81    pub fn new() -> Self {
82        Self(HeaplessVec::new())
83    }
84
85    pub fn push(&mut self, req: MspRequest) -> CuResult<()> {
86        self.0
87            .push(req)
88            .map_err(|_| CuError::from("MSP request batch overflow"))
89    }
90
91    pub fn iter(&self) -> impl Iterator<Item = &MspRequest> {
92        self.0.iter()
93    }
94}
95
96impl Encode for MspRequestBatch {
97    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
98        Encode::encode(&self.0.as_slice(), encoder)
99    }
100}
101
102impl Decode<()> for MspRequestBatch {
103    fn decode<D: Decoder<Context = ()>>(decoder: &mut D) -> Result<Self, DecodeError> {
104        decode_bounded_vec::<MspRequest, MAX_REQUESTS_PER_BATCH, _>(decoder).map(MspRequestBatch)
105    }
106}
107
108/// Batch of MSP responses collected by the bridge.
109#[derive(Debug, Clone, Default, Serialize, Deserialize, Reflect)]
110#[reflect(opaque, from_reflect = false)]
111pub struct MspResponseBatch(pub HeaplessVec<MspResponse, MAX_RESPONSES_PER_BATCH>);
112
113impl MspResponseBatch {
114    pub fn new() -> Self {
115        Self(HeaplessVec::new())
116    }
117
118    pub fn push(&mut self, resp: MspResponse) -> CuResult<()> {
119        self.0
120            .push(resp)
121            .map_err(|_| CuError::from("MSP response batch overflow"))
122    }
123
124    pub fn clear(&mut self) {
125        self.0.clear();
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.0.is_empty()
130    }
131}
132
133impl Encode for MspResponseBatch {
134    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
135        Encode::encode(&self.0.as_slice(), encoder)
136    }
137}
138
139impl Decode<()> for MspResponseBatch {
140    fn decode<D: Decoder<Context = ()>>(decoder: &mut D) -> Result<Self, DecodeError> {
141        decode_bounded_vec::<MspResponse, MAX_RESPONSES_PER_BATCH, _>(decoder).map(MspResponseBatch)
142    }
143}
144
145tx_channels! {
146    pub struct TxChannels : TxId {
147        requests => MspRequestBatch,
148    }
149}
150
151rx_channels! {
152    pub struct RxChannels : RxId {
153        responses => MspResponseBatch,
154        incoming => MspRequestBatch,
155    }
156}
157
158/// Bridge that multiplexes MSP traffic on a single serial link.
159#[derive(Reflect)]
160#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
161pub struct CuMspBridge<S, E>
162where
163    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
164    E: 'static,
165{
166    #[reflect(ignore)]
167    serial: S,
168    #[reflect(ignore)]
169    parser: MspParser,
170    #[reflect(ignore)]
171    read_buffer: [u8; READ_BUFFER_SIZE],
172    #[reflect(ignore)]
173    pending_responses: MspResponseBatch,
174    #[reflect(ignore)]
175    pending_requests: MspRequestBatch,
176    #[reflect(ignore)]
177    tx_buffer: HeaplessVec<u8, TX_BUFFER_CAPACITY>,
178}
179
180impl<S, E> CuMspBridge<S, E>
181where
182    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
183    E: 'static,
184{
185    fn from_serial(serial: S) -> Self {
186        Self {
187            serial,
188            parser: MspParser::new(),
189            read_buffer: [0; READ_BUFFER_SIZE],
190            pending_responses: MspResponseBatch::new(),
191            pending_requests: MspRequestBatch::new(),
192            tx_buffer: HeaplessVec::new(),
193        }
194    }
195
196    fn send_request(&mut self, request: &MspRequest) -> CuResult<()> {
197        let packet: MspPacket = request.into();
198        let size = packet.packet_size_bytes();
199        self.tx_buffer
200            .resize(size, 0)
201            .map_err(|_| CuError::from("MSP bridge tx buffer too small"))?;
202        packet
203            .serialize(self.tx_buffer.as_mut_slice())
204            .map_err(|err| {
205                CuError::new_with_cause("MSP bridge failed to serialize request", err)
206            })?;
207        self.serial
208            .write_all(self.tx_buffer.as_slice())
209            .map_err(|_| CuError::from("MSP bridge failed to write serial"))
210    }
211
212    fn poll_serial(&mut self) -> CuResult<()> {
213        loop {
214            if self.pending_responses.0.len() >= MAX_RESPONSES_PER_BATCH
215                || self.pending_requests.0.len() >= MAX_REQUESTS_PER_BATCH
216            {
217                break;
218            }
219            let Ok(n) = self.serial.read(&mut self.read_buffer) else {
220                break;
221            };
222            if n == 0 {
223                break;
224            }
225            for &byte in &self.read_buffer[..n] {
226                if self.pending_responses.0.len() >= MAX_RESPONSES_PER_BATCH
227                    || self.pending_requests.0.len() >= MAX_REQUESTS_PER_BATCH
228                {
229                    break;
230                }
231                if let Ok(Some(packet)) = self.parser.parse(byte) {
232                    if packet.direction == MspPacketDirection::ToFlightController {
233                        // This is an incoming request from the VTX
234                        if let Some(request) = MspRequest::from_packet(&packet) {
235                            self.pending_requests.push(request)?;
236                        }
237                    } else {
238                        // This is a response from the VTX
239                        let response = MspResponse::from(packet);
240                        self.pending_responses.push(response)?;
241                    }
242                }
243            }
244        }
245        Ok(())
246    }
247}
248
249impl<S, E> Freezable for CuMspBridge<S, E>
250where
251    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
252    E: 'static,
253{
254}
255
256impl<S, E> cu29::reflect::TypePath for CuMspBridge<S, E>
257where
258    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
259    E: 'static,
260{
261    fn type_path() -> &'static str {
262        "cu_msp_bridge::CuMspBridge"
263    }
264
265    fn short_type_path() -> &'static str {
266        "CuMspBridge"
267    }
268
269    fn type_ident() -> Option<&'static str> {
270        Some("CuMspBridge")
271    }
272
273    fn crate_name() -> Option<&'static str> {
274        Some("cu_msp_bridge")
275    }
276
277    fn module_path() -> Option<&'static str> {
278        Some("cu_msp_bridge")
279    }
280}
281
282pub struct MspResources<S> {
283    pub serial: Owned<S>,
284}
285
286#[derive(Copy, Clone, Debug, Eq, PartialEq)]
287pub enum Binding {
288    Serial,
289}
290
291impl<'r, S, E> ResourceBindings<'r> for MspResources<S>
292where
293    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
294{
295    type Binding = Binding;
296
297    fn from_bindings(
298        manager: &'r mut ResourceManager,
299        mapping: Option<&cu29::resource::ResourceBindingMap<Self::Binding>>,
300    ) -> CuResult<Self> {
301        let mapping = mapping.ok_or_else(|| {
302            CuError::from("MSP bridge requires a `serial` resource mapping in copperconfig")
303        })?;
304        let path = mapping.get(Binding::Serial).ok_or_else(|| {
305            CuError::from("MSP bridge resources must include `serial: <bundle.resource>`")
306        })?;
307        let serial = manager
308            .take::<S>(path.typed())
309            .map_err(|e| e.add_cause("Failed to fetch MSP serial resource"))?;
310        Ok(Self { serial })
311    }
312}
313
314impl<S, E> CuBridge for CuMspBridge<S, E>
315where
316    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
317    E: 'static,
318{
319    type Resources<'r> = MspResources<S>;
320    type Tx = TxChannels;
321    type Rx = RxChannels;
322
323    fn new(
324        config: Option<&ComponentConfig>,
325        tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
326        rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
327        resources: Self::Resources<'_>,
328    ) -> CuResult<Self>
329    where
330        Self: Sized,
331    {
332        let _ = tx_channels;
333        let _ = rx_channels;
334        let _ = config;
335        let bridge = Self::from_serial(resources.serial.0);
336        Ok(bridge)
337    }
338
339    fn preprocess(&mut self, _clock: &RobotClock) -> CuResult<()> {
340        self.poll_serial()
341    }
342
343    fn send<'a, Payload>(
344        &mut self,
345        _clock: &RobotClock,
346        channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
347        msg: &CuMsg<Payload>,
348    ) -> CuResult<()>
349    where
350        Payload: CuMsgPayload + 'a,
351    {
352        match channel.id() {
353            TxId::Requests => {
354                let request_msg: &CuMsg<MspRequestBatch> = msg.downcast_ref()?;
355                if let Some(batch) = request_msg.payload() {
356                    for request in batch.iter() {
357                        self.send_request(request)?;
358                    }
359                }
360            }
361        }
362        Ok(())
363    }
364
365    fn receive<'a, Payload>(
366        &mut self,
367        clock: &RobotClock,
368        channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
369        msg: &mut CuMsg<Payload>,
370    ) -> CuResult<()>
371    where
372        Payload: CuMsgPayload + 'a,
373    {
374        msg.tov = Tov::Time(clock.now());
375        match channel.id() {
376            RxId::Responses => {
377                let response_msg: &mut CuMsg<MspResponseBatch> = msg.downcast_mut()?;
378                let mut batch = MspResponseBatch::new();
379                mem::swap(&mut batch, &mut self.pending_responses);
380                response_msg.set_payload(batch);
381            }
382            RxId::Incoming => {
383                let request_msg: &mut CuMsg<MspRequestBatch> = msg.downcast_mut()?;
384                let mut batch = MspRequestBatch::new();
385                mem::swap(&mut batch, &mut self.pending_requests);
386                request_msg.set_payload(batch);
387            }
388        }
389        Ok(())
390    }
391}
392
393/// Type alias for MSP bridge using standard I/O (for backward compatibility)
394#[cfg(feature = "std")]
395pub type CuMspBridgeStd = CuMspBridge<cu_linux_resources::LinuxSerialPort, std::io::Error>;