crazyflie_lib/
crtp_utils.rs1use crate::{Error, Result};
7use async_trait::async_trait;
8use crazyflie_link::Packet;
9use flume as channel;
10use flume::{Receiver, Sender};
11use serde::{Deserialize, Serialize};
12use tokio::task::JoinHandle;
13use std::collections::BTreeMap;
14use std::sync::atomic::AtomicBool;
15use std::sync::atomic::Ordering::Relaxed;
16use std::time::Duration;
17use std::{
18 convert::{TryFrom, TryInto},
19 sync::Arc,
20};
21
22pub struct CrtpDispatch {
23 link: Arc<crazyflie_link::Connection>,
24 port_channels: BTreeMap<u8, Sender<Packet>>,
26 disconnect: Arc<AtomicBool>
27}
28
29impl CrtpDispatch {
30 pub fn new(
31 link: Arc<crazyflie_link::Connection>,
32 disconnect: Arc<AtomicBool>,
33 ) -> Self {
34 CrtpDispatch {
35 link,
36 port_channels: BTreeMap::new(),
37 disconnect
38 }
39 }
40
41 #[allow(clippy::map_entry)]
42 pub fn get_port_receiver(&mut self, port: u8) -> Option<Receiver<Packet>> {
43 if self.port_channels.contains_key(&port) {
44 None
45 } else {
46 let (tx, rx) = channel::unbounded();
47 self.port_channels.insert(port, tx);
48 Some(rx)
49 }
50 }
51
52 pub async fn run(self) -> Result<JoinHandle<()>> {
53 let link = self.link.clone();
54 Ok(tokio::spawn(async move {
55 let _ = &self;
56 while !self.disconnect.load(Relaxed) {
57 match tokio::time::timeout(Duration::from_millis(200), link.recv_packet())
58 .await
59 {
60 Ok(Ok(packet)) => {
61 if packet.get_port() < 16 {
62 let channel = self.port_channels.get(&packet.get_port()); if let Some(channel) = channel.as_ref() {
64 let _ = channel.send_async(packet).await;
65 }
66 }
67 }
68 Err(_) => continue,
69 Ok(Err(_)) => return, }
71 }
72 })
73 )
74 }
75}
76
77#[async_trait]
78pub(crate) trait WaitForPacket {
79 async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet>;
80}
81
82#[async_trait]
83impl WaitForPacket for channel::Receiver<Packet> {
84 async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet> {
85 let mut pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
86
87 loop {
88 if pk.get_port() == port
89 && pk.get_channel() == channel
90 && pk.get_data().starts_with(data_prefix)
91 {
92 break;
93 }
94 pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
95 }
96
97 Ok(pk)
98 }
99}
100
101const TOC_CHANNEL: u8 = 0;
102const TOC_GET_ITEM: u8 = 2;
103const TOC_INFO: u8 = 3;
104
105pub(crate) async fn fetch_toc<C, T, E>(
106 port: u8,
107 uplink: channel::Sender<Packet>,
108 downlink: channel::Receiver<Packet>,
109 toc_cache: C,
110) -> Result<std::collections::BTreeMap<String, (u16, T)>>
111where
112 C: TocCache,
113 T: TryFrom<u8, Error = E> + Serialize + for<'de> Deserialize<'de>,
114 E: Into<Error>,
115{
116 let pk = Packet::new(port, 0, vec![TOC_INFO]);
117 uplink
118 .send_async(pk)
119 .await
120 .map_err(|_| Error::Disconnected)?;
121
122 let pk = downlink.wait_packet(port, TOC_CHANNEL, &[TOC_INFO]).await?;
123
124 let toc_len = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
125 let toc_crc32 = u32::from_le_bytes(pk.get_data()[3..7].try_into()?);
126
127 let mut toc = std::collections::BTreeMap::new();
128
129 if let Some(toc_str) = toc_cache.get_toc(toc_crc32) {
131 toc = serde_json::from_str(&toc_str).map_err(|e| Error::InvalidParameter(format!("Failed to deserialize TOC cache: {}", e)))?;
132 return Ok(toc);
133 }
134
135 for i in 0..toc_len {
137 let pk = Packet::new(
138 port,
139 0,
140 vec![TOC_GET_ITEM, (i & 0x0ff) as u8, (i >> 8) as u8],
141 );
142 uplink
143 .send_async(pk)
144 .await
145 .map_err(|_| Error::Disconnected)?;
146
147 let pk = downlink.wait_packet(port, 0, &[TOC_GET_ITEM]).await?;
148
149 let mut strings = pk.get_data()[4..].split(|b| *b == 0);
150 let group = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
151 let name = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
152
153 let id = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
154 let item_type = pk.get_data()[3].try_into().map_err(|e: E| e.into())?;
155 toc.insert(format!("{}.{}", group, name), (id, item_type));
156 }
157
158 let toc_str = serde_json::to_string(&toc).map_err(|e| Error::InvalidParameter(format!("Failed to serialize TOC: {}", e)))?;
160 toc_cache.store_toc(toc_crc32, &toc_str);
161
162 Ok(toc)
163}
164
165pub fn crtp_channel_dispatcher(
166 downlink: channel::Receiver<Packet>,
167) -> (
168 Receiver<Packet>,
169 Receiver<Packet>,
170 Receiver<Packet>,
171 Receiver<Packet>,
172) {
173 let (mut senders, mut receivers) = (Vec::new(), Vec::new());
174
175 for _ in 0..4 {
176 let (tx, rx) = channel::unbounded();
177 senders.push(tx);
178 receivers.insert(0, rx);
179 }
180
181 tokio::spawn(async move {
182 while let Ok(pk) = downlink.recv_async().await {
183 if pk.get_channel() < 4 {
184 let _ = senders[pk.get_channel() as usize].send_async(pk).await;
185 }
186 }
187 });
188
189 (
191 receivers.pop().unwrap(),
192 receivers.pop().unwrap(),
193 receivers.pop().unwrap(),
194 receivers.pop().unwrap(),
195 )
196}
197
198#[derive(Clone)]
200pub struct NoTocCache;
201
202impl TocCache for NoTocCache {
203 fn get_toc(&self, _crc32: u32) -> Option<String> {
204 None
205 }
206
207 fn store_toc(&self, _crc32: u32, _toc: &str) {
208 }
210}
211
212pub trait TocCache: Clone + Send + Sync + 'static
249{
250 fn get_toc(&self, crc32: u32) -> Option<String>;
260
261 fn store_toc(&self, crc32: u32, toc: &str);
268}