use super::CmsisDapDevice;
use crate::{
probe::{DebugProbeInfo, DebugProbeType, ProbeCreationError},
DebugProbeSelector,
};
use hidapi::HidApi;
use rusb::{constants::LIBUSB_CLASS_HID, Device, DeviceDescriptor, UsbContext};
use std::time::Duration;
#[tracing::instrument(skip_all)]
pub fn list_cmsisdap_devices() -> Vec<DebugProbeInfo> {
tracing::debug!("Searching for CMSIS-DAP probes using libusb");
let mut probes = match rusb::Context::new().and_then(|ctx| ctx.devices()) {
Ok(devices) => devices
.iter()
.filter_map(|device| get_cmsisdap_info(&device))
.collect(),
Err(_) => vec![],
};
tracing::debug!(
"Found {} CMSIS-DAP probes using libusb, searching HID",
probes.len()
);
if let Ok(api) = hidapi::HidApi::new() {
for device in api.device_list() {
if let Some(info) = get_cmsisdap_hid_info(device) {
if !probes
.iter()
.any(|p| p.vendor_id == info.vendor_id && p.product_id == info.product_id)
{
tracing::trace!("Adding new HID-only probe {:?}", info);
probes.push(info)
} else {
tracing::trace!("Ignoring duplicate {:?}", info);
}
}
}
}
tracing::debug!("Found {} CMSIS-DAP probes total", probes.len());
probes
}
fn get_cmsisdap_info(device: &Device<rusb::Context>) -> Option<DebugProbeInfo> {
let timeout = Duration::from_millis(100);
let d_desc = device.device_descriptor().ok()?;
let handle = device.open().ok()?;
let language = handle.read_languages(timeout).ok()?.get(0).cloned()?;
let prod_str = handle
.read_product_string(language, &d_desc, timeout)
.ok()?;
let sn_str = handle
.read_serial_number_string(language, &d_desc, timeout)
.ok();
let cmsis_dap_product = prod_str.contains("CMSIS-DAP");
let config_descriptor = device.active_config_descriptor().ok()?;
let mut cmsis_dap_interface = false;
let mut hid_interface = None;
for interface in config_descriptor.interfaces() {
for descriptor in interface.descriptors() {
let interface_desc = match handle.read_interface_string(language, &descriptor, timeout)
{
Ok(desc) => desc,
Err(_) => {
tracing::trace!(
"Could not read string for interface {}, skipping",
interface.number()
);
continue;
}
};
if interface_desc.contains("CMSIS-DAP") {
tracing::trace!(" Interface {}: {}", interface.number(), interface_desc);
cmsis_dap_interface = true;
if descriptor.class_code() == LIBUSB_CLASS_HID {
tracing::trace!(" HID interface found");
hid_interface = Some(interface.number());
}
}
}
}
if cmsis_dap_interface || cmsis_dap_product {
tracing::trace!(
"{}: CMSIS-DAP device with {} interfaces",
prod_str,
config_descriptor.num_interfaces()
);
if let Some(interface) = hid_interface {
tracing::trace!("Will use interface number {} for CMSIS-DAPv1", interface);
} else {
tracing::trace!("No HID interface for CMSIS-DAP found.")
}
Some(DebugProbeInfo {
identifier: prod_str,
vendor_id: d_desc.vendor_id(),
product_id: d_desc.product_id(),
serial_number: sn_str,
probe_type: DebugProbeType::CmsisDap,
hid_interface,
})
} else {
None
}
}
fn get_cmsisdap_hid_info(device: &hidapi::DeviceInfo) -> Option<DebugProbeInfo> {
let prod_str = device.product_string().unwrap_or("");
let path = device.path().to_str().unwrap_or("");
if prod_str.contains("CMSIS-DAP") || path.contains("CMSIS-DAP") {
tracing::trace!("CMSIS-DAP device with USB path: {:?}", device.path());
tracing::trace!(" product_string: {:?}", prod_str);
tracing::trace!(
" interface: {}",
device.interface_number()
);
Some(DebugProbeInfo {
identifier: prod_str.to_owned(),
vendor_id: device.vendor_id(),
product_id: device.product_id(),
serial_number: device.serial_number().map(|s| s.to_owned()),
probe_type: DebugProbeType::CmsisDap,
hid_interface: Some(device.interface_number() as u8),
})
} else {
None
}
}
pub fn open_v2_device(device: Device<rusb::Context>) -> Option<CmsisDapDevice> {
let timeout = Duration::from_millis(100);
let d_desc = device.device_descriptor().ok()?;
let vid = d_desc.vendor_id();
let pid = d_desc.product_id();
let mut handle = device.open().ok()?;
let language = handle.read_languages(timeout).ok()?.get(0).cloned()?;
let c_desc = device.config_descriptor(0).ok()?;
for interface in c_desc.interfaces() {
for i_desc in interface.descriptors() {
match handle.read_interface_string(language, &i_desc, timeout) {
Ok(i_str) if !i_str.contains("CMSIS-DAP") => continue,
Err(_) => continue,
Ok(_) => (),
}
let n_ep = i_desc.num_endpoints();
if !(2..=3).contains(&n_ep) {
continue;
}
let eps: Vec<_> = i_desc.endpoint_descriptors().collect();
if eps[0].transfer_type() != rusb::TransferType::Bulk
|| eps[0].direction() != rusb::Direction::Out
{
continue;
}
if eps[1].transfer_type() != rusb::TransferType::Bulk
|| eps[1].direction() != rusb::Direction::In
{
continue;
}
let mut swo_ep = None;
if eps.len() > 2
&& eps[2].transfer_type() == rusb::TransferType::Bulk
&& eps[2].direction() == rusb::Direction::In
{
swo_ep = Some((eps[2].address(), eps[2].max_packet_size() as usize));
}
match handle.claim_interface(interface.number()) {
Ok(()) => {
tracing::debug!("Opening {:04x}:{:04x} in CMSIS-DAPv2 mode", vid, pid);
return Some(CmsisDapDevice::V2 {
handle,
out_ep: eps[0].address(),
in_ep: eps[1].address(),
swo_ep,
max_packet_size: eps[1].max_packet_size() as usize,
});
}
Err(_) => continue,
}
}
}
tracing::debug!(
"Could not open {:04x}:{:04x} in CMSIS-DAP v2 mode",
vid,
pid
);
None
}
fn device_matches(
device_descriptor: DeviceDescriptor,
selector: &DebugProbeSelector,
serial_str: Option<String>,
) -> bool {
if device_descriptor.vendor_id() == selector.vendor_id
&& device_descriptor.product_id() == selector.product_id
{
if selector.serial_number.is_some() {
serial_str == selector.serial_number
} else {
true
}
} else {
false
}
}
pub fn open_device_from_selector(
selector: impl Into<DebugProbeSelector>,
) -> Result<CmsisDapDevice, ProbeCreationError> {
let selector = selector.into();
tracing::trace!("Attempting to open device matching {}", selector);
let mut hid_device_info: Option<DebugProbeInfo> = None;
if let Ok(devices) = rusb::Context::new().and_then(|ctx| ctx.devices()) {
for device in devices.iter() {
tracing::trace!("Trying device {:?}", device);
let d_desc = match device.device_descriptor() {
Ok(d_desc) => d_desc,
Err(err) => {
tracing::trace!("Error reading descriptor: {:?}", err);
continue;
}
};
let handle = match device.open() {
Ok(handle) => handle,
Err(err) => {
tracing::trace!("Error opening: {:?}", err);
continue;
}
};
let timeout = Duration::from_millis(100);
let sn_str = match handle.read_languages(timeout) {
Ok(langs) => langs.get(0).and_then(|lang| {
handle
.read_serial_number_string(*lang, &d_desc, timeout)
.ok()
}),
Err(err) => {
tracing::trace!("Error getting languages: {:?}", err);
continue;
}
};
drop(handle);
if device_matches(d_desc, &selector, sn_str) {
hid_device_info = get_cmsisdap_info(&device);
if hid_device_info.is_some() {
if let Some(device) = open_v2_device(device) {
return Ok(device);
}
}
}
}
} else {
tracing::debug!("No devices matched using rusb");
}
let vid = selector.vendor_id;
let pid = selector.product_id;
let sn = &selector.serial_number;
tracing::debug!(
"Attempting to open {:04x}:{:04x} in CMSIS-DAP v1 mode",
vid,
pid
);
let hid_api = HidApi::new()?;
let mut device_list = hid_api.device_list();
let device_info = device_list
.find(|info| {
let mut device_match = info.vendor_id() == vid && info.product_id() == pid;
if let Some(sn) = sn {
device_match &= Some(sn.as_ref()) == info.serial_number();
}
if let Some(hid_interface) =
hid_device_info.as_ref().and_then(|info| info.hid_interface)
{
device_match &= info.interface_number() == hid_interface as i32;
}
device_match
})
.ok_or(ProbeCreationError::NotFound)?;
let device = device_info.open_device(&hid_api)?;
match device.get_product_string() {
Ok(Some(s)) if s.contains("CMSIS-DAP") => Ok(CmsisDapDevice::V1 {
handle: device,
report_size: 64,
}),
_ => {
Err(ProbeCreationError::NotFound)
}
}
}