crazyflie_lib/subsystems/
log.rs

1//! # Data logging subsystem
2//!
3//! The Crazyflie log subsystem allows to asynchronously log the value of exposed Crazyflie variables from the ground.
4//!
5//! At connection time, a Table Of Content (TOC) of the log variable is fetched from the Crazyflie which allows to
6//! log variables using their names. To log variable a [LogBlock] needs to be created. The variable to be logged are
7//! added to the LogBlock and then the LogBlock can be started returning a LogStream that will yield the log data.
8//!
9//! ```no_run
10//! # use crazyflie_lib::{Crazyflie, Value, Error, subsystems::log::LogPeriod};
11//! # use crazyflie_link::LinkContext;
12//! # async fn example() -> Result<(), Error> {
13//! # let context = LinkContext::new();
14//! # let cf = Crazyflie::connect_from_uri(
15//! #   &context,
16//! #   "radio://0/60/2M/E7E7E7E7E7",
17//! #   crazyflie_lib::NoTocCache
18//! # ).await?;
19//! // Create the log block
20//! let mut block = cf.log.create_block().await?;
21//!
22//! // Append Variables
23//! block.add_variable("stateEstimate.roll").await?;
24//! block.add_variable("stateEstimate.pitch").await?;
25//! block.add_variable("stateEstimate.yaw").await?;
26//!
27//! // Start the block
28//! let period = LogPeriod::from_millis(100)?;
29//! let stream = block.start(period).await?;
30//!
31//! // Get Data!
32//! while let Ok(data) = stream.next().await {
33//!     println!("Yaw is {:?}", data.data["stateEstimate.yaw"]);
34//! }
35//! # Ok(())
36//! # };
37//! ```
38
39use crate::crtp_utils::{TocCache, WaitForPacket};
40use crate::{Error, Result, Value, ValueType};
41use crazyflie_link::Packet;
42use flume as channel;
43use futures::lock::Mutex;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use std::convert::TryInto;
47use std::sync::Weak;
48use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration};
49
50use crate::crazyflie::LOG_PORT;
51
52/// # Access to the Crazyflie Log Subsystem
53///
54/// This struct provide functions to interact with the Crazyflie Log subsystem.
55///
56/// See the [log module documentation](crate::subsystems::log) for more context and information.
57#[derive(Debug)]
58pub struct Log {
59    uplink: channel::Sender<Packet>,
60    control_downlink: Arc<Mutex<channel::Receiver<Packet>>>,
61    toc: Arc<BTreeMap<String, (u16, LogItemInfo)>>,
62    next_block_id: Mutex<u8>,
63    data_channels: Arc<Mutex<BTreeMap<u8, flume::Sender<Packet>>>>,
64    active_blocks: Mutex<BTreeMap<u8, Weak<()>>>,
65}
66
67fn not_found(name: &str) -> Error {
68    Error::ParamError(format!("Log variable {} not found", name))
69}
70
71const CONTROL_CHANNEL: u8 = 1;
72
73const DELETE_BLOCK: u8 = 2;
74const START_BLOCK: u8 = 3;
75const STOP_BLOCK: u8 = 4;
76const RESET: u8 = 5;
77const CREATE_BLOCK_V2: u8 = 6;
78const APPEND_BLOCK_V2: u8 = 7;
79
80impl Log {
81    pub(crate) async fn new<T>(
82        downlink: channel::Receiver<Packet>,
83        uplink: channel::Sender<Packet>,
84        toc_cache: T,
85    ) -> Result<Self>
86    where
87        T: TocCache,
88    {
89        let (toc_downlink, control_downlink, data_downlink, _) =
90            crate::crtp_utils::crtp_channel_dispatcher(downlink);
91
92        let toc = crate::crtp_utils::fetch_toc(LOG_PORT, uplink.clone(), toc_downlink, toc_cache).await?;
93        let toc = Arc::new(toc);
94
95        let control_downlink = Arc::new(Mutex::new(control_downlink));
96
97        let next_block_id = Mutex::new(0);
98
99        let data_channels = Arc::new(Mutex::new(BTreeMap::new()));
100
101        let active_blocks = Mutex::new(BTreeMap::new());
102
103        let log = Self {
104            uplink,
105            control_downlink,
106            toc,
107            next_block_id,
108            data_channels,
109            active_blocks,
110        };
111        log.reset().await?;
112        log.spawn_data_dispatcher(data_downlink).await;
113
114        Ok(log)
115    }
116
117    async fn reset(&self) -> Result<()> {
118        let downlink = self.control_downlink.lock().await;
119
120        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![RESET]);
121        self.uplink
122            .send_async(pk)
123            .await
124            .map_err(|_| Error::Disconnected)?;
125
126        let pk = downlink
127            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[RESET])
128            .await?;
129        assert_eq!(pk.get_data()[2], 0);
130
131        Ok(())
132    }
133
134    async fn spawn_data_dispatcher(&self, data_downlink: flume::Receiver<Packet>) {
135        let data_channels = self.data_channels.clone();
136        tokio::spawn(async move {
137            while let Ok(packet) = data_downlink.recv_async().await {
138                if packet.get_data().len() > 1 {
139                    let block_id = packet.get_data()[0];
140                    let data_channels = data_channels.lock().await;
141                    if data_channels.contains_key(&block_id)
142                        && data_channels
143                            .get(&block_id)
144                            .unwrap()
145                            .send_async(packet)
146                            .await
147                            .is_err()
148                    {
149                        break;
150                    }
151                }
152            }
153        });
154    }
155
156    /// Get the names of all the log variables
157    ///
158    /// The names contain group and name of the log variable formatted as
159    /// "group.name".
160    pub fn names(&self) -> Vec<String> {
161        self.toc.keys().cloned().collect()
162    }
163
164    /// Return the type of a log variable or an Error if the parameter does not exist.
165    pub fn get_type(&self, name: &str) -> Result<ValueType> {
166        Ok(self
167            .toc
168            .get(name)
169            .ok_or_else(|| not_found(name))?
170            .1
171            .item_type)
172    }
173
174    async fn generate_next_block_id(&self) -> Result<u8> {
175        let mut next_block_id = self.next_block_id.lock().await;
176        if *next_block_id == u8::MAX {
177            return Err(Error::LogError("No more block ID available!".into()));
178        }
179        let id = *next_block_id;
180        *next_block_id += 1;
181        Ok(id)
182    }
183
184    /// Cleanup dropped LogBlocks
185    async fn cleanup_blocks(&self) -> Result<()> {
186        let mut active_blocks = self.active_blocks.lock().await;
187
188        for (block_id, canary) in active_blocks.clone().into_iter() {
189            if canary.upgrade() == None {
190                // Delete the block!
191                let control_downlink = self.control_downlink.lock().await;
192
193                let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![DELETE_BLOCK, block_id]);
194                self.uplink
195                    .send_async(pk)
196                    .await
197                    .map_err(|_| Error::Disconnected)?;
198
199                let pk = control_downlink
200                    .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[DELETE_BLOCK, block_id])
201                    .await?;
202                let error = pk.get_data()[2];
203
204                if error != 0 {
205                    return Err(Error::LogError(format!(
206                        "Protocol error when deleting block: {}",
207                        error
208                    )));
209                }
210
211                active_blocks.remove_entry(&block_id);
212            }
213        }
214
215        Ok(())
216    }
217
218    /// Create a Log block
219    ///
220    /// This will create a log block in the Crazyflie firmware and return a
221    /// [LogBlock] object that can be used to add variable to the block and start
222    /// logging
223    ///
224    /// This function can fail if there is no more log block ID available: each
225    /// log block is assigned a 8 bit ID by the lib and so far they are not
226    /// re-used. So during a Crazyflie connection lifetime, up to 256 log
227    /// blocks can be created. If this becomes a problem for any use-case, it
228    /// can be solved by a more clever ID generation algorithm.
229    ///
230    /// The Crazyflie firmware also has a limit in number of active log block,
231    /// this function will fail if this limit is reached. Unlike for the ID, the
232    /// active log blocks in the Crazyflie are cleaned-up when the [LogBlock]
233    /// object is dropped.
234    pub async fn create_block(&self) -> Result<LogBlock> {
235        self.cleanup_blocks().await?;
236
237        let block_id = self.generate_next_block_id().await?;
238        let control_downlink = self.control_downlink.lock().await;
239
240        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![CREATE_BLOCK_V2, block_id]);
241        self.uplink
242            .send_async(pk)
243            .await
244            .map_err(|_| Error::Disconnected)?;
245
246        let pk = control_downlink
247            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[CREATE_BLOCK_V2, block_id])
248            .await?;
249        let error = pk.get_data()[2];
250
251        if error != 0 {
252            return Err(Error::LogError(format!(
253                "Protocol error when creating block: {}",
254                error
255            )));
256        }
257
258        // Todo: Create data channel for the block
259        let (tx, rx) = flume::unbounded();
260        self.data_channels.lock().await.insert(block_id, tx);
261
262        let canary = Arc::new(());
263        self.active_blocks
264            .lock()
265            .await
266            .insert(block_id, Arc::downgrade(&canary));
267
268        Ok(LogBlock {
269            _canary: canary,
270            toc: Arc::downgrade(&self.toc),
271            uplink: self.uplink.clone(),
272            control_downlink: Arc::downgrade(&self.control_downlink),
273            block_id,
274            variables: Vec::new(),
275            data_channel: rx,
276        })
277    }
278}
279
280#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
281struct LogItemInfo {
282    item_type: ValueType,
283}
284
285impl TryFrom<u8> for LogItemInfo {
286    type Error = Error;
287
288    fn try_from(log_type: u8) -> Result<Self> {
289        let item_type = match log_type {
290            1 => ValueType::U8,
291            2 => ValueType::U16,
292            3 => ValueType::U32,
293            4 => ValueType::I8,
294            5 => ValueType::I16,
295            6 => ValueType::I32,
296            7 => ValueType::F32,
297            8 => ValueType::F16,
298            _ => {
299                return Err(Error::ProtocolError(format!(
300                    "Invalid log item type: {}",
301                    log_type
302                )))
303            }
304        };
305
306        Ok(LogItemInfo { item_type })
307    }
308}
309
310impl TryInto<u8> for LogItemInfo {
311    type Error = Error;
312
313    fn try_into(self) -> Result<u8> {
314        let value = match self.item_type {
315            ValueType::U8 => 1,
316            ValueType::U16 => 2,
317            ValueType::U32 => 3,
318            ValueType::I8 => 4,
319            ValueType::I16 => 5,
320            ValueType::I32 => 6,
321            ValueType::F32 => 7,
322            ValueType::F16 => 8,
323            _ => {
324                return Err(Error::LogError(format!(
325                    "Value type {:?} not handled by log",
326                    self.item_type
327                )))
328            }
329        };
330        Ok(value)
331    }
332}
333
334/// # Log Block
335///
336/// This object represent an IDLE LogBlock in the Crazyflie.
337///
338/// If the [LogBlock] object is dropped or its associated [LogStream], the
339/// Log Block will be deleted in the Crazyflie freeing resources.
340///
341/// See the [log module documentation](crate::subsystems::log) for more context and information.
342pub struct LogBlock {
343    _canary: Arc<()>,
344    toc: Weak<BTreeMap<String, (u16, LogItemInfo)>>,
345    uplink: channel::Sender<Packet>,
346    control_downlink: Weak<Mutex<channel::Receiver<Packet>>>,
347    block_id: u8,
348    variables: Vec<(String, ValueType)>,
349    data_channel: flume::Receiver<Packet>,
350}
351
352impl LogBlock {
353    /// Start log block and return a stream to read  the value
354    ///
355    /// Since a log-block cannot be modified after being started, this function
356    /// consumes the [LogBlock] object and return a [LogStream]. The function
357    /// [LogStream::stop()] can be called on the LogStream to get back the [LogBlock] object.
358    ///
359    /// This function can fail if there is a protocol error or an error
360    /// reported by the Crazyflie. In such case, the LogBlock object will be
361    /// dropped and the block will be deleted in the Crazyflie
362    pub async fn start(self, period: LogPeriod) -> Result<LogStream> {
363        let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
364        let control_uplink = control_uplink.lock().await;
365
366        let pk = Packet::new(
367            LOG_PORT,
368            CONTROL_CHANNEL,
369            vec![START_BLOCK, self.block_id, period.0],
370        );
371        self.uplink
372            .send_async(pk)
373            .await
374            .map_err(|_| Error::Disconnected)?;
375
376        let answer = control_uplink
377            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[START_BLOCK, self.block_id])
378            .await?;
379        if answer.get_data().len() != 3 {
380            return Err(Error::ProtocolError(
381                "Malformed Log control packet".to_owned(),
382            ));
383        }
384        let error_code = answer.get_data()[2];
385        if error_code != 0 {
386            return Err(Error::LogError(format!(
387                "Error starting lock: {}",
388                error_code
389            )));
390        }
391
392        Ok(LogStream { log_block: self })
393    }
394
395    /// Add a variable to the log block
396    ///
397    /// A packet will be sent to the Crazyflie to add the variable. The variable is logged in the same format as
398    /// it is stored in the Crazyflie (ie. there is no conversion done)
399    ///
400    /// This function can fail if the variable is not found in the toc or of the Crazyflie returns an error
401    /// The most common error reported by the Crazyflie would be if the log block is already too full.
402    pub async fn add_variable(&mut self, name: &str) -> Result<()> {
403        let toc = self.toc.upgrade().ok_or(Error::Disconnected)?;
404        let (variable_id, info) = toc.get(name).ok_or(Error::VariableNotFound)?;
405
406        // Add variable to Crazyflie
407        let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
408        let control_uplink = control_uplink.lock().await;
409
410        let mut payload = vec![APPEND_BLOCK_V2, self.block_id, (*info).try_into()?];
411        payload.extend_from_slice(&variable_id.to_le_bytes());
412        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, payload);
413        self.uplink
414            .send_async(pk)
415            .await
416            .map_err(|_| Error::Disconnected)?;
417
418        let answer = control_uplink
419            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[APPEND_BLOCK_V2, self.block_id])
420            .await?;
421        if answer.get_data().len() != 3 {
422            return Err(Error::ProtocolError(
423                "Malformed Log control packet".to_owned(),
424            ));
425        }
426        let error_code = answer.get_data()[2];
427        if error_code != 0 {
428            return Err(Error::LogError(format!(
429                "Error appending variable to block: {}",
430                error_code
431            )));
432        }
433
434        // Add variable to local list
435        self.variables.push((name.to_owned(), info.item_type));
436
437        Ok(())
438    }
439}
440
441/// # Log Steam
442///
443/// This object represents a started log block that is currently returning data
444/// at regular intervals.
445///
446/// Dropping this object or the associated [LogBlock] will delete the log block
447/// in the Crazyflie.
448///
449/// See the [log module documentation](crate::subsystems::log) for more context and information.
450pub struct LogStream {
451    log_block: LogBlock,
452}
453
454impl LogStream {
455    /// Stops the log block from streaming
456    ///
457    /// This method consumes the stream and returns back the log block object so that it can be started again later
458    /// with a different period.
459    ///
460    /// This function can only fail on unexpected protocol error. If it does, the log block is dropped and will be
461    /// cleaned-up next time a log block is created.
462    pub async fn stop(self) -> Result<LogBlock> {
463        let control_uplink = self
464            .log_block
465            .control_downlink
466            .upgrade()
467            .ok_or(Error::Disconnected)?;
468        let control_uplink = control_uplink.lock().await;
469
470        let pk = Packet::new(
471            LOG_PORT,
472            CONTROL_CHANNEL,
473            vec![STOP_BLOCK, self.log_block.block_id],
474        );
475        self.log_block
476            .uplink
477            .send_async(pk)
478            .await
479            .map_err(|_| Error::Disconnected)?;
480
481        let answer = control_uplink
482            .wait_packet(
483                LOG_PORT,
484                CONTROL_CHANNEL,
485                &[STOP_BLOCK, self.log_block.block_id],
486            )
487            .await?;
488        if answer.get_data().len() != 3 {
489            return Err(Error::ProtocolError(
490                "Malformed Log control packet".to_owned(),
491            ));
492        }
493        let error_code = answer.get_data()[2];
494        if error_code != 0 {
495            return Err(Error::LogError(format!(
496                "Error starting lock: {}",
497                error_code
498            )));
499        }
500
501        Ok(self.log_block)
502    }
503
504    /// Get the next log data from the log block stream
505    ///
506    /// This function will wait for the data and only return a value when the
507    /// next data is available.
508    ///
509    /// This function will return an error if the Crazyflie gets disconnected.
510    pub async fn next(&self) -> Result<LogData> {
511        let packet = self
512            .log_block
513            .data_channel
514            .recv_async()
515            .await
516            .map_err(|_| Error::Disconnected)?;
517
518        self.decode_packet(&packet.get_data()[1..])
519    }
520
521    fn decode_packet(&self, data: &[u8]) -> Result<LogData> {
522        let mut timestamp = data[0..=2].to_vec();
523        timestamp.insert(0, 0);
524        // The timestamp is 2 bytes long by design so this unwrap cannot fail
525        let timestamp = u32::from_le_bytes(timestamp.try_into().unwrap());
526
527        let mut index = 3;
528        let mut log_data = HashMap::new();
529        for (name, value_type) in &self.log_block.variables {
530            let byte_length = value_type.byte_length();
531            log_data.insert(
532                name.clone(),
533                Value::from_le_bytes(&data[index..(index + byte_length)], *value_type)?,
534            );
535            index += byte_length;
536        }
537
538        Ok(LogData {
539            timestamp,
540            data: log_data,
541        })
542    }
543}
544
545/// # Log data sample
546///
547/// This object represents a data sample coming from a started log block. It
548/// provides the Crazyflie timestamp in milliseconds when the data was sampled
549/// and a hash-table of variable name and value.
550///
551/// See the [log module documentation](crate::subsystems::log) for more context and information.
552#[derive(Debug)]
553pub struct LogData {
554    /// Timestamp in milliseconds of when the data sample was taken
555    pub timestamp: u32,
556    /// HashMap of the name of the variable vs sampled value
557    pub data: HashMap<String, Value>,
558}
559
560/// # Log block period
561///
562/// This object represent a valid log period. It implements the [TryFrom<Duration>]
563/// trait so it can be constructed from a [Duration] object. a [LogPeriod::from_millis()]
564/// function is provided for convenience.
565///
566/// A valid period for a Log block is between 10ms and 2550ms.
567///
568/// See the [log module documentation](crate::subsystems::log) for more context and information.
569pub struct LogPeriod(u8);
570
571impl LogPeriod {
572    /// Create a LogPeriod object from milliseconds
573    ///
574    /// Return an error if the millis is not valid.
575    /// A valid period for a Log block is between 10ms and 2550ms.
576    pub fn from_millis(millis: u64) -> Result<Self> {
577        Duration::from_millis(millis).try_into()
578    }
579}
580
581impl TryFrom<Duration> for LogPeriod {
582    type Error = Error;
583
584    fn try_from(value: Duration) -> Result<Self> {
585        let period_ms = value.as_millis();
586        let period_arg = period_ms / 10;
587        if period_arg == 0 || period_arg > 255 {
588            return Err(Error::LogError(
589                "Invalid log period, should be between 10ms and 2550ms".to_owned(),
590            ));
591        }
592        Ok(LogPeriod(period_arg as u8))
593    }
594}