use crate::error::Result;
use crate::transport::transaction::Request;
use moteus_protocol::CanFdFrame;
use std::future::Future;
use std::pin::Pin;
#[cfg(feature = "tokio")]
use super::{
extract_uuid_from_response, make_uuid_prefix, make_uuid_query_frame, resolve_addresses,
};
#[cfg(feature = "tokio")]
use crate::device_address::DeviceAddress;
#[cfg(feature = "tokio")]
use crate::error::Error;
#[cfg(feature = "tokio")]
use crate::transport::async_factory::{create_async_transports, AsyncTransportOptions};
#[cfg(feature = "tokio")]
use crate::transport::device::AsyncTransportDevice;
#[cfg(feature = "tokio")]
use crate::transport::device::TransportDeviceInfo;
#[cfg(feature = "tokio")]
use crate::transport::transaction::{FrameFilter, ResponseCollector};
#[cfg(feature = "tokio")]
use crate::transport::DeviceInfo;
#[cfg(feature = "tokio")]
use std::collections::HashMap;
#[cfg(feature = "tokio")]
use std::sync::Arc;
#[cfg(feature = "tokio")]
use tokio::sync::Mutex;
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[cfg(feature = "tokio")]
pub type SharedDevice = Arc<Mutex<Box<dyn AsyncTransportDevice>>>;
pub trait AsyncTransport: Send {
fn cycle<'a>(&'a mut self, requests: &'a mut [Request]) -> BoxFuture<'a, Result<()>>;
fn write<'a>(&'a mut self, frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>>;
fn read(&mut self, channel: Option<usize>) -> BoxFuture<'_, Result<Option<CanFdFrame>>>;
fn flush_read(&mut self, channel: Option<usize>) -> BoxFuture<'_, Result<()>>;
}
#[cfg(feature = "tokio")]
pub struct AsyncRouter {
devices: Vec<SharedDevice>,
device_infos: Vec<TransportDeviceInfo>,
parent_indices: Vec<usize>,
routing_table: HashMap<DeviceAddress, usize>,
}
#[cfg(feature = "tokio")]
fn compute_parent_indices(infos: &[TransportDeviceInfo]) -> Vec<usize> {
let mut indices: Vec<usize> = infos
.iter()
.enumerate()
.map(|(i, d)| d.parent_index.unwrap_or(i))
.collect();
indices.sort_unstable();
indices.dedup();
indices
}
#[cfg(feature = "tokio")]
struct DeviceWork {
device_idx: usize,
broadcast_with_reply: Vec<(usize, Request)>,
other: Vec<(usize, Request)>,
}
#[cfg(feature = "tokio")]
struct DeviceWorkResult {
device_idx: usize,
responses: Vec<(usize, Vec<CanFdFrame>)>,
error: Option<Error>,
}
#[cfg(feature = "tokio")]
async fn run_device_work(device: SharedDevice, mut work: DeviceWork) -> DeviceWorkResult {
let device_idx = work.device_idx;
let mut responses: Vec<(usize, Vec<CanFdFrame>)> = Vec::new();
let mut error: Option<Error> = None;
let mut guard = device.lock().await;
if let Err(e) = guard.recover().await {
return DeviceWorkResult {
device_idx,
responses: vec![],
error: Some(e),
};
}
for (orig_idx, req) in work.broadcast_with_reply.drain(..) {
let mut reqs = vec![req];
match guard.transaction(&mut reqs).await {
Ok(()) => {
let frames = reqs[0].responses.take();
responses.push((orig_idx, frames));
}
Err(e) => {
if error.is_none() {
error = Some(e);
}
}
}
}
if !work.other.is_empty() {
let orig_indices: Vec<usize> = work.other.iter().map(|(i, _)| *i).collect();
let mut reqs: Vec<Request> = work.other.into_iter().map(|(_, r)| r).collect();
match guard.transaction(&mut reqs).await {
Ok(()) => {
for (local_idx, orig_idx) in orig_indices.into_iter().enumerate() {
let frames = reqs[local_idx].responses.take();
responses.push((orig_idx, frames));
}
}
Err(e) => {
if error.is_none() {
error = Some(e);
}
}
}
}
DeviceWorkResult {
device_idx,
responses,
error,
}
}
#[cfg(feature = "tokio")]
impl AsyncRouter {
pub fn new(devices: Vec<Box<dyn AsyncTransportDevice>>) -> Self {
let device_infos: Vec<TransportDeviceInfo> = devices
.iter()
.map(|d| {
let mut info = d.info().clone();
info.empty_bus_tx_safe = d.empty_bus_tx_safe();
info
})
.collect();
let parent_indices = compute_parent_indices(&device_infos);
let shared = devices
.into_iter()
.map(|d| Arc::new(Mutex::new(d)))
.collect();
Self {
devices: shared,
device_infos,
parent_indices,
routing_table: HashMap::new(),
}
}
pub fn from_device(device: impl AsyncTransportDevice + 'static) -> Self {
Self::new(vec![Box::new(device)])
}
pub fn from_devices<I, T>(devices: I) -> Self
where
I: IntoIterator<Item = T>,
T: AsyncTransportDevice + 'static,
{
Self::new(
devices
.into_iter()
.map(|d| Box::new(d) as Box<dyn AsyncTransportDevice>)
.collect(),
)
}
pub async fn with_options(options: &AsyncTransportOptions) -> Result<Self> {
let devices = create_async_transports(options).await?;
if devices.is_empty() {
return Err(Error::NotConnected);
}
Ok(Self::new(devices))
}
pub fn device_count(&self) -> usize {
self.devices.len()
}
pub fn device_info(&self) -> Vec<&TransportDeviceInfo> {
self.device_infos.iter().collect()
}
pub fn add_route(
&mut self,
address: impl Into<DeviceAddress>,
device_idx: usize,
) -> Result<()> {
if device_idx >= self.devices.len() {
return Err(Error::Protocol(format!(
"Device index {} out of range (have {} devices)",
device_idx,
self.devices.len()
)));
}
self.routing_table.insert(address.into(), device_idx);
Ok(())
}
pub fn clear_routes(&mut self) {
self.routing_table.clear();
}
fn get_device_for_address(&self, address: &DeviceAddress) -> Option<usize> {
if self.devices.len() == 1 {
return Some(0);
}
self.routing_table.get(address).copied()
}
async fn discover_device(
&mut self,
address: &DeviceAddress,
source: u8,
can_prefix: u16,
) -> Result<usize> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
if self.devices.len() == 1 {
return Ok(0);
}
let can_id = address.as_can_id().unwrap_or(0x7F);
let mut query_frame = CanFdFrame::new();
query_frame.arbitration_id =
moteus_protocol::calculate_arbitration_id(source as i8, can_id as i8, can_prefix, true);
if address.can_id.is_none() {
if let Some(uuid) = address.as_uuid() {
let prefix_data = make_uuid_prefix(uuid);
query_frame.data[..prefix_data.len()].copy_from_slice(&prefix_data);
query_frame.size = prefix_data.len() as u8;
}
}
let offset = query_frame.size as usize;
query_frame.data[offset] = 0x50; query_frame.size += 1;
for device in &self.devices {
let mut guard = device.lock().await;
let _ = guard.flush().await;
}
for (idx, device) in self.devices.iter().enumerate() {
if self.device_infos[idx].empty_bus_tx_safe {
let mut guard = device.lock().await;
let _ = guard.write(&query_frame).await;
}
}
let mut join_set = tokio::task::JoinSet::new();
for (idx, device) in self.devices.iter().enumerate() {
let device = Arc::clone(device);
join_set.spawn(async move {
let mut guard = device.lock().await;
let _ = guard.recover().await;
let mut requests = vec![Request::receive_only(FrameFilter::Any)];
let ok = guard.transaction(&mut requests).await.is_ok();
let mut found = false;
if ok {
for response in requests[0].responses.take() {
let source_id = ((response.arbitration_id >> 8) & 0x7F) as u8;
if source_id != 0x7F {
found = true;
break;
}
}
}
(idx, found)
});
}
let mut found: Vec<usize> = Vec::new();
while let Some(result) = join_set.join_next().await {
if let Ok((idx, was_found)) = result {
if was_found {
found.push(idx);
}
}
}
match found.len() {
0 => Err(Error::DeviceNotFound(format!(
"{} not found on any CAN bus",
address
))),
1 => {
self.routing_table.insert(address.clone(), found[0]);
Ok(found[0])
}
_ => Err(Error::Protocol(format!(
"More than one {} found across connected CAN busses",
address
))),
}
}
async fn get_or_discover_device(
&mut self,
address: &DeviceAddress,
source: u8,
can_prefix: u16,
) -> Result<usize> {
if let Some(idx) = self.get_device_for_address(address) {
return Ok(idx);
}
self.discover_device(address, source, can_prefix).await
}
pub async fn discover(&mut self, can_prefix: u16, source: u8) -> Result<Vec<DeviceInfo>> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
let _ = self.flush_read(None).await;
let (query_frame, _reply_size) = make_uuid_query_frame(source, can_prefix);
for (idx, device) in self.devices.iter().enumerate() {
if self.device_infos[idx].empty_bus_tx_safe {
let mut guard = device.lock().await;
let _ = guard.write(&query_frame).await;
}
}
let mut join_set = tokio::task::JoinSet::new();
for (idx, device) in self.devices.iter().enumerate() {
let device = Arc::clone(device);
let transport_device = self.device_infos[idx].to_string();
join_set.spawn(async move {
let mut guard = device.lock().await;
let _ = guard.recover().await;
let mut requests =
vec![Request::receive_only(FrameFilter::Any).with_expected_replies(127)];
let _ = guard.transaction(&mut requests).await;
let responses = requests[0].responses.take();
(idx, transport_device, responses)
});
}
let mut discovered = Vec::new();
while let Some(result) = join_set.join_next().await {
if let Ok((idx, transport_device, responses)) = result {
for response in responses {
let source_id = ((response.arbitration_id >> 8) & 0x7F) as u8;
let dest_id = (response.arbitration_id & 0x7F) as u8;
if source_id == 0x7F || dest_id != source {
continue;
}
let uuid = extract_uuid_from_response(&response);
let device_info = if let Some(uuid) = uuid {
DeviceInfo::with_uuid(source_id, uuid, idx, transport_device.clone())
} else {
DeviceInfo::new(source_id, idx, transport_device.clone())
};
discovered.push(device_info);
}
}
}
resolve_addresses(&mut discovered);
discovered.sort_by(|a, b| match a.can_id.cmp(&b.can_id) {
std::cmp::Ordering::Equal => a.uuid.cmp(&b.uuid),
other => other,
});
Ok(discovered)
}
async fn execute_cycle(&mut self, requests: &mut [Request]) -> Result<()> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
if self.devices.len() == 1 {
let mut guard = self.devices[0].lock().await;
guard.recover().await?;
let result = guard.transaction(requests).await;
for req in requests.iter() {
let frames = req.responses.take();
for mut frame in frames {
frame.channel = Some(0);
req.responses.push(frame);
}
}
return result;
}
let mut device_groups: HashMap<usize, DeviceWork> = HashMap::new();
#[allow(clippy::needless_range_loop, clippy::manual_map)]
for i in 0..requests.len() {
if let Some(device_idx) = requests[i].channel {
let work = device_groups
.entry(device_idx)
.or_insert_with(|| DeviceWork {
device_idx,
broadcast_with_reply: Vec::new(),
other: Vec::new(),
});
let mut req = requests[i].clone();
req.responses = ResponseCollector::new();
work.other.push((i, req));
continue;
}
let dest_id = requests[i]
.frame
.as_ref()
.map(|frame| (frame.arbitration_id & 0x7F) as u8);
if let Some(dest_id) = dest_id {
let is_true_broadcast = dest_id == 0x7F
&& !matches!(
&requests[i].address,
Some(addr) if addr.uuid.is_some()
);
if is_true_broadcast {
let is_bwr = requests[i].expected_reply_count > 0;
for &device_idx in &self.parent_indices.clone() {
if self.device_infos[device_idx].empty_bus_tx_safe {
let work =
device_groups
.entry(device_idx)
.or_insert_with(|| DeviceWork {
device_idx,
broadcast_with_reply: Vec::new(),
other: Vec::new(),
});
let mut req = requests[i].clone();
req.responses = ResponseCollector::new();
if is_bwr {
work.broadcast_with_reply.push((i, req));
} else {
work.other.push((i, req));
}
}
}
} else {
let lookup_addr = match &requests[i].address {
Some(addr) => addr.clone(),
None => DeviceAddress::can_id(dest_id),
};
let (source, can_prefix) = match &requests[i].frame {
Some(f) => {
let (src, _dest, pfx) =
moteus_protocol::parse_arbitration_id(f.arbitration_id);
(src as u8, pfx)
}
None => (0, 0),
};
let mut target_idx = self
.get_or_discover_device(&lookup_addr, source, can_prefix)
.await?;
if let Some(parent_idx) = self.device_infos[target_idx].parent_index {
requests[i].child_device = Some(target_idx);
target_idx = parent_idx;
}
let work = device_groups
.entry(target_idx)
.or_insert_with(|| DeviceWork {
device_idx: target_idx,
broadcast_with_reply: Vec::new(),
other: Vec::new(),
});
let mut req = requests[i].clone();
req.responses = ResponseCollector::new();
work.other.push((i, req));
}
}
}
let mut join_set = tokio::task::JoinSet::new();
for (_, work) in device_groups {
let device = Arc::clone(&self.devices[work.device_idx]);
join_set.spawn(run_device_work(device, work));
}
let mut first_error: Option<Error> = None;
while let Some(result) = join_set.join_next().await {
match result {
Ok(work_result) => {
for (orig_idx, frames) in work_result.responses {
for mut frame in frames {
frame.channel = Some(work_result.device_idx);
requests[orig_idx].responses.push(frame);
}
}
if let Some(e) = work_result.error {
if first_error.is_none() {
first_error = Some(e);
}
}
}
Err(join_err) => {
if first_error.is_none() {
first_error =
Some(Error::Protocol(format!("task join error: {}", join_err)));
}
}
}
}
if let Some(e) = first_error {
return Err(e);
}
Ok(())
}
pub async fn cycle(&mut self, requests: &mut [Request]) -> Result<()> {
self.execute_cycle(requests).await
}
pub async fn write(&mut self, frame: &CanFdFrame) -> Result<()> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
let dest_id = (frame.arbitration_id & 0x7F) as u8;
if dest_id == 0x7F {
for (idx, device) in self.devices.iter().enumerate() {
if self.device_infos[idx].empty_bus_tx_safe {
let mut guard = device.lock().await;
guard.write(frame).await?;
}
}
Ok(())
} else {
let addr = DeviceAddress::can_id(dest_id);
let (src, _dest, pfx) = moteus_protocol::parse_arbitration_id(frame.arbitration_id);
let device_idx = self.get_or_discover_device(&addr, src as u8, pfx).await?;
let mut guard = self.devices[device_idx].lock().await;
guard.write(frame).await
}
}
pub async fn read(&mut self, channel: Option<usize>) -> Result<Option<CanFdFrame>> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
if let Some(idx) = channel {
if idx < self.devices.len() {
let mut guard = self.devices[idx].lock().await;
match guard.read().await? {
Some(mut frame) => {
frame.channel = Some(idx);
Ok(Some(frame))
}
None => Ok(None),
}
} else {
Err(Error::DeviceNotFound(format!("Channel {} not found", idx)))
}
} else {
for &idx in &self.parent_indices {
let mut guard = self.devices[idx].lock().await;
if let Ok(Some(mut frame)) = guard.read().await {
frame.channel = Some(idx);
return Ok(Some(frame));
}
}
Ok(None)
}
}
pub async fn flush_read(&mut self, channel: Option<usize>) -> Result<()> {
if self.devices.is_empty() {
return Err(Error::NotConnected);
}
if let Some(idx) = channel {
if idx < self.devices.len() {
let mut guard = self.devices[idx].lock().await;
guard.flush().await
} else {
Err(Error::DeviceNotFound(format!("Channel {} not found", idx)))
}
} else {
for &idx in &self.parent_indices {
let mut guard = self.devices[idx].lock().await;
guard.flush().await?;
}
Ok(())
}
}
}
#[cfg(feature = "tokio")]
impl<D: AsyncTransportDevice + 'static> From<D> for AsyncRouter {
fn from(device: D) -> Self {
AsyncRouter::from_device(device)
}
}
#[cfg(feature = "tokio")]
impl AsyncTransport for AsyncRouter {
fn cycle<'a>(&'a mut self, requests: &'a mut [Request]) -> BoxFuture<'a, Result<()>> {
Box::pin(AsyncRouter::cycle(self, requests))
}
fn write<'a>(&'a mut self, frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
Box::pin(AsyncRouter::write(self, frame))
}
fn read<'a>(&'a mut self, channel: Option<usize>) -> BoxFuture<'a, Result<Option<CanFdFrame>>> {
Box::pin(AsyncRouter::read(self, channel))
}
fn flush_read<'a>(&'a mut self, channel: Option<usize>) -> BoxFuture<'a, Result<()>> {
Box::pin(AsyncRouter::flush_read(self, channel))
}
}
#[cfg(feature = "tokio")]
impl<T: AsyncTransport + 'static> AsyncTransport for Arc<tokio::sync::Mutex<T>> {
fn cycle<'a>(&'a mut self, requests: &'a mut [Request]) -> BoxFuture<'a, Result<()>> {
let arc = Arc::clone(self);
Box::pin(async move {
let mut guard = arc.lock().await;
guard.cycle(requests).await
})
}
fn write<'a>(&'a mut self, frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
let arc = Arc::clone(self);
Box::pin(async move {
let mut guard = arc.lock().await;
guard.write(frame).await
})
}
fn read(&mut self, channel: Option<usize>) -> BoxFuture<'_, Result<Option<CanFdFrame>>> {
let arc = Arc::clone(self);
Box::pin(async move {
let mut guard = arc.lock().await;
guard.read(channel).await
})
}
fn flush_read(&mut self, channel: Option<usize>) -> BoxFuture<'_, Result<()>> {
let arc = Arc::clone(self);
Box::pin(async move {
let mut guard = arc.lock().await;
guard.flush_read(channel).await
})
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_async_transport_empty() {
use super::*;
use crate::error::Error;
let devices: Vec<Box<dyn AsyncTransportDevice>> = vec![];
let mut transport = AsyncRouter::new(devices);
let frame = CanFdFrame::new();
let mut requests = vec![Request::new(frame)];
let result = transport.cycle(&mut requests).await;
assert!(matches!(result, Err(Error::NotConnected)));
}
#[cfg(feature = "tokio")]
mod cancel_safety {
use super::super::*;
use crate::transport::device::{AsyncTransportDevice, TransportDeviceInfo};
use crate::transport::transaction::Request;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
struct CancellableMockDevice {
info: TransportDeviceInfo,
recover_count: Arc<AtomicUsize>,
needs_recovery: bool,
fast_mode: Arc<AtomicBool>,
}
impl CancellableMockDevice {
fn new(fast_mode: Arc<AtomicBool>) -> Self {
Self {
info: TransportDeviceInfo::new(0, "CancellableMock"),
recover_count: Arc::new(AtomicUsize::new(0)),
needs_recovery: false,
fast_mode,
}
}
fn with_counter(recover_count: Arc<AtomicUsize>, fast_mode: Arc<AtomicBool>) -> Self {
Self {
info: TransportDeviceInfo::new(0, "CancellableMock"),
recover_count,
needs_recovery: false,
fast_mode,
}
}
}
impl AsyncTransportDevice for CancellableMockDevice {
fn recover(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
if self.needs_recovery {
self.recover_count.fetch_add(1, Ordering::Release);
self.needs_recovery = false;
}
Ok(())
})
}
fn transaction<'a>(
&'a mut self,
_requests: &'a mut [Request],
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
self.needs_recovery = true;
if self.fast_mode.load(Ordering::Acquire) {
self.needs_recovery = false;
return Ok(());
}
tokio::time::sleep(Duration::from_secs(86400)).await;
self.needs_recovery = false;
Ok(())
})
}
fn write<'a>(&'a mut self, _frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
Box::pin(async { Ok(()) })
}
fn read(&mut self) -> BoxFuture<'_, Result<Option<CanFdFrame>>> {
Box::pin(async { Ok(None) })
}
fn flush(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async { Ok(()) })
}
fn info(&self) -> &TransportDeviceInfo {
&self.info
}
}
struct FastMockDevice {
info: TransportDeviceInfo,
recover_count: Arc<AtomicUsize>,
needs_recovery: bool,
}
impl FastMockDevice {
fn with_counter(recover_count: Arc<AtomicUsize>) -> Self {
Self {
info: TransportDeviceInfo::new(0, "FastMock"),
recover_count,
needs_recovery: false,
}
}
}
impl AsyncTransportDevice for FastMockDevice {
fn recover(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
if self.needs_recovery {
self.recover_count.fetch_add(1, Ordering::Release);
self.needs_recovery = false;
}
Ok(())
})
}
fn transaction<'a>(
&'a mut self,
_requests: &'a mut [Request],
) -> BoxFuture<'a, Result<()>> {
Box::pin(async { Ok(()) })
}
fn write<'a>(&'a mut self, _frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
Box::pin(async { Ok(()) })
}
fn read(&mut self) -> BoxFuture<'_, Result<Option<CanFdFrame>>> {
Box::pin(async { Ok(None) })
}
fn flush(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async { Ok(()) })
}
fn info(&self) -> &TransportDeviceInfo {
&self.info
}
}
#[tokio::test]
async fn test_single_device_survives_cancellation() {
let fast_mode = Arc::new(AtomicBool::new(false));
let device = CancellableMockDevice::new(fast_mode);
let mut transport = AsyncRouter::new(vec![Box::new(device)]);
let mut requests = vec![Request::new(CanFdFrame::new())];
let result =
tokio::time::timeout(Duration::from_millis(10), transport.cycle(&mut requests))
.await;
assert!(result.is_err());
assert_eq!(transport.device_count(), 1);
}
#[tokio::test]
async fn test_multi_device_survives_cancellation() {
let fast1 = Arc::new(AtomicBool::new(false));
let fast2 = Arc::new(AtomicBool::new(false));
let mut transport = AsyncRouter::new(vec![
Box::new(CancellableMockDevice::new(fast1)),
Box::new(CancellableMockDevice::new(fast2)),
]);
transport
.add_route(crate::DeviceAddress::can_id(1), 0)
.unwrap();
transport
.add_route(crate::DeviceAddress::can_id(2), 1)
.unwrap();
let mut frame1 = CanFdFrame::new();
frame1.arbitration_id = 0x0001; let mut frame2 = CanFdFrame::new();
frame2.arbitration_id = 0x0002; let mut requests = vec![Request::new(frame1), Request::new(frame2)];
let result =
tokio::time::timeout(Duration::from_millis(10), transport.cycle(&mut requests))
.await;
assert!(result.is_err());
assert_eq!(transport.device_count(), 2);
}
#[tokio::test]
async fn test_recover_called_after_cancellation() {
let recover_count = Arc::new(AtomicUsize::new(0));
let fast_mode = Arc::new(AtomicBool::new(false));
let device =
CancellableMockDevice::with_counter(recover_count.clone(), fast_mode.clone());
let mut transport = AsyncRouter::new(vec![Box::new(device)]);
let mut requests = vec![Request::new(CanFdFrame::new()).with_expected_replies(0)];
let _ = tokio::time::timeout(Duration::from_millis(10), transport.cycle(&mut requests))
.await;
fast_mode.store(true, Ordering::Release);
let mut requests = vec![Request::new(CanFdFrame::new()).with_expected_replies(0)];
transport.cycle(&mut requests).await.unwrap();
assert!(recover_count.load(Ordering::Acquire) > 0);
}
#[tokio::test]
async fn test_recover_noop_without_cancellation() {
let recover_count = Arc::new(AtomicUsize::new(0));
let device = FastMockDevice::with_counter(recover_count.clone());
let mut transport = AsyncRouter::new(vec![Box::new(device)]);
let mut requests = vec![Request::new(CanFdFrame::new()).with_expected_replies(0)];
transport.cycle(&mut requests).await.unwrap();
let mut requests = vec![Request::new(CanFdFrame::new()).with_expected_replies(0)];
transport.cycle(&mut requests).await.unwrap();
assert_eq!(recover_count.load(Ordering::Acquire), 0);
}
}
}