use crate::connection::{ConnectionTrait, RadioLinkStatistics};
use crate::error::{Error, Result};
use crate::{ConnectionStatus, LinkContext, Packet};
use async_trait::async_trait;
use futures_channel::oneshot;
use futures_util::lock::Mutex;
use log::{debug, error, info, warn};
use rusb::{DeviceHandle, DeviceList, GlobalContext};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;
use url::Url;
pub struct CrazyflieUSBConnection {
status: Arc<Mutex<ConnectionStatus>>,
uplink: flume::Sender<Vec<u8>>,
downlink: flume::Receiver<Vec<u8>>,
disconnect_channel: flume::Receiver<()>,
disconnect: Arc<AtomicBool>,
}
impl CrazyflieUSBConnection {
pub async fn open(
_link_context: &LinkContext,
uri: &str,
) -> Result<Option<CrazyflieUSBConnection>> {
let serial = Self::parse_uri(uri)?;
let (_device_desc, handle, handle_ctrl, serial) = tokio::task::spawn_blocking(move || {
for device in DeviceList::new()?.iter() {
let device_desc = match device.device_descriptor() {
Ok(d) => d,
Err(_) => continue,
};
if device_desc.vendor_id() == 0x0483 && device_desc.product_id() == 0x5740 {
let timeout = Duration::from_secs(1);
let handle = match device.open() {
Ok(d) => d,
Err(_) => continue,
};
handle.claim_interface(0)?;
let handle_ctrl = match device.open() {
Ok(d) => d,
Err(_) => continue,
};
let language = match handle.read_languages(timeout).unwrap_or_default().first()
{
Some(l) => *l,
None => continue,
};
let detected_serial =
handle.read_serial_number_string(language, &device_desc, timeout)?;
if detected_serial == serial {
return Ok((device_desc, handle, handle_ctrl, detected_serial));
}
}
}
Err(Error::InvalidUri)
})
.await
.unwrap()?;
let connection = CrazyflieUSBConnection::new(handle, handle_ctrl, serial).await?;
Ok(Some(connection))
}
async fn new(
usb_handle: DeviceHandle<GlobalContext>,
usb_handle_ctrl: DeviceHandle<GlobalContext>,
#[allow(unused_variables)] serial: String,
) -> Result<CrazyflieUSBConnection> {
let status = Arc::new(Mutex::new(ConnectionStatus::Connecting));
let (disconnect_channel_tx, disconnect_channel_rx) = flume::bounded(0);
let disconnect = Arc::new(AtomicBool::new(false));
let (uplink_send, uplink_recv) = flume::unbounded::<Vec<u8>>();
let (downlink_send, downlink_recv) = flume::unbounded::<Vec<u8>>();
let (connection_initialized_send, connection_initialized) = oneshot::channel();
let conn_disconnect = disconnect.clone();
let conn_status = status.clone();
#[cfg(feature = "packet_capture")]
let capture_serial = serial;
tokio::task::spawn_blocking(move || {
#[cfg(feature = "packet_capture")]
let capture_serial = capture_serial;
match usb_handle_ctrl.write_control(64, 0x01, 0x01, 0x01, &[], Duration::from_secs(1)) {
Ok(_) => debug!("Switched to USB communication mode"),
Err(e) => {
error!("Error switching to USB communication mode: {:?}", e);
return;
}
}
let inner_status = conn_status.clone();
tokio::runtime::Handle::current().block_on(async move {
let mut status = inner_status.lock().await;
*status = ConnectionStatus::Connected;
});
connection_initialized_send.send(()).unwrap();
#[cfg(feature = "packet_capture")]
let capture_serial = capture_serial;
let usb_handle = Arc::new(usb_handle);
let usb_handle_r = usb_handle.clone();
let usb_handle_w = usb_handle;
let conn_disconnect_r = conn_disconnect.clone();
let conn_disconnect_w = conn_disconnect.clone();
#[cfg(feature = "packet_capture")]
let capture_serial_r = capture_serial.clone();
#[cfg(feature = "packet_capture")]
let capture_serial_w = capture_serial;
struct DisconnectGuard(Arc<AtomicBool>);
impl Drop for DisconnectGuard {
fn drop(&mut self) {
self.0.store(true, Relaxed);
}
}
let reader = std::thread::spawn::<_, Result<()>>(move || {
info!("Reader thread started");
let _guard = DisconnectGuard(conn_disconnect_r.clone());
loop {
if conn_disconnect_r.load(Relaxed) {
return Ok(());
}
let mut buf = vec![0; 64];
match usb_handle_r.read_bulk(0x81, &mut buf, Duration::from_millis(20)) {
Ok(n) => {
if n > 0 {
let packet = buf[0..n].to_vec();
#[cfg(feature = "packet_capture")]
crate::capture::send_packet(
crate::capture::LINK_TYPE_USB,
crate::capture::DIRECTION_RX,
&[],
0,
&capture_serial_r,
&packet,
);
if downlink_send.send(packet).is_err() {
return Ok(());
}
}
}
Err(rusb::Error::Timeout) => {}
Err(e) => {
warn!("Reader thread error: {:?}", e);
return Err(e.into());
}
}
}
});
let writer = std::thread::spawn::<_, Result<()>>(move || {
info!("Writer thread started");
let _guard = DisconnectGuard(conn_disconnect_w.clone());
loop {
if conn_disconnect_w.load(Relaxed) {
return Ok(());
}
let packet = match uplink_recv.recv_timeout(Duration::from_millis(20)) {
Ok(p) => p,
Err(flume::RecvTimeoutError::Timeout) => continue,
Err(flume::RecvTimeoutError::Disconnected) => return Ok(()),
};
#[cfg(feature = "packet_capture")]
crate::capture::send_packet(
crate::capture::LINK_TYPE_USB,
crate::capture::DIRECTION_TX,
&[],
0,
&capture_serial_w,
&packet,
);
loop {
match usb_handle_w.write_bulk(0x01, &packet, Duration::from_millis(500)) {
Ok(_) => break,
Err(rusb::Error::Timeout) => {
if conn_disconnect_w.load(Relaxed) {
return Ok(());
}
}
Err(e) => {
warn!("Writer thread error: {:?}", e);
return Err(e.into());
}
}
}
}
});
fn panic_payload(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic payload".to_string()
}
}
let reader_result = reader.join();
let writer_result = writer.join();
let disconnect_message = match (reader_result, writer_result) {
(Ok(Ok(())), Ok(Ok(()))) => "Connection closed".to_string(),
(Ok(Err(e)), _) => {
error!("Reader thread error: {:?}", e);
format!("USB error (reader): {:?}", e)
}
(_, Ok(Err(e))) => {
error!("Writer thread error: {:?}", e);
format!("USB error (writer): {:?}", e)
}
(Err(p), _) => {
let msg = panic_payload(p);
error!("Reader thread panicked: {}", msg);
format!("Reader thread panicked: {}", msg)
}
(_, Err(p)) => {
let msg = panic_payload(p);
error!("Writer thread panicked: {}", msg);
format!("Writer thread panicked: {}", msg)
}
};
let inner_status = conn_status.clone();
tokio::runtime::Handle::current().block_on(async move {
let mut status = inner_status.lock().await;
*status = ConnectionStatus::Disconnected(disconnect_message)
});
let _ = disconnect_channel_tx.send(());
match usb_handle_ctrl.write_control(64, 0x01, 0x01, 0x00, &[], Duration::from_secs(1)) {
Ok(_) => debug!("Switched back to CR mode in CF"),
Err(e) => {
error!("Error switching back to CR mode in CF: {:?}", e);
}
}
});
connection_initialized.await.unwrap();
Ok(CrazyflieUSBConnection {
status,
disconnect_channel: disconnect_channel_rx,
uplink: uplink_send,
downlink: downlink_recv,
disconnect,
})
}
pub(crate) async fn scan() -> Result<Vec<String>> {
let found = tokio::task::spawn_blocking(|| {
let mut found = Vec::new();
for device in DeviceList::new()?.iter() {
let device_desc = match device.device_descriptor() {
Ok(d) => d,
Err(_) => continue,
};
if device_desc.vendor_id() == 0x0483 && device_desc.product_id() == 0x5740 {
let timeout = Duration::from_secs(1);
let handle = match device.open() {
Ok(d) => d,
Err(_) => continue,
};
let language = match handle.read_languages(timeout).unwrap_or_default().first()
{
Some(l) => *l,
None => continue,
};
let serial =
handle.read_serial_number_string(language, &device_desc, timeout)?;
found.push(format!("usb://{}", serial));
}
}
Result::<_>::Ok(found)
})
.await
.unwrap()?;
Ok(found)
}
pub(crate) async fn scan_selected(uris: Vec<&str>) -> Result<Vec<String>> {
let found = Self::scan().await?;
Ok(found
.into_iter()
.filter(|uri| uris.iter().any(|u| uri.contains(u)))
.collect())
}
fn parse_uri(uri: &str) -> Result<String> {
let uri = Url::parse(uri)?;
if uri.scheme() != "usb" {
return Err(Error::InvalidUriScheme);
}
let serial = uri.domain().ok_or(Error::InvalidUri)?;
if uri.path_segments().is_some() {
return Err(Error::InvalidUri);
}
Ok(serial.to_owned())
}
}
#[async_trait]
impl ConnectionTrait for CrazyflieUSBConnection {
async fn wait_close(&self) -> String {
let _ = self.disconnect_channel.recv_async().await;
if let ConnectionStatus::Disconnected(reason) = self.status().await {
reason
} else {
"Still connected!".to_owned()
}
}
async fn close(&self) {
self.disconnect.store(true, Relaxed);
let _ = self.disconnect_channel.recv_async().await;
}
async fn status(&self) -> ConnectionStatus {
self.status.lock().await.clone()
}
async fn wait_disconnect(&self) {
let _ = self.disconnect_channel.recv_async().await;
}
async fn send_packet(&self, packet: Packet) -> Result<()> {
self.uplink.send_async(packet.into()).await?;
Ok(())
}
async fn recv_packet(&self) -> Result<Packet> {
let packet = self.downlink.recv_async().await?;
Ok(packet.into())
}
async fn link_statistics(&self) -> Option<RadioLinkStatistics> {
None
}
}
impl Drop for CrazyflieUSBConnection {
fn drop(&mut self) {
self.disconnect.store(true, Relaxed);
}
}