crazyflie_lib/subsystems/
log.rs1use crate::crtp_utils::{TocCache, WaitForPacket};
40use crate::{Error, Result, Value, ValueType};
41use crazyflie_link::Packet;
42use flume as channel;
43use futures::lock::Mutex;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use std::convert::TryInto;
47use std::sync::Weak;
48use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration};
49
50use crate::crazyflie::LOG_PORT;
51
52#[derive(Debug)]
58pub struct Log {
59 uplink: channel::Sender<Packet>,
60 control_downlink: Arc<Mutex<channel::Receiver<Packet>>>,
61 toc: Arc<BTreeMap<String, (u16, LogItemInfo)>>,
62 next_block_id: Mutex<u8>,
63 data_channels: Arc<Mutex<BTreeMap<u8, flume::Sender<Packet>>>>,
64 active_blocks: Mutex<BTreeMap<u8, Weak<()>>>,
65}
66
67fn not_found(name: &str) -> Error {
68 Error::ParamError(format!("Log variable {} not found", name))
69}
70
71const CONTROL_CHANNEL: u8 = 1;
72
73const DELETE_BLOCK: u8 = 2;
74const START_BLOCK: u8 = 3;
75const STOP_BLOCK: u8 = 4;
76const RESET: u8 = 5;
77const CREATE_BLOCK_V2: u8 = 6;
78const APPEND_BLOCK_V2: u8 = 7;
79
80impl Log {
81 pub(crate) async fn new<T>(
82 downlink: channel::Receiver<Packet>,
83 uplink: channel::Sender<Packet>,
84 toc_cache: T,
85 ) -> Result<Self>
86 where
87 T: TocCache,
88 {
89 let (toc_downlink, control_downlink, data_downlink, _) =
90 crate::crtp_utils::crtp_channel_dispatcher(downlink);
91
92 let toc = crate::crtp_utils::fetch_toc(LOG_PORT, uplink.clone(), toc_downlink, toc_cache).await?;
93 let toc = Arc::new(toc);
94
95 let control_downlink = Arc::new(Mutex::new(control_downlink));
96
97 let next_block_id = Mutex::new(0);
98
99 let data_channels = Arc::new(Mutex::new(BTreeMap::new()));
100
101 let active_blocks = Mutex::new(BTreeMap::new());
102
103 let log = Self {
104 uplink,
105 control_downlink,
106 toc,
107 next_block_id,
108 data_channels,
109 active_blocks,
110 };
111 log.reset().await?;
112 log.spawn_data_dispatcher(data_downlink).await;
113
114 Ok(log)
115 }
116
117 async fn reset(&self) -> Result<()> {
118 let downlink = self.control_downlink.lock().await;
119
120 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![RESET]);
121 self.uplink
122 .send_async(pk)
123 .await
124 .map_err(|_| Error::Disconnected)?;
125
126 let pk = downlink
127 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[RESET])
128 .await?;
129 assert_eq!(pk.get_data()[2], 0);
130
131 Ok(())
132 }
133
134 async fn spawn_data_dispatcher(&self, data_downlink: flume::Receiver<Packet>) {
135 let data_channels = self.data_channels.clone();
136 tokio::spawn(async move {
137 while let Ok(packet) = data_downlink.recv_async().await {
138 if packet.get_data().len() > 1 {
139 let block_id = packet.get_data()[0];
140 let data_channels = data_channels.lock().await;
141 if data_channels.contains_key(&block_id)
142 && data_channels
143 .get(&block_id)
144 .unwrap()
145 .send_async(packet)
146 .await
147 .is_err()
148 {
149 break;
150 }
151 }
152 }
153 });
154 }
155
156 pub fn names(&self) -> Vec<String> {
161 self.toc.keys().cloned().collect()
162 }
163
164 pub fn get_type(&self, name: &str) -> Result<ValueType> {
166 Ok(self
167 .toc
168 .get(name)
169 .ok_or_else(|| not_found(name))?
170 .1
171 .item_type)
172 }
173
174 async fn generate_next_block_id(&self) -> Result<u8> {
175 let mut next_block_id = self.next_block_id.lock().await;
176 if *next_block_id == u8::MAX {
177 return Err(Error::LogError("No more block ID available!".into()));
178 }
179 let id = *next_block_id;
180 *next_block_id += 1;
181 Ok(id)
182 }
183
184 async fn cleanup_blocks(&self) -> Result<()> {
186 let mut active_blocks = self.active_blocks.lock().await;
187
188 for (block_id, canary) in active_blocks.clone().into_iter() {
189 if canary.upgrade() == None {
190 let control_downlink = self.control_downlink.lock().await;
192
193 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![DELETE_BLOCK, block_id]);
194 self.uplink
195 .send_async(pk)
196 .await
197 .map_err(|_| Error::Disconnected)?;
198
199 let pk = control_downlink
200 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[DELETE_BLOCK, block_id])
201 .await?;
202 let error = pk.get_data()[2];
203
204 if error != 0 {
205 return Err(Error::LogError(format!(
206 "Protocol error when deleting block: {}",
207 error
208 )));
209 }
210
211 active_blocks.remove_entry(&block_id);
212 }
213 }
214
215 Ok(())
216 }
217
218 pub async fn create_block(&self) -> Result<LogBlock> {
235 self.cleanup_blocks().await?;
236
237 let block_id = self.generate_next_block_id().await?;
238 let control_downlink = self.control_downlink.lock().await;
239
240 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, vec![CREATE_BLOCK_V2, block_id]);
241 self.uplink
242 .send_async(pk)
243 .await
244 .map_err(|_| Error::Disconnected)?;
245
246 let pk = control_downlink
247 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[CREATE_BLOCK_V2, block_id])
248 .await?;
249 let error = pk.get_data()[2];
250
251 if error != 0 {
252 return Err(Error::LogError(format!(
253 "Protocol error when creating block: {}",
254 error
255 )));
256 }
257
258 let (tx, rx) = flume::unbounded();
260 self.data_channels.lock().await.insert(block_id, tx);
261
262 let canary = Arc::new(());
263 self.active_blocks
264 .lock()
265 .await
266 .insert(block_id, Arc::downgrade(&canary));
267
268 Ok(LogBlock {
269 _canary: canary,
270 toc: Arc::downgrade(&self.toc),
271 uplink: self.uplink.clone(),
272 control_downlink: Arc::downgrade(&self.control_downlink),
273 block_id,
274 variables: Vec::new(),
275 data_channel: rx,
276 })
277 }
278}
279
280#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
281struct LogItemInfo {
282 item_type: ValueType,
283}
284
285impl TryFrom<u8> for LogItemInfo {
286 type Error = Error;
287
288 fn try_from(log_type: u8) -> Result<Self> {
289 let item_type = match log_type {
290 1 => ValueType::U8,
291 2 => ValueType::U16,
292 3 => ValueType::U32,
293 4 => ValueType::I8,
294 5 => ValueType::I16,
295 6 => ValueType::I32,
296 7 => ValueType::F32,
297 8 => ValueType::F16,
298 _ => {
299 return Err(Error::ProtocolError(format!(
300 "Invalid log item type: {}",
301 log_type
302 )))
303 }
304 };
305
306 Ok(LogItemInfo { item_type })
307 }
308}
309
310impl TryInto<u8> for LogItemInfo {
311 type Error = Error;
312
313 fn try_into(self) -> Result<u8> {
314 let value = match self.item_type {
315 ValueType::U8 => 1,
316 ValueType::U16 => 2,
317 ValueType::U32 => 3,
318 ValueType::I8 => 4,
319 ValueType::I16 => 5,
320 ValueType::I32 => 6,
321 ValueType::F32 => 7,
322 ValueType::F16 => 8,
323 _ => {
324 return Err(Error::LogError(format!(
325 "Value type {:?} not handled by log",
326 self.item_type
327 )))
328 }
329 };
330 Ok(value)
331 }
332}
333
334pub struct LogBlock {
343 _canary: Arc<()>,
344 toc: Weak<BTreeMap<String, (u16, LogItemInfo)>>,
345 uplink: channel::Sender<Packet>,
346 control_downlink: Weak<Mutex<channel::Receiver<Packet>>>,
347 block_id: u8,
348 variables: Vec<(String, ValueType)>,
349 data_channel: flume::Receiver<Packet>,
350}
351
352impl LogBlock {
353 pub async fn start(self, period: LogPeriod) -> Result<LogStream> {
363 let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
364 let control_uplink = control_uplink.lock().await;
365
366 let pk = Packet::new(
367 LOG_PORT,
368 CONTROL_CHANNEL,
369 vec![START_BLOCK, self.block_id, period.0],
370 );
371 self.uplink
372 .send_async(pk)
373 .await
374 .map_err(|_| Error::Disconnected)?;
375
376 let answer = control_uplink
377 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[START_BLOCK, self.block_id])
378 .await?;
379 if answer.get_data().len() != 3 {
380 return Err(Error::ProtocolError(
381 "Malformed Log control packet".to_owned(),
382 ));
383 }
384 let error_code = answer.get_data()[2];
385 if error_code != 0 {
386 return Err(Error::LogError(format!(
387 "Error starting lock: {}",
388 error_code
389 )));
390 }
391
392 Ok(LogStream { log_block: self })
393 }
394
395 pub async fn add_variable(&mut self, name: &str) -> Result<()> {
403 let toc = self.toc.upgrade().ok_or(Error::Disconnected)?;
404 let (variable_id, info) = toc.get(name).ok_or(Error::VariableNotFound)?;
405
406 let control_uplink = self.control_downlink.upgrade().ok_or(Error::Disconnected)?;
408 let control_uplink = control_uplink.lock().await;
409
410 let mut payload = vec![APPEND_BLOCK_V2, self.block_id, (*info).try_into()?];
411 payload.extend_from_slice(&variable_id.to_le_bytes());
412 let pk = Packet::new(LOG_PORT, CONTROL_CHANNEL, payload);
413 self.uplink
414 .send_async(pk)
415 .await
416 .map_err(|_| Error::Disconnected)?;
417
418 let answer = control_uplink
419 .wait_packet(LOG_PORT, CONTROL_CHANNEL, &[APPEND_BLOCK_V2, self.block_id])
420 .await?;
421 if answer.get_data().len() != 3 {
422 return Err(Error::ProtocolError(
423 "Malformed Log control packet".to_owned(),
424 ));
425 }
426 let error_code = answer.get_data()[2];
427 if error_code != 0 {
428 return Err(Error::LogError(format!(
429 "Error appending variable to block: {}",
430 error_code
431 )));
432 }
433
434 self.variables.push((name.to_owned(), info.item_type));
436
437 Ok(())
438 }
439}
440
441pub struct LogStream {
451 log_block: LogBlock,
452}
453
454impl LogStream {
455 pub async fn stop(self) -> Result<LogBlock> {
463 let control_uplink = self
464 .log_block
465 .control_downlink
466 .upgrade()
467 .ok_or(Error::Disconnected)?;
468 let control_uplink = control_uplink.lock().await;
469
470 let pk = Packet::new(
471 LOG_PORT,
472 CONTROL_CHANNEL,
473 vec![STOP_BLOCK, self.log_block.block_id],
474 );
475 self.log_block
476 .uplink
477 .send_async(pk)
478 .await
479 .map_err(|_| Error::Disconnected)?;
480
481 let answer = control_uplink
482 .wait_packet(
483 LOG_PORT,
484 CONTROL_CHANNEL,
485 &[STOP_BLOCK, self.log_block.block_id],
486 )
487 .await?;
488 if answer.get_data().len() != 3 {
489 return Err(Error::ProtocolError(
490 "Malformed Log control packet".to_owned(),
491 ));
492 }
493 let error_code = answer.get_data()[2];
494 if error_code != 0 {
495 return Err(Error::LogError(format!(
496 "Error starting lock: {}",
497 error_code
498 )));
499 }
500
501 Ok(self.log_block)
502 }
503
504 pub async fn next(&self) -> Result<LogData> {
511 let packet = self
512 .log_block
513 .data_channel
514 .recv_async()
515 .await
516 .map_err(|_| Error::Disconnected)?;
517
518 self.decode_packet(&packet.get_data()[1..])
519 }
520
521 fn decode_packet(&self, data: &[u8]) -> Result<LogData> {
522 let mut timestamp = data[0..=2].to_vec();
523 timestamp.insert(0, 0);
524 let timestamp = u32::from_le_bytes(timestamp.try_into().unwrap());
526
527 let mut index = 3;
528 let mut log_data = HashMap::new();
529 for (name, value_type) in &self.log_block.variables {
530 let byte_length = value_type.byte_length();
531 log_data.insert(
532 name.clone(),
533 Value::from_le_bytes(&data[index..(index + byte_length)], *value_type)?,
534 );
535 index += byte_length;
536 }
537
538 Ok(LogData {
539 timestamp,
540 data: log_data,
541 })
542 }
543}
544
545#[derive(Debug)]
553pub struct LogData {
554 pub timestamp: u32,
556 pub data: HashMap<String, Value>,
558}
559
560pub struct LogPeriod(u8);
570
571impl LogPeriod {
572 pub fn from_millis(millis: u64) -> Result<Self> {
577 Duration::from_millis(millis).try_into()
578 }
579}
580
581impl TryFrom<Duration> for LogPeriod {
582 type Error = Error;
583
584 fn try_from(value: Duration) -> Result<Self> {
585 let period_ms = value.as_millis();
586 let period_arg = period_ms / 10;
587 if period_arg == 0 || period_arg > 255 {
588 return Err(Error::LogError(
589 "Invalid log period, should be between 10ms and 2550ms".to_owned(),
590 ));
591 }
592 Ok(LogPeriod(period_arg as u8))
593 }
594}