Skip to main content

crazyflie_lib/
crtp_utils.rs

1//! Various CRTP utils used by the lib
2//!
3//! These functionalities are currently all private, some might be useful for the user code as well, lets make them
4//! public when needed.
5
6use crate::{Error, Result};
7use async_trait::async_trait;
8use crazyflie_link::Packet;
9use flume as channel;
10use flume::{Receiver, Sender};
11use serde::{Deserialize, Serialize};
12use tokio::task::JoinHandle;
13use std::collections::BTreeMap;
14use std::sync::atomic::AtomicBool;
15use std::sync::atomic::Ordering::Relaxed;
16use std::time::Duration;
17use std::{
18    convert::{TryFrom, TryInto},
19    sync::Arc,
20};
21
22pub struct CrtpDispatch {
23    link: Arc<crazyflie_link::Connection>,
24    // port_callbacks: [Arc<Mutex<Option<Sender<Packet>>>>; 15]
25    port_channels: BTreeMap<u8, Sender<Packet>>,
26    disconnect: Arc<AtomicBool>
27}
28
29impl CrtpDispatch {
30    pub fn new(
31        link: Arc<crazyflie_link::Connection>,
32        disconnect: Arc<AtomicBool>,
33    ) -> Self {
34        CrtpDispatch {
35            link,
36            port_channels: BTreeMap::new(),
37            disconnect
38        }
39    }
40
41    #[allow(clippy::map_entry)]
42    pub fn get_port_receiver(&mut self, port: u8) -> Option<Receiver<Packet>> {
43        if self.port_channels.contains_key(&port) {
44            None
45        } else {
46            let (tx, rx) = channel::unbounded();
47            self.port_channels.insert(port, tx);
48            Some(rx)
49        }
50    }
51
52    pub async fn run(self) -> Result<JoinHandle<()>> {
53        let link = self.link.clone();
54        Ok(tokio::spawn(async move {
55                let _ = &self;
56                while !self.disconnect.load(Relaxed) {                  
57                    match tokio::time::timeout(Duration::from_millis(200), link.recv_packet())
58                        .await
59                    {
60                        Ok(Ok(packet)) => {
61                            if packet.get_port() < 16 {
62                                let channel = self.port_channels.get(&packet.get_port()); // get(packet.get_port()).lock().await;
63                                if let Some(channel) = channel.as_ref() {
64                                    let _ = channel.send_async(packet).await;
65                                }
66                            }
67                        }
68                        Err(_) => continue,
69                        Ok(Err(_)) => return, // Other side of the channel disappeared, link closed
70                    }
71                }
72            })
73          )
74    }
75}
76
77#[async_trait]
78pub(crate) trait WaitForPacket {
79    async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet>;
80}
81
82#[async_trait]
83impl WaitForPacket for channel::Receiver<Packet> {
84    async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet> {
85        let mut pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
86
87        loop {
88            if pk.get_port() == port
89                && pk.get_channel() == channel
90                && pk.get_data().starts_with(data_prefix)
91            {
92                break;
93            }
94            pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
95        }
96
97        Ok(pk)
98    }
99}
100
101const TOC_CHANNEL: u8 = 0;
102const TOC_GET_ITEM: u8 = 2;
103const TOC_INFO: u8 = 3;
104/// Cache format version, included in the cache key.
105/// Bump when ParamItemInfo or LogItemInfo serialization changes.
106const TOC_CACHE_VERSION: u8 = 1;
107
108pub(crate) async fn fetch_toc<C, T, E>(
109    port: u8,
110    uplink: channel::Sender<Packet>,
111    downlink: channel::Receiver<Packet>,
112    toc_cache: C,
113) -> Result<std::collections::BTreeMap<String, (u16, T)>>
114where
115    C: TocCache,
116    T: TryFrom<u8, Error = E> + Serialize + for<'de> Deserialize<'de>,
117    E: Into<Error>,
118{
119    let pk = Packet::new(port, 0, vec![TOC_INFO]);
120    uplink
121        .send_async(pk)
122        .await
123        .map_err(|_| Error::Disconnected)?;
124
125    let pk = downlink.wait_packet(port, TOC_CHANNEL, &[TOC_INFO]).await?;
126
127    let toc_len = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
128    let toc_crc32 = u32::from_le_bytes(pk.get_data()[3..7].try_into()?);
129
130    let mut toc = std::collections::BTreeMap::new();
131    let crc_bytes = toc_crc32.to_le_bytes();
132    let cache_key: [u8; 5] = [TOC_CACHE_VERSION, crc_bytes[0], crc_bytes[1], crc_bytes[2], crc_bytes[3]];
133
134    // Check cache first
135    if let Some(toc_str) = toc_cache.get_toc(&cache_key) {
136        toc = serde_json::from_str(&toc_str).map_err(|e| Error::InvalidParameter(format!("Failed to deserialize TOC cache: {}", e)))?;
137        return Ok(toc);
138    }
139
140    // Fetch TOC from device
141    for i in 0..toc_len {
142        let pk = Packet::new(
143            port,
144            0,
145            vec![TOC_GET_ITEM, (i & 0x0ff) as u8, (i >> 8) as u8],
146        );
147        uplink
148            .send_async(pk)
149            .await
150            .map_err(|_| Error::Disconnected)?;
151
152        let pk = downlink.wait_packet(port, 0, &[TOC_GET_ITEM]).await?;
153
154        let mut strings = pk.get_data()[4..].split(|b| *b == 0);
155        let group = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
156        let name = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
157
158        let id = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
159        let item_type = pk.get_data()[3].try_into().map_err(|e: E| e.into())?;
160        toc.insert(format!("{}.{}", group, name), (id, item_type));
161    }
162
163    // Store in cache
164    let toc_str = serde_json::to_string(&toc).map_err(|e| Error::InvalidParameter(format!("Failed to serialize TOC: {}", e)))?;
165    toc_cache.store_toc(&cache_key, &toc_str);
166
167    Ok(toc)
168}
169
170pub fn crtp_channel_dispatcher(
171    downlink: channel::Receiver<Packet>,
172) -> (
173    Receiver<Packet>,
174    Receiver<Packet>,
175    Receiver<Packet>,
176    Receiver<Packet>,
177) {
178    let (mut senders, mut receivers) = (Vec::new(), Vec::new());
179
180    for _ in 0..4 {
181        let (tx, rx) = channel::unbounded();
182        senders.push(tx);
183        receivers.insert(0, rx);
184    }
185
186    tokio::spawn(async move {
187        while let Ok(pk) = downlink.recv_async().await {
188            if pk.get_channel() < 4 {
189                let _ = senders[pk.get_channel() as usize].send_async(pk).await;
190            }
191        }
192    });
193
194    // The 4 unwraps are guaranteed to succeed by design (the list is 4 item long)
195    (
196        receivers.pop().unwrap(),
197        receivers.pop().unwrap(),
198        receivers.pop().unwrap(),
199        receivers.pop().unwrap(),
200    )
201}
202
203/// Null implementation of ToC cache to be used when no caching is needed.
204#[derive(Clone)]
205pub struct NoTocCache;
206
207impl TocCache for NoTocCache {
208    fn get_toc(&self, _key: &[u8]) -> Option<String> {
209        None
210    }
211
212    fn store_toc(&self, _key: &[u8], _toc: &str) {
213        // No-op: this cache doesn't store anything
214    }
215}
216
217/// A trait for caching Table of Contents (TOC) data.
218///
219/// This trait provides methods for storing and retrieving TOC information
220/// using an opaque byte key. Implementations can use this to avoid
221/// re-fetching TOC data when the key matches a cached version.
222///
223/// The key is constructed by the library and should be treated as an opaque
224/// identifier. Implementors are free to encode it in whatever way suits their
225/// storage backend (e.g., hex encoding for filenames, raw bytes for in-memory maps).
226///
227/// # Concurrency
228///
229/// Both methods take `&self` to allow concurrent reads during parallel TOC fetching
230/// (Log and Param subsystems fetch their TOCs simultaneously). Implementations should
231/// use interior mutability (e.g., `RwLock`) for thread-safe caching.
232///
233/// # Example
234///
235/// ```rust
236/// use std::sync::{Arc, RwLock};
237/// use std::collections::HashMap;
238/// use crazyflie_lib::TocCache;
239///
240/// #[derive(Clone)]
241/// struct InMemoryCache {
242///     data: Arc<RwLock<HashMap<Vec<u8>, String>>>,
243/// }
244///
245/// impl TocCache for InMemoryCache {
246///     fn get_toc(&self, key: &[u8]) -> Option<String> {
247///         self.data.read().ok()?.get(key).cloned()
248///     }
249///
250///     fn store_toc(&self, key: &[u8], toc: &str) {
251///         if let Ok(mut lock) = self.data.write() {
252///             lock.insert(key.to_vec(), toc.to_string());
253///         }
254///     }
255/// }
256/// ```
257pub trait TocCache: Clone + Send + Sync + 'static
258{
259    /// Retrieves a cached TOC string based on the provided key.
260    ///
261    /// # Arguments
262    ///
263    /// * `key` - An opaque byte key used to identify the TOC.
264    ///
265    /// # Returns
266    ///
267    /// An `Option<String>` containing the cached TOC if it exists, or `None` if not found.
268    fn get_toc(&self, key: &[u8]) -> Option<String>;
269
270    /// Stores a TOC string associated with the provided key.
271    ///
272    /// # Arguments
273    ///
274    /// * `key` - An opaque byte key used to identify the TOC.
275    /// * `toc` - The TOC string to be stored.
276    fn store_toc(&self, key: &[u8], toc: &str);
277}