Skip to main content

modbus_bridge/
client_session.rs

1//! [`ClientSession`] — a live TCP session bound to a [`Client`](crate::Client).
2
3use crate::{
4    client::Client,
5    error::ModbusError,
6    event::{BridgeError, BridgeEvent, FunctionCode, Transaction, Warning},
7    frame,
8    tcp::ModbusTcp,
9    NoDelay,
10};
11use embedded_hal::digital::OutputPin;
12
13/// An active Modbus RTU → TCP client session.
14///
15/// Returned by [`Client::connect`](crate::Client::connect). Mutably borrows the
16/// client for its lifetime, preventing a second session from being opened until
17/// this one is finished.
18///
19/// Drive the session by calling [`next`](ClientSession::next) in a loop.
20pub struct ClientSession<'b, S, TX, TS, D = NoDelay> {
21    pub(crate) client: &'b mut Client<S, TX, D>,
22    pub(crate) tcp: ModbusTcp<TS>,
23}
24
25impl<'b, S, TX, TS, D> ClientSession<'b, S, TX, TS, D> {
26    pub(crate) fn new(client: &'b mut Client<S, TX, D>, stream: TS) -> Self {
27        Self {
28            client,
29            tcp: ModbusTcp::new(stream),
30        }
31    }
32
33    /// Consumes the session and returns the underlying TCP stream.
34    ///
35    /// # Examples
36    ///
37    /// ```rust,ignore
38    /// let tcp_stream = session.into_stream();
39    /// ```
40    pub fn into_stream(self) -> TS {
41        self.tcp.into_inner()
42    }
43}
44
45// ── Async next() — no timeout ─────────────────────────────────────────────────
46
47#[cfg(feature = "async")]
48impl<S, TX, TS> ClientSession<'_, S, TX, TS, NoDelay>
49where
50    S: embedded_io_async::Read + embedded_io_async::Write,
51    TX: OutputPin,
52    TS: embedded_io_async::Read + embedded_io_async::Write,
53{
54    /// Drives one complete Modbus request/response cycle asynchronously.
55    ///
56    /// Reads an RTU request from the serial port, forwards it to the upstream
57    /// Modbus TCP server, and returns the response to the RTU master.
58    ///
59    /// # Errors
60    ///
61    /// Returns [`BridgeError::RtuClosed`](crate::BridgeError::RtuClosed) when the
62    /// RTU master closes the connection cleanly.
63    pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
64        let rtu_req = self.client.rtu.listen().await.map_err(|e| match e {
65            ModbusError::Serial(se) => BridgeError::RtuIo(se),
66            ModbusError::Crc => BridgeError::RtuCrcMismatch,
67            ModbusError::PayloadTooShort => BridgeError::RtuClosed,
68            _ => BridgeError::BufferOverflow,
69        })?;
70
71        let (unit_id, fc_byte, start_address, register_count) =
72            frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
73
74        let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
75            .map_err(|_| BridgeError::BufferOverflow)?;
76
77        let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
78
79        self.tcp.send(&tcp_req).await.map_err(BridgeError::TcpIo)?;
80
81        let tcp_resp = self.tcp.listen().await.map_err(|e| match e {
82            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
83            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
84            _ => BridgeError::BufferOverflow,
85        })?;
86
87        let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
88            Ok(r) => (r, None),
89            Err(ModbusError::InvalidTransactionId) => {
90                let rx_tid = tcp_resp
91                    .get(..2)
92                    .map(|b| u16::from_be_bytes([b[0], b[1]]))
93                    .unwrap_or(0);
94                let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
95                    .map_err(|_| BridgeError::BufferOverflow)?;
96                (
97                    fallback,
98                    Some(Warning::TransactionIdMismatch {
99                        expected: tid,
100                        got: rx_tid,
101                    }),
102                )
103            }
104            Err(_) => return Err(BridgeError::BufferOverflow),
105        };
106
107        self.client
108            .rtu
109            .send(&rtu_resp)
110            .await
111            .map_err(BridgeError::RtuIo)?;
112
113        if let Some(w) = tid_warning {
114            return Ok(BridgeEvent::Warning(w));
115        }
116        Ok(BridgeEvent::Transaction(Transaction {
117            unit_id,
118            function_code: FunctionCode::from(fc_byte),
119            start_address,
120            register_count,
121        }))
122    }
123}
124
125// ── Async next() — with timeout ───────────────────────────────────────────────
126
127#[cfg(feature = "async")]
128impl<S, TX, TS, D> ClientSession<'_, S, TX, TS, D>
129where
130    S: embedded_io_async::Read + embedded_io_async::Write,
131    TX: OutputPin,
132    TS: embedded_io_async::Read + embedded_io_async::Write,
133    D: embedded_hal_async::delay::DelayNs,
134{
135    /// Drives one complete Modbus request/response cycle asynchronously, with timeouts.
136    ///
137    /// Reads an RTU request from the serial port, forwards it to the upstream
138    /// Modbus TCP server, and returns the response to the RTU master.
139    /// Applies `rtu_timeout_ms` around the RTU listen and `tcp_timeout_ms`
140    /// around the TCP response.
141    ///
142    /// # Errors
143    ///
144    /// - [`BridgeError::RtuClosed`](crate::BridgeError::RtuClosed) — RTU master sent EOF (normal disconnect).
145    /// - [`BridgeError::RtuIo`](crate::BridgeError::RtuIo) — Serial port I/O error.
146    /// - [`BridgeError::RtuCrcMismatch`](crate::BridgeError::RtuCrcMismatch) — RTU request failed CRC-16 check.
147    /// - [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) — Upstream TCP server closed the connection.
148    /// - [`BridgeError::TcpIo`](crate::BridgeError::TcpIo) — TCP stream I/O error.
149    /// - [`BridgeError::BufferOverflow`](crate::BridgeError::BufferOverflow) — Frame exceeded internal buffer capacity.
150    /// - [`BridgeError::Timeout`](crate::BridgeError::Timeout) — An I/O operation did not complete within the configured deadline.
151    pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
152        use core::pin::pin;
153        use futures_util::future::{select, Either};
154
155        let rtu_req = if let Some(ms) = self.client.rtu_timeout_ms {
156            let rtu_fut = pin!(self.client.rtu.listen());
157            let delay_fut = pin!(self.client.delay.delay_ms(ms));
158            match select(rtu_fut, delay_fut).await {
159                Either::Left((r, _)) => r.map_err(|e| match e {
160                    ModbusError::Serial(se) => BridgeError::RtuIo(se),
161                    ModbusError::Crc => BridgeError::RtuCrcMismatch,
162                    ModbusError::PayloadTooShort => BridgeError::RtuClosed,
163                    _ => BridgeError::BufferOverflow,
164                })?,
165                Either::Right(_) => return Err(BridgeError::Timeout),
166            }
167        } else {
168            self.client.rtu.listen().await.map_err(|e| match e {
169                ModbusError::Serial(se) => BridgeError::RtuIo(se),
170                ModbusError::Crc => BridgeError::RtuCrcMismatch,
171                ModbusError::PayloadTooShort => BridgeError::RtuClosed,
172                _ => BridgeError::BufferOverflow,
173            })?
174        };
175
176        let (unit_id, fc_byte, start_address, register_count) =
177            frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
178
179        let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
180            .map_err(|_| BridgeError::BufferOverflow)?;
181
182        let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
183
184        self.tcp.send(&tcp_req).await.map_err(BridgeError::TcpIo)?;
185
186        let tcp_resp = if let Some(ms) = self.client.tcp_timeout_ms {
187            let tcp_fut = pin!(self.tcp.listen());
188            let delay_fut = pin!(self.client.delay.delay_ms(ms));
189            match select(tcp_fut, delay_fut).await {
190                Either::Left((r, _)) => r.map_err(|e| match e {
191                    ModbusError::PayloadTooShort => BridgeError::TcpClosed,
192                    ModbusError::Tcp(te) => BridgeError::TcpIo(te),
193                    _ => BridgeError::BufferOverflow,
194                })?,
195                Either::Right(_) => return Err(BridgeError::Timeout),
196            }
197        } else {
198            self.tcp.listen().await.map_err(|e| match e {
199                ModbusError::PayloadTooShort => BridgeError::TcpClosed,
200                ModbusError::Tcp(te) => BridgeError::TcpIo(te),
201                _ => BridgeError::BufferOverflow,
202            })?
203        };
204
205        let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
206            Ok(r) => (r, None),
207            Err(ModbusError::InvalidTransactionId) => {
208                let rx_tid = tcp_resp
209                    .get(..2)
210                    .map(|b| u16::from_be_bytes([b[0], b[1]]))
211                    .unwrap_or(0);
212                let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
213                    .map_err(|_| BridgeError::BufferOverflow)?;
214                (
215                    fallback,
216                    Some(Warning::TransactionIdMismatch {
217                        expected: tid,
218                        got: rx_tid,
219                    }),
220                )
221            }
222            Err(_) => return Err(BridgeError::BufferOverflow),
223        };
224
225        self.client
226            .rtu
227            .send(&rtu_resp)
228            .await
229            .map_err(BridgeError::RtuIo)?;
230
231        if let Some(w) = tid_warning {
232            return Ok(BridgeEvent::Warning(w));
233        }
234        Ok(BridgeEvent::Transaction(Transaction {
235            unit_id,
236            function_code: FunctionCode::from(fc_byte),
237            start_address,
238            register_count,
239        }))
240    }
241}
242
243// ── Sync next() — no timeout ──────────────────────────────────────────────────
244
245#[cfg(feature = "sync")]
246impl<S, TX, TS> ClientSession<'_, S, TX, TS, NoDelay>
247where
248    S: embedded_io::Read + embedded_io::Write,
249    TX: OutputPin,
250    TS: embedded_io::Read + embedded_io::Write,
251{
252    /// Drives one complete Modbus request/response cycle (blocking).
253    ///
254    /// Reads an RTU request from the serial port, forwards it to the upstream
255    /// Modbus TCP server, and returns the response to the RTU master.
256    ///
257    /// # Errors
258    ///
259    /// - [`BridgeError::RtuClosed`](crate::BridgeError::RtuClosed) — RTU master sent EOF (normal disconnect).
260    /// - [`BridgeError::RtuIo`](crate::BridgeError::RtuIo) — Serial port I/O error.
261    /// - [`BridgeError::RtuCrcMismatch`](crate::BridgeError::RtuCrcMismatch) — RTU request failed CRC-16 check.
262    /// - [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) — Upstream TCP server closed the connection.
263    /// - [`BridgeError::TcpIo`](crate::BridgeError::TcpIo) — TCP stream I/O error.
264    /// - [`BridgeError::BufferOverflow`](crate::BridgeError::BufferOverflow) — Frame exceeded internal buffer capacity.
265    #[expect(
266        clippy::should_implement_trait,
267        reason = "drives one request/response cycle, not an iterator"
268    )]
269    pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
270        let rtu_req = self.client.rtu.listen().map_err(|e| match e {
271            ModbusError::Serial(se) => BridgeError::RtuIo(se),
272            ModbusError::Crc => BridgeError::RtuCrcMismatch,
273            ModbusError::PayloadTooShort => BridgeError::RtuClosed,
274            _ => BridgeError::BufferOverflow,
275        })?;
276
277        let (unit_id, fc_byte, start_address, register_count) =
278            frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
279
280        let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
281            .map_err(|_| BridgeError::BufferOverflow)?;
282
283        let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
284
285        self.tcp.send(&tcp_req).map_err(BridgeError::TcpIo)?;
286
287        let tcp_resp = self.tcp.listen().map_err(|e| match e {
288            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
289            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
290            ModbusError::Push => BridgeError::BufferOverflow,
291            _ => BridgeError::BufferOverflow,
292        })?;
293
294        let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
295            Ok(r) => (r, None),
296            Err(ModbusError::InvalidTransactionId) => {
297                let rx_tid = if tcp_resp.len() >= 2 {
298                    u16::from_be_bytes([tcp_resp[0], tcp_resp[1]])
299                } else {
300                    0
301                };
302                let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
303                    .map_err(|_| BridgeError::BufferOverflow)?;
304                (
305                    fallback,
306                    Some(Warning::TransactionIdMismatch {
307                        expected: tid,
308                        got: rx_tid,
309                    }),
310                )
311            }
312            Err(_) => return Err(BridgeError::BufferOverflow),
313        };
314
315        self.client
316            .rtu
317            .send(&rtu_resp)
318            .map_err(BridgeError::RtuIo)?;
319
320        if let Some(w) = tid_warning {
321            return Ok(BridgeEvent::Warning(w));
322        }
323        Ok(BridgeEvent::Transaction(Transaction {
324            unit_id,
325            function_code: FunctionCode::from(fc_byte),
326            start_address,
327            register_count,
328        }))
329    }
330}
331
332// ── Sync next() — with timeout ────────────────────────────────────────────────
333
334#[cfg(feature = "sync")]
335impl<S, TX, TS, D> ClientSession<'_, S, TX, TS, D>
336where
337    S: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
338    TX: OutputPin,
339    TS: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
340    D: embedded_hal::delay::DelayNs,
341{
342    /// Drives one complete Modbus request/response cycle (blocking) with timeout support.
343    ///
344    /// Reads an RTU request from the serial port, forwards it to the upstream
345    /// Modbus TCP server, and returns the response to the RTU master.
346    /// Polls `ReadReady` before each I/O operation to enforce the timeout budget.
347    ///
348    /// # Errors
349    ///
350    /// - [`BridgeError::RtuClosed`](crate::BridgeError::RtuClosed) — RTU master sent EOF (normal disconnect).
351    /// - [`BridgeError::RtuIo`](crate::BridgeError::RtuIo) — Serial port I/O error.
352    /// - [`BridgeError::RtuCrcMismatch`](crate::BridgeError::RtuCrcMismatch) — RTU request failed CRC-16 check.
353    /// - [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) — Upstream TCP server closed the connection.
354    /// - [`BridgeError::TcpIo`](crate::BridgeError::TcpIo) — TCP stream I/O error.
355    /// - [`BridgeError::BufferOverflow`](crate::BridgeError::BufferOverflow) — Frame exceeded internal buffer capacity.
356    /// - [`BridgeError::Timeout`](crate::BridgeError::Timeout) — An I/O operation did not complete within the configured deadline.
357    #[expect(
358        clippy::should_implement_trait,
359        reason = "drives one request/response cycle, not an iterator"
360    )]
361    pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
362        if let Some(timeout_ms) = self.client.rtu_timeout_ms {
363            let mut elapsed = 0u32;
364            loop {
365                match self.client.rtu.serial.read_ready() {
366                    Ok(true) => break,
367                    Ok(false) => {
368                        if elapsed >= timeout_ms {
369                            return Err(BridgeError::Timeout);
370                        }
371                        self.client.delay.delay_ms(1);
372                        elapsed = elapsed.saturating_add(1);
373                    }
374                    Err(e) => return Err(BridgeError::RtuIo(e)),
375                }
376            }
377        }
378
379        let rtu_req = self.client.rtu.listen().map_err(|e| match e {
380            ModbusError::Serial(se) => BridgeError::RtuIo(se),
381            ModbusError::Crc => BridgeError::RtuCrcMismatch,
382            ModbusError::PayloadTooShort => BridgeError::RtuClosed,
383            _ => BridgeError::BufferOverflow,
384        })?;
385
386        let (unit_id, fc_byte, start_address, register_count) =
387            frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
388
389        let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
390            .map_err(|_| BridgeError::BufferOverflow)?;
391
392        let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
393
394        self.tcp.send(&tcp_req).map_err(BridgeError::TcpIo)?;
395
396        if let Some(timeout_ms) = self.client.tcp_timeout_ms {
397            let mut elapsed = 0u32;
398            loop {
399                match self.tcp.stream.read_ready() {
400                    Ok(true) => break,
401                    Ok(false) => {
402                        if elapsed >= timeout_ms {
403                            return Err(BridgeError::Timeout);
404                        }
405                        self.client.delay.delay_ms(1);
406                        elapsed = elapsed.saturating_add(1);
407                    }
408                    Err(e) => return Err(BridgeError::TcpIo(e)),
409                }
410            }
411        }
412
413        let tcp_resp = self.tcp.listen().map_err(|e| match e {
414            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
415            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
416            ModbusError::Push => BridgeError::BufferOverflow,
417            _ => BridgeError::BufferOverflow,
418        })?;
419
420        let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
421            Ok(r) => (r, None),
422            Err(ModbusError::InvalidTransactionId) => {
423                let rx_tid = if tcp_resp.len() >= 2 {
424                    u16::from_be_bytes([tcp_resp[0], tcp_resp[1]])
425                } else {
426                    0
427                };
428                let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
429                    .map_err(|_| BridgeError::BufferOverflow)?;
430                (
431                    fallback,
432                    Some(Warning::TransactionIdMismatch {
433                        expected: tid,
434                        got: rx_tid,
435                    }),
436                )
437            }
438            Err(_) => return Err(BridgeError::BufferOverflow),
439        };
440
441        self.client
442            .rtu
443            .send(&rtu_resp)
444            .map_err(BridgeError::RtuIo)?;
445
446        if let Some(w) = tid_warning {
447            return Ok(BridgeEvent::Warning(w));
448        }
449        Ok(BridgeEvent::Transaction(Transaction {
450            unit_id,
451            function_code: FunctionCode::from(fc_byte),
452            start_address,
453            register_count,
454        }))
455    }
456}