crazyflie_lib/subsystems/
console.rs

1//! # Console subsystem
2//!
3//! The Crazyflie has a test console that is used to communicate various information
4//! and debug message to the ground.
5//!
6//! The log is available either as a data stream that produces the same data as
7//! returned by the crazyflie (ie. can be incomplete lines):
8//! ``` no_run
9//! use futures::StreamExt;
10//!
11//! # async fn as_stream(crazyflie: &crazyflie_lib::Crazyflie) {
12//! let mut console_stream = crazyflie.console.stream().await;
13//!
14//! while let Some(data) = console_stream.next().await {
15//!     println!("{}", data);
16//! }
17//! // If the Crazyflie send "Hello .................................................... World!"
18//! // The println would show:
19//! // Hello ........................
20//! // ............................ W
21//! // orld!
22//! # }
23//! ```
24//!
25//! Or a line streams that assemble and returns full lines:
26//! ``` no_run
27//! use futures::StreamExt;
28//!
29//! # async fn as_stream(crazyflie: &crazyflie_lib::Crazyflie) {
30//! let mut line_stream = crazyflie.console.line_stream().await;
31//!
32//! while let Some(data) = line_stream.next().await {
33//!     println!("{}", data);
34//! }
35//! // If the Crazyflie send "Hello .................................................... World!"
36//! // The println would show:
37//! // Hello .................................................... World!
38//! # }
39//! ```
40//!
41//! The data received from the Crazyflie is decoded as
42//! [UTF8 lossy](String::from_utf8_lossy()). before being sent as [String] to the
43//! streams.
44//!
45//! ## History or no History
46//!
47//! By default, the [Console::stream()] and [Console::line_stream()] functions
48//! will return a stream that will produce the full console history since connection
49//! and then produce the console as it arrives from the Crazyflie. This is needed
50//! if the startup message needs to be displayed but can be problematic for more
51//! advanced use-case to observe the console some time after the connection only.
52//!
53//! There exist functions for both data stream and line stream to get the stream
54//! without getting the history first.
55
56use std::sync::Arc;
57
58use crate::Result;
59use async_broadcast::{broadcast, Receiver};
60use tokio::task::JoinHandle;
61use crazyflie_link::Packet;
62use flume as channel;
63use futures::{lock::Mutex, Stream, StreamExt};
64
65/// # Access to the console subsystem
66///
67/// See the [console module documentation](crate::subsystems::console) for more context and information.
68pub struct Console {
69    stream_broadcast_receiver: Receiver<String>,
70    console_buffer: Arc<Mutex<String>>,
71    line_broadcast_receiver: Receiver<String>,
72    console_lines: Arc<Mutex<Vec<String>>>,
73    _console_task: JoinHandle<()>,
74}
75
76impl Console {
77    pub(crate) async fn new(
78        downlink: channel::Receiver<Packet>,
79    ) -> Result<Self> {
80        let (mut stream_broadcast, stream_broadcast_receiver) = broadcast(1000);
81        let console_buffer: Arc<Mutex<String>> = Default::default();
82
83        let (mut line_broadcast, line_broadcast_receiver) = broadcast(1000);
84
85        // Enable overflow mode so old messages are dropped instead of blocking
86        stream_broadcast.set_overflow(true);
87        line_broadcast.set_overflow(true);
88        let console_lines: Arc<Mutex<Vec<String>>> = Default::default();
89
90        let buffer = console_buffer.clone();
91        let lines = console_lines.clone();
92
93        let _console_task = tokio::spawn(async move {
94            let mut line_buffer = String::new();
95            while let Ok(pk) = downlink.recv_async().await {
96                // Decode text from the console
97                let text = String::from_utf8_lossy(pk.get_data());
98
99                buffer.lock().await.push_str(&text);
100
101                // Push the text to all active streams, we ignore any error there
102                let _ = stream_broadcast.broadcast(text.clone().into_owned()).await;
103
104                // Extract lines and push them to all active line streams
105                line_buffer.push_str(&text);
106                if let Some((line, rest)) = line_buffer.clone().split_once("\n") {
107                    line_buffer = rest.to_owned();
108                    lines.lock().await.push(line.to_owned().clone());
109                    let _ = line_broadcast.broadcast(line.to_owned()).await;
110                }
111            }
112        });
113
114        Ok(Self {
115            stream_broadcast_receiver,
116            console_buffer,
117            line_broadcast_receiver,
118            console_lines,
119            _console_task,
120        })
121    }
122
123    /// Return a [Stream] that generates a [String] each time a console packet
124    /// is received from the Crazyflie.
125    ///
126    /// With the current Crazyflie algorithms, packets are up to 30 character
127    /// long and a new line triggers the send of a packet. Though this is not a
128    /// guarantee and nothing should be expected from this Stream other that
129    /// getting the console data when they are received.
130    ///
131    /// The lib keeps track of the console history since connection, the stream
132    /// will first produce the full history since connection in one String and then
133    /// will start returning Strings as they come from the Crazyflie.
134    pub async fn stream(&self) -> impl Stream<Item = String> + use<> {
135        let buffer = self.console_buffer.lock().await;
136        let history_buffer = buffer.clone();
137        let history_stream = futures::stream::once(async { history_buffer }).boxed();
138
139        history_stream.chain(self.stream_broadcast_receiver.clone())
140    }
141
142    /// Version of [Console::stream()] but that does not produce the history
143    /// first.
144    pub async fn stream_no_history(&self) -> impl Stream<Item = String> + use<> {
145        self.stream_broadcast_receiver.clone()
146    }
147
148    /// Return a [Stream] that generate a [String] each time a line is received
149    /// from the Crazyflie.
150    ///
151    /// This is a useful function if you want to receive the console line by line.
152    /// (for example to print it in a terminal or a file)
153    ///
154    /// Similar to [Console::stream()], this stream will generate first the
155    /// console history since connection. The history is generated by the Stream
156    /// line-by-line.
157    pub async fn line_stream(&self) -> impl Stream<Item = String> + use<> {
158        let lines = self.console_lines.lock().await;
159        let history_lines = lines.clone();
160        let history_stream = futures::stream::iter(history_lines.into_iter()).boxed();
161
162        history_stream.chain(self.line_broadcast_receiver.clone())
163    }
164
165    /// Version of [Console::line_stream()] but that does not produce the history
166    /// first.
167    pub async fn line_stream_no_history(&self) -> impl Stream<Item = String> + use<> {
168        self.line_broadcast_receiver.clone()
169    }
170}