cu_msp_bridge/
lib.rs

1//! Copper bridge that multiplexes MSP traffic in both directions over a single serial port.
2//! The type exposes a `requests` Tx channel that accepts [`MspRequestBatch`] messages and a
3//! `responses` Rx channel that yields [`MspResponseBatch`] payloads decoded from the line.
4
5#![cfg_attr(not(feature = "std"), no_std)]
6
7#[cfg(not(feature = "std"))]
8extern crate alloc;
9
10// std implementation
11#[cfg(feature = "std")]
12mod std_impl {
13    pub use std::{format, mem, string::String, vec::Vec};
14
15    pub const DEVICE_KEY: &str = "device";
16    pub const BAUD_KEY: &str = "baudrate";
17    pub const TIMEOUT_KEY: &str = "timeout_ms";
18    pub const DEFAULT_DEVICE: &str = "/dev/ttyUSB0";
19    pub const DEFAULT_BAUDRATE: u32 = 115_200;
20    pub const DEFAULT_TIMEOUT_MS: u64 = 50;
21}
22
23// no-std implementation
24#[cfg(not(feature = "std"))]
25mod no_std_impl {
26    pub use alloc::vec::Vec;
27    pub use core::mem;
28}
29
30#[cfg(not(feature = "std"))]
31use no_std_impl::*;
32#[cfg(feature = "std")]
33use std_impl::*;
34
35use bincode::de::Decoder;
36use bincode::enc::Encoder;
37use bincode::error::{DecodeError, EncodeError};
38use bincode::{Decode, Encode};
39use cu_msp_lib::structs::{MspRequest, MspResponse};
40use cu_msp_lib::{MspPacket, MspPacketDirection, MspParser};
41use cu29::cubridge::{
42    BridgeChannel, BridgeChannelConfig, BridgeChannelInfo, BridgeChannelSet, CuBridge,
43};
44use cu29::prelude::*;
45#[cfg(feature = "std")]
46use cu29::resource::ResourceBundle;
47use cu29::resource::{Owned, ResourceBindings, ResourceManager};
48use embedded_io::{ErrorType, Read, Write};
49use serde::{Deserialize, Serialize};
50use smallvec::SmallVec;
51use spin::Mutex;
52
53const READ_BUFFER_SIZE: usize = 512;
54const MAX_REQUESTS_PER_BATCH: usize = 8;
55const MAX_RESPONSES_PER_BATCH: usize = 16;
56
57/// Batch of MSP requests transported over the bridge.
58#[derive(Debug, Clone, Default, Serialize, Deserialize)]
59pub struct MspRequestBatch(pub SmallVec<[MspRequest; MAX_REQUESTS_PER_BATCH]>);
60
61impl MspRequestBatch {
62    pub fn new() -> Self {
63        Self(SmallVec::new())
64    }
65
66    pub fn push(&mut self, req: MspRequest) {
67        self.0.push(req);
68    }
69
70    pub fn iter(&self) -> impl Iterator<Item = &MspRequest> {
71        self.0.iter()
72    }
73}
74
75impl Encode for MspRequestBatch {
76    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
77        Encode::encode(&self.0.as_slice(), encoder)
78    }
79}
80
81impl Decode<()> for MspRequestBatch {
82    fn decode<D: Decoder<Context = ()>>(decoder: &mut D) -> Result<Self, DecodeError> {
83        let values = <Vec<MspRequest> as Decode<()>>::decode(decoder)?;
84        Ok(Self(values.into()))
85    }
86}
87
88/// Batch of MSP responses collected by the bridge.
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct MspResponseBatch(pub SmallVec<[MspResponse; MAX_RESPONSES_PER_BATCH]>);
91
92impl MspResponseBatch {
93    pub fn new() -> Self {
94        Self(SmallVec::new())
95    }
96
97    pub fn push(&mut self, resp: MspResponse) {
98        self.0.push(resp);
99    }
100
101    pub fn clear(&mut self) {
102        self.0.clear();
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.0.is_empty()
107    }
108}
109
110impl Encode for MspResponseBatch {
111    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
112        Encode::encode(&self.0.as_slice(), encoder)
113    }
114}
115
116impl Decode<()> for MspResponseBatch {
117    fn decode<D: Decoder<Context = ()>>(decoder: &mut D) -> Result<Self, DecodeError> {
118        let values = <Vec<MspResponse> as Decode<()>>::decode(decoder)?;
119        Ok(Self(values.into()))
120    }
121}
122
123tx_channels! {
124    pub struct TxChannels : TxId {
125        requests => MspRequestBatch,
126    }
127}
128
129rx_channels! {
130    pub struct RxChannels : RxId {
131        responses => MspResponseBatch,
132        incoming => MspRequestBatch,
133    }
134}
135
136/// Bridge that multiplexes MSP traffic on a single serial link.
137pub struct CuMspBridge<S, E>
138where
139    S: Write<Error = E> + Read<Error = E>,
140{
141    serial: S,
142    parser: MspParser,
143    read_buffer: [u8; READ_BUFFER_SIZE],
144    pending_responses: MspResponseBatch,
145    pending_requests: MspRequestBatch,
146    tx_buffer: SmallVec<[u8; 256]>,
147}
148
149impl<S, E> CuMspBridge<S, E>
150where
151    S: Write<Error = E> + Read<Error = E>,
152{
153    fn from_serial(serial: S) -> Self {
154        Self {
155            serial,
156            parser: MspParser::new(),
157            read_buffer: [0; READ_BUFFER_SIZE],
158            pending_responses: MspResponseBatch::new(),
159            pending_requests: MspRequestBatch::new(),
160            tx_buffer: SmallVec::new(),
161        }
162    }
163
164    fn send_request(&mut self, request: &MspRequest) -> CuResult<()> {
165        let packet: MspPacket = request.into();
166        let size = packet.packet_size_bytes();
167        self.tx_buffer.resize(size, 0);
168        packet.serialize(&mut self.tx_buffer).map_err(|err| {
169            CuError::new_with_cause("MSP bridge failed to serialize request", err)
170        })?;
171        self.serial
172            .write_all(&self.tx_buffer)
173            .map_err(|_| CuError::from("MSP bridge failed to write serial"))
174    }
175
176    fn poll_serial(&mut self) -> CuResult<()> {
177        loop {
178            if self.pending_responses.0.len() >= MAX_RESPONSES_PER_BATCH
179                || self.pending_requests.0.len() >= MAX_REQUESTS_PER_BATCH
180            {
181                break;
182            }
183            match self.serial.read(&mut self.read_buffer) {
184                Ok(0) => break,
185                Ok(n) => {
186                    for &byte in &self.read_buffer[..n] {
187                        if self.pending_responses.0.len() >= MAX_RESPONSES_PER_BATCH
188                            || self.pending_requests.0.len() >= MAX_REQUESTS_PER_BATCH
189                        {
190                            break;
191                        }
192                        match self.parser.parse(byte) {
193                            Ok(Some(packet)) => {
194                                if packet.direction == MspPacketDirection::ToFlightController {
195                                    // This is an incoming request from the VTX
196                                    if let Some(request) = MspRequest::from_packet(&packet) {
197                                        self.pending_requests.push(request);
198                                    }
199                                } else {
200                                    // This is a response from the VTX
201                                    let response = MspResponse::from(packet);
202                                    self.pending_responses.push(response);
203                                }
204                            }
205                            Ok(None) => {}
206                            Err(_) => {}
207                        }
208                    }
209                }
210                Err(_) => break,
211            }
212        }
213        Ok(())
214    }
215}
216
217impl<S, E> Freezable for CuMspBridge<S, E> where S: Write<Error = E> + Read<Error = E> {}
218
219pub struct MspResources<S> {
220    pub serial: Owned<S>,
221}
222
223#[derive(Copy, Clone, Debug, Eq, PartialEq)]
224pub enum Binding {
225    Serial,
226}
227
228impl<'r, S, E> ResourceBindings<'r> for MspResources<S>
229where
230    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
231{
232    type Binding = Binding;
233
234    fn from_bindings(
235        manager: &'r mut ResourceManager,
236        mapping: Option<&cu29::resource::ResourceBindingMap<Self::Binding>>,
237    ) -> CuResult<Self> {
238        let mapping = mapping.ok_or_else(|| {
239            CuError::from("MSP bridge requires a `serial` resource mapping in copperconfig")
240        })?;
241        let path = mapping.get(Binding::Serial).ok_or_else(|| {
242            CuError::from("MSP bridge resources must include `serial: <bundle.resource>`")
243        })?;
244        let serial = manager
245            .take::<S>(path.typed())
246            .map_err(|e| e.add_cause("Failed to fetch MSP serial resource"))?;
247        Ok(Self { serial })
248    }
249}
250
251impl<S, E> CuBridge for CuMspBridge<S, E>
252where
253    S: Write<Error = E> + Read<Error = E> + Send + Sync + 'static,
254{
255    type Resources<'r> = MspResources<S>;
256    type Tx = TxChannels;
257    type Rx = RxChannels;
258
259    fn new(
260        config: Option<&ComponentConfig>,
261        tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
262        rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
263        resources: Self::Resources<'_>,
264    ) -> CuResult<Self>
265    where
266        Self: Sized,
267    {
268        let _ = tx_channels;
269        let _ = rx_channels;
270        let _ = config;
271        let bridge = Self::from_serial(resources.serial.0);
272        Ok(bridge)
273    }
274
275    fn preprocess(&mut self, _clock: &RobotClock) -> CuResult<()> {
276        self.poll_serial()
277    }
278
279    fn send<'a, Payload>(
280        &mut self,
281        _clock: &RobotClock,
282        channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
283        msg: &CuMsg<Payload>,
284    ) -> CuResult<()>
285    where
286        Payload: CuMsgPayload + 'a,
287    {
288        match channel.id() {
289            TxId::Requests => {
290                let request_msg: &CuMsg<MspRequestBatch> = msg.downcast_ref()?;
291                if let Some(batch) = request_msg.payload() {
292                    for request in batch.iter() {
293                        self.send_request(request)?;
294                    }
295                }
296            }
297        }
298        Ok(())
299    }
300
301    fn receive<'a, Payload>(
302        &mut self,
303        clock: &RobotClock,
304        channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
305        msg: &mut CuMsg<Payload>,
306    ) -> CuResult<()>
307    where
308        Payload: CuMsgPayload + 'a,
309    {
310        msg.tov = Tov::Time(clock.now());
311        match channel.id() {
312            RxId::Responses => {
313                let response_msg: &mut CuMsg<MspResponseBatch> = msg.downcast_mut()?;
314                let mut batch = MspResponseBatch::new();
315                mem::swap(&mut batch, &mut self.pending_responses);
316                response_msg.set_payload(batch);
317            }
318            RxId::Incoming => {
319                let request_msg: &mut CuMsg<MspRequestBatch> = msg.downcast_mut()?;
320                let mut batch = MspRequestBatch::new();
321                mem::swap(&mut batch, &mut self.pending_requests);
322                request_msg.set_payload(batch);
323            }
324        }
325        Ok(())
326    }
327}
328
329#[cfg(feature = "std")]
330pub mod std_serial {
331    use embedded_io_adapters::std::FromStd;
332    #[cfg(feature = "std")]
333    use std::boxed::Box;
334    #[cfg(feature = "std")]
335    use std::time::Duration;
336
337    pub type StdSerial = FromStd<Box<dyn serialport::SerialPort>>;
338
339    pub fn open(path: &str, baud: u32, timeout_ms: u64) -> std::io::Result<StdSerial> {
340        let port = serialport::new(path, baud)
341            .timeout(Duration::from_millis(timeout_ms))
342            .open()?;
343        Ok(FromStd::new(port))
344    }
345}
346
347#[cfg(feature = "std")]
348pub struct StdSerialBundle;
349
350#[cfg(feature = "std")]
351bundle_resources!(StdSerialBundle: Serial);
352
353#[cfg(feature = "std")]
354impl ResourceBundle for StdSerialBundle {
355    fn build(
356        bundle: cu29::resource::BundleContext<Self>,
357        config: Option<&ComponentConfig>,
358        manager: &mut ResourceManager,
359    ) -> CuResult<()> {
360        let cfg = config.ok_or_else(|| {
361            CuError::from(format!(
362                "MSP serial bundle `{}` requires configuration",
363                bundle.bundle_id()
364            ))
365        })?;
366        let device = cfg
367            .get::<String>(DEVICE_KEY)
368            .filter(|s| !s.is_empty())
369            .unwrap_or_else(|| DEFAULT_DEVICE.to_string());
370        let baudrate = cfg.get::<u32>(BAUD_KEY).unwrap_or(DEFAULT_BAUDRATE);
371        let timeout_ms = cfg.get::<u64>(TIMEOUT_KEY).unwrap_or(DEFAULT_TIMEOUT_MS);
372
373        let serial = std_serial::open(&device, baudrate, timeout_ms).map_err(|err| {
374            CuError::from(format!(
375                "MSP bridge failed to open serial `{device}` at {baudrate} baud: {err}"
376            ))
377        })?;
378        let key = bundle.key(StdSerialBundleId::Serial);
379        manager.add_owned(key, LockedSerial::new(serial))
380    }
381}
382
383pub struct LockedSerial<T>(pub Mutex<T>);
384
385impl<T> LockedSerial<T> {
386    pub fn new(inner: T) -> Self {
387        Self(Mutex::new(inner))
388    }
389}
390
391impl<T: ErrorType> ErrorType for LockedSerial<T> {
392    type Error = T::Error;
393}
394
395impl<T: Read> Read for LockedSerial<T> {
396    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
397        let mut guard = self.0.lock();
398        guard.read(buf)
399    }
400}
401
402impl<T: Write> Write for LockedSerial<T> {
403    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
404        let mut guard = self.0.lock();
405        guard.write(buf)
406    }
407
408    fn flush(&mut self) -> Result<(), Self::Error> {
409        let mut guard = self.0.lock();
410        guard.flush()
411    }
412}
413
414/// Type alias for MSP bridge using standard I/O (for backward compatibility)
415#[cfg(feature = "std")]
416pub type CuMspBridgeStd = CuMspBridge<LockedSerial<std_serial::StdSerial>, std::io::Error>;