Skip to main content

can_iso_tp/
async_node.rs

1//! Async ISO-TP node implementation.
2
3use core::time::Duration;
4
5use embedded_can::Frame;
6use embedded_can_interface::{AsyncRxFrameIo, AsyncTxFrameIo};
7
8use crate::async_io::AsyncRuntime;
9use crate::errors::{IsoTpError, TimeoutKind};
10use crate::pdu::{
11    FlowStatus, Pdu, decode_with_offset, duration_to_st_min, encode_with_prefix_sized,
12    st_min_to_duration,
13};
14use crate::rx::{RxMachine, RxOutcome, RxState, RxStorage};
15use crate::timer::Clock;
16use crate::{IsoTpConfig, RxFlowControl, id_matches};
17
18/// Async ISO-TP endpoint backed by async transmit/receive halves and a clock.
19pub struct IsoTpAsyncNode<'a, Tx, Rx, F, C>
20where
21    Tx: AsyncTxFrameIo<Frame = F>,
22    Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
23    C: Clock,
24{
25    tx: Tx,
26    rx: Rx,
27    cfg: IsoTpConfig,
28    rx_flow_control: RxFlowControl,
29    clock: C,
30    rx_machine: RxMachine<'a>,
31    rx_last_activity: Option<C::Instant>,
32}
33
34impl<'a, Tx, Rx, F, C> IsoTpAsyncNode<'a, Tx, Rx, F, C>
35where
36    Tx: AsyncTxFrameIo<Frame = F>,
37    Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
38    F: Frame,
39    C: Clock,
40{
41    /// Start building an [`IsoTpAsyncNode`] (requires RX storage before `build()`).
42    pub fn builder(tx: Tx, rx: Rx, cfg: IsoTpConfig, clock: C) -> IsoTpAsyncNodeBuilder<Tx, Rx, C> {
43        IsoTpAsyncNodeBuilder { tx, rx, cfg, clock }
44    }
45
46    /// Construct using a provided clock and caller-provided RX buffer.
47    pub fn with_clock(
48        tx: Tx,
49        rx: Rx,
50        cfg: IsoTpConfig,
51        clock: C,
52        rx_buffer: &'a mut [u8],
53    ) -> Result<Self, IsoTpError<()>> {
54        Self::with_clock_and_storage(tx, rx, cfg, clock, RxStorage::Borrowed(rx_buffer))
55    }
56
57    /// Construct using a provided clock and explicit RX storage.
58    pub fn with_clock_and_storage(
59        tx: Tx,
60        rx: Rx,
61        cfg: IsoTpConfig,
62        clock: C,
63        rx_storage: RxStorage<'a>,
64    ) -> Result<Self, IsoTpError<()>> {
65        let node = Self::builder(tx, rx, cfg, clock)
66            .rx_storage(rx_storage)
67            .build()?;
68        Ok(node)
69    }
70
71    /// Get the current receive-side FlowControl parameters (BS/STmin).
72    pub fn rx_flow_control(&self) -> RxFlowControl {
73        self.rx_flow_control
74    }
75
76    /// Update receive-side FlowControl parameters (BS/STmin).
77    pub fn set_rx_flow_control(&mut self, fc: RxFlowControl) {
78        self.rx_flow_control = fc;
79    }
80
81    fn update_rx_timeout_activity(&mut self, now: C::Instant) {
82        self.rx_last_activity = if self.rx_machine.state == RxState::Receiving {
83            Some(now)
84        } else {
85            None
86        };
87    }
88
89    fn expire_rx_timeout_if_needed(&mut self) {
90        let Some(last_activity) = self.rx_last_activity else {
91            return;
92        };
93        if self.rx_machine.state != RxState::Receiving {
94            self.rx_last_activity = None;
95            return;
96        }
97        if self.clock.elapsed(last_activity) >= self.cfg.n_br {
98            isotp_warn!("async recv n_br timeout; aborting in-flight rx");
99            self.rx_machine.reset();
100            self.rx_last_activity = None;
101        }
102    }
103
104    /// Blocking-by-await send until completion or timeout.
105    ///
106    /// This method performs the full ISO-TP handshake and segmentation as needed, using the
107    /// provided runtime for sleeps/timeouts.
108    pub async fn send<R: AsyncRuntime>(
109        &mut self,
110        rt: &R,
111        payload: &[u8],
112        timeout: Duration,
113    ) -> Result<(), IsoTpError<Tx::Error>> {
114        if payload.len() > self.cfg.max_payload_len {
115            isotp_warn!(
116                "async send overflow payload_len={} max={}",
117                payload.len(),
118                self.cfg.max_payload_len
119            );
120            return Err(IsoTpError::Overflow);
121        }
122
123        let start = self.clock.now();
124        if self.clock.elapsed(start) >= timeout {
125            return Err(IsoTpError::Timeout(TimeoutKind::NAs));
126        }
127
128        if payload.len() <= self.cfg.max_single_frame_payload() {
129            isotp_trace!("async send single-frame len={}", payload.len());
130            let pdu = Pdu::SingleFrame {
131                len: payload.len() as u8,
132                data: payload,
133            };
134            let frame = encode_with_prefix_sized(
135                self.cfg.tx_id,
136                &pdu,
137                self.cfg.padding,
138                self.cfg.tx_addr,
139                self.cfg.frame_len,
140            )
141            .map_err(|_| IsoTpError::InvalidFrame)?;
142            self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
143                .await?;
144            return Ok(());
145        }
146
147        let mut offset = payload.len().min(self.cfg.max_first_frame_payload());
148        let mut next_sn: u8 = 1;
149        let wait_count: u8 = 0;
150        isotp_debug!(
151            "async send first-frame payload_len={} ff_chunk={} bs={} st_min_ms={}",
152            payload.len(),
153            offset,
154            self.cfg.block_size,
155            self.cfg.st_min.as_millis() as u64
156        );
157
158        let pdu = Pdu::FirstFrame {
159            len: payload.len() as u16,
160            data: &payload[..offset],
161        };
162        let frame = encode_with_prefix_sized(
163            self.cfg.tx_id,
164            &pdu,
165            self.cfg.padding,
166            self.cfg.tx_addr,
167            self.cfg.frame_len,
168        )
169        .map_err(|_| IsoTpError::InvalidFrame)?;
170        self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
171            .await?;
172
173        let fc_start = self.clock.now();
174        let (mut block_size, mut st_min, mut wait_count) = self
175            .wait_for_flow_control(rt, start, timeout, fc_start, wait_count)
176            .await?;
177        let mut block_remaining = block_size;
178
179        let mut last_cf_sent: Option<C::Instant> = None;
180        while offset < payload.len() {
181            if block_size > 0 && block_remaining == 0 {
182                let fc_start = self.clock.now();
183                let (new_bs, new_st_min, new_wait_count) = self
184                    .wait_for_flow_control(rt, start, timeout, fc_start, wait_count)
185                    .await?;
186                block_size = new_bs;
187                block_remaining = new_bs;
188                st_min = new_st_min;
189                wait_count = new_wait_count;
190                continue;
191            }
192
193            if let Some(sent_at) = last_cf_sent {
194                let elapsed = self.clock.elapsed(sent_at);
195                if elapsed < st_min {
196                    let wait_for = st_min - elapsed;
197                    sleep_or_timeout(&self.clock, rt, start, timeout, TimeoutKind::NAs, wait_for)
198                        .await?;
199                }
200            }
201
202            let remaining = payload.len() - offset;
203            let chunk = remaining.min(self.cfg.max_consecutive_frame_payload());
204            let pdu = Pdu::ConsecutiveFrame {
205                sn: next_sn & 0x0F,
206                data: &payload[offset..offset + chunk],
207            };
208            let frame = encode_with_prefix_sized(
209                self.cfg.tx_id,
210                &pdu,
211                self.cfg.padding,
212                self.cfg.tx_addr,
213                self.cfg.frame_len,
214            )
215            .map_err(|_| IsoTpError::InvalidFrame)?;
216            self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
217                .await?;
218            isotp_trace!(
219                "async send cf sent sn={} chunk={} offset={} payload_len={} block_remaining={}",
220                next_sn & 0x0F,
221                chunk,
222                offset,
223                payload.len(),
224                block_remaining
225            );
226
227            last_cf_sent = Some(self.clock.now());
228            offset += chunk;
229            next_sn = (next_sn + 1) & 0x0F;
230
231            if block_size > 0 {
232                block_remaining = block_remaining.saturating_sub(1);
233            }
234        }
235
236        Ok(())
237    }
238
239    /// Blocking-by-await receive until a full payload arrives or timeout.
240    ///
241    /// The provided `deliver` callback is invoked only when a full payload has been reassembled.
242    /// The slice passed to `deliver` is valid until the next receive operation mutates the internal
243    /// reassembly buffer.
244    pub async fn recv<R: AsyncRuntime>(
245        &mut self,
246        rt: &R,
247        timeout: Duration,
248        deliver: &mut dyn FnMut(&[u8]),
249    ) -> Result<(), IsoTpError<Tx::Error>> {
250        let start = self.clock.now();
251
252        loop {
253            self.expire_rx_timeout_if_needed();
254
255            let global_remaining = remaining(timeout, self.clock.elapsed(start))
256                .ok_or(IsoTpError::Timeout(TimeoutKind::NAr))?;
257            let wait_for = if let Some(last_activity) = self.rx_last_activity {
258                if self.rx_machine.state == RxState::Receiving {
259                    let elapsed = self.clock.elapsed(last_activity);
260                    match self.cfg.n_br.checked_sub(elapsed) {
261                        Some(remaining_n_br) => global_remaining.min(remaining_n_br),
262                        None => Duration::from_millis(0),
263                    }
264                } else {
265                    global_remaining
266                }
267            } else {
268                global_remaining
269            };
270            if wait_for == Duration::from_millis(0) {
271                self.expire_rx_timeout_if_needed();
272                continue;
273            }
274
275            let frame = match rt.timeout(wait_for, self.rx.recv()).await {
276                Ok(Ok(frame)) => frame,
277                Ok(Err(err)) => return Err(IsoTpError::LinkError(err)),
278                Err(_) => {
279                    self.expire_rx_timeout_if_needed();
280                    if remaining(timeout, self.clock.elapsed(start)).is_none() {
281                        return Err(IsoTpError::Timeout(TimeoutKind::NAr));
282                    }
283                    continue;
284                }
285            };
286
287            if !id_matches(frame.id(), &self.cfg.rx_id) {
288                isotp_trace!("async recv drop frame: rx_id mismatch");
289                continue;
290            }
291            if let Some(expected) = self.cfg.rx_addr
292                && frame.data().first().copied() != Some(expected)
293            {
294                isotp_trace!(
295                    "async recv drop frame: rx_addr mismatch expected={}",
296                    expected
297                );
298                continue;
299            }
300
301            let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
302                isotp_warn!("async recv invalid frame decode");
303                IsoTpError::InvalidFrame
304            })?;
305            if matches!(pdu, Pdu::FlowControl { .. }) {
306                isotp_trace!("async recv ignore flow-control while receiving");
307                continue;
308            }
309
310            let outcome = match self
311                .rx_machine
312                .on_pdu(&self.cfg, &self.rx_flow_control, pdu)
313            {
314                Ok(o) => o,
315                Err(IsoTpError::Overflow) => {
316                    isotp_warn!("async recv rx overflow; sending overflow fc");
317                    self.rx_machine.reset();
318                    self.rx_last_activity = None;
319                    let _ = self.send_overflow_fc(rt, start, timeout).await;
320                    return Err(IsoTpError::RxOverflow);
321                }
322                Err(IsoTpError::UnexpectedPdu) => {
323                    self.update_rx_timeout_activity(self.clock.now());
324                    continue;
325                }
326                Err(IsoTpError::BadSequence) => {
327                    isotp_warn!("async recv bad sequence");
328                    self.rx_machine.reset();
329                    self.rx_last_activity = None;
330                    return Err(IsoTpError::BadSequence);
331                }
332                Err(IsoTpError::InvalidFrame) => return Err(IsoTpError::InvalidFrame),
333                Err(IsoTpError::InvalidConfig) => return Err(IsoTpError::InvalidConfig),
334                Err(IsoTpError::Timeout(kind)) => return Err(IsoTpError::Timeout(kind)),
335                Err(IsoTpError::WouldBlock) => return Err(IsoTpError::WouldBlock),
336                Err(IsoTpError::RxOverflow) => return Err(IsoTpError::RxOverflow),
337                Err(IsoTpError::NotIdle) => return Err(IsoTpError::NotIdle),
338                Err(IsoTpError::LinkError(_)) => return Err(IsoTpError::InvalidFrame),
339            };
340
341            match outcome {
342                RxOutcome::None => {
343                    self.update_rx_timeout_activity(self.clock.now());
344                    continue;
345                }
346                RxOutcome::SendFlowControl {
347                    status,
348                    block_size,
349                    st_min,
350                } => {
351                    self.update_rx_timeout_activity(self.clock.now());
352                    isotp_trace!(
353                        "async recv send fc status={} bs={} st_min_raw={}",
354                        flow_status_code(status),
355                        block_size,
356                        st_min
357                    );
358                    self.send_flow_control(rt, start, timeout, status, block_size, st_min)
359                        .await?;
360                }
361                RxOutcome::Completed(_len) => {
362                    self.rx_last_activity = None;
363                    let data = self.rx_machine.take_completed();
364                    isotp_debug!("async recv completed len={}", data.len());
365                    deliver(data);
366                    return Ok(());
367                }
368            }
369        }
370    }
371
372    async fn wait_for_flow_control<R: AsyncRuntime>(
373        &mut self,
374        rt: &R,
375        global_start: C::Instant,
376        global_timeout: Duration,
377        mut fc_start: C::Instant,
378        mut wait_count: u8,
379    ) -> Result<(u8, Duration, u8), IsoTpError<Tx::Error>> {
380        loop {
381            if self.clock.elapsed(fc_start) >= self.cfg.n_bs {
382                return Err(IsoTpError::Timeout(TimeoutKind::NBs));
383            }
384
385            let fc_remaining = self.cfg.n_bs - self.clock.elapsed(fc_start);
386            let global_remaining = remaining(global_timeout, self.clock.elapsed(global_start))
387                .ok_or(IsoTpError::Timeout(TimeoutKind::NAs))?;
388            let wait_for = fc_remaining.min(global_remaining);
389
390            let frame = match rt.timeout(wait_for, self.rx.recv()).await {
391                Ok(Ok(f)) => f,
392                Ok(Err(err)) => return Err(IsoTpError::LinkError(err)),
393                Err(_) => {
394                    if global_remaining <= fc_remaining {
395                        return Err(IsoTpError::Timeout(TimeoutKind::NAs));
396                    }
397                    return Err(IsoTpError::Timeout(TimeoutKind::NBs));
398                }
399            };
400
401            if !id_matches(frame.id(), &self.cfg.rx_id) {
402                isotp_trace!("async wait_fc drop frame: rx_id mismatch");
403                continue;
404            }
405            if let Some(expected) = self.cfg.rx_addr
406                && frame.data().first().copied() != Some(expected)
407            {
408                isotp_trace!(
409                    "async wait_fc drop frame: rx_addr mismatch expected={}",
410                    expected
411                );
412                continue;
413            }
414
415            let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
416                isotp_warn!("async wait_fc invalid frame decode");
417                IsoTpError::InvalidFrame
418            })?;
419            match pdu {
420                Pdu::FlowControl {
421                    status,
422                    block_size,
423                    st_min,
424                } => match status {
425                    FlowStatus::ClearToSend => {
426                        let bs = if block_size == 0 {
427                            self.cfg.block_size
428                        } else {
429                            block_size
430                        };
431                        let st_min = st_min_to_duration(st_min).unwrap_or(self.cfg.st_min);
432                        isotp_debug!(
433                            "async wait_fc cts bs={} st_min_ms={}",
434                            bs,
435                            st_min.as_millis() as u64
436                        );
437                        return Ok((bs, st_min, 0));
438                    }
439                    FlowStatus::Wait => {
440                        wait_count = wait_count.saturating_add(1);
441                        isotp_trace!(
442                            "async wait_fc wait wait_count={} max={}",
443                            wait_count,
444                            self.cfg.wft_max
445                        );
446                        if wait_count > self.cfg.wft_max {
447                            return Err(IsoTpError::Timeout(TimeoutKind::NBs));
448                        }
449                        fc_start = self.clock.now();
450                        continue;
451                    }
452                    FlowStatus::Overflow => return Err(IsoTpError::Overflow),
453                },
454                _ => continue,
455            }
456        }
457    }
458
459    async fn send_flow_control<R: AsyncRuntime>(
460        &mut self,
461        rt: &R,
462        start: C::Instant,
463        timeout: Duration,
464        status: FlowStatus,
465        block_size: u8,
466        st_min: u8,
467    ) -> Result<(), IsoTpError<Tx::Error>> {
468        isotp_trace!(
469            "async send_flow_control status={} bs={} st_min_raw={}",
470            flow_status_code(status),
471            block_size,
472            st_min
473        );
474        let fc = Pdu::FlowControl {
475            status,
476            block_size,
477            st_min,
478        };
479        let frame = encode_with_prefix_sized(
480            self.cfg.tx_id,
481            &fc,
482            self.cfg.padding,
483            self.cfg.tx_addr,
484            self.cfg.frame_len,
485        )
486        .map_err(|_| IsoTpError::InvalidFrame)?;
487        self.send_frame(rt, start, timeout, TimeoutKind::NAs, &frame)
488            .await?;
489        Ok(())
490    }
491
492    async fn send_overflow_fc<R: AsyncRuntime>(
493        &mut self,
494        rt: &R,
495        start: C::Instant,
496        timeout: Duration,
497    ) -> Result<(), IsoTpError<Tx::Error>> {
498        self.send_flow_control(
499            rt,
500            start,
501            timeout,
502            FlowStatus::Overflow,
503            0,
504            duration_to_st_min(self.cfg.st_min),
505        )
506        .await
507    }
508
509    async fn send_frame<R: AsyncRuntime>(
510        &mut self,
511        rt: &R,
512        start: C::Instant,
513        timeout: Duration,
514        kind: TimeoutKind,
515        frame: &F,
516    ) -> Result<(), IsoTpError<Tx::Error>> {
517        let remaining =
518            remaining(timeout, self.clock.elapsed(start)).ok_or(IsoTpError::Timeout(kind))?;
519        match rt.timeout(remaining, self.tx.send(frame)).await {
520            Ok(Ok(())) => Ok(()),
521            Ok(Err(err)) => Err(IsoTpError::LinkError(err)),
522            Err(_) => Err(IsoTpError::Timeout(kind)),
523        }
524    }
525}
526
527#[cfg(feature = "defmt")]
528#[inline]
529fn flow_status_code(status: FlowStatus) -> u8 {
530    match status {
531        FlowStatus::ClearToSend => 0,
532        FlowStatus::Wait => 1,
533        FlowStatus::Overflow => 2,
534    }
535}
536
537fn remaining(timeout: Duration, elapsed: Duration) -> Option<Duration> {
538    timeout.checked_sub(elapsed)
539}
540
541async fn sleep_or_timeout<C: Clock, R: AsyncRuntime, E>(
542    clock: &C,
543    rt: &R,
544    start: C::Instant,
545    timeout: Duration,
546    kind: TimeoutKind,
547    duration: Duration,
548) -> Result<(), IsoTpError<E>> {
549    let remaining = remaining(timeout, clock.elapsed(start)).ok_or(IsoTpError::Timeout(kind))?;
550    let wait_for = duration.min(remaining);
551    let sleep_fut = rt.sleep(wait_for);
552    match rt.timeout(wait_for, sleep_fut).await {
553        Ok(()) => Ok(()),
554        Err(_) => Err(IsoTpError::Timeout(kind)),
555    }
556}
557
558#[cfg(feature = "std")]
559impl<'a, Tx, Rx, F> IsoTpAsyncNode<'a, Tx, Rx, F, crate::StdClock>
560where
561    Tx: AsyncTxFrameIo<Frame = F>,
562    Rx: AsyncRxFrameIo<Frame = F, Error = Tx::Error>,
563    F: Frame,
564{
565    /// Convenience constructor using `StdClock`.
566    pub fn with_std_clock(
567        tx: Tx,
568        rx: Rx,
569        cfg: IsoTpConfig,
570        rx_buffer: &'a mut [u8],
571    ) -> Result<Self, IsoTpError<()>> {
572        IsoTpAsyncNode::with_clock(tx, rx, cfg, crate::StdClock, rx_buffer)
573    }
574}
575
576/// Builder for [`IsoTpAsyncNode`] that enforces providing RX storage before construction.
577pub struct IsoTpAsyncNodeBuilder<Tx, Rx, C> {
578    tx: Tx,
579    rx: Rx,
580    cfg: IsoTpConfig,
581    clock: C,
582}
583
584/// Builder state after RX storage has been provided.
585pub struct IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
586    tx: Tx,
587    rx: Rx,
588    cfg: IsoTpConfig,
589    clock: C,
590    rx_storage: RxStorage<'a>,
591}
592
593impl<Tx, Rx, C> IsoTpAsyncNodeBuilder<Tx, Rx, C> {
594    /// Provide the RX buffer used for receive-side reassembly.
595    pub fn rx_buffer<'a>(self, buffer: &'a mut [u8]) -> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
596        self.rx_storage(RxStorage::Borrowed(buffer))
597    }
598
599    /// Provide explicit RX storage (borrowed or owned, depending on features).
600    pub fn rx_storage<'a>(
601        self,
602        rx_storage: RxStorage<'a>,
603    ) -> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C> {
604        IsoTpAsyncNodeBuilderWithRx {
605            tx: self.tx,
606            rx: self.rx,
607            cfg: self.cfg,
608            clock: self.clock,
609            rx_storage,
610        }
611    }
612}
613
614impl<'a, Tx, Rx, C> IsoTpAsyncNodeBuilderWithRx<'a, Tx, Rx, C>
615where
616    Tx: AsyncTxFrameIo,
617    Rx: AsyncRxFrameIo<Frame = <Tx as AsyncTxFrameIo>::Frame, Error = <Tx as AsyncTxFrameIo>::Error>,
618    <Tx as AsyncTxFrameIo>::Frame: Frame,
619    C: Clock,
620{
621    /// Validate configuration and build an [`IsoTpAsyncNode`].
622    pub fn build(
623        self,
624    ) -> Result<IsoTpAsyncNode<'a, Tx, Rx, <Tx as AsyncTxFrameIo>::Frame, C>, IsoTpError<()>> {
625        self.cfg.validate().map_err(|_| IsoTpError::InvalidConfig)?;
626        if self.rx_storage.capacity() < self.cfg.max_payload_len {
627            return Err(IsoTpError::InvalidConfig);
628        }
629        let rx_flow_control = RxFlowControl::from_config(&self.cfg);
630        Ok(IsoTpAsyncNode {
631            tx: self.tx,
632            rx: self.rx,
633            cfg: self.cfg,
634            rx_flow_control,
635            clock: self.clock,
636            rx_machine: RxMachine::new(self.rx_storage),
637            rx_last_activity: None,
638        })
639    }
640}
641
642#[cfg(test)]
643mod tests {
644    use super::*;
645    use alloc::boxed::Box;
646    use core::future::Future;
647    use core::pin::Pin;
648    use std::sync::Mutex;
649
650    #[derive(Debug, Clone, Copy)]
651    struct TestClock {
652        now_ms: u64,
653    }
654
655    impl Clock for TestClock {
656        type Instant = u64;
657
658        fn now(&self) -> Self::Instant {
659            self.now_ms
660        }
661
662        fn elapsed(&self, earlier: Self::Instant) -> Duration {
663            Duration::from_millis(self.now_ms.saturating_sub(earlier))
664        }
665
666        fn add(&self, instant: Self::Instant, dur: Duration) -> Self::Instant {
667            instant.saturating_add(dur.as_millis() as u64)
668        }
669    }
670
671    #[derive(Default)]
672    struct TestRuntime {
673        sleeps: Mutex<Vec<Duration>>,
674    }
675
676    impl AsyncRuntime for TestRuntime {
677        type TimeoutError = ();
678
679        type Sleep<'a>
680            = core::future::Ready<()>
681        where
682            Self: 'a;
683
684        fn sleep<'a>(&'a self, duration: Duration) -> Self::Sleep<'a> {
685            self.sleeps.lock().unwrap().push(duration);
686            core::future::ready(())
687        }
688
689        type Timeout<'a, F>
690            = Pin<Box<dyn Future<Output = Result<F::Output, Self::TimeoutError>> + 'a>>
691        where
692            Self: 'a,
693            F: Future + 'a;
694
695        fn timeout<'a, F>(&'a self, _duration: Duration, future: F) -> Self::Timeout<'a, F>
696        where
697            F: Future + 'a,
698        {
699            Box::pin(async move { Ok(future.await) })
700        }
701    }
702
703    #[test]
704    fn remaining_handles_underflow() {
705        assert_eq!(
706            remaining(Duration::from_millis(10), Duration::from_millis(5)),
707            Some(Duration::from_millis(5))
708        );
709        assert_eq!(
710            remaining(Duration::from_millis(10), Duration::from_millis(10)),
711            Some(Duration::from_millis(0))
712        );
713        assert_eq!(
714            remaining(Duration::from_millis(10), Duration::from_millis(11)),
715            None
716        );
717    }
718
719    #[tokio::test(flavor = "current_thread")]
720    async fn sleep_or_timeout_uses_min_of_requested_and_remaining() {
721        let clock = TestClock { now_ms: 30 };
722        let rt = TestRuntime::default();
723        let start = 0u64;
724
725        let res = sleep_or_timeout::<_, _, ()>(
726            &clock,
727            &rt,
728            start,
729            Duration::from_millis(100),
730            TimeoutKind::NAs,
731            Duration::from_millis(200),
732        )
733        .await;
734        assert!(res.is_ok());
735
736        let sleeps = rt.sleeps.lock().unwrap().clone();
737        assert_eq!(sleeps, vec![Duration::from_millis(70)]);
738    }
739
740    #[tokio::test(flavor = "current_thread")]
741    async fn sleep_or_timeout_returns_timeout_when_elapsed_exceeds_deadline() {
742        let clock = TestClock { now_ms: 100 };
743        let rt = TestRuntime::default();
744        let start = 0u64;
745
746        let err = sleep_or_timeout::<_, _, ()>(
747            &clock,
748            &rt,
749            start,
750            Duration::from_millis(50),
751            TimeoutKind::NAs,
752            Duration::from_millis(1),
753        )
754        .await
755        .unwrap_err();
756        assert!(matches!(err, IsoTpError::Timeout(TimeoutKind::NAs)));
757
758        let sleeps = rt.sleeps.lock().unwrap().clone();
759        assert!(sleeps.is_empty());
760    }
761
762    #[test]
763    fn test_clock_now_and_add_are_exercised() {
764        let clock = TestClock { now_ms: 5 };
765        assert_eq!(clock.now(), 5);
766        assert_eq!(clock.add(10, Duration::from_millis(7)), 17);
767    }
768}