mavlink_core/connection/
direct_serial.rs

1//! Serial MAVLINK connection
2
3use crate::connection::MavConnection;
4use crate::error::{MessageReadError, MessageWriteError};
5use crate::peek_reader::PeekReader;
6use crate::Connectable;
7use crate::{MAVLinkMessageRaw, MavHeader, MavlinkVersion, Message, ReadVersion};
8use core::ops::DerefMut;
9use core::sync::atomic::{self, AtomicU8};
10use std::io::{self, BufReader};
11use std::sync::Mutex;
12
13use serialport::{DataBits, FlowControl, Parity, SerialPort, StopBits};
14
15#[cfg(not(feature = "signing"))]
16use crate::{read_versioned_msg, read_versioned_raw_message, write_versioned_msg};
17#[cfg(feature = "signing")]
18use crate::{
19    read_versioned_msg_signed, read_versioned_raw_message_signed, write_versioned_msg_signed,
20    SigningConfig, SigningData,
21};
22
23pub mod config;
24
25use config::SerialConfig;
26
27pub struct SerialConnection {
28    // Separate ports for reading and writing as it's safe to use concurrently.
29    // See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
30    read_port: Mutex<PeekReader<BufReader<Box<dyn SerialPort>>>>,
31    write_port: Mutex<Box<dyn SerialPort>>,
32    sequence: AtomicU8,
33    protocol_version: MavlinkVersion,
34    recv_any_version: bool,
35    #[cfg(feature = "signing")]
36    signing_data: Option<SigningData>,
37}
38
39impl<M: Message> MavConnection<M> for SerialConnection {
40    fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
41        let mut port = self.read_port.lock().unwrap();
42        let version = ReadVersion::from_conn_cfg::<_, M>(self);
43
44        loop {
45            #[cfg(not(feature = "signing"))]
46            let result = read_versioned_msg(port.deref_mut(), version);
47            #[cfg(feature = "signing")]
48            let result =
49                read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
50            match result {
51                ok @ Ok(..) => {
52                    return ok;
53                }
54                Err(MessageReadError::Io(e)) => {
55                    if e.kind() == io::ErrorKind::UnexpectedEof {
56                        return Err(MessageReadError::Io(e));
57                    }
58                }
59                _ => {}
60            }
61        }
62    }
63
64    fn recv_raw(&self) -> Result<MAVLinkMessageRaw, MessageReadError> {
65        let mut port = self.read_port.lock().unwrap();
66        let version = ReadVersion::from_conn_cfg::<_, M>(self);
67
68        loop {
69            #[cfg(not(feature = "signing"))]
70            let result = read_versioned_raw_message::<M, _>(port.deref_mut(), version);
71            #[cfg(feature = "signing")]
72            let result = read_versioned_raw_message_signed::<M, _>(
73                port.deref_mut(),
74                version,
75                self.signing_data.as_ref(),
76            );
77            match result {
78                ok @ Ok(..) => {
79                    return ok;
80                }
81                Err(MessageReadError::Io(e)) => {
82                    if e.kind() == io::ErrorKind::UnexpectedEof {
83                        return Err(MessageReadError::Io(e));
84                    }
85                }
86                _ => {}
87            }
88        }
89    }
90
91    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
92        let mut port = self.read_port.lock().unwrap();
93        let version = ReadVersion::from_conn_cfg::<_, M>(self);
94
95        #[cfg(not(feature = "signing"))]
96        let result = read_versioned_msg(port.deref_mut(), version);
97
98        #[cfg(feature = "signing")]
99        let result =
100            read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
101
102        result
103    }
104
105    fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
106        let mut port = self.write_port.lock().unwrap();
107
108        let sequence = self.sequence.fetch_add(
109            1,
110            // Safety:
111            //
112            // We are using `Ordering::Relaxed` here because:
113            // - We only need a unique sequence number per message
114            // - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
115            // - No other thread reads or writes `self.sequence` without going through this `Mutex`
116            //
117            // Warning:
118            //
119            // If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
120            // then we should upgrade this ordering to `Ordering::SeqCst`.
121            atomic::Ordering::Relaxed,
122        );
123
124        let header = MavHeader {
125            sequence,
126            system_id: header.system_id,
127            component_id: header.component_id,
128        };
129
130        #[cfg(not(feature = "signing"))]
131        let result = write_versioned_msg(port.deref_mut(), self.protocol_version, header, data);
132        #[cfg(feature = "signing")]
133        let result = write_versioned_msg_signed(
134            port.deref_mut(),
135            self.protocol_version,
136            header,
137            data,
138            self.signing_data.as_ref(),
139        );
140        result
141    }
142
143    fn set_protocol_version(&mut self, version: MavlinkVersion) {
144        self.protocol_version = version;
145    }
146
147    fn protocol_version(&self) -> MavlinkVersion {
148        self.protocol_version
149    }
150
151    fn set_allow_recv_any_version(&mut self, allow: bool) {
152        self.recv_any_version = allow;
153    }
154
155    fn allow_recv_any_version(&self) -> bool {
156        self.recv_any_version
157    }
158
159    #[cfg(feature = "signing")]
160    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
161        self.signing_data = signing_data.map(SigningData::from_config);
162    }
163}
164
165impl Connectable for SerialConfig {
166    fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
167        let read_port = serialport::new(&self.port_name, self.baud_rate)
168            .data_bits(DataBits::Eight)
169            .parity(Parity::None)
170            .stop_bits(StopBits::One)
171            .flow_control(FlowControl::None)
172            .open()?;
173
174        let write_port = read_port.try_clone()?;
175
176        let read_buffer_capacity = self.buffer_capacity();
177        let buf_reader = BufReader::with_capacity(read_buffer_capacity, read_port);
178
179        Ok(Box::new(SerialConnection {
180            read_port: Mutex::new(PeekReader::new(buf_reader)),
181            write_port: Mutex::new(write_port),
182            sequence: AtomicU8::new(0),
183            protocol_version: MavlinkVersion::V2,
184            #[cfg(feature = "signing")]
185            signing_data: None,
186            recv_any_version: false,
187        }))
188    }
189}