Skip to main content

can_iso_tp/
lib.rs

1//! `can-iso-tp`: a lightweight ISO-TP (ISO 15765-2) transport layer for CAN.
2//!
3//! ISO-TP (“ISO Transport Protocol”) defines how to carry *larger* payloads over classic CAN by
4//! segmenting messages into:
5//! - a **Single Frame** (small payloads),
6//! - a **First Frame** + multiple **Consecutive Frames** (larger payloads), and
7//! - **Flow Control** frames to regulate pacing and batching.
8//!
9//! This crate provides:
10//! - A blocking/polling node ([`IsoTpNode`]) built on split CAN Tx/Rx halves from
11//!   `embedded-can-interface`.
12//! - An async node ([`IsoTpAsyncNode`]) for async runtimes via [`AsyncRuntime`].
13//! - Small supporting building blocks (`rx`, `tx`, `pdu`, config and addressing helpers).
14//!
15//! The public API is designed to be usable in `no_std` environments. Allocation is optional (see
16//! feature flags). Regardless of allocator availability, receive-side reassembly requires an
17//! explicit caller-provided buffer (borrowed or owned).
18//!
19//! # Feature flags
20//! - `std` (default): enables allocation support and exports [`StdClock`].
21//! - `alloc`: enables allocation support without the Rust standard library (e.g. to use
22//!   [`RxStorage::Owned`]).
23//! - `uds`: enables 29-bit UDS “normal fixed” addressing helpers (`can-uds`) and multi-peer
24//!   demultiplexers ([`IsoTpDemux`], [`IsoTpAsyncDemux`]) that provide `reply_to` metadata.
25//!
26//! # Concepts
27//! - **Addressing**: ISO-TP supports “normal” addressing (just CAN IDs), and extended/mixed modes
28//!   where an additional *addressing byte* is inserted before the PCI. This crate models that via
29//!   [`IsoTpConfig::tx_addr`] / [`IsoTpConfig::rx_addr`] and helpers in [`address`].
30//! - **PCI offset**: when an addressing byte is present, the Protocol Control Information starts at
31//!   byte 1 rather than byte 0. Helpers like [`IsoTpConfig::tx_pci_offset`] and
32//!   [`pdu::decode_with_offset`] keep the encoding/decoding logic consistent.
33//! - **Progress**: the non-blocking API uses [`Progress`] to indicate whether a step completed,
34//!   needs more work, or would block.
35//!
36//! # Quick start
37//! The `IsoTpNode` API is structured as:
38//! - **polling**: repeatedly call [`IsoTpNode::poll_send`] / [`IsoTpNode::poll_recv`], or
39//! - **blocking**: call [`IsoTpNode::send`] / [`IsoTpNode::recv`], which spin internally until done
40//!   or timeout.
41//!
42//! This crate does not ship a concrete CAN driver. For tests and examples, the workspace includes
43//! `embedded-can-mock` which implements `embedded-can-interface`.
44//!
45//! ```rust,ignore
46//! use core::time::Duration;
47//! use can_iso_tp::{IsoTpConfig, IsoTpNode, Progress, StdClock};
48//! use embedded_can_interface::SplitTxRx;
49//!
50//! # fn example<D, F>(driver: D, cfg: IsoTpConfig) -> Result<(), can_iso_tp::IsoTpError<D::Error>>
51//! # where
52//! #   D: SplitTxRx,
53//! #   D::Tx: embedded_can_interface::TxFrameIo<Frame = F>,
54//! #   D::Rx: embedded_can_interface::RxFrameIo<Frame = F, Error = <D::Tx as embedded_can_interface::TxFrameIo>::Error>,
55//! #   F: embedded_can::Frame,
56//! # {
57//! let (tx, rx) = driver.split();
58//! let mut rx_buf = [0u8; 4095];
59//! let mut node = IsoTpNode::with_clock(tx, rx, cfg, StdClock, &mut rx_buf).unwrap();
60//!
61//! // Send (blocking)
62//! node.send(b"hello", Duration::from_millis(100))?;
63//!
64//! // Receive (polling)
65//! let mut out = Vec::new();
66//! loop {
67//!     // In a superloop, you typically pass your current time here.
68//!     let clock = StdClock;
69//!     let now = clock.now();
70//!     match node.poll_recv(now, &mut |data| out = data.to_vec())? {
71//!         Progress::Completed => break,
72//!         Progress::InFlight | Progress::WaitingForFlowControl | Progress::WouldBlock => continue,
73//!     }
74//! }
75//! # Ok(()) }
76//! ```
77
78#![cfg_attr(not(feature = "std"), no_std)]
79
80#[cfg(any(feature = "alloc", feature = "std"))]
81extern crate alloc;
82
83#[cfg(feature = "defmt")]
84#[allow(unused_macros)]
85macro_rules! isotp_trace {
86    ($($arg:tt)*) => {
87        defmt::trace!($($arg)*);
88    };
89}
90#[cfg(not(feature = "defmt"))]
91#[allow(unused_macros)]
92macro_rules! isotp_trace {
93    ($($arg:tt)*) => {};
94}
95
96#[cfg(feature = "defmt")]
97#[allow(unused_macros)]
98macro_rules! isotp_debug {
99    ($($arg:tt)*) => {
100        defmt::debug!($($arg)*);
101    };
102}
103#[cfg(not(feature = "defmt"))]
104#[allow(unused_macros)]
105macro_rules! isotp_debug {
106    ($($arg:tt)*) => {};
107}
108
109#[cfg(feature = "defmt")]
110#[allow(unused_macros)]
111macro_rules! isotp_warn {
112    ($($arg:tt)*) => {
113        defmt::warn!($($arg)*);
114    };
115}
116#[cfg(not(feature = "defmt"))]
117#[allow(unused_macros)]
118macro_rules! isotp_warn {
119    ($($arg:tt)*) => {};
120}
121
122pub mod address;
123#[cfg(feature = "uds")]
124mod async_demux;
125pub mod async_io;
126pub mod async_node;
127pub mod config;
128#[cfg(feature = "uds")]
129pub mod demux;
130pub mod errors;
131#[cfg(feature = "isotp-interface")]
132mod interface_impl;
133pub mod pdu;
134pub mod rx;
135pub mod timer;
136pub mod tx;
137
138pub use address::{AsymmetricAddress, IsoTpAddress, RxAddress, TargetAddressType, TxAddress};
139#[cfg(feature = "uds")]
140pub use async_demux::{
141    AppRecvIntoError, IsoTpAsyncDemux, IsoTpAsyncDemuxApp, IsoTpAsyncDemuxDriver,
142};
143pub use async_io::{AsyncRuntime, TimedOut};
144pub use async_node::IsoTpAsyncNode;
145pub use config::IsoTpConfig;
146#[cfg(feature = "uds")]
147pub use demux::{IsoTpDemux, rx_storages_from_buffers};
148pub use errors::{IsoTpError, TimeoutKind};
149#[cfg(feature = "isotp-interface")]
150pub use interface_impl::AsyncWithRt;
151pub use rx::RxStorage;
152pub use timer::Clock;
153#[cfg(feature = "std")]
154pub use timer::StdClock;
155pub use tx::Progress;
156
157/// Receive-side ISO-TP flow-control parameters (BS/STmin).
158///
159/// These values are advertised to the remote sender in FlowControl (FC) frames. Updating them at
160/// runtime allows shaping the sender's rate based on backpressure.
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub struct RxFlowControl {
163    /// Block size (0 = unlimited).
164    pub block_size: u8,
165    /// Minimum separation time between consecutive frames.
166    pub st_min: Duration,
167}
168
169impl RxFlowControl {
170    /// Build flow-control parameters from a node's static configuration.
171    pub fn from_config(cfg: &IsoTpConfig) -> Self {
172        Self {
173            block_size: cfg.block_size,
174            st_min: cfg.st_min,
175        }
176    }
177}
178
179use core::mem;
180use core::time::Duration;
181use embedded_can::Frame;
182use embedded_can_interface::{Id, RxFrameIo, TxFrameIo};
183
184use pdu::{
185    FlowStatus, Pdu, decode_with_offset, duration_to_st_min, encode_with_prefix_sized,
186    st_min_to_duration,
187};
188use rx::{RxMachine, RxOutcome, RxState};
189use tx::{TxSession, TxState};
190
191/// Alias for CAN identifier.
192pub type CanId = Id;
193/// Re-export of the CAN frame trait.
194pub use embedded_can::Frame as CanFrame;
195
196/// ISO-TP endpoint backed by split transmit/receive halves and a clock.
197pub struct IsoTpNode<'a, Tx, Rx, F, C>
198where
199    Tx: TxFrameIo<Frame = F>,
200    Rx: RxFrameIo<Frame = F, Error = Tx::Error>,
201    C: Clock,
202{
203    tx: Tx,
204    rx: Rx,
205    cfg: IsoTpConfig,
206    rx_flow_control: RxFlowControl,
207    clock: C,
208    tx_state: TxState<C::Instant>,
209    rx_machine: RxMachine<'a>,
210    rx_last_activity: Option<C::Instant>,
211}
212
213impl<'a, Tx, Rx, F, C> IsoTpNode<'a, Tx, Rx, F, C>
214where
215    Tx: TxFrameIo<Frame = F>,
216    Rx: RxFrameIo<Frame = F, Error = Tx::Error>,
217    F: Frame,
218    C: Clock,
219{
220    /// Start building an [`IsoTpNode`] (requires RX storage before `build()`).
221    pub fn builder(tx: Tx, rx: Rx, cfg: IsoTpConfig, clock: C) -> IsoTpNodeBuilder<Tx, Rx, C> {
222        IsoTpNodeBuilder { tx, rx, cfg, clock }
223    }
224
225    /// Construct using a provided clock and caller-provided RX buffer.
226    ///
227    /// ISO-TP receive-side reassembly requires a buffer for storing the in-flight payload.
228    /// In `no_std`/`no alloc` environments, this is typically a static or stack-allocated slice.
229    pub fn with_clock(
230        tx: Tx,
231        rx: Rx,
232        cfg: IsoTpConfig,
233        clock: C,
234        rx_buffer: &'a mut [u8],
235    ) -> Result<Self, IsoTpError<()>> {
236        Self::with_clock_and_storage(tx, rx, cfg, clock, RxStorage::Borrowed(rx_buffer))
237    }
238
239    /// Construct using a provided clock and explicit RX storage.
240    pub fn with_clock_and_storage(
241        tx: Tx,
242        rx: Rx,
243        cfg: IsoTpConfig,
244        clock: C,
245        rx_storage: RxStorage<'a>,
246    ) -> Result<Self, IsoTpError<()>> {
247        let node = Self::builder(tx, rx, cfg, clock)
248            .rx_storage(rx_storage)
249            .build()?;
250        Ok(node)
251    }
252
253    /// Get the current receive-side FlowControl parameters (BS/STmin).
254    pub fn rx_flow_control(&self) -> RxFlowControl {
255        self.rx_flow_control
256    }
257
258    /// Update receive-side FlowControl parameters (BS/STmin).
259    ///
260    /// This affects FlowControl frames emitted in response to segmented transfers.
261    pub fn set_rx_flow_control(&mut self, fc: RxFlowControl) {
262        self.rx_flow_control = fc;
263    }
264
265    fn update_rx_timeout_activity(&mut self, now: C::Instant) {
266        self.rx_last_activity = if self.rx_machine.state == RxState::Receiving {
267            Some(now)
268        } else {
269            None
270        };
271    }
272
273    fn expire_rx_timeout_if_needed(&mut self) {
274        let Some(last_activity) = self.rx_last_activity else {
275            return;
276        };
277        if self.rx_machine.state != RxState::Receiving {
278            self.rx_last_activity = None;
279            return;
280        }
281        if self.clock.elapsed(last_activity) >= self.cfg.n_br {
282            isotp_warn!("poll_recv n_br timeout; aborting in-flight rx");
283            self.rx_machine.reset();
284            self.rx_last_activity = None;
285        }
286    }
287
288    /// Advance transmission once; caller supplies current time.
289    ///
290    /// This method is appropriate for “superloop” style designs where you poll the node until it
291    /// reports [`Progress::Completed`]. The node maintains internal state between calls.
292    pub fn poll_send(
293        &mut self,
294        payload: &[u8],
295        now: C::Instant,
296    ) -> Result<Progress, IsoTpError<Tx::Error>> {
297        // if cfg!(debug_assertions) {
298        //     match &self.tx_state {
299        //         TxState::Idle => println!("DEBUG poll_send state=Idle"),
300        //         TxState::WaitingForFc { .. } => println!("DEBUG poll_send state=WaitingForFc"),
301        //         TxState::Sending {
302        //             st_min_deadline, ..
303        //         } => println!(
304        //             "DEBUG poll_send state=Sending deadline_set={}",
305        //             st_min_deadline.is_some()
306        //         ),
307        //     };
308        // }
309        if payload.len() > self.cfg.max_payload_len {
310            isotp_warn!(
311                "poll_send overflow payload_len={} max={}",
312                payload.len(),
313                self.cfg.max_payload_len
314            );
315            return Err(IsoTpError::Overflow);
316        }
317
318        let state = mem::replace(&mut self.tx_state, TxState::Idle);
319        match state {
320            TxState::Idle => self.start_send(payload, now),
321            TxState::WaitingForFc { session, deadline } => {
322                self.continue_wait_for_fc(payload, session, deadline, now)
323            }
324            TxState::Sending {
325                session,
326                st_min_deadline,
327            } => self.continue_send(payload, session, st_min_deadline, now),
328        }
329    }
330
331    /// Blocking send until completion or timeout.
332    ///
333    /// Internally this repeatedly calls [`IsoTpNode::poll_send`] until completion or a timeout is
334    /// reached.
335    pub fn send(&mut self, payload: &[u8], timeout: Duration) -> Result<(), IsoTpError<Tx::Error>> {
336        let start = self.clock.now();
337        let deadline = self.clock.add(start, timeout);
338        loop {
339            let now = self.clock.now();
340            if now >= deadline {
341                return Err(IsoTpError::Timeout(TimeoutKind::NAs));
342            }
343            match self.poll_send(payload, now)? {
344                Progress::Completed => return Ok(()),
345                Progress::WaitingForFlowControl | Progress::InFlight | Progress::WouldBlock => {
346                    continue;
347                }
348            }
349        }
350    }
351
352    /// Non-blocking receive step; delivers bytes when complete.
353    ///
354    /// The provided `deliver` callback is invoked only when a full payload has been reassembled.
355    /// The slice passed to `deliver` is valid until the next receive operation mutates the internal
356    /// reassembly buffer.
357    pub fn poll_recv(
358        &mut self,
359        now: C::Instant,
360        deliver: &mut dyn FnMut(&[u8]),
361    ) -> Result<Progress, IsoTpError<Tx::Error>> {
362        self.expire_rx_timeout_if_needed();
363        loop {
364            let frame = match self.rx.try_recv() {
365                Ok(frame) => frame,
366                Err(_) => {
367                    return Ok(Progress::WouldBlock);
368                }
369            };
370
371            if !id_matches(frame.id(), &self.cfg.rx_id) {
372                isotp_trace!("poll_recv drop frame: rx_id mismatch");
373                continue;
374            }
375            if let Some(expected) = self.cfg.rx_addr
376                && frame.data().first().copied() != Some(expected)
377            {
378                isotp_trace!(
379                    "poll_recv drop frame: rx_addr mismatch expected={}",
380                    expected
381                );
382                continue;
383            }
384
385            let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
386                isotp_warn!("poll_recv invalid frame decode");
387                IsoTpError::InvalidFrame
388            })?;
389            if matches!(pdu, Pdu::FlowControl { .. }) {
390                isotp_trace!("poll_recv ignore flow-control while receiving");
391                continue;
392            }
393            let outcome = match self
394                .rx_machine
395                .on_pdu(&self.cfg, &self.rx_flow_control, pdu)
396            {
397                Ok(o) => o,
398                Err(IsoTpError::Overflow) => {
399                    isotp_warn!("poll_recv rx overflow; sending overflow fc");
400                    self.rx_machine.reset();
401                    self.rx_last_activity = None;
402                    let _ = self.send_overflow_fc();
403                    return Err(IsoTpError::RxOverflow);
404                }
405                Err(IsoTpError::UnexpectedPdu) => {
406                    self.update_rx_timeout_activity(now);
407                    continue;
408                }
409                Err(IsoTpError::BadSequence) => {
410                    isotp_warn!("poll_recv bad sequence");
411                    self.rx_machine.reset();
412                    self.rx_last_activity = None;
413                    return Err(IsoTpError::BadSequence);
414                }
415                Err(IsoTpError::InvalidFrame) => return Err(IsoTpError::InvalidFrame),
416                Err(IsoTpError::InvalidConfig) => return Err(IsoTpError::InvalidConfig),
417                Err(IsoTpError::Timeout(kind)) => return Err(IsoTpError::Timeout(kind)),
418                Err(IsoTpError::WouldBlock) => return Err(IsoTpError::WouldBlock),
419                Err(IsoTpError::RxOverflow) => return Err(IsoTpError::RxOverflow),
420                Err(IsoTpError::NotIdle) => return Err(IsoTpError::NotIdle),
421                Err(IsoTpError::LinkError(_)) => return Err(IsoTpError::InvalidFrame),
422            };
423
424            match outcome {
425                RxOutcome::None => {
426                    self.update_rx_timeout_activity(now);
427                    return Ok(Progress::InFlight);
428                }
429                RxOutcome::SendFlowControl {
430                    status,
431                    block_size,
432                    st_min,
433                } => {
434                    self.update_rx_timeout_activity(now);
435                    isotp_trace!(
436                        "poll_recv send fc status={} bs={} st_min_raw={}",
437                        flow_status_code(status),
438                        block_size,
439                        st_min
440                    );
441                    self.send_flow_control(status, block_size, st_min)?;
442                    return Ok(Progress::InFlight);
443                }
444                RxOutcome::Completed(_len) => {
445                    self.rx_last_activity = None;
446                    let data = self.rx_machine.take_completed();
447                    isotp_debug!("poll_recv completed len={}", data.len());
448                    deliver(data);
449                    return Ok(Progress::Completed);
450                }
451            }
452        }
453    }
454
455    /// Blocking receive until a full payload arrives or timeout.
456    ///
457    /// Internally this repeatedly calls [`IsoTpNode::poll_recv`] until completion or a timeout is
458    /// reached.
459    pub fn recv(
460        &mut self,
461        timeout: Duration,
462        deliver: &mut dyn FnMut(&[u8]),
463    ) -> Result<(), IsoTpError<Tx::Error>> {
464        let start = self.clock.now();
465        let deadline = self.clock.add(start, timeout);
466        loop {
467            let now = self.clock.now();
468            if now >= deadline {
469                return Err(IsoTpError::Timeout(TimeoutKind::NAr));
470            }
471            match self.poll_recv(now, deliver)? {
472                Progress::Completed => return Ok(()),
473                Progress::InFlight | Progress::WaitingForFlowControl | Progress::WouldBlock => {
474                    continue;
475                }
476            }
477        }
478    }
479
480    fn start_send(
481        &mut self,
482        payload: &[u8],
483        now: C::Instant,
484    ) -> Result<Progress, IsoTpError<Tx::Error>> {
485        if payload.len() <= self.cfg.max_single_frame_payload() {
486            isotp_trace!("start_send single-frame len={}", payload.len());
487            let pdu = Pdu::SingleFrame {
488                len: payload.len() as u8,
489                data: payload,
490            };
491            let frame = encode_with_prefix_sized(
492                self.cfg.tx_id,
493                &pdu,
494                self.cfg.padding,
495                self.cfg.tx_addr,
496                self.cfg.frame_len,
497            )
498            .map_err(|_| IsoTpError::InvalidFrame)?;
499            self.tx.try_send(&frame).map_err(IsoTpError::LinkError)?;
500            self.tx_state = TxState::Idle;
501            return Ok(Progress::Completed);
502        }
503
504        {
505            let mut session = TxSession::new(payload.len(), self.cfg.block_size, self.cfg.st_min);
506            let len = payload.len();
507            let chunk = payload.len().min(self.cfg.max_first_frame_payload());
508            isotp_debug!(
509                "start_send first-frame payload_len={} ff_chunk={} bs={} st_min_ms={}",
510                len,
511                chunk,
512                self.cfg.block_size,
513                self.cfg.st_min.as_millis() as u64
514            );
515            let pdu = Pdu::FirstFrame {
516                len: len as u16,
517                data: &payload[..chunk],
518            };
519            let frame = encode_with_prefix_sized(
520                self.cfg.tx_id,
521                &pdu,
522                self.cfg.padding,
523                self.cfg.tx_addr,
524                self.cfg.frame_len,
525            )
526            .map_err(|_| IsoTpError::InvalidFrame)?;
527            self.tx.try_send(&frame).map_err(IsoTpError::LinkError)?;
528
529            session.offset = chunk;
530
531            let deadline = self.clock.add(now, self.cfg.n_bs);
532            self.tx_state = TxState::WaitingForFc { session, deadline };
533            Ok(Progress::WaitingForFlowControl)
534        }
535    }
536
537    fn continue_wait_for_fc(
538        &mut self,
539        payload: &[u8],
540        mut session: TxSession,
541        deadline: C::Instant,
542        now: C::Instant,
543    ) -> Result<Progress, IsoTpError<Tx::Error>> {
544        if payload.len() != session.payload_len {
545            isotp_warn!(
546                "wait_fc payload mismatch got={} expected={}",
547                payload.len(),
548                session.payload_len
549            );
550            self.tx_state = TxState::Idle;
551            return Err(IsoTpError::NotIdle);
552        }
553        if session.wait_count > self.cfg.wft_max {
554            isotp_warn!(
555                "wait_fc exceeded wft_max wait_count={} max={}",
556                session.wait_count,
557                self.cfg.wft_max
558            );
559            self.tx_state = TxState::Idle;
560            return Err(IsoTpError::Timeout(TimeoutKind::NBs));
561        }
562        if now >= deadline {
563            isotp_warn!("wait_fc timed out (n_bs)");
564            return Err(IsoTpError::Timeout(TimeoutKind::NBs));
565        }
566        loop {
567            let frame = match self.rx.try_recv() {
568                Ok(f) => f,
569                Err(_) => {
570                    self.tx_state = TxState::WaitingForFc { session, deadline };
571                    return Ok(Progress::WaitingForFlowControl);
572                }
573            };
574            if !id_matches(frame.id(), &self.cfg.rx_id) {
575                isotp_trace!("wait_fc drop frame: rx_id mismatch");
576                continue;
577            }
578            if let Some(expected) = self.cfg.rx_addr
579                && frame.data().first().copied() != Some(expected)
580            {
581                isotp_trace!("wait_fc drop frame: rx_addr mismatch expected={}", expected);
582                continue;
583            }
584            let pdu = decode_with_offset(frame.data(), self.cfg.rx_pci_offset()).map_err(|_| {
585                isotp_warn!("wait_fc invalid frame decode");
586                IsoTpError::InvalidFrame
587            })?;
588            match pdu {
589                Pdu::FlowControl {
590                    status,
591                    block_size,
592                    st_min,
593                } => match status {
594                    FlowStatus::ClearToSend => {
595                        session.wait_count = 0;
596                        let bs = if block_size == 0 {
597                            self.cfg.block_size
598                        } else {
599                            block_size
600                        };
601                        session.block_size = bs;
602                        session.block_remaining = bs;
603                        session.st_min = st_min_to_duration(st_min).unwrap_or(self.cfg.st_min);
604                        isotp_debug!(
605                            "wait_fc cts bs={} st_min_ms={}",
606                            bs,
607                            session.st_min.as_millis() as u64
608                        );
609                        return self.continue_send(payload, session, None, now);
610                    }
611                    FlowStatus::Wait => {
612                        session.wait_count = session.wait_count.saturating_add(1);
613                        isotp_trace!(
614                            "wait_fc wait wait_count={} max={}",
615                            session.wait_count,
616                            self.cfg.wft_max
617                        );
618                        if session.wait_count > self.cfg.wft_max {
619                            let deadline = self.clock.add(now, self.cfg.n_bs);
620                            self.tx_state = TxState::WaitingForFc { session, deadline };
621                            return Err(IsoTpError::Timeout(TimeoutKind::NBs));
622                        }
623                        let new_deadline = self.clock.add(now, self.cfg.n_bs);
624                        self.tx_state = TxState::WaitingForFc {
625                            session,
626                            deadline: new_deadline,
627                        };
628                        return Ok(Progress::WaitingForFlowControl);
629                    }
630                    FlowStatus::Overflow => {
631                        isotp_warn!("wait_fc remote overflow");
632                        self.tx_state = TxState::Idle;
633                        return Err(IsoTpError::Overflow);
634                    }
635                },
636                _ => continue,
637            }
638        }
639    }
640
641    fn continue_send(
642        &mut self,
643        payload: &[u8],
644        mut session: TxSession,
645        st_min_deadline: Option<C::Instant>,
646        now: C::Instant,
647    ) -> Result<Progress, IsoTpError<Tx::Error>> {
648        if payload.len() != session.payload_len {
649            isotp_warn!(
650                "continue_send payload mismatch got={} expected={}",
651                payload.len(),
652                session.payload_len
653            );
654            self.tx_state = TxState::Idle;
655            return Err(IsoTpError::NotIdle);
656        }
657        if let Some(deadline) = st_min_deadline
658            && now < deadline
659        {
660            self.tx_state = TxState::Sending {
661                session,
662                st_min_deadline: Some(deadline),
663            };
664            return Ok(Progress::WouldBlock);
665        }
666
667        if session.offset >= session.payload_len {
668            isotp_debug!(
669                "continue_send completed payload_len={}",
670                session.payload_len
671            );
672            self.tx_state = TxState::Idle;
673            return Ok(Progress::Completed);
674        }
675
676        let remaining = session.payload_len - session.offset;
677        let chunk = remaining.min(self.cfg.max_consecutive_frame_payload());
678        let data = &payload[session.offset..session.offset + chunk];
679        let pdu = Pdu::ConsecutiveFrame {
680            sn: session.next_sn & 0x0F,
681            data,
682        };
683        let frame = encode_with_prefix_sized(
684            self.cfg.tx_id,
685            &pdu,
686            self.cfg.padding,
687            self.cfg.tx_addr,
688            self.cfg.frame_len,
689        )
690        .map_err(|_| IsoTpError::InvalidFrame)?;
691        self.tx.try_send(&frame).map_err(IsoTpError::LinkError)?;
692        isotp_trace!(
693            "continue_send cf sent sn={} chunk={} offset={} payload_len={} block_remaining={}",
694            pdu_sn(&pdu),
695            chunk,
696            session.offset,
697            session.payload_len,
698            session.block_remaining
699        );
700
701        session.offset += chunk;
702        session.next_sn = (session.next_sn + 1) & 0x0F;
703
704        if session.offset >= session.payload_len {
705            isotp_debug!(
706                "continue_send completed payload_len={}",
707                session.payload_len
708            );
709            self.tx_state = TxState::Idle;
710            return Ok(Progress::Completed);
711        }
712
713        if session.block_size > 0 {
714            session.block_remaining = session.block_remaining.saturating_sub(1);
715            if session.block_remaining == 0 {
716                session.block_remaining = session.block_size;
717                let deadline = self.clock.add(now, self.cfg.n_bs);
718                self.tx_state = TxState::WaitingForFc { session, deadline };
719                isotp_trace!("continue_send wait for fc block boundary");
720                return Ok(Progress::WaitingForFlowControl);
721            }
722        }
723
724        let next_deadline = if session.st_min > Duration::from_millis(0) {
725            Some(self.clock.add(now, session.st_min))
726        } else {
727            None
728        };
729        self.tx_state = TxState::Sending {
730            session,
731            st_min_deadline: next_deadline,
732        };
733        Ok(Progress::InFlight)
734    }
735
736    fn send_flow_control(
737        &mut self,
738        status: FlowStatus,
739        block_size: u8,
740        st_min: u8,
741    ) -> Result<(), IsoTpError<Tx::Error>> {
742        isotp_trace!(
743            "send_flow_control status={} bs={} st_min_raw={}",
744            flow_status_code(status),
745            block_size,
746            st_min
747        );
748        let fc = Pdu::FlowControl {
749            status,
750            block_size,
751            st_min,
752        };
753        let frame = encode_with_prefix_sized(
754            self.cfg.tx_id,
755            &fc,
756            self.cfg.padding,
757            self.cfg.tx_addr,
758            self.cfg.frame_len,
759        )
760        .map_err(|_| IsoTpError::InvalidFrame)?;
761        self.tx.try_send(&frame).map_err(IsoTpError::LinkError)?;
762        Ok(())
763    }
764
765    fn send_overflow_fc(&mut self) -> Result<(), IsoTpError<Tx::Error>> {
766        self.send_flow_control(FlowStatus::Overflow, 0, duration_to_st_min(self.cfg.st_min))
767    }
768}
769
770fn id_matches(actual: embedded_can::Id, expected: &Id) -> bool {
771    match (actual, expected) {
772        (embedded_can::Id::Standard(a), Id::Standard(b)) => a == *b,
773        (embedded_can::Id::Extended(a), Id::Extended(b)) => a == *b,
774        _ => false,
775    }
776}
777
778#[cfg(feature = "defmt")]
779#[inline]
780fn flow_status_code(status: FlowStatus) -> u8 {
781    match status {
782        FlowStatus::ClearToSend => 0,
783        FlowStatus::Wait => 1,
784        FlowStatus::Overflow => 2,
785    }
786}
787
788#[cfg(feature = "defmt")]
789#[inline]
790fn pdu_sn(pdu: &Pdu<'_>) -> u8 {
791    match pdu {
792        Pdu::ConsecutiveFrame { sn, .. } => *sn,
793        _ => 0,
794    }
795}
796
797#[cfg(feature = "std")]
798impl<'a, Tx, Rx, F> IsoTpNode<'a, Tx, Rx, F, StdClock>
799where
800    Tx: TxFrameIo<Frame = F>,
801    Rx: RxFrameIo<Frame = F, Error = Tx::Error>,
802    F: Frame,
803{
804    /// Convenience constructor using `StdClock`.
805    pub fn with_std_clock(
806        tx: Tx,
807        rx: Rx,
808        cfg: IsoTpConfig,
809        rx_buffer: &'a mut [u8],
810    ) -> Result<Self, IsoTpError<()>> {
811        IsoTpNode::with_clock(tx, rx, cfg, StdClock, rx_buffer)
812    }
813}
814
815/// Builder for [`IsoTpNode`] that enforces providing RX storage before construction.
816pub struct IsoTpNodeBuilder<Tx, Rx, C> {
817    tx: Tx,
818    rx: Rx,
819    cfg: IsoTpConfig,
820    clock: C,
821}
822
823/// Builder state after RX storage has been provided.
824pub struct IsoTpNodeBuilderWithRx<'a, Tx, Rx, C> {
825    tx: Tx,
826    rx: Rx,
827    cfg: IsoTpConfig,
828    clock: C,
829    rx_storage: RxStorage<'a>,
830}
831
832impl<Tx, Rx, C> IsoTpNodeBuilder<Tx, Rx, C> {
833    /// Provide the RX buffer used for receive-side reassembly.
834    pub fn rx_buffer<'a>(self, buffer: &'a mut [u8]) -> IsoTpNodeBuilderWithRx<'a, Tx, Rx, C> {
835        self.rx_storage(RxStorage::Borrowed(buffer))
836    }
837
838    /// Provide explicit RX storage (borrowed or owned, depending on features).
839    pub fn rx_storage<'a>(
840        self,
841        rx_storage: RxStorage<'a>,
842    ) -> IsoTpNodeBuilderWithRx<'a, Tx, Rx, C> {
843        IsoTpNodeBuilderWithRx {
844            tx: self.tx,
845            rx: self.rx,
846            cfg: self.cfg,
847            clock: self.clock,
848            rx_storage,
849        }
850    }
851}
852
853impl<'a, Tx, Rx, C> IsoTpNodeBuilderWithRx<'a, Tx, Rx, C>
854where
855    Tx: TxFrameIo,
856    Rx: RxFrameIo<Frame = <Tx as TxFrameIo>::Frame, Error = <Tx as TxFrameIo>::Error>,
857    <Tx as TxFrameIo>::Frame: Frame,
858    C: Clock,
859{
860    /// Validate configuration and build an [`IsoTpNode`].
861    pub fn build(
862        self,
863    ) -> Result<IsoTpNode<'a, Tx, Rx, <Tx as TxFrameIo>::Frame, C>, IsoTpError<()>> {
864        self.cfg.validate().map_err(|_| IsoTpError::InvalidConfig)?;
865        if self.rx_storage.capacity() < self.cfg.max_payload_len {
866            return Err(IsoTpError::InvalidConfig);
867        }
868        let rx_flow_control = RxFlowControl::from_config(&self.cfg);
869        Ok(IsoTpNode {
870            tx: self.tx,
871            rx: self.rx,
872            cfg: self.cfg,
873            rx_flow_control,
874            clock: self.clock,
875            tx_state: TxState::Idle,
876            rx_machine: RxMachine::new(self.rx_storage),
877            rx_last_activity: None,
878        })
879    }
880}