crazyflie_lib/
crtp_utils.rs1use crate::{Error, Result};
7use async_trait::async_trait;
8use crazyflie_link::Packet;
9use flume as channel;
10use flume::{Receiver, Sender};
11use serde::{Deserialize, Serialize};
12use tokio::task::JoinHandle;
13use std::collections::BTreeMap;
14use std::sync::atomic::AtomicBool;
15use std::sync::atomic::Ordering::Relaxed;
16use std::time::Duration;
17use std::{
18 convert::{TryFrom, TryInto},
19 sync::Arc,
20};
21
22pub struct CrtpDispatch {
23 link: Arc<crazyflie_link::Connection>,
24 port_channels: BTreeMap<u8, Sender<Packet>>,
26 disconnect: Arc<AtomicBool>
27}
28
29impl CrtpDispatch {
30 pub fn new(
31 link: Arc<crazyflie_link::Connection>,
32 disconnect: Arc<AtomicBool>,
33 ) -> Self {
34 CrtpDispatch {
35 link,
36 port_channels: BTreeMap::new(),
37 disconnect
38 }
39 }
40
41 #[allow(clippy::map_entry)]
42 pub fn get_port_receiver(&mut self, port: u8) -> Option<Receiver<Packet>> {
43 if self.port_channels.contains_key(&port) {
44 None
45 } else {
46 let (tx, rx) = channel::unbounded();
47 self.port_channels.insert(port, tx);
48 Some(rx)
49 }
50 }
51
52 pub async fn run(self) -> Result<JoinHandle<()>> {
53 let link = self.link.clone();
54 Ok(tokio::spawn(async move {
55 let _ = &self;
56 while !self.disconnect.load(Relaxed) {
57 match tokio::time::timeout(Duration::from_millis(200), link.recv_packet())
58 .await
59 {
60 Ok(Ok(packet)) => {
61 if packet.get_port() < 16 {
62 let channel = self.port_channels.get(&packet.get_port()); if let Some(channel) = channel.as_ref() {
64 let _ = channel.send_async(packet).await;
65 }
66 }
67 }
68 Err(_) => continue,
69 Ok(Err(_)) => return, }
71 }
72 })
73 )
74 }
75}
76
77#[async_trait]
78pub(crate) trait WaitForPacket {
79 async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet>;
80}
81
82#[async_trait]
83impl WaitForPacket for channel::Receiver<Packet> {
84 async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet> {
85 let mut pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
86
87 loop {
88 if pk.get_port() == port
89 && pk.get_channel() == channel
90 && pk.get_data().starts_with(data_prefix)
91 {
92 break;
93 }
94 pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
95 }
96
97 Ok(pk)
98 }
99}
100
101const TOC_CHANNEL: u8 = 0;
102const TOC_GET_ITEM: u8 = 2;
103const TOC_INFO: u8 = 3;
104const TOC_CACHE_VERSION: u8 = 1;
107
108pub(crate) async fn fetch_toc<C, T, E>(
109 port: u8,
110 uplink: channel::Sender<Packet>,
111 downlink: channel::Receiver<Packet>,
112 toc_cache: C,
113) -> Result<std::collections::BTreeMap<String, (u16, T)>>
114where
115 C: TocCache,
116 T: TryFrom<u8, Error = E> + Serialize + for<'de> Deserialize<'de>,
117 E: Into<Error>,
118{
119 let pk = Packet::new(port, 0, vec![TOC_INFO]);
120 uplink
121 .send_async(pk)
122 .await
123 .map_err(|_| Error::Disconnected)?;
124
125 let pk = downlink.wait_packet(port, TOC_CHANNEL, &[TOC_INFO]).await?;
126
127 let toc_len = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
128 let toc_crc32 = u32::from_le_bytes(pk.get_data()[3..7].try_into()?);
129
130 let mut toc = std::collections::BTreeMap::new();
131 let crc_bytes = toc_crc32.to_le_bytes();
132 let cache_key: [u8; 5] = [TOC_CACHE_VERSION, crc_bytes[0], crc_bytes[1], crc_bytes[2], crc_bytes[3]];
133
134 if let Some(toc_str) = toc_cache.get_toc(&cache_key) {
136 toc = serde_json::from_str(&toc_str).map_err(|e| Error::InvalidParameter(format!("Failed to deserialize TOC cache: {}", e)))?;
137 return Ok(toc);
138 }
139
140 for i in 0..toc_len {
142 let pk = Packet::new(
143 port,
144 0,
145 vec![TOC_GET_ITEM, (i & 0x0ff) as u8, (i >> 8) as u8],
146 );
147 uplink
148 .send_async(pk)
149 .await
150 .map_err(|_| Error::Disconnected)?;
151
152 let pk = downlink.wait_packet(port, 0, &[TOC_GET_ITEM]).await?;
153
154 let mut strings = pk.get_data()[4..].split(|b| *b == 0);
155 let group = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
156 let name = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
157
158 let id = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
159 let item_type = pk.get_data()[3].try_into().map_err(|e: E| e.into())?;
160 toc.insert(format!("{}.{}", group, name), (id, item_type));
161 }
162
163 let toc_str = serde_json::to_string(&toc).map_err(|e| Error::InvalidParameter(format!("Failed to serialize TOC: {}", e)))?;
165 toc_cache.store_toc(&cache_key, &toc_str);
166
167 Ok(toc)
168}
169
170pub fn crtp_channel_dispatcher(
171 downlink: channel::Receiver<Packet>,
172) -> (
173 Receiver<Packet>,
174 Receiver<Packet>,
175 Receiver<Packet>,
176 Receiver<Packet>,
177) {
178 let (mut senders, mut receivers) = (Vec::new(), Vec::new());
179
180 for _ in 0..4 {
181 let (tx, rx) = channel::unbounded();
182 senders.push(tx);
183 receivers.insert(0, rx);
184 }
185
186 tokio::spawn(async move {
187 while let Ok(pk) = downlink.recv_async().await {
188 if pk.get_channel() < 4 {
189 let _ = senders[pk.get_channel() as usize].send_async(pk).await;
190 }
191 }
192 });
193
194 (
196 receivers.pop().unwrap(),
197 receivers.pop().unwrap(),
198 receivers.pop().unwrap(),
199 receivers.pop().unwrap(),
200 )
201}
202
203#[derive(Clone)]
205pub struct NoTocCache;
206
207impl TocCache for NoTocCache {
208 fn get_toc(&self, _key: &[u8]) -> Option<String> {
209 None
210 }
211
212 fn store_toc(&self, _key: &[u8], _toc: &str) {
213 }
215}
216
217pub trait TocCache: Clone + Send + Sync + 'static
258{
259 fn get_toc(&self, key: &[u8]) -> Option<String>;
269
270 fn store_toc(&self, key: &[u8], toc: &str);
277}