1use crate::{
4 bridge::Bridge,
5 error::ModbusError,
6 event::{BridgeError, BridgeEvent, FunctionCode, Transaction, Warning},
7 frame,
8 tcp::ModbusTcp,
9 NoDelay,
10};
11use embedded_hal::digital::OutputPin;
12
13pub struct Connection<'b, S, TX, TS, D = NoDelay> {
21 pub(crate) bridge: &'b mut Bridge<S, TX, D>,
22 pub(crate) tcp: ModbusTcp<TS>,
23}
24
25impl<'b, S, TX, TS, D> Connection<'b, S, TX, TS, D> {
26 pub(crate) fn new(bridge: &'b mut Bridge<S, TX, D>, stream: TS) -> Self {
27 Self {
28 bridge,
29 tcp: ModbusTcp::new(stream),
30 }
31 }
32
33 pub fn into_stream(self) -> TS {
42 self.tcp.into_inner()
43 }
44}
45
46#[cfg(feature = "async")]
49impl<S, TX, TS> Connection<'_, S, TX, TS, NoDelay>
50where
51 S: embedded_io_async::Read + embedded_io_async::Write,
52 TX: OutputPin,
53 TS: embedded_io_async::Read + embedded_io_async::Write,
54{
55 pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
62 let tcp_req = self.tcp.listen().await.map_err(|e| match e {
63 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
64 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
65 ModbusError::Push => BridgeError::BufferOverflow,
66 _ => BridgeError::BufferOverflow,
67 })?;
68
69 let (unit_id, fc_byte, start_address, register_count) =
70 frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
71
72 let (rtu_req, tid) =
73 frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
74
75 let rtu_resp = self
76 .bridge
77 .rtu
78 .send_and_receive(&rtu_req)
79 .await
80 .map_err(|e| match e {
81 ModbusError::Serial(se) => BridgeError::RtuIo(se),
82 ModbusError::Crc => BridgeError::RtuCrcMismatch,
83 ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
84 _ => BridgeError::BufferOverflow,
85 })?;
86
87 let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
88 Ok(r) => (r, None),
89 Err(ModbusError::InvalidTransactionId) => {
90 let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
91 .map_err(|_| BridgeError::BufferOverflow)?;
92 let rx_tid = rtu_resp
93 .get(..2)
94 .map(|b| u16::from_be_bytes([b[0], b[1]]))
95 .unwrap_or(0);
96 (
97 fallback,
98 Some(Warning::TransactionIdMismatch {
99 expected: tid,
100 got: rx_tid,
101 }),
102 )
103 }
104 Err(_) => return Err(BridgeError::BufferOverflow),
105 };
106
107 self.tcp.send(&tcp_resp).await.map_err(BridgeError::TcpIo)?;
108
109 if let Some(w) = tid_warning {
110 return Ok(BridgeEvent::Warning(w));
111 }
112 Ok(BridgeEvent::Transaction(Transaction {
113 unit_id,
114 function_code: FunctionCode::from(fc_byte),
115 start_address,
116 register_count,
117 }))
118 }
119}
120
121#[cfg(feature = "async")]
124impl<S, TX, TS, D> Connection<'_, S, TX, TS, D>
125where
126 S: embedded_io_async::Read + embedded_io_async::Write,
127 TX: OutputPin,
128 TS: embedded_io_async::Read + embedded_io_async::Write,
129 D: embedded_hal_async::delay::DelayNs,
130{
131 pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
140 use core::pin::pin;
141 use futures_util::future::{select, Either};
142
143 let tcp_req = if let Some(ms) = self.bridge.tcp_timeout_ms {
144 let tcp_fut = pin!(self.tcp.listen());
145 let delay_fut = pin!(self.bridge.delay.delay_ms(ms));
146 match select(tcp_fut, delay_fut).await {
147 Either::Left((r, _)) => r.map_err(|e| match e {
148 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
149 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
150 _ => BridgeError::BufferOverflow,
151 })?,
152 Either::Right(_) => return Err(BridgeError::Timeout),
153 }
154 } else {
155 self.tcp.listen().await.map_err(|e| match e {
156 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
157 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
158 _ => BridgeError::BufferOverflow,
159 })?
160 };
161
162 let (unit_id, fc_byte, start_address, register_count) =
163 frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
164
165 let (rtu_req, tid) =
166 frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
167
168 let rtu_resp = if let Some(ms) = self.bridge.rtu_timeout_ms {
169 let rtu = &mut self.bridge.rtu;
170 let delay = &mut self.bridge.delay;
171 let rtu_fut = pin!(rtu.send_and_receive(&rtu_req));
172 let delay_fut = pin!(delay.delay_ms(ms));
173 match select(rtu_fut, delay_fut).await {
174 Either::Left((r, _)) => r.map_err(|e| match e {
175 ModbusError::Serial(se) => BridgeError::RtuIo(se),
176 ModbusError::Crc => BridgeError::RtuCrcMismatch,
177 ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
178 _ => BridgeError::BufferOverflow,
179 })?,
180 Either::Right(_) => return Err(BridgeError::Timeout),
181 }
182 } else {
183 self.bridge
184 .rtu
185 .send_and_receive(&rtu_req)
186 .await
187 .map_err(|e| match e {
188 ModbusError::Serial(se) => BridgeError::RtuIo(se),
189 ModbusError::Crc => BridgeError::RtuCrcMismatch,
190 ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
191 _ => BridgeError::BufferOverflow,
192 })?
193 };
194
195 let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
196 Ok(r) => (r, None),
197 Err(ModbusError::InvalidTransactionId) => {
198 let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
199 .map_err(|_| BridgeError::BufferOverflow)?;
200 let rx_tid = rtu_resp
201 .get(..2)
202 .map(|b| u16::from_be_bytes([b[0], b[1]]))
203 .unwrap_or(0);
204 (
205 fallback,
206 Some(Warning::TransactionIdMismatch {
207 expected: tid,
208 got: rx_tid,
209 }),
210 )
211 }
212 Err(_) => return Err(BridgeError::BufferOverflow),
213 };
214
215 self.tcp.send(&tcp_resp).await.map_err(BridgeError::TcpIo)?;
216
217 if let Some(w) = tid_warning {
218 return Ok(BridgeEvent::Warning(w));
219 }
220 Ok(BridgeEvent::Transaction(Transaction {
221 unit_id,
222 function_code: FunctionCode::from(fc_byte),
223 start_address,
224 register_count,
225 }))
226 }
227}
228
229#[cfg(feature = "sync")]
232impl<S, TX, TS> Connection<'_, S, TX, TS, NoDelay>
233where
234 S: embedded_io::Read + embedded_io::Write,
235 TX: OutputPin,
236 TS: embedded_io::Read + embedded_io::Write,
237{
238 #[expect(
251 clippy::should_implement_trait,
252 reason = "drives one request/response cycle, not an iterator"
253 )]
254 pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
255 let tcp_req = self.tcp.listen().map_err(|e| match e {
256 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
257 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
258 ModbusError::Push => BridgeError::BufferOverflow,
259 _ => BridgeError::BufferOverflow,
260 })?;
261
262 let (unit_id, fc_byte, start_address, register_count) =
263 frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
264
265 let (rtu_req, tid) =
266 frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
267
268 let rtu_resp = self
269 .bridge
270 .rtu
271 .send_and_receive(&rtu_req)
272 .map_err(|e| match e {
273 ModbusError::Serial(se) => BridgeError::RtuIo(se),
274 ModbusError::Crc => BridgeError::RtuCrcMismatch,
275 ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
276 _ => BridgeError::BufferOverflow,
277 })?;
278
279 let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
280 Ok(r) => (r, None),
281 Err(ModbusError::InvalidTransactionId) => {
282 let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
283 .map_err(|_| BridgeError::BufferOverflow)?;
284 let rx_tid = if rtu_resp.len() >= 2 {
285 u16::from_be_bytes([rtu_resp[0], rtu_resp[1]])
286 } else {
287 0
288 };
289 (
290 fallback,
291 Some(Warning::TransactionIdMismatch {
292 expected: tid,
293 got: rx_tid,
294 }),
295 )
296 }
297 Err(_) => return Err(BridgeError::BufferOverflow),
298 };
299
300 self.tcp.send(&tcp_resp).map_err(BridgeError::TcpIo)?;
301
302 if let Some(w) = tid_warning {
303 return Ok(BridgeEvent::Warning(w));
304 }
305 Ok(BridgeEvent::Transaction(Transaction {
306 unit_id,
307 function_code: FunctionCode::from(fc_byte),
308 start_address,
309 register_count,
310 }))
311 }
312}
313
314#[cfg(feature = "sync")]
317impl<S, TX, TS, D> Connection<'_, S, TX, TS, D>
318where
319 S: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
320 TX: OutputPin,
321 TS: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
322 D: embedded_hal::delay::DelayNs,
323{
324 #[expect(
339 clippy::should_implement_trait,
340 reason = "drives one request/response cycle, not an iterator"
341 )]
342 pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
343 if let Some(timeout_ms) = self.bridge.tcp_timeout_ms {
344 let mut elapsed = 0u32;
345 loop {
346 match self.tcp.stream.read_ready() {
347 Ok(true) => break,
348 Ok(false) => {
349 if elapsed >= timeout_ms {
350 return Err(BridgeError::Timeout);
351 }
352 self.bridge.delay.delay_ms(1);
353 elapsed = elapsed.saturating_add(1);
354 }
355 Err(e) => return Err(BridgeError::TcpIo(e)),
356 }
357 }
358 }
359
360 let tcp_req = self.tcp.listen().map_err(|e| match e {
361 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
362 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
363 ModbusError::Push => BridgeError::BufferOverflow,
364 _ => BridgeError::BufferOverflow,
365 })?;
366
367 let (unit_id, fc_byte, start_address, register_count) =
368 frame::parse_tcp_request(&tcp_req).unwrap_or((0, 0, 0, 0));
369
370 let (rtu_req, tid) =
371 frame::tcp_to_rtu(&tcp_req).map_err(|_| BridgeError::BufferOverflow)?;
372
373 if let Some(timeout_ms) = self.bridge.rtu_timeout_ms {
374 let mut elapsed = 0u32;
375 loop {
376 match self.bridge.rtu.serial.read_ready() {
377 Ok(true) => break,
378 Ok(false) => {
379 if elapsed >= timeout_ms {
380 return Err(BridgeError::Timeout);
381 }
382 self.bridge.delay.delay_ms(1);
383 elapsed = elapsed.saturating_add(1);
384 }
385 Err(e) => return Err(BridgeError::RtuIo(e)),
386 }
387 }
388 }
389
390 let rtu_resp = self
391 .bridge
392 .rtu
393 .send_and_receive(&rtu_req)
394 .map_err(|e| match e {
395 ModbusError::Serial(se) => BridgeError::RtuIo(se),
396 ModbusError::Crc => BridgeError::RtuCrcMismatch,
397 ModbusError::PayloadTooShort => BridgeError::RtuCrcMismatch,
398 _ => BridgeError::BufferOverflow,
399 })?;
400
401 let (tcp_resp, tid_warning) = match frame::rtu_resp_to_tcp(&rtu_resp, tid) {
402 Ok(r) => (r, None),
403 Err(ModbusError::InvalidTransactionId) => {
404 let fallback = frame::rtu_resp_to_tcp(&rtu_resp, 0)
405 .map_err(|_| BridgeError::BufferOverflow)?;
406 let rx_tid = if rtu_resp.len() >= 2 {
407 u16::from_be_bytes([rtu_resp[0], rtu_resp[1]])
408 } else {
409 0
410 };
411 (
412 fallback,
413 Some(Warning::TransactionIdMismatch {
414 expected: tid,
415 got: rx_tid,
416 }),
417 )
418 }
419 Err(_) => return Err(BridgeError::BufferOverflow),
420 };
421
422 self.tcp.send(&tcp_resp).map_err(BridgeError::TcpIo)?;
423
424 if let Some(w) = tid_warning {
425 return Ok(BridgeEvent::Warning(w));
426 }
427 Ok(BridgeEvent::Transaction(Transaction {
428 unit_id,
429 function_code: FunctionCode::from(fc_byte),
430 start_address,
431 register_count,
432 }))
433 }
434}