1use core::time::Duration;
4
5use embedded_can::Frame;
6use embedded_can_interface::{AsyncRxFrameIo, AsyncTxFrameIo};
7
8use crate::async_io::AsyncRuntime;
9use crate::errors::{IsoTpError, TimeoutKind};
10use crate::pdu::{
11 FlowStatus, Pdu, decode_with_offset, duration_to_st_min, encode_with_prefix_sized,
12 st_min_to_duration,
13};
14use crate::rx::{RxMachine, RxOutcome, RxState, RxStorage};
15use crate::timer::Clock;
16use crate::{IsoTpConfig, RxFlowControl, id_matches};
17
18pub struct IsoTpAsyncNode<'a, Tx, Rx, F, C>
20where
21 Tx: AsyncTxFrameIo<Frame = F>,
22 Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
23 C: Clock,
24{
25 tx: Tx,
26 rx: Rx,
27 cfg: IsoTpConfig,
28 rx_flow_control: RxFlowControl,
29 clock: C,
30 rx_machine: RxMachine<'a>,
31 rx_last_activity: Option<C::Instant>,
32}
33
34impl<'a, Tx, Rx, F, C> IsoTpAsyncNode<'a, Tx, Rx, F, C>
35where
36 Tx: AsyncTxFrameIo<Frame = F>,
37 Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
38 F: Frame,
39 C: Clock,
40{
41 pub fn builder(tx: Tx, rx: Rx, cfg: IsoTpConfig, clock: C) -> IsoTpAsyncNodeBuilder<Tx, Rx, C> {
43 IsoTpAsyncNodeBuilder { tx, rx, cfg, clock }
44 }
45
46 pub fn with_clock(
48 tx: Tx,
49 rx: Rx,
50 cfg: IsoTpConfig,
51 clock: C,
52 rx_buffer: &'a mut [u8],
53 ) -> Result<Self, IsoTpError<()>> {
54 Self::with_clock_and_storage(tx, rx, cfg, clock, RxStorage::Borrowed(rx_buffer))
55 }
56
57 pub fn with_clock_and_storage(
59 tx: Tx,
60 rx: Rx,
61 cfg: IsoTpConfig,
62 clock: C,
63 rx_storage: RxStorage<'a>,
64 ) -> Result<Self, IsoTpError<()>> {
65 let node = Self::builder(tx, rx, cfg, clock)
66 .rx_storage(rx_storage)
67 .build()?;
68 Ok(node)
69 }
70
71 pub fn rx_flow_control(&self) -> RxFlowControl {
73 self.rx_flow_control
74 }
75
76 pub fn set_rx_flow_control(&mut self, fc: RxFlowControl) {
78 self.rx_flow_control = fc;
79 }
80
81 fn update_rx_timeout_activity(&mut self, now: C::Instant) {
82 self.rx_last_activity = if self.rx_machine.state == RxState::Receiving {
83 Some(now)
84 } else {
85 None
86 };
87 }
88
89 fn expire_rx_timeout_if_needed(&mut self) {
90 let Some(last_activity) = self.rx_last_activity else {
91 return;
92 };
93 if self.rx_machine.state != RxState::Receiving {
94 self.rx_last_activity = None;
95 return;
96 }
97 if self.clock.elapsed(last_activity) >= self.cfg.n_br {
98 isotp_warn!("async recv n_br timeout; aborting in-flight rx");
99 self.rx_machine.reset();
100 self.rx_last_activity = None;
101 }
102 }
103
104 pub async fn send<R: AsyncRuntime>(
109 &mut self,
110 rt: &R,
111 payload: &[u8],
112 timeout: Duration,
113 ) -> Result<(), IsoTpError<Tx::Error>> {
114 if payload.len() > self.cfg.max_payload_len {
115 isotp_warn!(
116 "async send overflow payload_len={} max={}",
117 payload.len(),
118 self.cfg.max_payload_len
119 );
120 return Err(IsoTpError::Overflow);
121 }
122
123 let start = self.clock.now();
124 if self.clock.elapsed(start) >= timeout {
125 return Err(IsoTpError::Timeout(TimeoutKind::NAs));
126 }
127
128 if payload.len() <= self.cfg.max_single_frame_payload() {
129 isotp_trace!("async send single-frame len={}", payload.len());
130 let pdu = Pdu::SingleFrame {
131 len: payload.len() as u8,
132 data: payload,
133 };
134 let frame = encode_with_prefix_sized(
135 self.cfg.tx_id,
136 &pdu,
137 self.cfg.padding,
138 self.cfg.tx_addr,
139 self.cfg.frame_len,
140 )
141 .map_err(|_| IsoTpError::InvalidFrame)?;
142 self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
143 .await?;
144 return Ok(());
145 }
146
147 let mut offset = payload.len().min(self.cfg.max_first_frame_payload());
148 let mut next_sn: u8 = 1;
149 let wait_count: u8 = 0;
150 isotp_debug!(
151 "async send first-frame payload_len={} ff_chunk={} bs={} st_min_ms={}",
152 payload.len(),
153 offset,
154 self.cfg.block_size,
155 self.cfg.st_min.as_millis() as u64
156 );
157
158 let pdu = Pdu::FirstFrame {
159 len: payload.len() as u16,
160 data: &payload[..offset],
161 };
162 let frame = encode_with_prefix_sized(
163 self.cfg.tx_id,
164 &pdu,
165 self.cfg.padding,
166 self.cfg.tx_addr,
167 self.cfg.frame_len,
168 )
169 .map_err(|_| IsoTpError::InvalidFrame)?;
170 self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
171 .await?;
172
173 let fc_start = self.clock.now();
174 let (mut block_size, mut st_min, mut wait_count) = self
175 .wait_for_flow_control(rt, start, timeout, fc_start, wait_count)
176 .await?;
177 let mut block_remaining = block_size;
178
179 let mut last_cf_sent: Option<C::Instant> = None;
180 while offset < payload.len() {
181 if block_size > 0 && block_remaining == 0 {
182 let fc_start = self.clock.now();
183 let (new_bs, new_st_min, new_wait_count) = self
184 .wait_for_flow_control(rt, start, timeout, fc_start, wait_count)
185 .await?;
186 block_size = new_bs;
187 block_remaining = new_bs;
188 st_min = new_st_min;
189 wait_count = new_wait_count;
190 continue;
191 }
192
193 if let Some(sent_at) = last_cf_sent {
194 let elapsed = self.clock.elapsed(sent_at);
195 if elapsed < st_min {
196 let wait_for = st_min - elapsed;
197 sleep_or_timeout(&self.clock, rt, start, timeout, TimeoutKind::NAs, wait_for)
198 .await?;
199 }
200 }
201
202 let remaining = payload.len() - offset;
203 let chunk = remaining.min(self.cfg.max_consecutive_frame_payload());
204 let pdu = Pdu::ConsecutiveFrame {
205 sn: next_sn & 0x0F,
206 data: &payload[offset..offset + chunk],
207 };
208 let frame = encode_with_prefix_sized(
209 self.cfg.tx_id,
210 &pdu,
211 self.cfg.padding,
212 self.cfg.tx_addr,
213 self.cfg.frame_len,
214 )
215 .map_err(|_| IsoTpError::InvalidFrame)?;
216 self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
217 .await?;
218 isotp_trace!(
219 "async send cf sent sn={} chunk={} offset={} payload_len={} block_remaining={}",
220 next_sn & 0x0F,
221 chunk,
222 offset,
223 payload.len(),
224 block_remaining
225 );
226
227 last_cf_sent = Some(self.clock.now());
228 offset += chunk;
229 next_sn = (next_sn + 1) & 0x0F;
230
231 if block_size > 0 {
232 block_remaining = block_remaining.saturating_sub(1);
233 }
234 }
235
236 Ok(())
237 }
238
239 pub async fn recv<R: AsyncRuntime>(
245 &mut self,
246 rt: &R,
247 timeout: Duration,
248 deliver: &mut dyn FnMut(&[u8]),
249 ) -> Result<(), IsoTpError<Tx::Error>> {
250 let start = self.clock.now();
251
252 loop {
253 self.expire_rx_timeout_if_needed();
254
255 let global_remaining = remaining(timeout, self.clock.elapsed(start))
256 .ok_or(IsoTpError::Timeout(TimeoutKind::NAr))?;
257 let wait_for = if let Some(last_activity) = self.rx_last_activity {
258 if self.rx_machine.state == RxState::Receiving {
259 let elapsed = self.clock.elapsed(last_activity);
260 match self.cfg.n_br.checked_sub(elapsed) {
261 Some(remaining_n_br) => global_remaining.min(remaining_n_br),
262 None => Duration::from_millis(0),
263 }
264 } else {
265 global_remaining
266 }
267 } else {
268 global_remaining
269 };
270 if wait_for == Duration::from_millis(0) {
271 self.expire_rx_timeout_if_needed();
272 continue;
273 }
274
275 let frame = match rt.timeout(wait_for, self.rx.recv()).await {
276 Ok(Ok(frame)) => frame,
277 Ok(Err(err)) => return Err(IsoTpError::LinkError(err)),
278 Err(_) => {
279 self.expire_rx_timeout_if_needed();
280 if remaining(timeout, self.clock.elapsed(start)).is_none() {
281 return Err(IsoTpError::Timeout(TimeoutKind::NAr));
282 }
283 continue;
284 }
285 };
286
287 if !id_matches(frame.id(), &self.cfg.rx_id) {
288 isotp_trace!("async recv drop frame: rx_id mismatch");
289 continue;
290 }
291 if let Some(expected) = self.cfg.rx_addr
292 && frame.data().first().copied() != Some(expected)
293 {
294 isotp_trace!(
295 "async recv drop frame: rx_addr mismatch expected={}",
296 expected
297 );
298 continue;
299 }
300
301 let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
302 isotp_warn!("async recv invalid frame decode");
303 IsoTpError::InvalidFrame
304 })?;
305 if matches!(pdu, Pdu::FlowControl { .. }) {
306 isotp_trace!("async recv ignore flow-control while receiving");
307 continue;
308 }
309
310 let outcome = match self
311 .rx_machine
312 .on_pdu(&self.cfg, &self.rx_flow_control, pdu)
313 {
314 Ok(o) => o,
315 Err(IsoTpError::Overflow) => {
316 isotp_warn!("async recv rx overflow; sending overflow fc");
317 self.rx_machine.reset();
318 self.rx_last_activity = None;
319 let _ = self.send_overflow_fc(rt, start, timeout).await;
320 return Err(IsoTpError::RxOverflow);
321 }
322 Err(IsoTpError::UnexpectedPdu) => {
323 self.update_rx_timeout_activity(self.clock.now());
324 continue;
325 }
326 Err(IsoTpError::BadSequence) => {
327 isotp_warn!("async recv bad sequence");
328 self.rx_machine.reset();
329 self.rx_last_activity = None;
330 return Err(IsoTpError::BadSequence);
331 }
332 Err(IsoTpError::InvalidFrame) => return Err(IsoTpError::InvalidFrame),
333 Err(IsoTpError::InvalidConfig) => return Err(IsoTpError::InvalidConfig),
334 Err(IsoTpError::Timeout(kind)) => return Err(IsoTpError::Timeout(kind)),
335 Err(IsoTpError::WouldBlock) => return Err(IsoTpError::WouldBlock),
336 Err(IsoTpError::RxOverflow) => return Err(IsoTpError::RxOverflow),
337 Err(IsoTpError::NotIdle) => return Err(IsoTpError::NotIdle),
338 Err(IsoTpError::LinkError(_)) => return Err(IsoTpError::InvalidFrame),
339 };
340
341 match outcome {
342 RxOutcome::None => {
343 self.update_rx_timeout_activity(self.clock.now());
344 continue;
345 }
346 RxOutcome::SendFlowControl {
347 status,
348 block_size,
349 st_min,
350 } => {
351 self.update_rx_timeout_activity(self.clock.now());
352 isotp_trace!(
353 "async recv send fc status={} bs={} st_min_raw={}",
354 flow_status_code(status),
355 block_size,
356 st_min
357 );
358 self.send_flow_control(rt, start, timeout, status, block_size, st_min)
359 .await?;
360 }
361 RxOutcome::Completed(_len) => {
362 self.rx_last_activity = None;
363 let data = self.rx_machine.take_completed();
364 isotp_debug!("async recv completed len={}", data.len());
365 deliver(data);
366 return Ok(());
367 }
368 }
369 }
370 }
371
372 async fn wait_for_flow_control<R: AsyncRuntime>(
373 &mut self,
374 rt: &R,
375 global_start: C::Instant,
376 global_timeout: Duration,
377 mut fc_start: C::Instant,
378 mut wait_count: u8,
379 ) -> Result<(u8, Duration, u8), IsoTpError<Tx::Error>> {
380 loop {
381 if self.clock.elapsed(fc_start) >= self.cfg.n_bs {
382 return Err(IsoTpError::Timeout(TimeoutKind::NBs));
383 }
384
385 let fc_remaining = self.cfg.n_bs - self.clock.elapsed(fc_start);
386 let global_remaining = remaining(global_timeout, self.clock.elapsed(global_start))
387 .ok_or(IsoTpError::Timeout(TimeoutKind::NAs))?;
388 let wait_for = fc_remaining.min(global_remaining);
389
390 let frame = match rt.timeout(wait_for, self.rx.recv()).await {
391 Ok(Ok(f)) => f,
392 Ok(Err(err)) => return Err(IsoTpError::LinkError(err)),
393 Err(_) => {
394 if global_remaining <= fc_remaining {
395 return Err(IsoTpError::Timeout(TimeoutKind::NAs));
396 }
397 return Err(IsoTpError::Timeout(TimeoutKind::NBs));
398 }
399 };
400
401 if !id_matches(frame.id(), &self.cfg.rx_id) {
402 isotp_trace!("async wait_fc drop frame: rx_id mismatch");
403 continue;
404 }
405 if let Some(expected) = self.cfg.rx_addr
406 && frame.data().first().copied() != Some(expected)
407 {
408 isotp_trace!(
409 "async wait_fc drop frame: rx_addr mismatch expected={}",
410 expected
411 );
412 continue;
413 }
414
415 let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
416 isotp_warn!("async wait_fc invalid frame decode");
417 IsoTpError::InvalidFrame
418 })?;
419 match pdu {
420 Pdu::FlowControl {
421 status,
422 block_size,
423 st_min,
424 } => match status {
425 FlowStatus::ClearToSend => {
426 let bs = if block_size == 0 {
427 self.cfg.block_size
428 } else {
429 block_size
430 };
431 let st_min = st_min_to_duration(st_min).unwrap_or(self.cfg.st_min);
432 isotp_debug!(
433 "async wait_fc cts bs={} st_min_ms={}",
434 bs,
435 st_min.as_millis() as u64
436 );
437 return Ok((bs, st_min, 0));
438 }
439 FlowStatus::Wait => {
440 wait_count = wait_count.saturating_add(1);
441 isotp_trace!(
442 "async wait_fc wait wait_count={} max={}",
443 wait_count,
444 self.cfg.wft_max
445 );
446 if wait_count > self.cfg.wft_max {
447 return Err(IsoTpError::Timeout(TimeoutKind::NBs));
448 }
449 fc_start = self.clock.now();
450 continue;
451 }
452 FlowStatus::Overflow => return Err(IsoTpError::Overflow),
453 },
454 _ => continue,
455 }
456 }
457 }
458
459 async fn send_flow_control<R: AsyncRuntime>(
460 &mut self,
461 rt: &R,
462 start: C::Instant,
463 timeout: Duration,
464 status: FlowStatus,
465 block_size: u8,
466 st_min: u8,
467 ) -> Result<(), IsoTpError<Tx::Error>> {
468 isotp_trace!(
469 "async send_flow_control status={} bs={} st_min_raw={}",
470 flow_status_code(status),
471 block_size,
472 st_min
473 );
474 let fc = Pdu::FlowControl {
475 status,
476 block_size,
477 st_min,
478 };
479 let frame = encode_with_prefix_sized(
480 self.cfg.tx_id,
481 &fc,
482 self.cfg.padding,
483 self.cfg.tx_addr,
484 self.cfg.frame_len,
485 )
486 .map_err(|_| IsoTpError::InvalidFrame)?;
487 self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
488 .await?;
489 Ok(())
490 }
491
492 async fn send_overflow_fc<R: AsyncRuntime>(
493 &mut self,
494 rt: &R,
495 start: C::Instant,
496 timeout: Duration,
497 ) -> Result<(), IsoTpError<Tx::Error>> {
498 self.send_flow_control(
499 rt,
500 start,
501 timeout,
502 FlowStatus::Overflow,
503 0,
504 duration_to_st_min(self.cfg.st_min),
505 )
506 .await
507 }
508
509 async fn send_frame<R: AsyncRuntime>(
510 &mut self,
511 rt: &R,
512 start: C::Instant,
513 timeout: Duration,
514 kind: TimeoutKind,
515 frame: &F,
516 ) -> Result<(), IsoTpError<Tx::Error>> {
517 let remaining =
518 remaining(timeout, self.clock.elapsed(start)).ok_or(IsoTpError::Timeout(kind))?;
519 match rt.timeout(remaining, self.tx.send(frame)).await {
520 Ok(Ok(())) => Ok(()),
521 Ok(Err(err)) => Err(IsoTpError::LinkError(err)),
522 Err(_) => Err(IsoTpError::Timeout(kind)),
523 }
524 }
525}
526
527#[cfg(feature = "defmt")]
528#[inline]
529fn flow_status_code(status: FlowStatus) -> u8 {
530 match status {
531 FlowStatus::ClearToSend => 0,
532 FlowStatus::Wait => 1,
533 FlowStatus::Overflow => 2,
534 }
535}
536
537fn remaining(timeout: Duration, elapsed: Duration) -> Option<Duration> {
538 timeout.checked_sub(elapsed)
539}
540
541async fn sleep_or_timeout<C: Clock, R: AsyncRuntime, E>(
542 clock: &C,
543 rt: &R,
544 start: C::Instant,
545 timeout: Duration,
546 kind: TimeoutKind,
547 duration: Duration,
548) -> Result<(), IsoTpError<E>> {
549 let remaining = remaining(timeout, clock.elapsed(start)).ok_or(IsoTpError::Timeout(kind))?;
550 let wait_for = duration.min(remaining);
551 let sleep_fut = rt.sleep(wait_for);
552 match rt.timeout(wait_for, sleep_fut).await {
553 Ok(()) => Ok(()),
554 Err(_) => Err(IsoTpError::Timeout(kind)),
555 }
556}
557
558#[cfg(feature = "std")]
559impl<'a, Tx, Rx, F> IsoTpAsyncNode<'a, Tx, Rx, F, crate::StdClock>
560where
561 Tx: AsyncTxFrameIo<Frame = F>,
562 Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
563 F: Frame,
564{
565 pub fn with_std_clock(
567 tx: Tx,
568 rx: Rx,
569 cfg: IsoTpConfig,
570 rx_buffer: &'a mut [u8],
571 ) -> Result<Self, IsoTpError<()>> {
572 IsoTpAsyncNode::with_clock(tx, rx, cfg, crate::StdClock, rx_buffer)
573 }
574}
575
576pub struct IsoTpAsyncNodeBuilder<Tx, Rx, C> {
578 tx: Tx,
579 rx: Rx,
580 cfg: IsoTpConfig,
581 clock: C,
582}
583
584pub struct IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
586 tx: Tx,
587 rx: Rx,
588 cfg: IsoTpConfig,
589 clock: C,
590 rx_storage: RxStorage<'a>,
591}
592
593impl<Tx, Rx, C> IsoTpAsyncNodeBuilder<Tx, Rx, C> {
594 pub fn rx_buffer<'a>(self, buffer: &'a mut [u8]) -> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
596 self.rx_storage(RxStorage::Borrowed(buffer))
597 }
598
599 pub fn rx_storage<'a>(
601 self,
602 rx_storage: RxStorage<'a>,
603 ) -> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
604 IsoTpAsyncNodeBuilderWithRx {
605 tx: self.tx,
606 rx: self.rx,
607 cfg: self.cfg,
608 clock: self.clock,
609 rx_storage,
610 }
611 }
612}
613
614impl<'a, Tx, Rx, C> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C>
615where
616 Tx: AsyncTxFrameIo,
617 Rx: AsyncRxFrameIo<Frame = <Tx as AsyncTxFrameIo>::Frame, Error = <Tx as AsyncTxFrameIo>::Error>,
618 <Tx as AsyncTxFrameIo>::Frame: Frame,
619 C: Clock,
620{
621 pub fn build(
623 self,
624 ) -> Result<IsoTpAsyncNode<'a, Tx, Rx, <Tx as AsyncTxFrameIo>::Frame, C>, IsoTpError<()>> {
625 self.cfg.validate().map_err(|_| IsoTpError::InvalidConfig)?;
626 if self.rx_storage.capacity() < self.cfg.max_payload_len {
627 return Err(IsoTpError::InvalidConfig);
628 }
629 let rx_flow_control = RxFlowControl::from_config(&self.cfg);
630 Ok(IsoTpAsyncNode {
631 tx: self.tx,
632 rx: self.rx,
633 cfg: self.cfg,
634 rx_flow_control,
635 clock: self.clock,
636 rx_machine: RxMachine::new(self.rx_storage),
637 rx_last_activity: None,
638 })
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use alloc::boxed::Box;
646 use core::future::Future;
647 use core::pin::Pin;
648 use std::sync::Mutex;
649
650 #[derive(Debug, Clone, Copy)]
651 struct TestClock {
652 now_ms: u64,
653 }
654
655 impl Clock for TestClock {
656 type Instant = u64;
657
658 fn now(&self) -> Self::Instant {
659 self.now_ms
660 }
661
662 fn elapsed(&self, earlier: Self::Instant) -> Duration {
663 Duration::from_millis(self.now_ms.saturating_sub(earlier))
664 }
665
666 fn add(&self, instant: Self::Instant, dur: Duration) -> Self::Instant {
667 instant.saturating_add(dur.as_millis() as u64)
668 }
669 }
670
671 #[derive(Default)]
672 struct TestRuntime {
673 sleeps: Mutex<Vec<Duration>>,
674 }
675
676 impl AsyncRuntime for TestRuntime {
677 type TimeoutError = ();
678
679 type Sleep<'a>
680 = core::future::Ready<()>
681 where
682 Self: 'a;
683
684 fn sleep<'a>(&'a self, duration: Duration) -> Self::Sleep<'a> {
685 self.sleeps.lock().unwrap().push(duration);
686 core::future::ready(())
687 }
688
689 type Timeout<'a, F>
690 = Pin<Box<dyn Future<Output = Result<F::Output, Self::TimeoutError>> + 'a>>
691 where
692 Self: 'a,
693 F: Future + 'a;
694
695 fn timeout<'a, F>(&'a self, _duration: Duration, future: F) -> Self::Timeout<'a, F>
696 where
697 F: Future + 'a,
698 {
699 Box::pin(async move { Ok(future.await) })
700 }
701 }
702
703 #[test]
704 fn remaining_handles_underflow() {
705 assert_eq!(
706 remaining(Duration::from_millis(10), Duration::from_millis(5)),
707 Some(Duration::from_millis(5))
708 );
709 assert_eq!(
710 remaining(Duration::from_millis(10), Duration::from_millis(10)),
711 Some(Duration::from_millis(0))
712 );
713 assert_eq!(
714 remaining(Duration::from_millis(10), Duration::from_millis(11)),
715 None
716 );
717 }
718
719 #[tokio::test(flavor = "current_thread")]
720 async fn sleep_or_timeout_uses_min_of_requested_and_remaining() {
721 let clock = TestClock { now_ms: 30 };
722 let rt = TestRuntime::default();
723 let start = 0u64;
724
725 let res = sleep_or_timeout::<_, _, ()>(
726 &clock,
727 &rt,
728 start,
729 Duration::from_millis(100),
730 TimeoutKind::NAs,
731 Duration::from_millis(200),
732 )
733 .await;
734 assert!(res.is_ok());
735
736 let sleeps = rt.sleeps.lock().unwrap().clone();
737 assert_eq!(sleeps, vec![Duration::from_millis(70)]);
738 }
739
740 #[tokio::test(flavor = "current_thread")]
741 async fn sleep_or_timeout_returns_timeout_when_elapsed_exceeds_deadline() {
742 let clock = TestClock { now_ms: 100 };
743 let rt = TestRuntime::default();
744 let start = 0u64;
745
746 let err = sleep_or_timeout::<_, _, ()>(
747 &clock,
748 &rt,
749 start,
750 Duration::from_millis(50),
751 TimeoutKind::NAs,
752 Duration::from_millis(1),
753 )
754 .await
755 .unwrap_err();
756 assert!(matches!(err, IsoTpError::Timeout(TimeoutKind::NAs)));
757
758 let sleeps = rt.sleeps.lock().unwrap().clone();
759 assert!(sleeps.is_empty());
760 }
761
762 #[test]
763 fn test_clock_now_and_add_are_exercised() {
764 let clock = TestClock { now_ms: 5 };
765 assert_eq!(clock.now(), 5);
766 assert_eq!(clock.add(10, Duration::from_millis(7)), 17);
767 }
768}