async_mavlink/
lib.rs

1//! Async adapter for the [mavlink](https://docs.rs/mavlink/) crate
2//!
3//! The mavlink crate offers a low level API for MAVLink connections. This crate adds a thin
4//! async API on top of that. The most important feature is a subscribe based communication model.
5//! It allows the user to subscribe to a certain type of message. All subsequent messages of that
6//! type then can be received in a async stream. In order for this to function, it is necessary to
7//! execute the event loop of the connector as a task. This crate aims to be executor independent,
8//! e.g. it should work with all async executors.
9//!
10//! This so far is only a proof of concept. While it might serve your usecase well, expect things
11//! to break. Contributions and suggestions are wellcome!
12//!
13//! # Example: Pulling all Parameters from a Vehicle
14//!
15//! In this example the subscribe method is utilized to collect all parameters of the vehicle.
16//!
17//! ```
18//! use std::collections::HashMap;
19//! use async_mavlink::prelude::*;
20//! use mavlink::{MavlinkVersion, common::*};
21//! use smol::prelude::*;
22//!
23//! # fn main()->Result<(),AsyncMavlinkError>{
24//! smol::block_on(async {
25//! #   smol::spawn(async {
26//! #       let mut conn = mavlink::connect("udpout:127.0.0.1:14551").unwrap();
27//! #       conn.set_protocol_version(mavlink::MavlinkVersion::V1);
28//! #       loop {
29//! #           let mut value = PARAM_VALUE_DATA::default();
30//! #           value.param_id = to_char_arr("param_1");
31//! #           value.param_value = 13.0;
32//! #           value.param_count = 1;
33//! #           conn.send_default(&MavMessage::PARAM_VALUE(value));
34//! #           smol::Timer::after(std::time::Duration::from_millis(10)).await;
35//! #       }
36//! #   }).detach();
37//!     // connect
38//!     let (conn, future) = AsyncMavConn::new("udpin:127.0.0.1:14551", MavlinkVersion::V1)?;
39//!     
40//!     // start event loop
41//!     smol::spawn(async move { future.await }).detach();
42//!
43//!     // we want to subscribe to PARAM_VALUE messages
44//!     let msg_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
45//!
46//!     // subscribe to PARAM_VALUE message
47//!     let mut stream = conn.subscribe(msg_type).await?;
48//!
49//!     // and send one PARAM_REQUEST_LIST message
50//!     let msg_request = MavMessage::PARAM_REQUEST_LIST(Default::default());
51//!     conn.send_default(&msg_request).await?;
52//!     let mut parameters = HashMap::new();
53//!
54//!     // receive all parameters and store them in a HashMap
55//!     while let Some(MavMessage::PARAM_VALUE(data)) = (stream.next()).await {
56//!         parameters.insert(to_string(&data.param_id), data.param_value);
57//!         if data.param_count as usize == parameters.len(){
58//!             break;
59//!         }
60//!     }
61//!
62//!     // do something with parameters
63//! # Ok(())    
64//! })
65//! # }
66//! ```
67#![deny(missing_docs)]
68#![deny(unsafe_code)]
69
70use std::collections::HashMap;
71use std::pin::Pin;
72use std::sync::Arc;
73use std::time::Instant;
74
75use arc_swap::ArcSwapOption;
76use blocking::Unblock;
77use futures::{
78    channel::{
79        mpsc::{self, UnboundedSender},
80        oneshot,
81    },
82    future::Either,
83    prelude::*,
84    StreamExt,
85};
86
87use mavlink::{MavHeader, Message};
88
89pub mod prelude;
90mod types;
91mod util;
92
93pub use types::{AsyncMavlinkError, MavMessageType};
94pub use util::{to_char_arr, to_string};
95
96/// An async adapter for a MAVLink connection
97///
98/// Offers high level functionality to interact with a MAVLink vehicle in an async fashion.
99pub struct AsyncMavConn<M: mavlink::Message> {
100    task_dispatcher: UnboundedSender<Task<M>>,
101    last_heartbeat: Arc<ArcSwapOption<Instant>>,
102}
103
104enum Task<M: mavlink::Message> {
105    Emit {
106        header: MavHeader,
107        message: M,
108        backchannel: oneshot::Sender<Result<(), AsyncMavlinkError>>,
109    },
110    Subscribe {
111        message_type: MavMessageType<M>,
112        backchannel: UnboundedSender<M>,
113    },
114}
115
116// TODO make this failable if no heartbeat is received
117impl<M: 'static + mavlink::Message + Clone + Send + Sync> AsyncMavConn<M> {
118    /// Construct a new MavlinkConnectionHandler
119    ///
120    /// # Arguments
121    ///
122    /// * `address` - MAVLink connection `&str`. Equivalent to the `address` argument in
123    /// [mavlink::connect](https://docs.rs/mavlink/*/mavlink/fn.connect.html)
124    ///
125    /// # Examples
126    ///
127    /// ```
128    /// use async_mavlink::prelude::*;
129    /// use mavlink::{MavlinkVersion, common::MavMessage};
130    /// use smol::prelude::*;
131    ///
132    /// # fn main() -> Result<(),AsyncMavlinkError> {
133    /// smol::block_on(async {
134    ///     let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.2:14551", MavlinkVersion::V1)?;
135    ///     smol::spawn(async move { future.await }).detach();
136    ///     // ...
137    /// #   conn.send_default(&MavMessage::HEARTBEAT(Default::default())).await?;
138    /// # Ok(())
139    /// })
140    /// # }
141    /// ```
142    pub fn new(
143        address: &str,
144        mavlink_version: mavlink::MavlinkVersion,
145    ) -> Result<(Arc<Self>, impl Future<Output = impl Send> + Send), AsyncMavlinkError> {
146        let mut conn = mavlink::connect::<M>(address)?;
147        conn.set_protocol_version(mavlink_version);
148        let (task_dispatcher, incoming_tasks) = mpsc::unbounded();
149        let last_heartbeat = Arc::new(ArcSwapOption::from(None));
150
151        let f = {
152            let last_heartbeat = last_heartbeat.clone();
153            async move {
154                let mut subscriptions: HashMap<_, Vec<UnboundedSender<M>>> = HashMap::new();
155                let conn = Arc::new(conn);
156
157                let messages_iter = std::iter::repeat_with({
158                    let conn = conn.clone();
159                    move || conn.recv()
160                });
161                let messages = Unblock::new(messages_iter).map(Either::Right);
162                let operations = incoming_tasks.map(Either::Left);
163
164                let mut combined = stream::select(operations, messages);
165
166                let heartbeat_id =
167                    mavlink::common::MavMessage::HEARTBEAT(Default::default()).message_id();
168
169                // Event loop
170                //
171                // In here we process everything. By making this not concurrent itself, we can
172                // afford not to use locks at all!
173                loop {
174                    // TODO Evaluate whether this unwrap is fallible
175                    match combined.next().await.unwrap() {
176                        // A new subscription
177                        Either::Left(Task::Subscribe {
178                            message_type,
179                            backchannel,
180                        }) => {
181                            // find existing subscriptions for this message
182                            let message_subs = subscriptions
183                                .entry(message_type)
184                                .or_insert_with(|| Vec::with_capacity(1));
185                            // and add this channel to it.
186                            message_subs.push(backchannel);
187                        }
188                        // A new message to be emitted
189                        Either::Left(Task::Emit {
190                            header,
191                            message,
192                            backchannel,
193                        }) => {
194                            // Send the message and send back any error which occurred on sending
195                            let result = conn.send(&header, &message).map_err(|e| e.into());
196                            let _ = backchannel.send(result); // TODO handle this
197                        }
198                        Either::Right(Ok((_header, msg))) => {
199                            if msg.message_id() == heartbeat_id {
200                                last_heartbeat.rcu(|_| Some(Arc::new(Instant::now())));
201                            }
202                            subscriptions
203                                .entry(MavMessageType::new(&msg))
204                                .or_insert_with(Vec::new)
205                                .retain(|mut backchannel| match backchannel.is_closed() {
206                                    true => false,
207                                    false => {
208                                        futures::executor::block_on(backchannel.send(msg.clone()))
209                                            .expect("unable to do this");
210                                        true
211                                    }
212                                });
213                        }
214                        _ => {}
215                    }
216                }
217            }
218        };
219
220        Ok((
221            Arc::new(Self {
222                task_dispatcher,
223                last_heartbeat,
224            }),
225            Box::pin(f),
226        ))
227    }
228
229    /// Subscribe to all new MavMessages of the given MavMessageType
230    ///
231    /// This returns a never-ending Stream of MavMessages.
232    ///
233    /// # Arguments
234    ///
235    /// * `message_type` - `MavMessageType` of the desired messages
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// # use std::time::Duration;
241    /// # use futures::prelude::*;
242    /// # use async_mavlink::prelude::*;
243    /// # use mavlink::{MavlinkVersion, common::MavMessage};
244    /// # fn main() -> Result<(),AsyncMavlinkError> {
245    /// # smol::block_on(async {
246    /// # let (conn, future) = AsyncMavConn::new("udpin:127.0.0.3:14551", MavlinkVersion::V1)?;
247    /// # smol::spawn(async move { future.await; }).detach();
248    /// # smol::spawn(async {
249    /// #   let mut conn = mavlink::connect("udpout:127.0.0.3:14551").unwrap();
250    /// #   conn.set_protocol_version(mavlink::MavlinkVersion::V1);
251    /// #   loop {
252    /// #       conn.send_default(&MavMessage::PARAM_VALUE(Default::default()));
253    /// #       smol::Timer::after(Duration::from_millis(10)).await;
254    /// #   }
255    /// # }).detach();
256    /// let message_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
257    /// let mut stream = conn.subscribe(message_type).await?;
258    /// while let Some(MavMessage::PARAM_VALUE(data)) = (stream.next()).await {
259    ///     // do something with `data`
260    /// #   break;
261    /// }
262    /// # Ok(()) })}
263    /// ```
264    pub async fn subscribe(
265        &self,
266        message_type: MavMessageType<M>,
267    ) -> Result<Pin<Box<dyn Stream<Item = M>>>, AsyncMavlinkError> {
268        let (backchannel, rx) = mpsc::unbounded();
269
270        self.task_dispatcher
271            .clone()
272            .send(Task::Subscribe {
273                message_type,
274                backchannel,
275            })
276            .await?; // infailable
277        Ok(Box::pin(rx))
278    }
279
280    /// Awaits the next MavMessage of the given MavMessageType
281    ///
282    /// # Arguments
283    ///
284    /// * `message_type` - `MavMessageType` of the desired messages
285    ///
286    /// # Examples
287    ///
288    /// ```
289    /// # use std::time::Duration;
290    /// # use futures::prelude::*;
291    /// # use async_mavlink::prelude::*;
292    /// # use mavlink::{MavlinkVersion, common::MavMessage};
293    /// # fn main() -> Result<(),AsyncMavlinkError> {
294    /// # smol::block_on(async {
295    /// # let (conn, future) = AsyncMavConn::new("udpin:127.0.0.4:14551", MavlinkVersion::V1)?;
296    /// # smol::spawn(async move { future.await; }).detach();
297    /// # smol::spawn(async {
298    /// #   let mut conn = mavlink::connect("udpout:127.0.0.4:14551").unwrap();
299    /// #   conn.set_protocol_version(mavlink::MavlinkVersion::V1);
300    /// #   loop {
301    /// #       let mut header = mavlink::MavHeader::default();
302    /// #       header.system_id = 0;    
303    /// #       conn.send(&header, &MavMessage::PARAM_VALUE(Default::default()));
304    /// #       smol::Timer::after(Duration::from_millis(10)).await;
305    /// #   }
306    /// # }).detach();
307    /// let message_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
308    ///
309    /// if let MavMessage::PARAM_VALUE(data) = conn.request(message_type).await {
310    ///     // do something with `data`
311    /// }
312    /// # Ok(()) })}
313    /// ```
314    pub async fn request(&self, message_type: MavMessageType<M>) -> M {
315        let (backchannel, mut rx) = mpsc::unbounded();
316        self.task_dispatcher
317            .clone()
318            .send(Task::Subscribe {
319                message_type,
320                backchannel,
321            })
322            .await
323            .unwrap(); //this may never fail
324        rx.next().map(|m| m.expect("Oh no!")).await
325    }
326
327    /// Send a `MavMessage` to the vehicle
328    ///
329    /// # Arguments
330    ///
331    /// * `message` - `MavMessage` to send
332    ///
333    /// # Examples
334    ///
335    /// ```
336    /// # use std::time::Duration;
337    /// # use futures::prelude::*;
338    /// # use async_mavlink::AsyncMavConn;
339    /// # use async_mavlink::prelude::*;
340    /// # use mavlink::{MavHeader, MavlinkVersion, common::*};
341    /// # fn main() -> Result<(),AsyncMavlinkError> {
342    /// # smol::block_on(async {
343    /// # let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.5:14551", MavlinkVersion::V1)?;
344    /// # smol::spawn(async move { future.await; }).detach();
345    /// let header = MavHeader::default();
346    /// let message = MavMessage::PARAM_REQUEST_LIST(PARAM_REQUEST_LIST_DATA {
347    ///     target_component: 0,
348    ///     target_system: 0,
349    /// });
350    ///
351    /// conn.send(&header, &message).await?;
352    /// # Ok(())
353    /// # })}
354    /// ```
355    pub async fn send(&self, header: &MavHeader, message: &M) -> Result<(), AsyncMavlinkError> {
356        let (backchannel, receiver) = oneshot::channel();
357        self.task_dispatcher
358            .clone() // TODO get rid of clone
359            .send(Task::Emit {
360                header: *header,
361                message: message.clone(),
362                backchannel,
363            })
364            .await
365            .map_err(AsyncMavlinkError::from)?;
366        receiver.await.map_err(AsyncMavlinkError::from)?
367    }
368
369    /// Send a `MavMessage` to the vehicle
370    ///
371    /// # Arguments
372    ///
373    /// * `message` - `MavMessage` to send
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// # use async_mavlink::prelude::*;
379    /// # use mavlink::{MavlinkVersion,common::*};
380    /// # fn main() -> Result<(), AsyncMavlinkError> {
381    /// # use async_mavlink::prelude::AsyncMavlinkError;
382    /// # smol::block_on(async {
383    /// # let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.6:14551", MavlinkVersion::V1)?;
384    /// # smol::spawn(async move { future.await; }).detach();
385    /// let message = MavMessage::PARAM_REQUEST_LIST(PARAM_REQUEST_LIST_DATA {
386    ///     target_component: 0,
387    ///     target_system: 0,
388    /// });
389    ///
390    /// conn.send_default(&message).await?;
391    /// # Ok(())
392    /// # })}
393    /// ```
394    pub async fn send_default(&self, message: &M) -> Result<(), AsyncMavlinkError> {
395        Ok(self.send(&MavHeader::default(), message).await?)
396    }
397
398    /// Returns the `Instant` from the last received HEARTBEAT
399    pub async fn last_heartbeat(&self) -> Option<Instant> {
400        self.last_heartbeat.load_full().map(|arc| *arc)
401    }
402}
403
404#[cfg(test)]
405mod test {
406    use super::*;
407    use mavlink::common::*;
408    use std::time::Duration;
409    async fn new_conn<M: 'static + mavlink::Message + Clone + Send + Sync>(
410        arg: &str,
411    ) -> Arc<AsyncMavConn<M>> {
412        let (conn, future) = AsyncMavConn::new(arg, mavlink::MavlinkVersion::V1).unwrap();
413        smol::spawn(async move { future.await }).detach();
414        conn
415    }
416
417    #[test]
418    fn subscribe() -> Result<(), AsyncMavlinkError> {
419        smol::block_on(async {
420            let conn = new_conn("udpin:127.0.0.7:14551").await;
421
422            smol::spawn(async move {
423                let mut conn = mavlink::connect("udpout:127.0.0.7:14551").unwrap();
424                conn.set_protocol_version(mavlink::MavlinkVersion::V1);
425                loop {
426                    conn.send_default(&MavMessage::HEARTBEAT(Default::default()))
427                        .unwrap();
428                    smol::Timer::after(Duration::from_millis(10)).await;
429                }
430            })
431            .detach();
432
433            let message_type = MavMessageType::new(&MavMessage::HEARTBEAT(Default::default()));
434            let mut stream = conn.subscribe(message_type).await?;
435
436            let mut i = 0;
437
438            while let Some(MavMessage::HEARTBEAT(_data)) = (stream.next()).await {
439                i += 1;
440                if i > 5 {
441                    break;
442                }
443            }
444
445            Ok(())
446        })
447    }
448
449    #[test]
450    fn send() -> Result<(), AsyncMavlinkError> {
451        smol::block_on(async {
452            let conn = new_conn("udpout:127.0.0.8:14551").await;
453
454            let mut raw_conn = mavlink::connect("udpin:127.0.0.8:14551").unwrap();
455            raw_conn.set_protocol_version(mavlink::MavlinkVersion::V1);
456            let received = blocking::unblock(move || raw_conn.recv());
457
458            smol::spawn(async move {
459                let header = mavlink::MavHeader::default();
460                let message = MavMessage::HEARTBEAT(HEARTBEAT_DATA::default());
461                for _ in 0usize..100 {
462                    conn.send(&header, &message).await.unwrap();
463                }
464            })
465            .detach();
466
467            let received = received.await;
468            assert!(received.is_ok());
469
470            if let Ok((_, MavMessage::HEARTBEAT(_))) = received {
471            } else {
472                panic!("received wrong message");
473            }
474
475            Ok(())
476        })
477    }
478
479    #[test]
480    fn send_default() -> std::io::Result<()> {
481        smol::block_on(async {
482            let conn = new_conn("udpout:127.0.0.9:14551").await;
483
484            let mut raw_conn = mavlink::connect("udpin:127.0.0.9:14551").unwrap();
485            raw_conn.set_protocol_version(mavlink::MavlinkVersion::V1);
486            let received = blocking::unblock(move || raw_conn.recv());
487
488            smol::spawn(async move {
489                let message = MavMessage::HEARTBEAT(HEARTBEAT_DATA::default());
490                for _ in 0usize..100 {
491                    conn.send_default(&message).await.unwrap();
492                }
493            })
494            .detach();
495
496            let received = received.await;
497            assert!(received.is_ok());
498
499            if let Ok((_, MavMessage::HEARTBEAT(_))) = received {
500            } else {
501                panic!("received wrong message");
502            }
503
504            Ok(())
505        })
506    }
507}