crazyflie_lib/
crtp_utils.rs

1//! Various CRTP utils used by the lib
2//!
3//! These functionalities are currently all private, some might be useful for the user code as well, lets make them
4//! public when needed.
5
6use crate::{Error, Result};
7use async_trait::async_trait;
8use crazyflie_link::Packet;
9use flume as channel;
10use flume::{Receiver, Sender};
11use serde::{Deserialize, Serialize};
12use tokio::task::JoinHandle;
13use std::collections::BTreeMap;
14use std::sync::atomic::AtomicBool;
15use std::sync::atomic::Ordering::Relaxed;
16use std::time::Duration;
17use std::{
18    convert::{TryFrom, TryInto},
19    sync::Arc,
20};
21
22pub struct CrtpDispatch {
23    link: Arc<crazyflie_link::Connection>,
24    // port_callbacks: [Arc<Mutex<Option<Sender<Packet>>>>; 15]
25    port_channels: BTreeMap<u8, Sender<Packet>>,
26    disconnect: Arc<AtomicBool>
27}
28
29impl CrtpDispatch {
30    pub fn new(
31        link: Arc<crazyflie_link::Connection>,
32        disconnect: Arc<AtomicBool>,
33    ) -> Self {
34        CrtpDispatch {
35            link,
36            port_channels: BTreeMap::new(),
37            disconnect
38        }
39    }
40
41    #[allow(clippy::map_entry)]
42    pub fn get_port_receiver(&mut self, port: u8) -> Option<Receiver<Packet>> {
43        if self.port_channels.contains_key(&port) {
44            None
45        } else {
46            let (tx, rx) = channel::unbounded();
47            self.port_channels.insert(port, tx);
48            Some(rx)
49        }
50    }
51
52    pub async fn run(self) -> Result<JoinHandle<()>> {
53        let link = self.link.clone();
54        Ok(tokio::spawn(async move {
55                let _ = &self;
56                while !self.disconnect.load(Relaxed) {                  
57                    match tokio::time::timeout(Duration::from_millis(200), link.recv_packet())
58                        .await
59                    {
60                        Ok(Ok(packet)) => {
61                            if packet.get_port() < 16 {
62                                let channel = self.port_channels.get(&packet.get_port()); // get(packet.get_port()).lock().await;
63                                if let Some(channel) = channel.as_ref() {
64                                    let _ = channel.send_async(packet).await;
65                                }
66                            }
67                        }
68                        Err(_) => continue,
69                        Ok(Err(_)) => return, // Other side of the channel disappeared, link closed
70                    }
71                }
72            })
73          )
74    }
75}
76
77#[async_trait]
78pub(crate) trait WaitForPacket {
79    async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet>;
80}
81
82#[async_trait]
83impl WaitForPacket for channel::Receiver<Packet> {
84    async fn wait_packet(&self, port: u8, channel: u8, data_prefix: &[u8]) -> Result<Packet> {
85        let mut pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
86
87        loop {
88            if pk.get_port() == port
89                && pk.get_channel() == channel
90                && pk.get_data().starts_with(data_prefix)
91            {
92                break;
93            }
94            pk = self.recv_async().await.ok().ok_or(Error::Disconnected)?;
95        }
96
97        Ok(pk)
98    }
99}
100
101const TOC_CHANNEL: u8 = 0;
102const TOC_GET_ITEM: u8 = 2;
103const TOC_INFO: u8 = 3;
104
105pub(crate) async fn fetch_toc<C, T, E>(
106    port: u8,
107    uplink: channel::Sender<Packet>,
108    downlink: channel::Receiver<Packet>,
109    toc_cache: C,
110) -> Result<std::collections::BTreeMap<String, (u16, T)>>
111where
112    C: TocCache,
113    T: TryFrom<u8, Error = E> + Serialize + for<'de> Deserialize<'de>,
114    E: Into<Error>,
115{
116    let pk = Packet::new(port, 0, vec![TOC_INFO]);
117    uplink
118        .send_async(pk)
119        .await
120        .map_err(|_| Error::Disconnected)?;
121
122    let pk = downlink.wait_packet(port, TOC_CHANNEL, &[TOC_INFO]).await?;
123
124    let toc_len = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
125    let toc_crc32 = u32::from_le_bytes(pk.get_data()[3..7].try_into()?);
126
127    let mut toc = std::collections::BTreeMap::new();
128
129    // Check cache first
130    if let Some(toc_str) = toc_cache.get_toc(toc_crc32) {
131        toc = serde_json::from_str(&toc_str).map_err(|e| Error::InvalidParameter(format!("Failed to deserialize TOC cache: {}", e)))?;
132        return Ok(toc);
133    }
134
135    // Fetch TOC from device
136    for i in 0..toc_len {
137        let pk = Packet::new(
138            port,
139            0,
140            vec![TOC_GET_ITEM, (i & 0x0ff) as u8, (i >> 8) as u8],
141        );
142        uplink
143            .send_async(pk)
144            .await
145            .map_err(|_| Error::Disconnected)?;
146
147        let pk = downlink.wait_packet(port, 0, &[TOC_GET_ITEM]).await?;
148
149        let mut strings = pk.get_data()[4..].split(|b| *b == 0);
150        let group = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
151        let name = String::from_utf8_lossy(strings.next().expect("TOC packet format error"));
152
153        let id = u16::from_le_bytes(pk.get_data()[1..3].try_into()?);
154        let item_type = pk.get_data()[3].try_into().map_err(|e: E| e.into())?;
155        toc.insert(format!("{}.{}", group, name), (id, item_type));
156    }
157
158    // Store in cache
159    let toc_str = serde_json::to_string(&toc).map_err(|e| Error::InvalidParameter(format!("Failed to serialize TOC: {}", e)))?;
160    toc_cache.store_toc(toc_crc32, &toc_str);
161
162    Ok(toc)
163}
164
165pub fn crtp_channel_dispatcher(
166    downlink: channel::Receiver<Packet>,
167) -> (
168    Receiver<Packet>,
169    Receiver<Packet>,
170    Receiver<Packet>,
171    Receiver<Packet>,
172) {
173    let (mut senders, mut receivers) = (Vec::new(), Vec::new());
174
175    for _ in 0..4 {
176        let (tx, rx) = channel::unbounded();
177        senders.push(tx);
178        receivers.insert(0, rx);
179    }
180
181    tokio::spawn(async move {
182        while let Ok(pk) = downlink.recv_async().await {
183            if pk.get_channel() < 4 {
184                let _ = senders[pk.get_channel() as usize].send_async(pk).await;
185            }
186        }
187    });
188
189    // The 4 unwraps are guaranteed to succeed by design (the list is 4 item long)
190    (
191        receivers.pop().unwrap(),
192        receivers.pop().unwrap(),
193        receivers.pop().unwrap(),
194        receivers.pop().unwrap(),
195    )
196}
197
198/// Null implementation of ToC cache to be used when no caching is needed.
199#[derive(Clone)]
200pub struct NoTocCache;
201
202impl TocCache for NoTocCache {
203    fn get_toc(&self, _crc32: u32) -> Option<String> {
204        None
205    }
206
207    fn store_toc(&self, _crc32: u32, _toc: &str) {
208        // No-op: this cache doesn't store anything
209    }
210}
211
212/// A trait for caching Table of Contents (TOC) data.
213///
214/// This trait provides methods for storing and retrieving TOC information
215/// using a CRC32 checksum as the key. Implementations can use this to avoid
216/// re-fetching TOC data when the checksum matches a cached version.
217///
218/// # Concurrency
219///
220/// Both methods take `&self` to allow concurrent reads during parallel TOC fetching
221/// (Log and Param subsystems fetch their TOCs simultaneously). Implementations should
222/// use interior mutability (e.g., `RwLock`) for thread-safe caching.
223///
224/// # Example
225///
226/// ```rust
227/// use std::sync::{Arc, RwLock};
228/// use std::collections::HashMap;
229/// use crazyflie_lib::TocCache;
230///
231/// #[derive(Clone)]
232/// struct InMemoryCache {
233///     data: Arc<RwLock<HashMap<u32, String>>>,
234/// }
235///
236/// impl TocCache for InMemoryCache {
237///     fn get_toc(&self, crc32: u32) -> Option<String> {
238///         self.data.read().ok()?.get(&crc32).cloned()
239///     }
240///
241///     fn store_toc(&self, crc32: u32, toc: &str) {
242///         if let Ok(mut lock) = self.data.write() {
243///             lock.insert(crc32, toc.to_string());
244///         }
245///     }
246/// }
247/// ```
248pub trait TocCache: Clone + Send + Sync + 'static
249{
250    /// Retrieves a cached TOC string based on the provided CRC32 checksum.
251    ///
252    /// # Arguments
253    ///
254    /// * `crc32` - The CRC32 checksum used to identify the TOC.
255    ///
256    /// # Returns
257    ///
258    /// An `Option<String>` containing the cached TOC if it exists, or `None` if not found.
259    fn get_toc(&self, crc32: u32) -> Option<String>;
260
261    /// Stores a TOC string associated with the provided CRC32 checksum.
262    ///
263    /// # Arguments
264    ///
265    /// * `crc32` - The CRC32 checksum used to identify the TOC.
266    /// * `toc` - The TOC string to be stored.
267    fn store_toc(&self, crc32: u32, toc: &str);
268}