crazyflie_lib/
crazyflie.rs1use crate::subsystems::commander::Commander;
2use crate::subsystems::high_level_commander::HighLevelCommander;
3use crate::subsystems::console::Console;
4use crate::subsystems::localization::Localization;
5use crate::subsystems::log::Log;
6use crate::subsystems::memory::Memory;
7use crate::subsystems::param::Param;
8
9use crate::crtp_utils::{CrtpDispatch, TocCache};
10use crate::subsystems::platform::Platform;
11use crate::{Error, Result};
12use crate::{MIN_SUPPORTED_PROTOCOL_VERSION, MAX_SUPPORTED_PROTOCOL_VERSION};
13use flume as channel;
14use futures::lock::Mutex;
15use tokio::task::JoinHandle;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering::Relaxed;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub(crate) const CONSOLE_PORT: u8 = 0;
23pub(crate) const PARAM_PORT: u8 = 2;
24pub(crate) const COMMANDER_PORT: u8 = 3;
25pub(crate) const MEMORY_PORT: u8 = 4;
26pub(crate) const LOG_PORT: u8 = 5;
27pub(crate) const LOCALIZATION_PORT: u8 = 6;
28pub(crate) const GENERIC_SETPOINT_PORT: u8 = 7;
29pub(crate) const HL_COMMANDER_PORT: u8 = 8;
30pub(crate) const PLATFORM_PORT: u8 = 13;
31pub(crate) const _LINK_PORT: u8 = 15;
32
33pub struct Crazyflie {
41 pub log: Log,
43 pub param: Param,
45 pub memory: Memory,
47 pub commander: Commander,
49 pub high_level_commander: HighLevelCommander,
51 pub console: Console,
53 pub localization: Localization,
55 pub platform: Platform,
57 uplink_task: Mutex<Option<JoinHandle<()>>>,
58 dispatch_task: Mutex<Option<JoinHandle<()>>>,
59 disconnect: Arc<AtomicBool>,
60 link: Arc<crazyflie_link::Connection>,
61}
62
63impl Crazyflie {
64 pub async fn connect_from_uri<T>(
73 link_context: &crazyflie_link::LinkContext,
74 uri: &str,
75 toc_cache: T,
76 ) -> Result<Self>
77 where
78 T: TocCache + Send + Sync + 'static,
79 {
80 let link = link_context.open_link(uri).await?;
81
82 Self::connect_from_link(link, toc_cache).await
83 }
84
85 pub async fn connect_from_link<T>(
94 link: crazyflie_link::Connection,
95 toc_cache: T,
96 ) -> Result<Self>
97 where
98 T: TocCache + Send + Sync + 'static,
99 {
100 let disconnect = Arc::new(AtomicBool::new(false));
101
102 let link = Arc::new(link);
104 let mut dispatcher = CrtpDispatch::new(link.clone(), disconnect.clone());
105
106 let disconnect_uplink = disconnect.clone();
108 let (uplink, rx) = channel::unbounded();
109 let link_uplink = link.clone();
110 let uplink_task = tokio::spawn(async move {
111 while !disconnect_uplink.load(Relaxed) {
112 match tokio::time::timeout(
113 Duration::from_millis(100), rx.recv_async()
114 ).await
115 {
116 Ok(Ok(pk)) => {
117 if link_uplink.send_packet(pk).await.is_err() {
118 return;
119 }
120 }
121 Err(_) => (),
122 Ok(Err(flume::RecvError::Disconnected)) => return,
123 }
124 }
125 });
126
127 let platform_downlink = dispatcher.get_port_receiver(PLATFORM_PORT).unwrap();
129 let log_downlink = dispatcher.get_port_receiver(LOG_PORT).unwrap();
130 let param_downlink = dispatcher.get_port_receiver(PARAM_PORT).unwrap();
131 let console_downlink = dispatcher.get_port_receiver(CONSOLE_PORT).unwrap();
132 let localization_downlink = dispatcher.get_port_receiver(LOCALIZATION_PORT).unwrap();
133 let memory_downlink = dispatcher.get_port_receiver(MEMORY_PORT).unwrap();
134
135 let dispatch_task = dispatcher.run().await?;
137
138 let platform = Platform::new(uplink.clone(), platform_downlink);
140
141 let protocol_version = platform.protocol_version().await?;
142
143 if !(MIN_SUPPORTED_PROTOCOL_VERSION..=MAX_SUPPORTED_PROTOCOL_VERSION)
144 .contains(&protocol_version)
145 {
146 return Err(Error::ProtocolVersionNotSupported {
147 min_supported: MIN_SUPPORTED_PROTOCOL_VERSION,
148 max_supported: MAX_SUPPORTED_PROTOCOL_VERSION,
149 found: protocol_version,
150 });
151 }
152
153 let log_future = Log::new(log_downlink, uplink.clone(), toc_cache.clone());
157 let param_future = Param::new(param_downlink, uplink.clone(), toc_cache.clone());
158 let memory_future = Memory::new(memory_downlink, uplink.clone());
159
160 let commander = Commander::new(uplink.clone());
161 let high_level_commander = HighLevelCommander::new(uplink.clone());
162 let console = Console::new(console_downlink).await?;
163 let localization = Localization::new(uplink.clone(), localization_downlink);
164 let (log, param, memory) = futures::join!(log_future, param_future, memory_future);
166 Ok(Crazyflie {
167 log: log?,
168 param: param?,
169 memory: memory?,
170 commander,
171 high_level_commander,
172 console,
173 localization,
174 platform,
175 uplink_task: Mutex::new(Some(uplink_task)),
176 dispatch_task: Mutex::new(Some(dispatch_task)),
177 disconnect,
178 link,
179 })
180 }
181
182 pub async fn disconnect(&self) {
190 self.disconnect.store(true, Relaxed);
192
193 if let Some(uplink_task) = self.uplink_task.lock().await.take() {
195 uplink_task.await.expect("Uplink task failed");
196 }
197 if let Some(dispatch_task) = self.dispatch_task.lock().await.take() {
198 dispatch_task.await.expect("Dispatcher task failed");
199 }
200
201 self.link.close().await;
202 }
203
204 pub async fn wait_disconnect(&self) -> String {
212 let reason = self.link.wait_close().await;
213
214 self.disconnect().await;
215
216 reason
217 }
218}
219
220impl Drop for Crazyflie {
221 fn drop(&mut self) {
222 self.disconnect.store(true, Relaxed);
223 }
224}