1use crate::error::ModbusError;
2use crate::layers::application::{ApplicationLayer, ApplicationProtocol, ApplicationRole, Framing};
3use crate::layers::physical::PhysicalLayer;
4use crate::master_session::{MasterSession, PreCheck, PreCheckOutcome, WaiterKey};
5use crate::types::{
6 ApplicationDataUnit, CustomFunctionCode, DeviceIdentification, DeviceObject, MasterResponse,
7 ServerId,
8};
9use crate::utils::{parse_coils, parse_registers};
10use std::sync::atomic::{AtomicU16, AtomicU8, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13use tokio::sync::broadcast::error::RecvError;
14use tokio::task::JoinHandle;
15
16#[derive(Clone, Copy, Debug)]
19pub struct ModbusMasterOptions {
20 pub timeout_ms: u64,
23 pub concurrent: bool,
28}
29
30impl Default for ModbusMasterOptions {
31 fn default() -> Self {
32 Self {
33 timeout_ms: 1000,
34 concurrent: false,
35 }
36 }
37}
38
39pub struct ModbusMaster<A: ApplicationLayer, P: PhysicalLayer> {
40 application: Arc<A>,
41 physical: Arc<P>,
42 session: Arc<MasterSession>,
43 pub timeout_ms: u64,
44 pub concurrent: bool,
45 next_tid: AtomicU16,
46 clean_level: AtomicU8,
47 queue_lock: tokio::sync::Mutex<()>,
48 tasks: Mutex<Vec<JoinHandle<()>>>,
49}
50
51impl<A: ApplicationLayer + 'static, P: PhysicalLayer + 'static> ModbusMaster<A, P> {
52 pub fn new(application: Arc<A>, physical: Arc<P>, options: ModbusMasterOptions) -> Self {
53 if options.concurrent && application.protocol() != ApplicationProtocol::Tcp {
54 panic!("concurrent mode requires a Modbus TCP application layer");
55 }
56 application
57 .set_role(ApplicationRole::Master)
58 .expect("application layer is already bound to a different role");
59 let session = Arc::new(MasterSession::new());
60
61 let session_for_framing = Arc::clone(&session);
62 let mut framing_rx = application.subscribe_framing();
63 let framing_task = tokio::spawn(async move {
64 loop {
65 match framing_rx.recv().await {
66 Ok(frame) => session_for_framing.handle_frame(frame),
67 Err(RecvError::Lagged(_)) => continue,
68 Err(RecvError::Closed) => break,
69 }
70 }
71 });
72
73 let session_for_error = Arc::clone(&session);
74 let mut error_rx = application.subscribe_framing_error();
75 let error_task = tokio::spawn(async move {
76 loop {
77 match error_rx.recv().await {
78 Ok(err) => session_for_error.handle_error(err),
79 Err(RecvError::Lagged(_)) => continue,
80 Err(RecvError::Closed) => break,
81 }
82 }
83 });
84
85 Self {
86 application,
87 physical,
88 session,
89 timeout_ms: options.timeout_ms,
90 concurrent: options.concurrent,
91 next_tid: AtomicU16::new(1),
92 clean_level: AtomicU8::new(0),
93 queue_lock: tokio::sync::Mutex::new(()),
94 tasks: Mutex::new(vec![framing_task, error_task]),
95 }
96 }
97
98 fn allocate_tid(&self) -> u16 {
101 self.next_tid
102 .fetch_update(Ordering::Release, Ordering::Acquire, |t| {
103 let next = if t == 65535 { 1 } else { t + 1 };
104 Some(next)
105 })
106 .unwrap()
107 }
108
109 fn clean(&self, level: u8) {
110 let current = self.clean_level.load(Ordering::Acquire);
111 if current == 2 {
112 return;
113 }
114 if current == 1 && level == 1 {
115 return;
116 }
117 let err = if level == 2 {
118 ModbusError::InvalidState("Master destroyed".into())
119 } else {
120 ModbusError::InvalidState("Master closed".into())
121 };
122 self.session.stop_all(err);
123 self.clean_level.store(level, Ordering::Release);
124 }
125
126 pub fn is_open(&self) -> bool {
127 self.physical.is_open()
128 }
129
130 pub fn is_destroyed(&self) -> bool {
131 self.clean_level.load(Ordering::Acquire) == 2 || self.physical.is_destroyed()
132 }
133
134 pub async fn open(&self, options: P::OpenOptions) -> Result<(), ModbusError> {
135 if self.clean_level.load(Ordering::Acquire) == 2 {
136 return Err(ModbusError::PortDestroyed);
137 }
138 self.clean_level.store(0, Ordering::Release);
139 self.next_tid.store(1, Ordering::Release);
140 self.physical.open(options).await?;
141 Ok(())
142 }
143
144 pub async fn close(&self) -> Result<(), ModbusError> {
145 if self.clean_level.load(Ordering::Acquire) == 2 {
146 return Ok(());
147 }
148 self.clean(1);
149 self.physical.close().await
150 }
151
152 pub async fn destroy(&self) {
153 if self.clean_level.load(Ordering::Acquire) == 2 {
154 return;
155 }
156 self.clean(2);
157 {
158 let mut tasks = self.tasks.lock().unwrap();
159 for task in tasks.drain(..) {
160 task.abort();
161 }
162 }
163 self.application.destroy().await;
164 let _ = self.physical.destroy().await;
165 }
166
167 fn check_unit_fc(unit: u8, fc: u8) -> PreCheck {
168 Arc::new(move |f: &Framing| {
169 if f.adu.unit == unit && f.adu.fc == fc {
170 PreCheckOutcome::Pass
171 } else {
172 PreCheckOutcome::Fail(ModbusError::InvalidResponse)
173 }
174 })
175 }
176
177 fn check_length(expected: usize) -> PreCheck {
178 Arc::new(move |_: &Framing| PreCheckOutcome::NeedLength(expected))
179 }
180
181 fn check_byte_count(expected: usize) -> PreCheck {
182 Arc::new(move |f: &Framing| {
183 if !f.adu.data.is_empty() && f.adu.data[0] as usize == expected {
184 PreCheckOutcome::Pass
185 } else {
186 PreCheckOutcome::Fail(ModbusError::InvalidResponse)
187 }
188 })
189 }
190
191 fn check_echo(expected: Vec<u8>) -> PreCheck {
192 Arc::new(move |f: &Framing| {
193 if f.adu.data == expected {
194 PreCheckOutcome::Pass
195 } else {
196 PreCheckOutcome::Fail(ModbusError::InvalidResponse)
197 }
198 })
199 }
200
201 async fn wait_response(
202 &self,
203 request: &ApplicationDataUnit,
204 checks: Vec<PreCheck>,
205 timeout_ms: u64,
206 ) -> Result<Option<Framing>, ModbusError> {
207 if self.clean_level.load(Ordering::Acquire) >= 1 {
211 return Err(ModbusError::InvalidState("Master closed".into()));
212 }
213
214 let _queue_guard = if self.concurrent {
219 None
220 } else {
221 Some(self.queue_lock.lock().await)
222 };
223
224 if self.clean_level.load(Ordering::Acquire) >= 1 {
227 return Err(ModbusError::InvalidState("Master closed".into()));
228 }
229
230 if !self.concurrent {
234 self.application.flush();
235 }
236
237 let broadcast = request.unit == 0;
238 let uses_tid = self.application.protocol() == ApplicationProtocol::Tcp && !broadcast;
239
240 let (encoded, key) = if uses_tid {
245 let tid = self.allocate_tid();
246 let adu = ApplicationDataUnit {
247 transaction: Some(tid),
248 unit: request.unit,
249 fc: request.fc,
250 data: request.data.clone(),
251 };
252 (self.application.encode(&adu), WaiterKey::Tid(tid))
253 } else {
254 (self.application.encode(request), WaiterKey::Fifo)
255 };
256
257 let final_checks: Vec<PreCheck> = if let WaiterKey::Tid(tid) = key {
261 let mut v: Vec<PreCheck> = Vec::with_capacity(checks.len() + 1);
262 v.push(Arc::new(move |f: &Framing| {
263 if f.adu.transaction == Some(tid) {
264 PreCheckOutcome::Pass
265 } else {
266 PreCheckOutcome::Fail(ModbusError::InvalidResponse)
267 }
268 }));
269 v.extend(checks);
270 v
271 } else {
272 checks
273 };
274
275 let rx = self.session.start(key, final_checks);
279 if let Err(err) = self.physical.write(&encoded).await {
280 self.session.stop(key);
281 return Err(err);
282 }
283
284 if broadcast {
285 self.session.stop(key);
288 return Ok(None);
289 }
290
291 let timeout = Duration::from_millis(timeout_ms);
292 match tokio::time::timeout(timeout, rx).await {
293 Ok(Ok(Ok(frame))) => Ok(Some(frame)),
294 Ok(Ok(Err(err))) => Err(err),
295 Ok(Err(_)) => {
296 Err(ModbusError::InvalidState(
298 "master session was cleared while waiting".into(),
299 ))
300 }
301 Err(_) => {
302 self.session.stop(key);
303 Err(ModbusError::Timeout)
304 }
305 }
306 }
307
308 pub async fn read_coils(
310 &self,
311 unit: u8,
312 address: u16,
313 length: u16,
314 timeout_ms: Option<u64>,
315 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
316 let fc = 0x01;
317 let byte_count = ((length + 7) / 8) as usize;
318
319 let mut buf = vec![0u8; 4];
320 buf[0..2].copy_from_slice(&address.to_be_bytes());
321 buf[2..4].copy_from_slice(&length.to_be_bytes());
322
323 let request = ApplicationDataUnit::new(unit, fc, buf);
324
325 let frame = self
326 .wait_response(
327 &request,
328 vec![
329 Self::check_unit_fc(unit, fc),
330 Self::check_length(1 + byte_count),
331 Self::check_byte_count(byte_count),
332 ],
333 timeout_ms.unwrap_or(self.timeout_ms),
334 )
335 .await?;
336
337 match frame {
338 Some(f) => Ok(Some(MasterResponse {
339 transaction: f.adu.transaction,
340 unit: f.adu.unit,
341 fc: f.adu.fc,
342 data: parse_coils(&f.adu.data, length),
343 raw: f.raw,
344 })),
345 None => Ok(None),
346 }
347 }
348
349 pub async fn write_fc1(
351 &self,
352 unit: u8,
353 address: u16,
354 length: u16,
355 timeout_ms: Option<u64>,
356 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
357 self.read_coils(unit, address, length, timeout_ms).await
358 }
359
360 pub async fn read_discrete_inputs(
362 &self,
363 unit: u8,
364 address: u16,
365 length: u16,
366 timeout_ms: Option<u64>,
367 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
368 let fc = 0x02;
369 let byte_count = ((length + 7) / 8) as usize;
370
371 let mut buf = vec![0u8; 4];
372 buf[0..2].copy_from_slice(&address.to_be_bytes());
373 buf[2..4].copy_from_slice(&length.to_be_bytes());
374
375 let request = ApplicationDataUnit::new(unit, fc, buf);
376
377 let frame = self
378 .wait_response(
379 &request,
380 vec![
381 Self::check_unit_fc(unit, fc),
382 Self::check_length(1 + byte_count),
383 Self::check_byte_count(byte_count),
384 ],
385 timeout_ms.unwrap_or(self.timeout_ms),
386 )
387 .await?;
388
389 match frame {
390 Some(f) => Ok(Some(MasterResponse {
391 transaction: f.adu.transaction,
392 unit: f.adu.unit,
393 fc: f.adu.fc,
394 data: parse_coils(&f.adu.data, length),
395 raw: f.raw,
396 })),
397 None => Ok(None),
398 }
399 }
400
401 pub async fn write_fc2(
403 &self,
404 unit: u8,
405 address: u16,
406 length: u16,
407 timeout_ms: Option<u64>,
408 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
409 self.read_discrete_inputs(unit, address, length, timeout_ms)
410 .await
411 }
412
413 pub async fn read_holding_registers(
415 &self,
416 unit: u8,
417 address: u16,
418 length: u16,
419 timeout_ms: Option<u64>,
420 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
421 let fc = 0x03;
422 let byte_count = (length * 2) as usize;
423
424 let mut buf = vec![0u8; 4];
425 buf[0..2].copy_from_slice(&address.to_be_bytes());
426 buf[2..4].copy_from_slice(&length.to_be_bytes());
427
428 let request = ApplicationDataUnit::new(unit, fc, buf);
429
430 let frame = self
431 .wait_response(
432 &request,
433 vec![
434 Self::check_unit_fc(unit, fc),
435 Self::check_length(1 + byte_count),
436 Self::check_byte_count(byte_count),
437 ],
438 timeout_ms.unwrap_or(self.timeout_ms),
439 )
440 .await?;
441
442 match frame {
443 Some(f) => Ok(Some(MasterResponse {
444 transaction: f.adu.transaction,
445 unit: f.adu.unit,
446 fc: f.adu.fc,
447 data: parse_registers(&f.adu.data, length),
448 raw: f.raw,
449 })),
450 None => Ok(None),
451 }
452 }
453
454 pub async fn write_fc3(
456 &self,
457 unit: u8,
458 address: u16,
459 length: u16,
460 timeout_ms: Option<u64>,
461 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
462 self.read_holding_registers(unit, address, length, timeout_ms)
463 .await
464 }
465
466 pub async fn read_input_registers(
468 &self,
469 unit: u8,
470 address: u16,
471 length: u16,
472 timeout_ms: Option<u64>,
473 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
474 let fc = 0x04;
475 let byte_count = (length * 2) as usize;
476
477 let mut buf = vec![0u8; 4];
478 buf[0..2].copy_from_slice(&address.to_be_bytes());
479 buf[2..4].copy_from_slice(&length.to_be_bytes());
480
481 let request = ApplicationDataUnit::new(unit, fc, buf);
482
483 let frame = self
484 .wait_response(
485 &request,
486 vec![
487 Self::check_unit_fc(unit, fc),
488 Self::check_length(1 + byte_count),
489 Self::check_byte_count(byte_count),
490 ],
491 timeout_ms.unwrap_or(self.timeout_ms),
492 )
493 .await?;
494
495 match frame {
496 Some(f) => Ok(Some(MasterResponse {
497 transaction: f.adu.transaction,
498 unit: f.adu.unit,
499 fc: f.adu.fc,
500 data: parse_registers(&f.adu.data, length),
501 raw: f.raw,
502 })),
503 None => Ok(None),
504 }
505 }
506
507 pub async fn write_fc4(
509 &self,
510 unit: u8,
511 address: u16,
512 length: u16,
513 timeout_ms: Option<u64>,
514 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
515 self.read_input_registers(unit, address, length, timeout_ms)
516 .await
517 }
518
519 pub async fn write_single_coil(
521 &self,
522 unit: u8,
523 address: u16,
524 value: bool,
525 timeout_ms: Option<u64>,
526 ) -> Result<Option<MasterResponse<bool>>, ModbusError> {
527 let fc = 0x05;
528
529 let mut buf = vec![0u8; 4];
530 buf[0..2].copy_from_slice(&address.to_be_bytes());
531 let value_u16: u16 = if value { 0xff00 } else { 0x0000 };
532 buf[2..4].copy_from_slice(&value_u16.to_be_bytes());
533
534 let request = ApplicationDataUnit::new(unit, fc, buf.clone());
535
536 let frame = self
537 .wait_response(
538 &request,
539 vec![
540 Self::check_unit_fc(unit, fc),
541 Self::check_length(4),
542 Self::check_echo(buf),
543 ],
544 timeout_ms.unwrap_or(self.timeout_ms),
545 )
546 .await?;
547
548 match frame {
549 Some(f) => Ok(Some(MasterResponse {
550 transaction: f.adu.transaction,
551 unit: f.adu.unit,
552 fc: f.adu.fc,
553 data: value,
554 raw: f.raw,
555 })),
556 None => Ok(None),
557 }
558 }
559
560 pub async fn write_fc5(
562 &self,
563 unit: u8,
564 address: u16,
565 value: bool,
566 timeout_ms: Option<u64>,
567 ) -> Result<Option<MasterResponse<bool>>, ModbusError> {
568 self.write_single_coil(unit, address, value, timeout_ms)
569 .await
570 }
571
572 pub async fn write_single_register(
574 &self,
575 unit: u8,
576 address: u16,
577 value: u16,
578 timeout_ms: Option<u64>,
579 ) -> Result<Option<MasterResponse<u16>>, ModbusError> {
580 let fc = 0x06;
581
582 let mut buf = vec![0u8; 4];
583 buf[0..2].copy_from_slice(&address.to_be_bytes());
584 buf[2..4].copy_from_slice(&value.to_be_bytes());
585
586 let request = ApplicationDataUnit::new(unit, fc, buf.clone());
587
588 let frame = self
589 .wait_response(
590 &request,
591 vec![
592 Self::check_unit_fc(unit, fc),
593 Self::check_length(4),
594 Self::check_echo(buf),
595 ],
596 timeout_ms.unwrap_or(self.timeout_ms),
597 )
598 .await?;
599
600 match frame {
601 Some(f) => Ok(Some(MasterResponse {
602 transaction: f.adu.transaction,
603 unit: f.adu.unit,
604 fc: f.adu.fc,
605 data: value,
606 raw: f.raw,
607 })),
608 None => Ok(None),
609 }
610 }
611
612 pub async fn write_fc6(
614 &self,
615 unit: u8,
616 address: u16,
617 value: u16,
618 timeout_ms: Option<u64>,
619 ) -> Result<Option<MasterResponse<u16>>, ModbusError> {
620 self.write_single_register(unit, address, value, timeout_ms)
621 .await
622 }
623
624 pub async fn write_multiple_coils(
626 &self,
627 unit: u8,
628 address: u16,
629 values: &[bool],
630 timeout_ms: Option<u64>,
631 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
632 let fc = 0x0f;
633 let byte_count = ((values.len() + 7) / 8) as u8;
634
635 let mut buf = vec![0u8; 5 + byte_count as usize];
636 buf[0..2].copy_from_slice(&address.to_be_bytes());
637 buf[2..4].copy_from_slice(&(values.len() as u16).to_be_bytes());
638 buf[4] = byte_count;
639 for (byte_idx, chunk) in values.chunks(8).enumerate() {
640 let mut byte = 0u8;
641 for (bit_idx, &v) in chunk.iter().enumerate() {
642 if v {
643 byte |= 1 << bit_idx;
644 }
645 }
646 buf[5 + byte_idx] = byte;
647 }
648
649 let tx_buf = buf.clone();
650 let request = ApplicationDataUnit::new(unit, fc, buf);
651
652 let frame = self
653 .wait_response(
654 &request,
655 vec![
656 Self::check_unit_fc(unit, fc),
657 Self::check_length(4),
658 Self::check_echo(tx_buf[..4].to_vec()),
659 ],
660 timeout_ms.unwrap_or(self.timeout_ms),
661 )
662 .await?;
663
664 match frame {
665 Some(f) => Ok(Some(MasterResponse {
666 transaction: f.adu.transaction,
667 unit: f.adu.unit,
668 fc: f.adu.fc,
669 data: values.to_vec(),
670 raw: f.raw,
671 })),
672 None => Ok(None),
673 }
674 }
675
676 pub async fn write_fc15(
678 &self,
679 unit: u8,
680 address: u16,
681 values: &[bool],
682 timeout_ms: Option<u64>,
683 ) -> Result<Option<MasterResponse<Vec<bool>>>, ModbusError> {
684 self.write_multiple_coils(unit, address, values, timeout_ms)
685 .await
686 }
687
688 pub async fn write_multiple_registers(
690 &self,
691 unit: u8,
692 address: u16,
693 values: &[u16],
694 timeout_ms: Option<u64>,
695 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
696 let fc = 0x10;
697 let byte_count = (values.len() * 2) as u8;
698
699 let mut buf = vec![0u8; 5 + byte_count as usize];
700 buf[0..2].copy_from_slice(&address.to_be_bytes());
701 buf[2..4].copy_from_slice(&(values.len() as u16).to_be_bytes());
702 buf[4] = byte_count;
703 for (i, &v) in values.iter().enumerate() {
704 buf[5 + i * 2..7 + i * 2].copy_from_slice(&v.to_be_bytes());
705 }
706
707 let tx_buf = buf.clone();
708 let request = ApplicationDataUnit::new(unit, fc, buf);
709
710 let frame = self
711 .wait_response(
712 &request,
713 vec![
714 Self::check_unit_fc(unit, fc),
715 Self::check_length(4),
716 Self::check_echo(tx_buf[..4].to_vec()),
717 ],
718 timeout_ms.unwrap_or(self.timeout_ms),
719 )
720 .await?;
721
722 match frame {
723 Some(f) => Ok(Some(MasterResponse {
724 transaction: f.adu.transaction,
725 unit: f.adu.unit,
726 fc: f.adu.fc,
727 data: values.to_vec(),
728 raw: f.raw,
729 })),
730 None => Ok(None),
731 }
732 }
733
734 pub async fn write_fc16(
736 &self,
737 unit: u8,
738 address: u16,
739 values: &[u16],
740 timeout_ms: Option<u64>,
741 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
742 self.write_multiple_registers(unit, address, values, timeout_ms)
743 .await
744 }
745
746 pub async fn report_server_id(
748 &self,
749 unit: u8,
750 server_id_length: usize,
751 timeout_ms: Option<u64>,
752 ) -> Result<Option<MasterResponse<ServerId>>, ModbusError> {
753 let fc = 0x11;
754 let request = ApplicationDataUnit::new(unit, fc, vec![]);
755
756 let frame = self
757 .wait_response(
758 &request,
759 vec![
760 Self::check_unit_fc(unit, fc),
761 Arc::new(move |f: &Framing| {
762 if !f.adu.data.is_empty() {
763 let len = 1 + f.adu.data[0] as usize;
764 PreCheckOutcome::NeedLength(len)
765 } else {
766 PreCheckOutcome::InsufficientData
767 }
768 }),
769 ],
770 timeout_ms.unwrap_or(self.timeout_ms),
771 )
772 .await?;
773
774 match frame {
775 Some(f) => {
776 let run_status_index = 1 + server_id_length;
777 if f.adu.data.len() < run_status_index + 1 {
778 return Err(ModbusError::InvalidResponse);
779 }
780 Ok(Some(MasterResponse {
781 transaction: f.adu.transaction,
782 unit: f.adu.unit,
783 fc: f.adu.fc,
784 data: ServerId {
785 server_id: f.adu.data[1..run_status_index].to_vec(),
786 run_indicator_status: f.adu.data[run_status_index] == 0xff,
787 additional_data: f.adu.data[run_status_index + 1..].to_vec(),
788 },
789 raw: f.raw,
790 }))
791 }
792 None => Ok(None),
793 }
794 }
795
796 pub async fn handle_fc17(
798 &self,
799 unit: u8,
800 server_id_length: usize,
801 timeout_ms: Option<u64>,
802 ) -> Result<Option<MasterResponse<ServerId>>, ModbusError> {
803 self.report_server_id(unit, server_id_length, timeout_ms)
804 .await
805 }
806
807 pub async fn mask_write_register(
809 &self,
810 unit: u8,
811 address: u16,
812 and_mask: u16,
813 or_mask: u16,
814 timeout_ms: Option<u64>,
815 ) -> Result<Option<MasterResponse<(u16, u16)>>, ModbusError> {
816 let fc = 0x16;
817
818 let mut buf = vec![0u8; 6];
819 buf[0..2].copy_from_slice(&address.to_be_bytes());
820 buf[2..4].copy_from_slice(&and_mask.to_be_bytes());
821 buf[4..6].copy_from_slice(&or_mask.to_be_bytes());
822
823 let request = ApplicationDataUnit::new(unit, fc, buf.clone());
824
825 let frame = self
826 .wait_response(
827 &request,
828 vec![
829 Self::check_unit_fc(unit, fc),
830 Self::check_length(6),
831 Self::check_echo(buf),
832 ],
833 timeout_ms.unwrap_or(self.timeout_ms),
834 )
835 .await?;
836
837 match frame {
838 Some(f) => Ok(Some(MasterResponse {
839 transaction: f.adu.transaction,
840 unit: f.adu.unit,
841 fc: f.adu.fc,
842 data: (and_mask, or_mask),
843 raw: f.raw,
844 })),
845 None => Ok(None),
846 }
847 }
848
849 pub async fn handle_fc22(
851 &self,
852 unit: u8,
853 address: u16,
854 and_mask: u16,
855 or_mask: u16,
856 timeout_ms: Option<u64>,
857 ) -> Result<Option<MasterResponse<(u16, u16)>>, ModbusError> {
858 self.mask_write_register(unit, address, and_mask, or_mask, timeout_ms)
859 .await
860 }
861
862 pub async fn read_and_write_multiple_registers(
864 &self,
865 unit: u8,
866 read_address: u16,
867 read_length: u16,
868 write_address: u16,
869 write_values: &[u16],
870 timeout_ms: Option<u64>,
871 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
872 let fc = 0x17;
873 let byte_count = (write_values.len() * 2) as u8;
874
875 let mut buf = vec![0u8; 9 + byte_count as usize];
876 buf[0..2].copy_from_slice(&read_address.to_be_bytes());
877 buf[2..4].copy_from_slice(&read_length.to_be_bytes());
878 buf[4..6].copy_from_slice(&write_address.to_be_bytes());
879 buf[6..8].copy_from_slice(&(write_values.len() as u16).to_be_bytes());
880 buf[8] = byte_count;
881 for (i, &v) in write_values.iter().enumerate() {
882 buf[9 + i * 2..11 + i * 2].copy_from_slice(&v.to_be_bytes());
883 }
884
885 let request = ApplicationDataUnit::new(unit, fc, buf);
886 let read_byte_count = (read_length * 2) as usize;
887
888 let frame = self
889 .wait_response(
890 &request,
891 vec![
892 Self::check_unit_fc(unit, fc),
893 Self::check_length(1 + read_byte_count),
894 Self::check_byte_count(read_byte_count),
895 ],
896 timeout_ms.unwrap_or(self.timeout_ms),
897 )
898 .await?;
899
900 match frame {
901 Some(f) => Ok(Some(MasterResponse {
902 transaction: f.adu.transaction,
903 unit: f.adu.unit,
904 fc: f.adu.fc,
905 data: parse_registers(&f.adu.data, read_length),
906 raw: f.raw,
907 })),
908 None => Ok(None),
909 }
910 }
911
912 pub async fn handle_fc23(
914 &self,
915 unit: u8,
916 read_address: u16,
917 read_length: u16,
918 write_address: u16,
919 write_values: &[u16],
920 timeout_ms: Option<u64>,
921 ) -> Result<Option<MasterResponse<Vec<u16>>>, ModbusError> {
922 self.read_and_write_multiple_registers(
923 unit,
924 read_address,
925 read_length,
926 write_address,
927 write_values,
928 timeout_ms,
929 )
930 .await
931 }
932
933 pub async fn read_device_identification(
935 &self,
936 unit: u8,
937 read_device_id_code: u8,
938 object_id: u8,
939 timeout_ms: Option<u64>,
940 ) -> Result<Option<MasterResponse<DeviceIdentification>>, ModbusError> {
941 let fc = 0x2b;
942 let request =
943 ApplicationDataUnit::new(unit, fc, vec![0x0e, read_device_id_code, object_id]);
944
945 let frame = self
946 .wait_response(
947 &request,
948 vec![
949 Self::check_unit_fc(unit, fc),
950 Arc::new(move |f: &Framing| {
951 if f.adu.data.len() >= 6
952 && f.adu.data[0] == 0x0e
953 && f.adu.data[1] == read_device_id_code
954 {
955 let num_objects = f.adu.data[5] as usize;
956 let mut total = 6usize;
957 let mut idx = 6;
958 for _ in 0..num_objects {
959 if idx + 2 > f.adu.data.len() {
960 return PreCheckOutcome::InsufficientData;
961 }
962 let obj_len = f.adu.data[idx + 1] as usize;
963 total += 2 + obj_len;
964 idx += 2 + obj_len;
965 }
966 PreCheckOutcome::NeedLength(total)
967 } else {
968 PreCheckOutcome::Fail(ModbusError::InvalidResponse)
969 }
970 }),
971 ],
972 timeout_ms.unwrap_or(self.timeout_ms),
973 )
974 .await?;
975
976 match frame {
977 Some(f) => {
978 let mut objects = Vec::new();
979 let num_objects = f.adu.data[5] as usize;
980 let mut idx = 6;
981 for _ in 0..num_objects {
982 let obj_id = f.adu.data[idx];
983 let obj_len = f.adu.data[idx + 1] as usize;
984 let obj_value =
985 String::from_utf8_lossy(&f.adu.data[idx + 2..idx + 2 + obj_len])
986 .to_string();
987 objects.push(DeviceObject {
988 id: obj_id,
989 value: obj_value,
990 });
991 idx += 2 + obj_len;
992 }
993 Ok(Some(MasterResponse {
994 transaction: f.adu.transaction,
995 unit: f.adu.unit,
996 fc: f.adu.fc,
997 data: DeviceIdentification {
998 read_device_id_code: f.adu.data[1],
999 conformity_level: f.adu.data[2],
1000 more_follows: f.adu.data[3] == 0xff,
1001 next_object_id: f.adu.data[4],
1002 objects,
1003 },
1004 raw: f.raw,
1005 }))
1006 }
1007 None => Ok(None),
1008 }
1009 }
1010
1011 pub async fn handle_fc43_14(
1013 &self,
1014 unit: u8,
1015 read_device_id_code: u8,
1016 object_id: u8,
1017 timeout_ms: Option<u64>,
1018 ) -> Result<Option<MasterResponse<DeviceIdentification>>, ModbusError> {
1019 self.read_device_identification(unit, read_device_id_code, object_id, timeout_ms)
1020 .await
1021 }
1022
1023 pub fn add_custom_function_code(&self, cfc: CustomFunctionCode) {
1024 self.application.add_custom_function_code(cfc);
1025 }
1026
1027 pub fn remove_custom_function_code(&self, fc: u8) {
1028 self.application.remove_custom_function_code(fc);
1029 }
1030
1031 pub async fn send_custom_fc(
1039 &self,
1040 unit: u8,
1041 fc: u8,
1042 data: Vec<u8>,
1043 timeout_ms: Option<u64>,
1044 ) -> Result<Option<MasterResponse<Vec<u8>>>, ModbusError> {
1045 let request = ApplicationDataUnit::new(unit, fc, data);
1046 let frame = self
1047 .wait_response(
1048 &request,
1049 vec![Self::check_unit_fc(unit, fc)],
1050 timeout_ms.unwrap_or(self.timeout_ms),
1051 )
1052 .await?;
1053 match frame {
1054 Some(f) => Ok(Some(MasterResponse {
1055 transaction: f.adu.transaction,
1056 unit: f.adu.unit,
1057 fc: f.adu.fc,
1058 data: f.adu.data,
1059 raw: f.raw,
1060 })),
1061 None => Ok(None),
1062 }
1063 }
1064}