async_mavlink/lib.rs
1//! Async adapter for the [mavlink](https://docs.rs/mavlink/) crate
2//!
3//! The mavlink crate offers a low level API for MAVLink connections. This crate adds a thin
4//! async API on top of that. The most important feature is a subscribe based communication model.
5//! It allows the user to subscribe to a certain type of message. All subsequent messages of that
6//! type then can be received in a async stream. In order for this to function, it is necessary to
7//! execute the event loop of the connector as a task. This crate aims to be executor independent,
8//! e.g. it should work with all async executors.
9//!
10//! This so far is only a proof of concept. While it might serve your usecase well, expect things
11//! to break. Contributions and suggestions are wellcome!
12//!
13//! # Example: Pulling all Parameters from a Vehicle
14//!
15//! In this example the subscribe method is utilized to collect all parameters of the vehicle.
16//!
17//! ```
18//! use std::collections::HashMap;
19//! use async_mavlink::prelude::*;
20//! use mavlink::{MavlinkVersion, common::*};
21//! use smol::prelude::*;
22//!
23//! # fn main()->Result<(),AsyncMavlinkError>{
24//! smol::block_on(async {
25//! # smol::spawn(async {
26//! # let mut conn = mavlink::connect("udpout:127.0.0.1:14551").unwrap();
27//! # conn.set_protocol_version(mavlink::MavlinkVersion::V1);
28//! # loop {
29//! # let mut value = PARAM_VALUE_DATA::default();
30//! # value.param_id = to_char_arr("param_1");
31//! # value.param_value = 13.0;
32//! # value.param_count = 1;
33//! # conn.send_default(&MavMessage::PARAM_VALUE(value));
34//! # smol::Timer::after(std::time::Duration::from_millis(10)).await;
35//! # }
36//! # }).detach();
37//! // connect
38//! let (conn, future) = AsyncMavConn::new("udpin:127.0.0.1:14551", MavlinkVersion::V1)?;
39//!
40//! // start event loop
41//! smol::spawn(async move { future.await }).detach();
42//!
43//! // we want to subscribe to PARAM_VALUE messages
44//! let msg_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
45//!
46//! // subscribe to PARAM_VALUE message
47//! let mut stream = conn.subscribe(msg_type).await?;
48//!
49//! // and send one PARAM_REQUEST_LIST message
50//! let msg_request = MavMessage::PARAM_REQUEST_LIST(Default::default());
51//! conn.send_default(&msg_request).await?;
52//! let mut parameters = HashMap::new();
53//!
54//! // receive all parameters and store them in a HashMap
55//! while let Some(MavMessage::PARAM_VALUE(data)) = (stream.next()).await {
56//! parameters.insert(to_string(&data.param_id), data.param_value);
57//! if data.param_count as usize == parameters.len(){
58//! break;
59//! }
60//! }
61//!
62//! // do something with parameters
63//! # Ok(())
64//! })
65//! # }
66//! ```
67#![deny(missing_docs)]
68#![deny(unsafe_code)]
69
70use std::collections::HashMap;
71use std::pin::Pin;
72use std::sync::Arc;
73use std::time::Instant;
74
75use arc_swap::ArcSwapOption;
76use blocking::Unblock;
77use futures::{
78 channel::{
79 mpsc::{self, UnboundedSender},
80 oneshot,
81 },
82 future::Either,
83 prelude::*,
84 StreamExt,
85};
86
87use mavlink::{MavHeader, Message};
88
89pub mod prelude;
90mod types;
91mod util;
92
93pub use types::{AsyncMavlinkError, MavMessageType};
94pub use util::{to_char_arr, to_string};
95
96/// An async adapter for a MAVLink connection
97///
98/// Offers high level functionality to interact with a MAVLink vehicle in an async fashion.
99pub struct AsyncMavConn<M: mavlink::Message> {
100 task_dispatcher: UnboundedSender<Task<M>>,
101 last_heartbeat: Arc<ArcSwapOption<Instant>>,
102}
103
104enum Task<M: mavlink::Message> {
105 Emit {
106 header: MavHeader,
107 message: M,
108 backchannel: oneshot::Sender<Result<(), AsyncMavlinkError>>,
109 },
110 Subscribe {
111 message_type: MavMessageType<M>,
112 backchannel: UnboundedSender<M>,
113 },
114}
115
116// TODO make this failable if no heartbeat is received
117impl<M: 'static + mavlink::Message + Clone + Send + Sync> AsyncMavConn<M> {
118 /// Construct a new MavlinkConnectionHandler
119 ///
120 /// # Arguments
121 ///
122 /// * `address` - MAVLink connection `&str`. Equivalent to the `address` argument in
123 /// [mavlink::connect](https://docs.rs/mavlink/*/mavlink/fn.connect.html)
124 ///
125 /// # Examples
126 ///
127 /// ```
128 /// use async_mavlink::prelude::*;
129 /// use mavlink::{MavlinkVersion, common::MavMessage};
130 /// use smol::prelude::*;
131 ///
132 /// # fn main() -> Result<(),AsyncMavlinkError> {
133 /// smol::block_on(async {
134 /// let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.2:14551", MavlinkVersion::V1)?;
135 /// smol::spawn(async move { future.await }).detach();
136 /// // ...
137 /// # conn.send_default(&MavMessage::HEARTBEAT(Default::default())).await?;
138 /// # Ok(())
139 /// })
140 /// # }
141 /// ```
142 pub fn new(
143 address: &str,
144 mavlink_version: mavlink::MavlinkVersion,
145 ) -> Result<(Arc<Self>, impl Future<Output = impl Send> + Send), AsyncMavlinkError> {
146 let mut conn = mavlink::connect::<M>(address)?;
147 conn.set_protocol_version(mavlink_version);
148 let (task_dispatcher, incoming_tasks) = mpsc::unbounded();
149 let last_heartbeat = Arc::new(ArcSwapOption::from(None));
150
151 let f = {
152 let last_heartbeat = last_heartbeat.clone();
153 async move {
154 let mut subscriptions: HashMap<_, Vec<UnboundedSender<M>>> = HashMap::new();
155 let conn = Arc::new(conn);
156
157 let messages_iter = std::iter::repeat_with({
158 let conn = conn.clone();
159 move || conn.recv()
160 });
161 let messages = Unblock::new(messages_iter).map(Either::Right);
162 let operations = incoming_tasks.map(Either::Left);
163
164 let mut combined = stream::select(operations, messages);
165
166 let heartbeat_id =
167 mavlink::common::MavMessage::HEARTBEAT(Default::default()).message_id();
168
169 // Event loop
170 //
171 // In here we process everything. By making this not concurrent itself, we can
172 // afford not to use locks at all!
173 loop {
174 // TODO Evaluate whether this unwrap is fallible
175 match combined.next().await.unwrap() {
176 // A new subscription
177 Either::Left(Task::Subscribe {
178 message_type,
179 backchannel,
180 }) => {
181 // find existing subscriptions for this message
182 let message_subs = subscriptions
183 .entry(message_type)
184 .or_insert_with(|| Vec::with_capacity(1));
185 // and add this channel to it.
186 message_subs.push(backchannel);
187 }
188 // A new message to be emitted
189 Either::Left(Task::Emit {
190 header,
191 message,
192 backchannel,
193 }) => {
194 // Send the message and send back any error which occurred on sending
195 let result = conn.send(&header, &message).map_err(|e| e.into());
196 let _ = backchannel.send(result); // TODO handle this
197 }
198 Either::Right(Ok((_header, msg))) => {
199 if msg.message_id() == heartbeat_id {
200 last_heartbeat.rcu(|_| Some(Arc::new(Instant::now())));
201 }
202 subscriptions
203 .entry(MavMessageType::new(&msg))
204 .or_insert_with(Vec::new)
205 .retain(|mut backchannel| match backchannel.is_closed() {
206 true => false,
207 false => {
208 futures::executor::block_on(backchannel.send(msg.clone()))
209 .expect("unable to do this");
210 true
211 }
212 });
213 }
214 _ => {}
215 }
216 }
217 }
218 };
219
220 Ok((
221 Arc::new(Self {
222 task_dispatcher,
223 last_heartbeat,
224 }),
225 Box::pin(f),
226 ))
227 }
228
229 /// Subscribe to all new MavMessages of the given MavMessageType
230 ///
231 /// This returns a never-ending Stream of MavMessages.
232 ///
233 /// # Arguments
234 ///
235 /// * `message_type` - `MavMessageType` of the desired messages
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// # use std::time::Duration;
241 /// # use futures::prelude::*;
242 /// # use async_mavlink::prelude::*;
243 /// # use mavlink::{MavlinkVersion, common::MavMessage};
244 /// # fn main() -> Result<(),AsyncMavlinkError> {
245 /// # smol::block_on(async {
246 /// # let (conn, future) = AsyncMavConn::new("udpin:127.0.0.3:14551", MavlinkVersion::V1)?;
247 /// # smol::spawn(async move { future.await; }).detach();
248 /// # smol::spawn(async {
249 /// # let mut conn = mavlink::connect("udpout:127.0.0.3:14551").unwrap();
250 /// # conn.set_protocol_version(mavlink::MavlinkVersion::V1);
251 /// # loop {
252 /// # conn.send_default(&MavMessage::PARAM_VALUE(Default::default()));
253 /// # smol::Timer::after(Duration::from_millis(10)).await;
254 /// # }
255 /// # }).detach();
256 /// let message_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
257 /// let mut stream = conn.subscribe(message_type).await?;
258 /// while let Some(MavMessage::PARAM_VALUE(data)) = (stream.next()).await {
259 /// // do something with `data`
260 /// # break;
261 /// }
262 /// # Ok(()) })}
263 /// ```
264 pub async fn subscribe(
265 &self,
266 message_type: MavMessageType<M>,
267 ) -> Result<Pin<Box<dyn Stream<Item = M>>>, AsyncMavlinkError> {
268 let (backchannel, rx) = mpsc::unbounded();
269
270 self.task_dispatcher
271 .clone()
272 .send(Task::Subscribe {
273 message_type,
274 backchannel,
275 })
276 .await?; // infailable
277 Ok(Box::pin(rx))
278 }
279
280 /// Awaits the next MavMessage of the given MavMessageType
281 ///
282 /// # Arguments
283 ///
284 /// * `message_type` - `MavMessageType` of the desired messages
285 ///
286 /// # Examples
287 ///
288 /// ```
289 /// # use std::time::Duration;
290 /// # use futures::prelude::*;
291 /// # use async_mavlink::prelude::*;
292 /// # use mavlink::{MavlinkVersion, common::MavMessage};
293 /// # fn main() -> Result<(),AsyncMavlinkError> {
294 /// # smol::block_on(async {
295 /// # let (conn, future) = AsyncMavConn::new("udpin:127.0.0.4:14551", MavlinkVersion::V1)?;
296 /// # smol::spawn(async move { future.await; }).detach();
297 /// # smol::spawn(async {
298 /// # let mut conn = mavlink::connect("udpout:127.0.0.4:14551").unwrap();
299 /// # conn.set_protocol_version(mavlink::MavlinkVersion::V1);
300 /// # loop {
301 /// # let mut header = mavlink::MavHeader::default();
302 /// # header.system_id = 0;
303 /// # conn.send(&header, &MavMessage::PARAM_VALUE(Default::default()));
304 /// # smol::Timer::after(Duration::from_millis(10)).await;
305 /// # }
306 /// # }).detach();
307 /// let message_type = MavMessageType::new(&MavMessage::PARAM_VALUE(Default::default()));
308 ///
309 /// if let MavMessage::PARAM_VALUE(data) = conn.request(message_type).await {
310 /// // do something with `data`
311 /// }
312 /// # Ok(()) })}
313 /// ```
314 pub async fn request(&self, message_type: MavMessageType<M>) -> M {
315 let (backchannel, mut rx) = mpsc::unbounded();
316 self.task_dispatcher
317 .clone()
318 .send(Task::Subscribe {
319 message_type,
320 backchannel,
321 })
322 .await
323 .unwrap(); //this may never fail
324 rx.next().map(|m| m.expect("Oh no!")).await
325 }
326
327 /// Send a `MavMessage` to the vehicle
328 ///
329 /// # Arguments
330 ///
331 /// * `message` - `MavMessage` to send
332 ///
333 /// # Examples
334 ///
335 /// ```
336 /// # use std::time::Duration;
337 /// # use futures::prelude::*;
338 /// # use async_mavlink::AsyncMavConn;
339 /// # use async_mavlink::prelude::*;
340 /// # use mavlink::{MavHeader, MavlinkVersion, common::*};
341 /// # fn main() -> Result<(),AsyncMavlinkError> {
342 /// # smol::block_on(async {
343 /// # let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.5:14551", MavlinkVersion::V1)?;
344 /// # smol::spawn(async move { future.await; }).detach();
345 /// let header = MavHeader::default();
346 /// let message = MavMessage::PARAM_REQUEST_LIST(PARAM_REQUEST_LIST_DATA {
347 /// target_component: 0,
348 /// target_system: 0,
349 /// });
350 ///
351 /// conn.send(&header, &message).await?;
352 /// # Ok(())
353 /// # })}
354 /// ```
355 pub async fn send(&self, header: &MavHeader, message: &M) -> Result<(), AsyncMavlinkError> {
356 let (backchannel, receiver) = oneshot::channel();
357 self.task_dispatcher
358 .clone() // TODO get rid of clone
359 .send(Task::Emit {
360 header: *header,
361 message: message.clone(),
362 backchannel,
363 })
364 .await
365 .map_err(AsyncMavlinkError::from)?;
366 receiver.await.map_err(AsyncMavlinkError::from)?
367 }
368
369 /// Send a `MavMessage` to the vehicle
370 ///
371 /// # Arguments
372 ///
373 /// * `message` - `MavMessage` to send
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// # use async_mavlink::prelude::*;
379 /// # use mavlink::{MavlinkVersion,common::*};
380 /// # fn main() -> Result<(), AsyncMavlinkError> {
381 /// # use async_mavlink::prelude::AsyncMavlinkError;
382 /// # smol::block_on(async {
383 /// # let (conn, future) = AsyncMavConn::new("udpbcast:127.0.0.6:14551", MavlinkVersion::V1)?;
384 /// # smol::spawn(async move { future.await; }).detach();
385 /// let message = MavMessage::PARAM_REQUEST_LIST(PARAM_REQUEST_LIST_DATA {
386 /// target_component: 0,
387 /// target_system: 0,
388 /// });
389 ///
390 /// conn.send_default(&message).await?;
391 /// # Ok(())
392 /// # })}
393 /// ```
394 pub async fn send_default(&self, message: &M) -> Result<(), AsyncMavlinkError> {
395 Ok(self.send(&MavHeader::default(), message).await?)
396 }
397
398 /// Returns the `Instant` from the last received HEARTBEAT
399 pub async fn last_heartbeat(&self) -> Option<Instant> {
400 self.last_heartbeat.load_full().map(|arc| *arc)
401 }
402}
403
404#[cfg(test)]
405mod test {
406 use super::*;
407 use mavlink::common::*;
408 use std::time::Duration;
409 async fn new_conn<M: 'static + mavlink::Message + Clone + Send + Sync>(
410 arg: &str,
411 ) -> Arc<AsyncMavConn<M>> {
412 let (conn, future) = AsyncMavConn::new(arg, mavlink::MavlinkVersion::V1).unwrap();
413 smol::spawn(async move { future.await }).detach();
414 conn
415 }
416
417 #[test]
418 fn subscribe() -> Result<(), AsyncMavlinkError> {
419 smol::block_on(async {
420 let conn = new_conn("udpin:127.0.0.7:14551").await;
421
422 smol::spawn(async move {
423 let mut conn = mavlink::connect("udpout:127.0.0.7:14551").unwrap();
424 conn.set_protocol_version(mavlink::MavlinkVersion::V1);
425 loop {
426 conn.send_default(&MavMessage::HEARTBEAT(Default::default()))
427 .unwrap();
428 smol::Timer::after(Duration::from_millis(10)).await;
429 }
430 })
431 .detach();
432
433 let message_type = MavMessageType::new(&MavMessage::HEARTBEAT(Default::default()));
434 let mut stream = conn.subscribe(message_type).await?;
435
436 let mut i = 0;
437
438 while let Some(MavMessage::HEARTBEAT(_data)) = (stream.next()).await {
439 i += 1;
440 if i > 5 {
441 break;
442 }
443 }
444
445 Ok(())
446 })
447 }
448
449 #[test]
450 fn send() -> Result<(), AsyncMavlinkError> {
451 smol::block_on(async {
452 let conn = new_conn("udpout:127.0.0.8:14551").await;
453
454 let mut raw_conn = mavlink::connect("udpin:127.0.0.8:14551").unwrap();
455 raw_conn.set_protocol_version(mavlink::MavlinkVersion::V1);
456 let received = blocking::unblock(move || raw_conn.recv());
457
458 smol::spawn(async move {
459 let header = mavlink::MavHeader::default();
460 let message = MavMessage::HEARTBEAT(HEARTBEAT_DATA::default());
461 for _ in 0usize..100 {
462 conn.send(&header, &message).await.unwrap();
463 }
464 })
465 .detach();
466
467 let received = received.await;
468 assert!(received.is_ok());
469
470 if let Ok((_, MavMessage::HEARTBEAT(_))) = received {
471 } else {
472 panic!("received wrong message");
473 }
474
475 Ok(())
476 })
477 }
478
479 #[test]
480 fn send_default() -> std::io::Result<()> {
481 smol::block_on(async {
482 let conn = new_conn("udpout:127.0.0.9:14551").await;
483
484 let mut raw_conn = mavlink::connect("udpin:127.0.0.9:14551").unwrap();
485 raw_conn.set_protocol_version(mavlink::MavlinkVersion::V1);
486 let received = blocking::unblock(move || raw_conn.recv());
487
488 smol::spawn(async move {
489 let message = MavMessage::HEARTBEAT(HEARTBEAT_DATA::default());
490 for _ in 0usize..100 {
491 conn.send_default(&message).await.unwrap();
492 }
493 })
494 .detach();
495
496 let received = received.await;
497 assert!(received.is_ok());
498
499 if let Ok((_, MavMessage::HEARTBEAT(_))) = received {
500 } else {
501 panic!("received wrong message");
502 }
503
504 Ok(())
505 })
506 }
507}