1use crate::{
4 client::Client,
5 error::ModbusError,
6 event::{BridgeError, BridgeEvent, FunctionCode, Transaction, Warning},
7 frame,
8 tcp::ModbusTcp,
9 NoDelay,
10};
11use embedded_hal::digital::OutputPin;
12
13pub struct ClientSession<'b, S, TX, TS, D = NoDelay> {
21 pub(crate) client: &'b mut Client<S, TX, D>,
22 pub(crate) tcp: ModbusTcp<TS>,
23}
24
25impl<'b, S, TX, TS, D> ClientSession<'b, S, TX, TS, D> {
26 pub(crate) fn new(client: &'b mut Client<S, TX, D>, stream: TS) -> Self {
27 Self {
28 client,
29 tcp: ModbusTcp::new(stream),
30 }
31 }
32
33 pub fn into_stream(self) -> TS {
41 self.tcp.into_inner()
42 }
43}
44
45#[cfg(feature = "async")]
48impl<S, TX, TS> ClientSession<'_, S, TX, TS, NoDelay>
49where
50 S: embedded_io_async::Read + embedded_io_async::Write,
51 TX: OutputPin,
52 TS: embedded_io_async::Read + embedded_io_async::Write,
53{
54 pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
64 let rtu_req = self.client.rtu.listen().await.map_err(|e| match e {
65 ModbusError::Serial(se) => BridgeError::RtuIo(se),
66 ModbusError::Crc => BridgeError::RtuCrcMismatch,
67 ModbusError::PayloadTooShort => BridgeError::RtuClosed,
68 _ => BridgeError::BufferOverflow,
69 })?;
70
71 let (unit_id, fc_byte, start_address, register_count) =
72 frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
73
74 let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
75 .map_err(|_| BridgeError::BufferOverflow)?;
76
77 let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
78
79 self.tcp.send(&tcp_req).await.map_err(BridgeError::TcpIo)?;
80
81 let tcp_resp = self.tcp.listen().await.map_err(|e| match e {
82 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
83 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
84 _ => BridgeError::BufferOverflow,
85 })?;
86
87 let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
88 Ok(r) => (r, None),
89 Err(ModbusError::InvalidTransactionId) => {
90 let rx_tid = tcp_resp
91 .get(..2)
92 .map(|b| u16::from_be_bytes([b[0], b[1]]))
93 .unwrap_or(0);
94 let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
95 .map_err(|_| BridgeError::BufferOverflow)?;
96 (
97 fallback,
98 Some(Warning::TransactionIdMismatch {
99 expected: tid,
100 got: rx_tid,
101 }),
102 )
103 }
104 Err(_) => return Err(BridgeError::BufferOverflow),
105 };
106
107 self.client
108 .rtu
109 .send(&rtu_resp)
110 .await
111 .map_err(BridgeError::RtuIo)?;
112
113 if let Some(w) = tid_warning {
114 return Ok(BridgeEvent::Warning(w));
115 }
116 Ok(BridgeEvent::Transaction(Transaction {
117 unit_id,
118 function_code: FunctionCode::from(fc_byte),
119 start_address,
120 register_count,
121 }))
122 }
123}
124
125#[cfg(feature = "async")]
128impl<S, TX, TS, D> ClientSession<'_, S, TX, TS, D>
129where
130 S: embedded_io_async::Read + embedded_io_async::Write,
131 TX: OutputPin,
132 TS: embedded_io_async::Read + embedded_io_async::Write,
133 D: embedded_hal_async::delay::DelayNs,
134{
135 pub async fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
152 use core::pin::pin;
153 use futures_util::future::{select, Either};
154
155 let rtu_req = if let Some(ms) = self.client.rtu_timeout_ms {
156 let rtu_fut = pin!(self.client.rtu.listen());
157 let delay_fut = pin!(self.client.delay.delay_ms(ms));
158 match select(rtu_fut, delay_fut).await {
159 Either::Left((r, _)) => r.map_err(|e| match e {
160 ModbusError::Serial(se) => BridgeError::RtuIo(se),
161 ModbusError::Crc => BridgeError::RtuCrcMismatch,
162 ModbusError::PayloadTooShort => BridgeError::RtuClosed,
163 _ => BridgeError::BufferOverflow,
164 })?,
165 Either::Right(_) => return Err(BridgeError::Timeout),
166 }
167 } else {
168 self.client.rtu.listen().await.map_err(|e| match e {
169 ModbusError::Serial(se) => BridgeError::RtuIo(se),
170 ModbusError::Crc => BridgeError::RtuCrcMismatch,
171 ModbusError::PayloadTooShort => BridgeError::RtuClosed,
172 _ => BridgeError::BufferOverflow,
173 })?
174 };
175
176 let (unit_id, fc_byte, start_address, register_count) =
177 frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
178
179 let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
180 .map_err(|_| BridgeError::BufferOverflow)?;
181
182 let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
183
184 self.tcp.send(&tcp_req).await.map_err(BridgeError::TcpIo)?;
185
186 let tcp_resp = if let Some(ms) = self.client.tcp_timeout_ms {
187 let tcp_fut = pin!(self.tcp.listen());
188 let delay_fut = pin!(self.client.delay.delay_ms(ms));
189 match select(tcp_fut, delay_fut).await {
190 Either::Left((r, _)) => r.map_err(|e| match e {
191 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
192 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
193 _ => BridgeError::BufferOverflow,
194 })?,
195 Either::Right(_) => return Err(BridgeError::Timeout),
196 }
197 } else {
198 self.tcp.listen().await.map_err(|e| match e {
199 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
200 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
201 _ => BridgeError::BufferOverflow,
202 })?
203 };
204
205 let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
206 Ok(r) => (r, None),
207 Err(ModbusError::InvalidTransactionId) => {
208 let rx_tid = tcp_resp
209 .get(..2)
210 .map(|b| u16::from_be_bytes([b[0], b[1]]))
211 .unwrap_or(0);
212 let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
213 .map_err(|_| BridgeError::BufferOverflow)?;
214 (
215 fallback,
216 Some(Warning::TransactionIdMismatch {
217 expected: tid,
218 got: rx_tid,
219 }),
220 )
221 }
222 Err(_) => return Err(BridgeError::BufferOverflow),
223 };
224
225 self.client
226 .rtu
227 .send(&rtu_resp)
228 .await
229 .map_err(BridgeError::RtuIo)?;
230
231 if let Some(w) = tid_warning {
232 return Ok(BridgeEvent::Warning(w));
233 }
234 Ok(BridgeEvent::Transaction(Transaction {
235 unit_id,
236 function_code: FunctionCode::from(fc_byte),
237 start_address,
238 register_count,
239 }))
240 }
241}
242
243#[cfg(feature = "sync")]
246impl<S, TX, TS> ClientSession<'_, S, TX, TS, NoDelay>
247where
248 S: embedded_io::Read + embedded_io::Write,
249 TX: OutputPin,
250 TS: embedded_io::Read + embedded_io::Write,
251{
252 #[expect(
266 clippy::should_implement_trait,
267 reason = "drives one request/response cycle, not an iterator"
268 )]
269 pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
270 let rtu_req = self.client.rtu.listen().map_err(|e| match e {
271 ModbusError::Serial(se) => BridgeError::RtuIo(se),
272 ModbusError::Crc => BridgeError::RtuCrcMismatch,
273 ModbusError::PayloadTooShort => BridgeError::RtuClosed,
274 _ => BridgeError::BufferOverflow,
275 })?;
276
277 let (unit_id, fc_byte, start_address, register_count) =
278 frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
279
280 let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
281 .map_err(|_| BridgeError::BufferOverflow)?;
282
283 let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
284
285 self.tcp.send(&tcp_req).map_err(BridgeError::TcpIo)?;
286
287 let tcp_resp = self.tcp.listen().map_err(|e| match e {
288 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
289 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
290 ModbusError::Push => BridgeError::BufferOverflow,
291 _ => BridgeError::BufferOverflow,
292 })?;
293
294 let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
295 Ok(r) => (r, None),
296 Err(ModbusError::InvalidTransactionId) => {
297 let rx_tid = if tcp_resp.len() >= 2 {
298 u16::from_be_bytes([tcp_resp[0], tcp_resp[1]])
299 } else {
300 0
301 };
302 let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
303 .map_err(|_| BridgeError::BufferOverflow)?;
304 (
305 fallback,
306 Some(Warning::TransactionIdMismatch {
307 expected: tid,
308 got: rx_tid,
309 }),
310 )
311 }
312 Err(_) => return Err(BridgeError::BufferOverflow),
313 };
314
315 self.client
316 .rtu
317 .send(&rtu_resp)
318 .map_err(BridgeError::RtuIo)?;
319
320 if let Some(w) = tid_warning {
321 return Ok(BridgeEvent::Warning(w));
322 }
323 Ok(BridgeEvent::Transaction(Transaction {
324 unit_id,
325 function_code: FunctionCode::from(fc_byte),
326 start_address,
327 register_count,
328 }))
329 }
330}
331
332#[cfg(feature = "sync")]
335impl<S, TX, TS, D> ClientSession<'_, S, TX, TS, D>
336where
337 S: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
338 TX: OutputPin,
339 TS: embedded_io::Read + embedded_io::Write + embedded_io::ReadReady,
340 D: embedded_hal::delay::DelayNs,
341{
342 #[expect(
358 clippy::should_implement_trait,
359 reason = "drives one request/response cycle, not an iterator"
360 )]
361 pub fn next(&mut self) -> Result<BridgeEvent, BridgeError<S::Error, TS::Error>> {
362 if let Some(timeout_ms) = self.client.rtu_timeout_ms {
363 let mut elapsed = 0u32;
364 loop {
365 match self.client.rtu.serial.read_ready() {
366 Ok(true) => break,
367 Ok(false) => {
368 if elapsed >= timeout_ms {
369 return Err(BridgeError::Timeout);
370 }
371 self.client.delay.delay_ms(1);
372 elapsed = elapsed.saturating_add(1);
373 }
374 Err(e) => return Err(BridgeError::RtuIo(e)),
375 }
376 }
377 }
378
379 let rtu_req = self.client.rtu.listen().map_err(|e| match e {
380 ModbusError::Serial(se) => BridgeError::RtuIo(se),
381 ModbusError::Crc => BridgeError::RtuCrcMismatch,
382 ModbusError::PayloadTooShort => BridgeError::RtuClosed,
383 _ => BridgeError::BufferOverflow,
384 })?;
385
386 let (unit_id, fc_byte, start_address, register_count) =
387 frame::parse_rtu_request(&rtu_req).unwrap_or((0, 0, 0, 0));
388
389 let tcp_req = frame::rtu_to_tcp(&rtu_req, self.tcp.next_transaction_id())
390 .map_err(|_| BridgeError::BufferOverflow)?;
391
392 let tid = u16::from_be_bytes([tcp_req[0], tcp_req[1]]);
393
394 self.tcp.send(&tcp_req).map_err(BridgeError::TcpIo)?;
395
396 if let Some(timeout_ms) = self.client.tcp_timeout_ms {
397 let mut elapsed = 0u32;
398 loop {
399 match self.tcp.stream.read_ready() {
400 Ok(true) => break,
401 Ok(false) => {
402 if elapsed >= timeout_ms {
403 return Err(BridgeError::Timeout);
404 }
405 self.client.delay.delay_ms(1);
406 elapsed = elapsed.saturating_add(1);
407 }
408 Err(e) => return Err(BridgeError::TcpIo(e)),
409 }
410 }
411 }
412
413 let tcp_resp = self.tcp.listen().map_err(|e| match e {
414 ModbusError::PayloadTooShort => BridgeError::TcpClosed,
415 ModbusError::Tcp(te) => BridgeError::TcpIo(te),
416 ModbusError::Push => BridgeError::BufferOverflow,
417 _ => BridgeError::BufferOverflow,
418 })?;
419
420 let (rtu_resp, tid_warning) = match frame::tcp_resp_to_rtu(&tcp_resp, tid) {
421 Ok(r) => (r, None),
422 Err(ModbusError::InvalidTransactionId) => {
423 let rx_tid = if tcp_resp.len() >= 2 {
424 u16::from_be_bytes([tcp_resp[0], tcp_resp[1]])
425 } else {
426 0
427 };
428 let fallback = frame::tcp_resp_to_rtu(&tcp_resp, rx_tid)
429 .map_err(|_| BridgeError::BufferOverflow)?;
430 (
431 fallback,
432 Some(Warning::TransactionIdMismatch {
433 expected: tid,
434 got: rx_tid,
435 }),
436 )
437 }
438 Err(_) => return Err(BridgeError::BufferOverflow),
439 };
440
441 self.client
442 .rtu
443 .send(&rtu_resp)
444 .map_err(BridgeError::RtuIo)?;
445
446 if let Some(w) = tid_warning {
447 return Ok(BridgeEvent::Warning(w));
448 }
449 Ok(BridgeEvent::Transaction(Transaction {
450 unit_id,
451 function_code: FunctionCode::from(fc_byte),
452 start_address,
453 register_count,
454 }))
455 }
456}