Skip to main content

modbus_bridge/
connection.rs

1//! [`Connection`] — a live TCP session bound to a [`Bridge`](crate::Bridge).
2
3use crate::{
4    bridge::Bridge,
5    error::ModbusError,
6    event::{BridgeError, BridgeEvent, FunctionCode, Transaction, Warning},
7    frame,
8    tcp::ModbusTcp,
9    NoDelay,
10};
11use embedded_hal::digital::OutputPin;
12
13/// An active Modbus TCP → RTU bridge connection.
14///
15/// Returned by [`Bridge::accept`](crate::Bridge::accept). Mutably borrows the
16/// bridge for its lifetime, preventing a second connection from being accepted
17/// until this one is finished.
18///
19/// Drive the connection by calling [`next`](Connection::next) in a loop.
20pub struct Connection<'b, S, TX, TS, D = NoDelay> {
21    pub(crate) bridge: &'b mut Bridge<S, TX, D>,
22    pub(crate) tcp: ModbusTcp<TS>,
23}
24
25impl<'b, S, TX, TS, D> Connection<'b, S, TX, TS, D> {
26    pub(crate) fn new(bridge: &'b mut Bridge<S, TX, D>, stream: TS) -> Self {
27        Self {
28            bridge,
29            tcp: ModbusTcp::new(stream),
30        }
31    }
32
33    /// Consumes the connection and returns the underlying TCP stream.
34    ///
35    /// # Examples
36    ///
37    /// ```rust,ignore
38    /// let socket = conn.into_stream();
39    /// socket.close();
40    /// ```
41    pub fn into_stream(self) -> TS {
42        self.tcp.into_inner()
43    }
44}
45
46// ── Async next() — no timeout ─────────────────────────────────────────────────
47
48#[cfg(feature = "async")]
49impl<S, TX, TS> Connection<'_, S, TX, TS, NoDelay>
50where
51    S: embedded_io_async::Read + embedded_io_async::Write,
52    TX: OutputPin,
53    TS: embedded_io_async::Read + embedded_io_async::Write,
54{
55    /// Drives one complete Modbus request/response cycle asynchronously.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) when the
60    /// TCP client closes the connection cleanly.
61    pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
62        let tcp_req = self.tcp.listen().await.map_err(|e| match e {
63            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
64            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
65            ModbusError::Push => BridgeError::BufferOverflow,
66            _ => BridgeError::BufferOverflow,
67        })?;
68
69        let (unit_id, fc_byte, start_address, register_count) =
70            frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
71
72        let (rtu_req, tid) =
73            frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
74
75        let rtu_resp = self
76            .bridge
77            .rtu
78            .send_and_receive(&rtu_req)
79            .await
80            .map_err(|e| match e {
81                ModbusError::Serial(se) => BridgeError::RtuIo(se),
82                ModbusError::Crc => BridgeError::RtuCrcMismatch,
83                ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
84                _ => BridgeError::BufferOverflow,
85            })?;
86
87        let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
88            Ok(r) => (r, None),
89            Err(ModbusError::InvalidTransactionId) => {
90                let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
91                    .map_err(|_| BridgeError::BufferOverflow)?;
92                let rx_tid = rtu_resp
93                    .get(..2)
94                    .map(|b| u16::from_be_bytes([b[0], b[1]]))
95                    .unwrap_or(0);
96                (
97                    fallback,
98                    Some(Warning::TransactionIdMismatch {
99                        expected: tid,
100                        got: rx_tid,
101                    }),
102                )
103            }
104            Err(_) => return Err(BridgeError::BufferOverflow),
105        };
106
107        self.tcp.send(&tcp_resp).await.map_err(BridgeError::TcpIo)?;
108
109        if let Some(w) = tid_warning {
110            return Ok(BridgeEvent::Warning(w));
111        }
112        Ok(BridgeEvent::Transaction(Transaction {
113            unit_id,
114            function_code: FunctionCode::from(fc_byte),
115            start_address,
116            register_count,
117        }))
118    }
119}
120
121// ── Async next() — with timeout ───────────────────────────────────────────────
122
123#[cfg(feature = "async")]
124impl<S, TX, TS, D> Connection<'_, S, TX, TS, D>
125where
126    S: embedded_io_async::Read + embedded_io_async::Write,
127    TX: OutputPin,
128    TS: embedded_io_async::Read + embedded_io_async::Write,
129    D: embedded_hal_async::delay::DelayNs,
130{
131    /// Drives one complete Modbus request/response cycle asynchronously, with timeouts.
132    ///
133    /// Applies `tcp_timeout_ms` around reading the incoming TCP request and
134    /// `rtu_timeout_ms` around the RTU send+receive cycle.
135    ///
136    /// # Errors
137    ///
138    /// Returns [`BridgeError::Timeout`](crate::BridgeError::Timeout) if a deadline expires.
139    pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
140        use core::pin::pin;
141        use futures_util::future::{select, Either};
142
143        let tcp_req = if let Some(ms) = self.bridge.tcp_timeout_ms {
144            let tcp_fut = pin!(self.tcp.listen());
145            let delay_fut = pin!(self.bridge.delay.delay_ms(ms));
146            match select(tcp_fut, delay_fut).await {
147                Either::Left((r, _)) => r.map_err(|e| match e {
148                    ModbusError::PayloadTooShort => BridgeError::TcpClosed,
149                    ModbusError::Tcp(te) => BridgeError::TcpIo(te),
150                    _ => BridgeError::BufferOverflow,
151                })?,
152                Either::Right(_) => return Err(BridgeError::Timeout),
153            }
154        } else {
155            self.tcp.listen().await.map_err(|e| match e {
156                ModbusError::PayloadTooShort => BridgeError::TcpClosed,
157                ModbusError::Tcp(te) => BridgeError::TcpIo(te),
158                _ => BridgeError::BufferOverflow,
159            })?
160        };
161
162        let (unit_id, fc_byte, start_address, register_count) =
163            frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
164
165        let (rtu_req, tid) =
166            frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
167
168        let rtu_resp = if let Some(ms) = self.bridge.rtu_timeout_ms {
169            let rtu = &mut self.bridge.rtu;
170            let delay = &mut self.bridge.delay;
171            let rtu_fut = pin!(rtu.send_and_receive(&rtu_req));
172            let delay_fut = pin!(delay.delay_ms(ms));
173            match select(rtu_fut, delay_fut).await {
174                Either::Left((r, _)) => r.map_err(|e| match e {
175                    ModbusError::Serial(se) => BridgeError::RtuIo(se),
176                    ModbusError::Crc => BridgeError::RtuCrcMismatch,
177                    ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
178                    _ => BridgeError::BufferOverflow,
179                })?,
180                Either::Right(_) => return Err(BridgeError::Timeout),
181            }
182        } else {
183            self.bridge
184                .rtu
185                .send_and_receive(&rtu_req)
186                .await
187                .map_err(|e| match e {
188                    ModbusError::Serial(se) => BridgeError::RtuIo(se),
189                    ModbusError::Crc => BridgeError::RtuCrcMismatch,
190                    ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
191                    _ => BridgeError::BufferOverflow,
192                })?
193        };
194
195        let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
196            Ok(r) => (r, None),
197            Err(ModbusError::InvalidTransactionId) => {
198                let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
199                    .map_err(|_| BridgeError::BufferOverflow)?;
200                let rx_tid = rtu_resp
201                    .get(..2)
202                    .map(|b| u16::from_be_bytes([b[0], b[1]]))
203                    .unwrap_or(0);
204                (
205                    fallback,
206                    Some(Warning::TransactionIdMismatch {
207                        expected: tid,
208                        got: rx_tid,
209                    }),
210                )
211            }
212            Err(_) => return Err(BridgeError::BufferOverflow),
213        };
214
215        self.tcp.send(&tcp_resp).await.map_err(BridgeError::TcpIo)?;
216
217        if let Some(w) = tid_warning {
218            return Ok(BridgeEvent::Warning(w));
219        }
220        Ok(BridgeEvent::Transaction(Transaction {
221            unit_id,
222            function_code: FunctionCode::from(fc_byte),
223            start_address,
224            register_count,
225        }))
226    }
227}
228
229// ── Sync next() — no timeout ──────────────────────────────────────────────────
230
231#[cfg(feature = "sync")]
232impl<S, TX, TS> Connection<'_, S, TX, TS, NoDelay>
233where
234    S: embedded_io::Read + embedded_io::Write,
235    TX: OutputPin,
236    TS: embedded_io::Read + embedded_io::Write,
237{
238    /// Drives one complete Modbus request/response cycle (blocking).
239    ///
240    /// Reads one TCP frame, converts it to RTU, sends it to the RTU device,
241    /// reads the response, and writes the TCP reply.
242    ///
243    /// # Errors
244    ///
245    /// - [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) — TCP client sent EOF (normal disconnect).
246    /// - [`BridgeError::TcpIo`](crate::BridgeError::TcpIo) — TCP stream I/O error.
247    /// - [`BridgeError::RtuIo`](crate::BridgeError::RtuIo) — Serial port I/O error.
248    /// - [`BridgeError::RtuCrcMismatch`](crate::BridgeError::RtuCrcMismatch) — RTU device response failed CRC-16 check.
249    /// - [`BridgeError::BufferOverflow`](crate::BridgeError::BufferOverflow) — Frame exceeded internal buffer capacity.
250    #[expect(
251        clippy::should_implement_trait,
252        reason = "drives one request/response cycle, not an iterator"
253    )]
254    pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
255        let tcp_req = self.tcp.listen().map_err(|e| match e {
256            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
257            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
258            ModbusError::Push => BridgeError::BufferOverflow,
259            _ => BridgeError::BufferOverflow,
260        })?;
261
262        let (unit_id, fc_byte, start_address, register_count) =
263            frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
264
265        let (rtu_req, tid) =
266            frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
267
268        let rtu_resp = self
269            .bridge
270            .rtu
271            .send_and_receive(&rtu_req)
272            .map_err(|e| match e {
273                ModbusError::Serial(se) => BridgeError::RtuIo(se),
274                ModbusError::Crc => BridgeError::RtuCrcMismatch,
275                ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
276                _ => BridgeError::BufferOverflow,
277            })?;
278
279        let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
280            Ok(r) => (r, None),
281            Err(ModbusError::InvalidTransactionId) => {
282                let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
283                    .map_err(|_| BridgeError::BufferOverflow)?;
284                let rx_tid = if rtu_resp.len() >= 2 {
285                    u16::from_be_bytes([rtu_resp[0], rtu_resp[1]])
286                } else {
287                    0
288                };
289                (
290                    fallback,
291                    Some(Warning::TransactionIdMismatch {
292                        expected: tid,
293                        got: rx_tid,
294                    }),
295                )
296            }
297            Err(_) => return Err(BridgeError::BufferOverflow),
298        };
299
300        self.tcp.send(&tcp_resp).map_err(BridgeError::TcpIo)?;
301
302        if let Some(w) = tid_warning {
303            return Ok(BridgeEvent::Warning(w));
304        }
305        Ok(BridgeEvent::Transaction(Transaction {
306            unit_id,
307            function_code: FunctionCode::from(fc_byte),
308            start_address,
309            register_count,
310        }))
311    }
312}
313
314// ── Sync next() — with timeout ────────────────────────────────────────────────
315
316#[cfg(feature = "sync")]
317impl<S, TX, TS, D> Connection<'_, S, TX, TS, D>
318where
319    S: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
320    TX: OutputPin,
321    TS: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
322    D: embedded_hal::delay::DelayNs,
323{
324    /// Drives one complete Modbus request/response cycle (blocking) with timeout support.
325    ///
326    /// Reads one TCP frame, converts it to RTU, sends it to the RTU device,
327    /// reads the response, and writes the TCP reply. Polls `ReadReady` before
328    /// each I/O operation to enforce the timeout budget.
329    ///
330    /// # Errors
331    ///
332    /// - [`BridgeError::TcpClosed`](crate::BridgeError::TcpClosed) — TCP client sent EOF (normal disconnect).
333    /// - [`BridgeError::TcpIo`](crate::BridgeError::TcpIo) — TCP stream I/O error.
334    /// - [`BridgeError::RtuIo`](crate::BridgeError::RtuIo) — Serial port I/O error.
335    /// - [`BridgeError::RtuCrcMismatch`](crate::BridgeError::RtuCrcMismatch) — RTU device response failed CRC-16 check.
336    /// - [`BridgeError::BufferOverflow`](crate::BridgeError::BufferOverflow) — Frame exceeded internal buffer capacity.
337    /// - [`BridgeError::Timeout`](crate::BridgeError::Timeout) — An I/O operation did not complete within the configured deadline.
338    #[expect(
339        clippy::should_implement_trait,
340        reason = "drives one request/response cycle, not an iterator"
341    )]
342    pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
343        if let Some(timeout_ms) = self.bridge.tcp_timeout_ms {
344            let mut elapsed = 0u32;
345            loop {
346                match self.tcp.stream.read_ready() {
347                    Ok(true) => break,
348                    Ok(false) => {
349                        if elapsed >= timeout_ms {
350                            return Err(BridgeError::Timeout);
351                        }
352                        self.bridge.delay.delay_ms(1);
353                        elapsed = elapsed.saturating_add(1);
354                    }
355                    Err(e) => return Err(BridgeError::TcpIo(e)),
356                }
357            }
358        }
359
360        let tcp_req = self.tcp.listen().map_err(|e| match e {
361            ModbusError::PayloadTooShort => BridgeError::TcpClosed,
362            ModbusError::Tcp(te) => BridgeError::TcpIo(te),
363            ModbusError::Push => BridgeError::BufferOverflow,
364            _ => BridgeError::BufferOverflow,
365        })?;
366
367        let (unit_id, fc_byte, start_address, register_count) =
368            frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
369
370        let (rtu_req, tid) =
371            frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
372
373        if let Some(timeout_ms) = self.bridge.rtu_timeout_ms {
374            let mut elapsed = 0u32;
375            loop {
376                match self.bridge.rtu.serial.read_ready() {
377                    Ok(true) => break,
378                    Ok(false) => {
379                        if elapsed >= timeout_ms {
380                            return Err(BridgeError::Timeout);
381                        }
382                        self.bridge.delay.delay_ms(1);
383                        elapsed = elapsed.saturating_add(1);
384                    }
385                    Err(e) => return Err(BridgeError::RtuIo(e)),
386                }
387            }
388        }
389
390        let rtu_resp = self
391            .bridge
392            .rtu
393            .send_and_receive(&rtu_req)
394            .map_err(|e| match e {
395                ModbusError::Serial(se) => BridgeError::RtuIo(se),
396                ModbusError::Crc => BridgeError::RtuCrcMismatch,
397                ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
398                _ => BridgeError::BufferOverflow,
399            })?;
400
401        let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
402            Ok(r) => (r, None),
403            Err(ModbusError::InvalidTransactionId) => {
404                let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
405                    .map_err(|_| BridgeError::BufferOverflow)?;
406                let rx_tid = if rtu_resp.len() >= 2 {
407                    u16::from_be_bytes([rtu_resp[0], rtu_resp[1]])
408                } else {
409                    0
410                };
411                (
412                    fallback,
413                    Some(Warning::TransactionIdMismatch {
414                        expected: tid,
415                        got: rx_tid,
416                    }),
417                )
418            }
419            Err(_) => return Err(BridgeError::BufferOverflow),
420        };
421
422        self.tcp.send(&tcp_resp).map_err(BridgeError::TcpIo)?;
423
424        if let Some(w) = tid_warning {
425            return Ok(BridgeEvent::Warning(w));
426        }
427        Ok(BridgeEvent::Transaction(Transaction {
428            unit_id,
429            function_code: FunctionCode::from(fc_byte),
430            start_address,
431            register_count,
432        }))
433    }
434}