crazyflie_lib/subsystems/
log.rs

1//! # Data logging subsystem
2//!
3//! The Crazyflie log subsystem allows to asynchronously log the value of exposed Crazyflie variables from the ground.
4//!
5//! At connection time, a Table Of Content (TOC) of the log variable is fetched from the Crazyflie which allows to
6//! log variables using their names. To log variable a [LogBlock] needs to be created. The variable to be logged are
7//! added to the LogBlock and then the LogBlock can be started returning a LogStream that will yield the log data.
8//!
9//! ```no_run
10//! # use crazyflie_lib::{Crazyflie, Value, Error, subsystems::log::LogPeriod};
11//! # use crazyflie_link::LinkContext;
12//! # async fn example() -> Result<(), Error> {
13//! # let context = LinkContext::new();
14//! # let cf = Crazyflie::connect_from_uri(&context, "radio://0/60/2M/E7E7E7E7E7").await?;
15//! // Create the log block
16//! let mut block = cf.log.create_block().await?;
17//!
18//! // Append Variables
19//! block.add_variable("stateEstimate.roll").await?;
20//! block.add_variable("stateEstimate.pitch").await?;
21//! block.add_variable("stateEstimate.yaw").await?;
22//!
23//! // Start the block
24//! let period = LogPeriod::from_millis(100)?;
25//! let stream = block.start(period).await?;
26//!
27//! // Get Data!
28//! while let Ok(data) = stream.next().await {
29//!     println!("Yaw is {:?}", data.data["stateEstimate.yaw"]);
30//! }
31//! # Ok(())
32//! # };
33//! ```
34
35use crate::crtp_utils::WaitForPacket;
36use crate::{Error, Result, Value, ValueType};
37use crazyflie_link::Packet;
38use flume as channel;
39use futures::lock::Mutex;
40use std::collections::HashMap;
41use std::convert::TryInto;
42use std::sync::Weak;
43use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration};
44
45use crate::crazyflie::LOG_PORT;
46
47/// # Access to the Crazyflie Log Subsystem
48///
49/// This struct provide functions to interact with the Crazyflie Log subsystem.
50///
51/// See the [log module documentation](crate::subsystems::log) for more context and information.
52#[derive(Debug)]
53pub struct Log {
54    uplink: channel::Sender<Packet>,
55    control_downlink: Arc<Mutex<channel::Receiver<Packet>>>,
56    toc: Arc<BTreeMap<String, (u16, LogItemInfo)>>,
57    next_block_id: Mutex<u8>,
58    data_channels: Arc<Mutex<BTreeMap<u8, flume::Sender<Packet>>>>,
59    active_blocks: Mutex<BTreeMap<u8, Weak<()>>>,
60}
61
62fn not_found(name: &str) -> Error {
63    Error::ParamError(format!("Log variable {} not found", name))
64}
65
66const CONTROL_CHANNEL: u8 = 1;
67
68const DELETE_BLOCK: u8 = 2;
69const START_BLOCK: u8 = 3;
70const STOP_BLOCK: u8 = 4;
71const RESET: u8 = 5;
72const CREATE_BLOCK_V2: u8 = 6;
73const APPEND_BLOCK_V2: u8 = 7;
74
75impl Log {
76    pub(crate) async fn new(
77        downlink: channel::Receiver<Packet>,
78        uplink: channel::Sender<Packet>,
79    ) -> Result<Self> {
80        let (toc_downlink, control_downlink, data_downlink, _) =
81            crate::crtp_utils::crtp_channel_dispatcher(downlink);
82
83        let toc = crate::crtp_utils::fetch_toc(LOG_PORT, uplink.clone(), toc_downlink).await?;
84        let toc = Arc::new(toc);
85
86        let control_downlink = Arc::new(Mutex::new(control_downlink));
87
88        let next_block_id = Mutex::new(0);
89
90        let data_channels = Arc::new(Mutex::new(BTreeMap::new()));
91
92        let active_blocks = Mutex::new(BTreeMap::new());
93
94        let log = Self {
95            uplink,
96            control_downlink,
97            toc,
98            next_block_id,
99            data_channels,
100            active_blocks,
101        };
102        log.reset().await?;
103        log.spawn_data_dispatcher(data_downlink).await;
104
105        Ok(log)
106    }
107
108    async fn reset(&self) -> Result<()> {
109        let downlink = self.control_downlink.lock().await;
110
111        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![RESET]);
112        self.uplink
113            .send_async(pk)
114            .await
115            .map_err(|_| Error::Disconnected)?;
116
117        let pk = downlink
118            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[RESET])
119            .await?;
120        assert_eq!(pk.get_data()[2], 0);
121
122        Ok(())
123    }
124
125    async fn spawn_data_dispatcher(&self, data_downlink: flume::Receiver<Packet>) {
126        let data_channels = self.data_channels.clone();
127        tokio::spawn(async move {
128            while let Ok(packet) = data_downlink.recv_async().await {
129                if packet.get_data().len() > 1 {
130                    let block_id = packet.get_data()[0];
131                    let data_channels = data_channels.lock().await;
132                    if data_channels.contains_key(&block_id)
133                        && data_channels
134                            .get(&block_id)
135                            .unwrap()
136                            .send_async(packet)
137                            .await
138                            .is_err()
139                    {
140                        break;
141                    }
142                }
143            }
144        });
145    }
146
147    /// Get the names of all the log variables
148    ///
149    /// The names contain group and name of the log variable formatted as
150    /// "group.name".
151    pub fn names(&self) -> Vec<String> {
152        self.toc.keys().cloned().collect()
153    }
154
155    /// Return the type of a log variable or an Error if the parameter does not exist.
156    pub fn get_type(&self, name: &str) -> Result<ValueType> {
157        Ok(self
158            .toc
159            .get(name)
160            .ok_or_else(|| not_found(name))?
161            .1
162            .item_type)
163    }
164
165    async fn generate_next_block_id(&self) -> Result<u8> {
166        let mut next_block_id = self.next_block_id.lock().await;
167        if *next_block_id == u8::MAX {
168            return Err(Error::LogError("No more block ID available!".into()));
169        }
170        let id = *next_block_id;
171        *next_block_id += 1;
172        Ok(id)
173    }
174
175    /// Cleanup dropped LogBlocks
176    async fn cleanup_blocks(&self) -> Result<()> {
177        let mut active_blocks = self.active_blocks.lock().await;
178
179        for (block_id, canary) in active_blocks.clone().into_iter() {
180            if canary.upgrade() == None {
181                // Delete the block!
182                let control_downlink = self.control_downlink.lock().await;
183
184                let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![DELETE_BLOCK, block_id]);
185                self.uplink
186                    .send_async(pk)
187                    .await
188                    .map_err(|_| Error::Disconnected)?;
189
190                let pk = control_downlink
191                    .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[DELETE_BLOCK, block_id])
192                    .await?;
193                let error = pk.get_data()[2];
194
195                if error != 0 {
196                    return Err(Error::LogError(format!(
197                        "Protocol error when deleting block: {}",
198                        error
199                    )));
200                }
201
202                active_blocks.remove_entry(&block_id);
203            }
204        }
205
206        Ok(())
207    }
208
209    /// Create a Log block
210    ///
211    /// This will create a log block in the Crazyflie firmware and return a
212    /// [LogBlock] object that can be used to add variable to the block and start
213    /// logging
214    ///
215    /// This function can fail if there is no more log block ID available: each
216    /// log block is assigned a 8 bit ID by the lib and so far they are not
217    /// re-used. So during a Crazyflie connection lifetime, up to 256 log
218    /// blocks can be created. If this becomes a problem for any use-case, it
219    /// can be solved by a more clever ID generation algorithm.
220    ///
221    /// The Crazyflie firmware also has a limit in number of active log block,
222    /// this function will fail if this limit is reached. Unlike for the ID, the
223    /// active log blocks in the Crazyflie are cleaned-up when the [LogBlock]
224    /// object is dropped.
225    pub async fn create_block(&self) -> Result<LogBlock> {
226        self.cleanup_blocks().await?;
227
228        let block_id = self.generate_next_block_id().await?;
229        let control_downlink = self.control_downlink.lock().await;
230
231        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![CREATE_BLOCK_V2, block_id]);
232        self.uplink
233            .send_async(pk)
234            .await
235            .map_err(|_| Error::Disconnected)?;
236
237        let pk = control_downlink
238            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[CREATE_BLOCK_V2, block_id])
239            .await?;
240        let error = pk.get_data()[2];
241
242        if error != 0 {
243            return Err(Error::LogError(format!(
244                "Protocol error when creating block: {}",
245                error
246            )));
247        }
248
249        // Todo: Create data channel for the block
250        let (tx, rx) = flume::unbounded();
251        self.data_channels.lock().await.insert(block_id, tx);
252
253        let canary = Arc::new(());
254        self.active_blocks
255            .lock()
256            .await
257            .insert(block_id, Arc::downgrade(&canary));
258
259        Ok(LogBlock {
260            _canary: canary,
261            toc: Arc::downgrade(&self.toc),
262            uplink: self.uplink.clone(),
263            control_downlink: Arc::downgrade(&self.control_downlink),
264            block_id,
265            variables: Vec::new(),
266            data_channel: rx,
267        })
268    }
269}
270
271#[derive(Debug, Clone, Copy)]
272struct LogItemInfo {
273    item_type: ValueType,
274}
275
276impl TryFrom<u8> for LogItemInfo {
277    type Error = Error;
278
279    fn try_from(log_type: u8) -> Result<Self> {
280        let item_type = match log_type {
281            1 => ValueType::U8,
282            2 => ValueType::U16,
283            3 => ValueType::U32,
284            4 => ValueType::I8,
285            5 => ValueType::I16,
286            6 => ValueType::I32,
287            7 => ValueType::F32,
288            8 => ValueType::F16,
289            _ => {
290                return Err(Error::ProtocolError(format!(
291                    "Invalid log item type: {}",
292                    log_type
293                )))
294            }
295        };
296
297        Ok(LogItemInfo { item_type })
298    }
299}
300
301impl TryInto<u8> for LogItemInfo {
302    type Error = Error;
303
304    fn try_into(self) -> Result<u8> {
305        let value = match self.item_type {
306            ValueType::U8 => 1,
307            ValueType::U16 => 2,
308            ValueType::U32 => 3,
309            ValueType::I8 => 4,
310            ValueType::I16 => 5,
311            ValueType::I32 => 6,
312            ValueType::F32 => 7,
313            ValueType::F16 => 8,
314            _ => {
315                return Err(Error::LogError(format!(
316                    "Value type {:?} not handled by log",
317                    self.item_type
318                )))
319            }
320        };
321        Ok(value)
322    }
323}
324
325/// # Log Block
326///
327/// This object represent an IDLE LogBlock in the Crazyflie.
328///
329/// If the [LogBlock] object is dropped or its associated [LogStream], the
330/// Log Block will be deleted in the Crazyflie freeing resources.
331///
332/// See the [log module documentation](crate::subsystems::log) for more context and information.
333pub struct LogBlock {
334    _canary: Arc<()>,
335    toc: Weak<BTreeMap<String, (u16, LogItemInfo)>>,
336    uplink: channel::Sender<Packet>,
337    control_downlink: Weak<Mutex<channel::Receiver<Packet>>>,
338    block_id: u8,
339    variables: Vec<(String, ValueType)>,
340    data_channel: flume::Receiver<Packet>,
341}
342
343impl LogBlock {
344    /// Start log block and return a stream to read  the value
345    ///
346    /// Since a log-block cannot be modified after being started, this function
347    /// consumes the [LogBlock] object and return a [LogStream]. The function
348    /// [LogStream::stop()] can be called on the LogStream to get back the [LogBlock] object.
349    ///
350    /// This function can fail if there is a protocol error or an error
351    /// reported by the Crazyflie. In such case, the LogBlock object will be
352    /// dropped and the block will be deleted in the Crazyflie
353    pub async fn start(self, period: LogPeriod) -> Result<LogStream> {
354        let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
355        let control_uplink = control_uplink.lock().await;
356
357        let pk = Packet::new(
358            LOG_PORT,
359            CONTROL_CHANNEL,
360            vec![START_BLOCK, self.block_id, period.0],
361        );
362        self.uplink
363            .send_async(pk)
364            .await
365            .map_err(|_| Error::Disconnected)?;
366
367        let answer = control_uplink
368            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[START_BLOCK, self.block_id])
369            .await?;
370        if answer.get_data().len() != 3 {
371            return Err(Error::ProtocolError(
372                "Malformed Log control packet".to_owned(),
373            ));
374        }
375        let error_code = answer.get_data()[2];
376        if error_code != 0 {
377            return Err(Error::LogError(format!(
378                "Error starting lock: {}",
379                error_code
380            )));
381        }
382
383        Ok(LogStream { log_block: self })
384    }
385
386    /// Add a variable to the log block
387    ///
388    /// A packet will be sent to the Crazyflie to add the variable. The variable is logged in the same format as
389    /// it is stored in the Crazyflie (ie. there is no conversion done)
390    ///
391    /// This function can fail if the variable is not found in the toc or of the Crazyflie returns an error
392    /// The most common error reported by the Crazyflie would be if the log block is already too full.
393    pub async fn add_variable(&mut self, name: &str) -> Result<()> {
394        let toc = self.toc.upgrade().ok_or(Error::Disconnected)?;
395        let (variable_id, info) = toc.get(name).ok_or(Error::VariableNotFound)?;
396
397        // Add variable to Crazyflie
398        let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
399        let control_uplink = control_uplink.lock().await;
400
401        let mut payload = vec![APPEND_BLOCK_V2, self.block_id, (*info).try_into()?];
402        payload.extend_from_slice(&variable_id.to_le_bytes());
403        let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, payload);
404        self.uplink
405            .send_async(pk)
406            .await
407            .map_err(|_| Error::Disconnected)?;
408
409        let answer = control_uplink
410            .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[APPEND_BLOCK_V2, self.block_id])
411            .await?;
412        if answer.get_data().len() != 3 {
413            return Err(Error::ProtocolError(
414                "Malformed Log control packet".to_owned(),
415            ));
416        }
417        let error_code = answer.get_data()[2];
418        if error_code != 0 {
419            return Err(Error::LogError(format!(
420                "Error appending variable to block: {}",
421                error_code
422            )));
423        }
424
425        // Add variable to local list
426        self.variables.push((name.to_owned(), info.item_type));
427
428        Ok(())
429    }
430}
431
432/// # Log Steam
433///
434/// This object represents a started log block that is currently returning data
435/// at regular intervals.
436///
437/// Dropping this object or the associated [LogBlock] will delete the log block
438/// in the Crazyflie.
439///
440/// See the [log module documentation](crate::subsystems::log) for more context and information.
441pub struct LogStream {
442    log_block: LogBlock,
443}
444
445impl LogStream {
446    /// Stops the log block from streaming
447    ///
448    /// This method consumes the stream and returns back the log block object so that it can be started again later
449    /// with a different period.
450    ///
451    /// This function can only fail on unexpected protocol error. If it does, the log block is dropped and will be
452    /// cleaned-up next time a log block is created.
453    pub async fn stop(self) -> Result<LogBlock> {
454        let control_uplink = self
455            .log_block
456            .control_downlink
457            .upgrade()
458            .ok_or(Error::Disconnected)?;
459        let control_uplink = control_uplink.lock().await;
460
461        let pk = Packet::new(
462            LOG_PORT,
463            CONTROL_CHANNEL,
464            vec![STOP_BLOCK, self.log_block.block_id],
465        );
466        self.log_block
467            .uplink
468            .send_async(pk)
469            .await
470            .map_err(|_| Error::Disconnected)?;
471
472        let answer = control_uplink
473            .wait_packet(
474                LOG_PORT,
475                CONTROL_CHANNEL,
476                &[STOP_BLOCK, self.log_block.block_id],
477            )
478            .await?;
479        if answer.get_data().len() != 3 {
480            return Err(Error::ProtocolError(
481                "Malformed Log control packet".to_owned(),
482            ));
483        }
484        let error_code = answer.get_data()[2];
485        if error_code != 0 {
486            return Err(Error::LogError(format!(
487                "Error starting lock: {}",
488                error_code
489            )));
490        }
491
492        Ok(self.log_block)
493    }
494
495    /// Get the next log data from the log block stream
496    ///
497    /// This function will wait for the data and only return a value when the
498    /// next data is available.
499    ///
500    /// This function will return an error if the Crazyflie gets disconnected.
501    pub async fn next(&self) -> Result<LogData> {
502        let packet = self
503            .log_block
504            .data_channel
505            .recv_async()
506            .await
507            .map_err(|_| Error::Disconnected)?;
508
509        self.decode_packet(&packet.get_data()[1..])
510    }
511
512    fn decode_packet(&self, data: &[u8]) -> Result<LogData> {
513        let mut timestamp = data[0..=2].to_vec();
514        timestamp.insert(0, 0);
515        // The timestamp is 2 bytes long by design so this unwrap cannot fail
516        let timestamp = u32::from_le_bytes(timestamp.try_into().unwrap());
517
518        let mut index = 3;
519        let mut log_data = HashMap::new();
520        for (name, value_type) in &self.log_block.variables {
521            let byte_length = value_type.byte_length();
522            log_data.insert(
523                name.clone(),
524                Value::from_le_bytes(&data[index..(index + byte_length)], *value_type)?,
525            );
526            index += byte_length;
527        }
528
529        Ok(LogData {
530            timestamp,
531            data: log_data,
532        })
533    }
534}
535
536/// # Log data sample
537///
538/// This object represents a data sample coming from a started log block. It
539/// provides the Crazyflie timestamp in milliseconds when the data was sampled
540/// and a hash-table of variable name and value.
541///
542/// See the [log module documentation](crate::subsystems::log) for more context and information.
543#[derive(Debug)]
544pub struct LogData {
545    /// Timestamp in milliseconds of when the data sample was taken
546    pub timestamp: u32,
547    /// HashMap of the name of the variable vs sampled value
548    pub data: HashMap<String, Value>,
549}
550
551/// # Log block period
552///
553/// This object represent a valid log period. It implements the [TryFrom<Duration>]
554/// trait so it can be constructed from a [Duration] object. a [LogPeriod::from_millis()]
555/// function is provided for convenience.
556///
557/// A valid period for a Log block is between 10ms and 2550ms.
558///
559/// See the [log module documentation](crate::subsystems::log) for more context and information.
560pub struct LogPeriod(u8);
561
562impl LogPeriod {
563    /// Create a LogPeriod object from milliseconds
564    ///
565    /// Return an error if the millis is not valid.
566    /// A valid period for a Log block is between 10ms and 2550ms.
567    pub fn from_millis(millis: u64) -> Result<Self> {
568        Duration::from_millis(millis).try_into()
569    }
570}
571
572impl TryFrom<Duration> for LogPeriod {
573    type Error = Error;
574
575    fn try_from(value: Duration) -> Result<Self> {
576        let period_ms = value.as_millis();
577        let period_arg = period_ms / 10;
578        if period_arg == 0 || period_arg > 255 {
579            return Err(Error::LogError(
580                "Invalid log period, should be between 10ms and 2550ms".to_owned(),
581            ));
582        }
583        Ok(LogPeriod(period_arg as u8))
584    }
585}