crazyflie_lib/subsystems/
log.rs1use crate::crtp_utils::WaitForPacket;
36use crate::{Error, Result, Value, ValueType};
37use crazyflie_link::Packet;
38use flume as channel;
39use futures::lock::Mutex;
40use std::collections::HashMap;
41use std::convert::TryInto;
42use std::sync::Weak;
43use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration};
44
45use crate::crazyflie::LOG_PORT;
46
47#[derive(Debug)]
53pub struct Log {
54 uplink: channel::Sender<Packet>,
55 control_downlink: Arc<Mutex<channel::Receiver<Packet>>>,
56 toc: Arc<BTreeMap<String, (u16, LogItemInfo)>>,
57 next_block_id: Mutex<u8>,
58 data_channels: Arc<Mutex<BTreeMap<u8, flume::Sender<Packet>>>>,
59 active_blocks: Mutex<BTreeMap<u8, Weak<()>>>,
60}
61
62fn not_found(name: &str) -> Error {
63 Error::ParamError(format!("Log variable {} not found", name))
64}
65
66const CONTROL_CHANNEL: u8 = 1;
67
68const DELETE_BLOCK: u8 = 2;
69const START_BLOCK: u8 = 3;
70const STOP_BLOCK: u8 = 4;
71const RESET: u8 = 5;
72const CREATE_BLOCK_V2: u8 = 6;
73const APPEND_BLOCK_V2: u8 = 7;
74
75impl Log {
76 pub(crate) async fn new(
77 downlink: channel::Receiver<Packet>,
78 uplink: channel::Sender<Packet>,
79 ) -> Result<Self> {
80 let (toc_downlink, control_downlink, data_downlink, _) =
81 crate::crtp_utils::crtp_channel_dispatcher(downlink);
82
83 let toc = crate::crtp_utils::fetch_toc(LOG_PORT, uplink.clone(), toc_downlink).await?;
84 let toc = Arc::new(toc);
85
86 let control_downlink = Arc::new(Mutex::new(control_downlink));
87
88 let next_block_id = Mutex::new(0);
89
90 let data_channels = Arc::new(Mutex::new(BTreeMap::new()));
91
92 let active_blocks = Mutex::new(BTreeMap::new());
93
94 let log = Self {
95 uplink,
96 control_downlink,
97 toc,
98 next_block_id,
99 data_channels,
100 active_blocks,
101 };
102 log.reset().await?;
103 log.spawn_data_dispatcher(data_downlink).await;
104
105 Ok(log)
106 }
107
108 async fn reset(&self) -> Result<()> {
109 let downlink = self.control_downlink.lock().await;
110
111 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![RESET]);
112 self.uplink
113 .send_async(pk)
114 .await
115 .map_err(|_| Error::Disconnected)?;
116
117 let pk = downlink
118 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[RESET])
119 .await?;
120 assert_eq!(pk.get_data()[2], 0);
121
122 Ok(())
123 }
124
125 async fn spawn_data_dispatcher(&self, data_downlink: flume::Receiver<Packet>) {
126 let data_channels = self.data_channels.clone();
127 tokio::spawn(async move {
128 while let Ok(packet) = data_downlink.recv_async().await {
129 if packet.get_data().len() > 1 {
130 let block_id = packet.get_data()[0];
131 let data_channels = data_channels.lock().await;
132 if data_channels.contains_key(&block_id)
133 && data_channels
134 .get(&block_id)
135 .unwrap()
136 .send_async(packet)
137 .await
138 .is_err()
139 {
140 break;
141 }
142 }
143 }
144 });
145 }
146
147 pub fn names(&self) -> Vec<String> {
152 self.toc.keys().cloned().collect()
153 }
154
155 pub fn get_type(&self, name: &str) -> Result<ValueType> {
157 Ok(self
158 .toc
159 .get(name)
160 .ok_or_else(|| not_found(name))?
161 .1
162 .item_type)
163 }
164
165 async fn generate_next_block_id(&self) -> Result<u8> {
166 let mut next_block_id = self.next_block_id.lock().await;
167 if *next_block_id == u8::MAX {
168 return Err(Error::LogError("No more block ID available!".into()));
169 }
170 let id = *next_block_id;
171 *next_block_id += 1;
172 Ok(id)
173 }
174
175 async fn cleanup_blocks(&self) -> Result<()> {
177 let mut active_blocks = self.active_blocks.lock().await;
178
179 for (block_id, canary) in active_blocks.clone().into_iter() {
180 if canary.upgrade() == None {
181 let control_downlink = self.control_downlink.lock().await;
183
184 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![DELETE_BLOCK, block_id]);
185 self.uplink
186 .send_async(pk)
187 .await
188 .map_err(|_| Error::Disconnected)?;
189
190 let pk = control_downlink
191 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[DELETE_BLOCK, block_id])
192 .await?;
193 let error = pk.get_data()[2];
194
195 if error != 0 {
196 return Err(Error::LogError(format!(
197 "Protocol error when deleting block: {}",
198 error
199 )));
200 }
201
202 active_blocks.remove_entry(&block_id);
203 }
204 }
205
206 Ok(())
207 }
208
209 pub async fn create_block(&self) -> Result<LogBlock> {
226 self.cleanup_blocks().await?;
227
228 let block_id = self.generate_next_block_id().await?;
229 let control_downlink = self.control_downlink.lock().await;
230
231 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![CREATE_BLOCK_V2, block_id]);
232 self.uplink
233 .send_async(pk)
234 .await
235 .map_err(|_| Error::Disconnected)?;
236
237 let pk = control_downlink
238 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[CREATE_BLOCK_V2, block_id])
239 .await?;
240 let error = pk.get_data()[2];
241
242 if error != 0 {
243 return Err(Error::LogError(format!(
244 "Protocol error when creating block: {}",
245 error
246 )));
247 }
248
249 let (tx, rx) = flume::unbounded();
251 self.data_channels.lock().await.insert(block_id, tx);
252
253 let canary = Arc::new(());
254 self.active_blocks
255 .lock()
256 .await
257 .insert(block_id, Arc::downgrade(&canary));
258
259 Ok(LogBlock {
260 _canary: canary,
261 toc: Arc::downgrade(&self.toc),
262 uplink: self.uplink.clone(),
263 control_downlink: Arc::downgrade(&self.control_downlink),
264 block_id,
265 variables: Vec::new(),
266 data_channel: rx,
267 })
268 }
269}
270
271#[derive(Debug, Clone, Copy)]
272struct LogItemInfo {
273 item_type: ValueType,
274}
275
276impl TryFrom<u8> for LogItemInfo {
277 type Error = Error;
278
279 fn try_from(log_type: u8) -> Result<Self> {
280 let item_type = match log_type {
281 1 => ValueType::U8,
282 2 => ValueType::U16,
283 3 => ValueType::U32,
284 4 => ValueType::I8,
285 5 => ValueType::I16,
286 6 => ValueType::I32,
287 7 => ValueType::F32,
288 8 => ValueType::F16,
289 _ => {
290 return Err(Error::ProtocolError(format!(
291 "Invalid log item type: {}",
292 log_type
293 )))
294 }
295 };
296
297 Ok(LogItemInfo { item_type })
298 }
299}
300
301impl TryInto<u8> for LogItemInfo {
302 type Error = Error;
303
304 fn try_into(self) -> Result<u8> {
305 let value = match self.item_type {
306 ValueType::U8 => 1,
307 ValueType::U16 => 2,
308 ValueType::U32 => 3,
309 ValueType::I8 => 4,
310 ValueType::I16 => 5,
311 ValueType::I32 => 6,
312 ValueType::F32 => 7,
313 ValueType::F16 => 8,
314 _ => {
315 return Err(Error::LogError(format!(
316 "Value type {:?} not handled by log",
317 self.item_type
318 )))
319 }
320 };
321 Ok(value)
322 }
323}
324
325pub struct LogBlock {
334 _canary: Arc<()>,
335 toc: Weak<BTreeMap<String, (u16, LogItemInfo)>>,
336 uplink: channel::Sender<Packet>,
337 control_downlink: Weak<Mutex<channel::Receiver<Packet>>>,
338 block_id: u8,
339 variables: Vec<(String, ValueType)>,
340 data_channel: flume::Receiver<Packet>,
341}
342
343impl LogBlock {
344 pub async fn start(self, period: LogPeriod) -> Result<LogStream> {
354 let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
355 let control_uplink = control_uplink.lock().await;
356
357 let pk = Packet::new(
358 LOG_PORT,
359 CONTROL_CHANNEL,
360 vec![START_BLOCK, self.block_id, period.0],
361 );
362 self.uplink
363 .send_async(pk)
364 .await
365 .map_err(|_| Error::Disconnected)?;
366
367 let answer = control_uplink
368 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[START_BLOCK, self.block_id])
369 .await?;
370 if answer.get_data().len() != 3 {
371 return Err(Error::ProtocolError(
372 "Malformed Log control packet".to_owned(),
373 ));
374 }
375 let error_code = answer.get_data()[2];
376 if error_code != 0 {
377 return Err(Error::LogError(format!(
378 "Error starting lock: {}",
379 error_code
380 )));
381 }
382
383 Ok(LogStream { log_block: self })
384 }
385
386 pub async fn add_variable(&mut self, name: &str) -> Result<()> {
394 let toc = self.toc.upgrade().ok_or(Error::Disconnected)?;
395 let (variable_id, info) = toc.get(name).ok_or(Error::VariableNotFound)?;
396
397 let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
399 let control_uplink = control_uplink.lock().await;
400
401 let mut payload = vec![APPEND_BLOCK_V2, self.block_id, (*info).try_into()?];
402 payload.extend_from_slice(&variable_id.to_le_bytes());
403 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, payload);
404 self.uplink
405 .send_async(pk)
406 .await
407 .map_err(|_| Error::Disconnected)?;
408
409 let answer = control_uplink
410 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[APPEND_BLOCK_V2, self.block_id])
411 .await?;
412 if answer.get_data().len() != 3 {
413 return Err(Error::ProtocolError(
414 "Malformed Log control packet".to_owned(),
415 ));
416 }
417 let error_code = answer.get_data()[2];
418 if error_code != 0 {
419 return Err(Error::LogError(format!(
420 "Error appending variable to block: {}",
421 error_code
422 )));
423 }
424
425 self.variables.push((name.to_owned(), info.item_type));
427
428 Ok(())
429 }
430}
431
432pub struct LogStream {
442 log_block: LogBlock,
443}
444
445impl LogStream {
446 pub async fn stop(self) -> Result<LogBlock> {
454 let control_uplink = self
455 .log_block
456 .control_downlink
457 .upgrade()
458 .ok_or(Error::Disconnected)?;
459 let control_uplink = control_uplink.lock().await;
460
461 let pk = Packet::new(
462 LOG_PORT,
463 CONTROL_CHANNEL,
464 vec![STOP_BLOCK, self.log_block.block_id],
465 );
466 self.log_block
467 .uplink
468 .send_async(pk)
469 .await
470 .map_err(|_| Error::Disconnected)?;
471
472 let answer = control_uplink
473 .wait_packet(
474 LOG_PORT,
475 CONTROL_CHANNEL,
476 &[STOP_BLOCK, self.log_block.block_id],
477 )
478 .await?;
479 if answer.get_data().len() != 3 {
480 return Err(Error::ProtocolError(
481 "Malformed Log control packet".to_owned(),
482 ));
483 }
484 let error_code = answer.get_data()[2];
485 if error_code != 0 {
486 return Err(Error::LogError(format!(
487 "Error starting lock: {}",
488 error_code
489 )));
490 }
491
492 Ok(self.log_block)
493 }
494
495 pub async fn next(&self) -> Result<LogData> {
502 let packet = self
503 .log_block
504 .data_channel
505 .recv_async()
506 .await
507 .map_err(|_| Error::Disconnected)?;
508
509 self.decode_packet(&packet.get_data()[1..])
510 }
511
512 fn decode_packet(&self, data: &[u8]) -> Result<LogData> {
513 let mut timestamp = data[0..=2].to_vec();
514 timestamp.insert(0, 0);
515 let timestamp = u32::from_le_bytes(timestamp.try_into().unwrap());
517
518 let mut index = 3;
519 let mut log_data = HashMap::new();
520 for (name, value_type) in &self.log_block.variables {
521 let byte_length = value_type.byte_length();
522 log_data.insert(
523 name.clone(),
524 Value::from_le_bytes(&data[index..(index + byte_length)], *value_type)?,
525 );
526 index += byte_length;
527 }
528
529 Ok(LogData {
530 timestamp,
531 data: log_data,
532 })
533 }
534}
535
536#[derive(Debug)]
544pub struct LogData {
545 pub timestamp: u32,
547 pub data: HashMap<String, Value>,
549}
550
551pub struct LogPeriod(u8);
561
562impl LogPeriod {
563 pub fn from_millis(millis: u64) -> Result<Self> {
568 Duration::from_millis(millis).try_into()
569 }
570}
571
572impl TryFrom<Duration> for LogPeriod {
573 type Error = Error;
574
575 fn try_from(value: Duration) -> Result<Self> {
576 let period_ms = value.as_millis();
577 let period_arg = period_ms / 10;
578 if period_arg == 0 || period_arg > 255 {
579 return Err(Error::LogError(
580 "Invalid log period, should be between 10ms and 2550ms".to_owned(),
581 ));
582 }
583 Ok(LogPeriod(period_arg as u8))
584 }
585}