1use std::collections::HashMap;
16#[cfg(feature = "traffic")]
17use std::panic::{AssertUnwindSafe, catch_unwind};
18use std::sync::atomic::{AtomicU16, Ordering};
19use std::sync::mpsc::{Receiver, Sender};
20use std::sync::{Arc, Mutex, mpsc};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::app::RequestErrorNotifier;
37#[cfg(feature = "traffic")]
38use mbus_client::app::{TrafficDirection, TrafficNotifier};
39use mbus_client::services::ClientServices;
40#[cfg(feature = "coils")]
41use mbus_client::services::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_client::services::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_client::services::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_client::services::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_client::services::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "registers")]
51use mbus_client::services::register::Registers;
52use mbus_core::errors::MbusError;
53#[cfg(feature = "diagnostics")]
54use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
55#[cfg(feature = "tcp")]
56use mbus_core::transport::ModbusTcpConfig;
57use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
58#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
59use mbus_core::transport::{ModbusSerialConfig, SerialMode};
60#[cfg(feature = "tcp")]
61use mbus_network::StdTcpTransport;
62#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
63use mbus_serial::StdSerialTransport;
64use tokio::sync::oneshot;
65
66#[cfg(feature = "diagnostics")]
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct DiagnosticsDataResponse {
70 pub sub_function: DiagnosticSubFunction,
72 pub data: Vec<u16>,
74}
75#[cfg(feature = "diagnostics")]
76pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
78
79#[derive(Debug, PartialEq, Eq)]
81pub enum AsyncError {
82 Mbus(MbusError),
84 WorkerClosed,
86 UnexpectedResponseType,
88}
89
90impl From<MbusError> for AsyncError {
91 fn from(value: MbusError) -> Self {
92 Self::Mbus(value)
93 }
94}
95
96impl std::fmt::Display for AsyncError {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 Self::Mbus(err) => write!(f, "Modbus error: {err}"),
100 Self::WorkerClosed => write!(f, "async worker channel closed"),
101 Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
102 }
103 }
104}
105
106impl std::error::Error for AsyncError {
107 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
108 match self {
109 Self::Mbus(err) => Some(err),
110 _ => None,
111 }
112 }
113}
114
115type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
116type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
117#[cfg(feature = "traffic")]
118type TrafficHandler = Box<dyn FnMut(&TrafficEvent) + Send + 'static>;
119#[cfg(feature = "traffic")]
120type TrafficHandlerStore = Arc<Mutex<Option<TrafficHandler>>>;
121#[cfg(feature = "traffic")]
122type TrafficSender = Sender<TrafficEvent>;
123
124#[cfg(feature = "traffic")]
125#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct TrafficEvent {
128 pub direction: TrafficDirection,
130 pub txn_id: u16,
132 pub unit_id_slave_addr: UnitIdOrSlaveAddr,
134 pub frame: Vec<u8>,
136 pub error: Option<MbusError>,
138}
139
140enum WorkerCommand {
141 Connect {
142 sender: PendingSender,
143 },
144 #[cfg(feature = "coils")]
145 ReadMultipleCoils {
146 txn_id: u16,
147 unit: UnitIdOrSlaveAddr,
148 address: u16,
149 quantity: u16,
150 sender: PendingSender,
151 },
152 #[cfg(feature = "registers")]
153 ReadHoldingRegisters {
154 txn_id: u16,
155 unit: UnitIdOrSlaveAddr,
156 address: u16,
157 quantity: u16,
158 sender: PendingSender,
159 },
160 #[cfg(feature = "registers")]
161 ReadInputRegisters {
162 txn_id: u16,
163 unit: UnitIdOrSlaveAddr,
164 address: u16,
165 quantity: u16,
166 sender: PendingSender,
167 },
168 #[cfg(feature = "registers")]
169 WriteSingleRegister {
170 txn_id: u16,
171 unit: UnitIdOrSlaveAddr,
172 address: u16,
173 value: u16,
174 sender: PendingSender,
175 },
176 #[cfg(feature = "coils")]
177 WriteSingleCoil {
178 txn_id: u16,
179 unit: UnitIdOrSlaveAddr,
180 address: u16,
181 value: bool,
182 sender: PendingSender,
183 },
184 #[cfg(feature = "coils")]
185 WriteMultipleCoils {
186 txn_id: u16,
187 unit: UnitIdOrSlaveAddr,
188 address: u16,
189 coils: Coils,
190 sender: PendingSender,
191 },
192 #[cfg(feature = "registers")]
193 WriteMultipleRegisters {
194 txn_id: u16,
195 unit: UnitIdOrSlaveAddr,
196 address: u16,
197 values: Vec<u16>,
198 sender: PendingSender,
199 },
200 #[cfg(feature = "registers")]
201 ReadWriteMultipleRegisters {
202 txn_id: u16,
203 unit: UnitIdOrSlaveAddr,
204 read_address: u16,
205 read_quantity: u16,
206 write_address: u16,
207 write_values: Vec<u16>,
208 sender: PendingSender,
209 },
210 #[cfg(feature = "registers")]
211 MaskWriteRegister {
212 txn_id: u16,
213 unit: UnitIdOrSlaveAddr,
214 address: u16,
215 and_mask: u16,
216 or_mask: u16,
217 sender: PendingSender,
218 },
219 #[cfg(feature = "discrete-inputs")]
220 ReadDiscreteInputs {
221 txn_id: u16,
222 unit: UnitIdOrSlaveAddr,
223 address: u16,
224 quantity: u16,
225 sender: PendingSender,
226 },
227 #[cfg(feature = "fifo")]
228 ReadFifoQueue {
229 txn_id: u16,
230 unit: UnitIdOrSlaveAddr,
231 address: u16,
232 sender: PendingSender,
233 },
234 #[cfg(feature = "file-record")]
235 ReadFileRecord {
236 txn_id: u16,
237 unit: UnitIdOrSlaveAddr,
238 sub_request: SubRequest,
239 sender: PendingSender,
240 },
241 #[cfg(feature = "file-record")]
242 WriteFileRecord {
243 txn_id: u16,
244 unit: UnitIdOrSlaveAddr,
245 sub_request: SubRequest,
246 sender: PendingSender,
247 },
248 #[cfg(feature = "diagnostics")]
249 ReadDeviceIdentification {
250 txn_id: u16,
251 unit: UnitIdOrSlaveAddr,
252 read_device_id_code: ReadDeviceIdCode,
253 object_id: ObjectId,
254 sender: PendingSender,
255 },
256 #[cfg(feature = "diagnostics")]
257 EncapsulatedInterfaceTransport {
258 txn_id: u16,
259 unit: UnitIdOrSlaveAddr,
260 mei_type: EncapsulatedInterfaceType,
261 data: Vec<u8>,
262 sender: PendingSender,
263 },
264 #[cfg(feature = "diagnostics")]
265 ReadExceptionStatus {
266 txn_id: u16,
267 unit: UnitIdOrSlaveAddr,
268 sender: PendingSender,
269 },
270 #[cfg(feature = "diagnostics")]
271 Diagnostics {
272 txn_id: u16,
273 unit: UnitIdOrSlaveAddr,
274 sub_function: DiagnosticSubFunction,
275 data: Vec<u16>,
276 sender: PendingSender,
277 },
278 #[cfg(feature = "diagnostics")]
279 GetCommEventCounter {
280 txn_id: u16,
281 unit: UnitIdOrSlaveAddr,
282 sender: PendingSender,
283 },
284 #[cfg(feature = "diagnostics")]
285 GetCommEventLog {
286 txn_id: u16,
287 unit: UnitIdOrSlaveAddr,
288 sender: PendingSender,
289 },
290 #[cfg(feature = "diagnostics")]
291 ReportServerId {
292 txn_id: u16,
293 unit: UnitIdOrSlaveAddr,
294 sender: PendingSender,
295 },
296 Shutdown,
297}
298
299enum WorkerResponse {
300 Ack,
301 #[cfg(feature = "coils")]
302 Coils(Coils),
303 #[cfg(feature = "registers")]
304 Registers(Registers),
305 #[cfg(feature = "registers")]
306 SingleRegisterWrite {
307 address: u16,
308 value: u16,
309 },
310 #[cfg(feature = "registers")]
311 MaskWriteRegister,
312 #[cfg(feature = "discrete-inputs")]
313 DiscreteInputs(DiscreteInputs),
314 #[cfg(feature = "fifo")]
315 FifoQueue(FifoQueue),
316 #[cfg(feature = "file-record")]
317 FileRecordRead(Vec<SubRequestParams>),
318 #[cfg(feature = "file-record")]
319 FileRecordWrite,
320 #[cfg(feature = "diagnostics")]
321 DeviceIdentification(DeviceIdentificationResponse),
322 #[cfg(feature = "diagnostics")]
323 EncapsulatedInterfaceTransport {
324 mei_type: EncapsulatedInterfaceType,
325 data: Vec<u8>,
326 },
327 #[cfg(feature = "diagnostics")]
328 ExceptionStatus(u8),
329 #[cfg(feature = "diagnostics")]
330 DiagnosticsData(DiagnosticsDataResponse),
331 #[cfg(feature = "diagnostics")]
332 CommEventCounter {
333 status: u16,
334 event_count: u16,
335 },
336 #[cfg(feature = "diagnostics")]
337 CommEventLog(CommEventLogResponse),
338 #[cfg(feature = "diagnostics")]
339 ReportServerId(Vec<u8>),
340}
341
342struct AsyncApp {
343 pending: PendingStore,
344 #[cfg(feature = "traffic")]
345 traffic_sender: TrafficSender,
346}
347
348impl AsyncApp {
349 fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
350 if let Ok(mut pending) = self.pending.lock()
351 && let Some(sender) = pending.remove(&txn_id)
352 {
353 let _ = sender.send(response);
354 }
355 }
356
357 fn resolve(&self, txn_id: u16, response: WorkerResponse) {
358 self.complete(txn_id, Ok(response));
359 }
360
361 fn reject(&self, txn_id: u16, error: MbusError) {
362 self.complete(txn_id, Err(error));
363 }
364
365 #[cfg(feature = "traffic")]
366 fn emit_traffic(
367 &self,
368 direction: TrafficDirection,
369 txn_id: u16,
370 unit_id_slave_addr: UnitIdOrSlaveAddr,
371 error: Option<MbusError>,
372 frame_bytes: &[u8],
373 ) {
374 let event = TrafficEvent {
375 direction,
376 txn_id,
377 unit_id_slave_addr,
378 frame: frame_bytes.to_vec(),
379 error,
380 };
381
382 let _ = self.traffic_sender.send(event);
383 }
384}
385
386impl TimeKeeper for AsyncApp {
387 fn current_millis(&self) -> u64 {
388 SystemTime::now()
389 .duration_since(UNIX_EPOCH)
390 .map(|d| d.as_millis() as u64)
391 .unwrap_or(0)
392 }
393}
394
395impl RequestErrorNotifier for AsyncApp {
396 fn request_failed(
397 &mut self,
398 txn_id: u16,
399 _unit_id_slave_addr: UnitIdOrSlaveAddr,
400 error: MbusError,
401 ) {
402 self.reject(txn_id, error);
403 }
404}
405
406#[cfg(feature = "traffic")]
407impl TrafficNotifier for AsyncApp {
408 fn on_tx_frame(
409 &mut self,
410 txn_id: u16,
411 unit_id_slave_addr: UnitIdOrSlaveAddr,
412 frame_bytes: &[u8],
413 ) {
414 self.emit_traffic(
415 TrafficDirection::Tx,
416 txn_id,
417 unit_id_slave_addr,
418 None,
419 frame_bytes,
420 );
421 }
422
423 fn on_rx_frame(
424 &mut self,
425 txn_id: u16,
426 unit_id_slave_addr: UnitIdOrSlaveAddr,
427 frame_bytes: &[u8],
428 ) {
429 self.emit_traffic(
430 TrafficDirection::Rx,
431 txn_id,
432 unit_id_slave_addr,
433 None,
434 frame_bytes,
435 );
436 }
437
438 fn on_tx_error(
439 &mut self,
440 txn_id: u16,
441 unit_id_slave_addr: UnitIdOrSlaveAddr,
442 error: MbusError,
443 frame_bytes: &[u8],
444 ) {
445 self.emit_traffic(
446 TrafficDirection::Tx,
447 txn_id,
448 unit_id_slave_addr,
449 Some(error),
450 frame_bytes,
451 );
452 }
453
454 fn on_rx_error(
455 &mut self,
456 txn_id: u16,
457 unit_id_slave_addr: UnitIdOrSlaveAddr,
458 error: MbusError,
459 frame_bytes: &[u8],
460 ) {
461 self.emit_traffic(
462 TrafficDirection::Rx,
463 txn_id,
464 unit_id_slave_addr,
465 Some(error),
466 frame_bytes,
467 );
468 }
469}
470
471#[cfg(feature = "traffic")]
472fn run_traffic_dispatcher(receiver: Receiver<TrafficEvent>, traffic_handler: TrafficHandlerStore) {
473 while let Ok(event) = receiver.recv() {
474 if let Ok(mut handler_slot) = traffic_handler.lock()
475 && let Some(handler) = handler_slot.as_mut()
476 {
477 let _ = catch_unwind(AssertUnwindSafe(|| handler(&event)));
478 }
479 }
480}
481
482#[cfg(feature = "coils")]
483impl CoilResponse for AsyncApp {
484 fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
485 self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
486 }
487
488 fn read_single_coil_response(
489 &mut self,
490 txn_id: u16,
491 _unit: UnitIdOrSlaveAddr,
492 address: u16,
493 value: bool,
494 ) {
495 match Coils::new(address, 1).and_then(|mut c| {
496 c.set_value(address, value)?;
497 Ok(c)
498 }) {
499 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
500 Err(err) => self.reject(txn_id, err),
501 }
502 }
503
504 fn write_single_coil_response(
505 &mut self,
506 txn_id: u16,
507 _unit: UnitIdOrSlaveAddr,
508 address: u16,
509 value: bool,
510 ) {
511 match Coils::new(address, 1).and_then(|mut c| {
512 c.set_value(address, value)?;
513 Ok(c)
514 }) {
515 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
516 Err(err) => self.reject(txn_id, err),
517 }
518 }
519
520 fn write_multiple_coils_response(
521 &mut self,
522 txn_id: u16,
523 _unit: UnitIdOrSlaveAddr,
524 address: u16,
525 quantity: u16,
526 ) {
527 match Coils::new(address, quantity) {
528 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
529 Err(err) => self.reject(txn_id, err),
530 }
531 }
532}
533
534#[cfg(feature = "registers")]
535impl RegisterResponse for AsyncApp {
536 fn read_multiple_input_registers_response(
537 &mut self,
538 txn_id: u16,
539 _unit: UnitIdOrSlaveAddr,
540 registers: &Registers,
541 ) {
542 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
543 }
544
545 fn read_single_input_register_response(
546 &mut self,
547 txn_id: u16,
548 _unit: UnitIdOrSlaveAddr,
549 address: u16,
550 value: u16,
551 ) {
552 match Registers::new(address, 1).and_then(|mut r| {
553 r.set_value(address, value)?;
554 Ok(r)
555 }) {
556 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
557 Err(err) => self.reject(txn_id, err),
558 }
559 }
560
561 fn read_multiple_holding_registers_response(
562 &mut self,
563 txn_id: u16,
564 _unit: UnitIdOrSlaveAddr,
565 registers: &Registers,
566 ) {
567 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
568 }
569
570 fn write_single_register_response(
571 &mut self,
572 txn_id: u16,
573 _unit: UnitIdOrSlaveAddr,
574 address: u16,
575 value: u16,
576 ) {
577 self.resolve(
578 txn_id,
579 WorkerResponse::SingleRegisterWrite { address, value },
580 );
581 }
582
583 fn write_multiple_registers_response(
584 &mut self,
585 txn_id: u16,
586 _unit: UnitIdOrSlaveAddr,
587 starting_address: u16,
588 quantity: u16,
589 ) {
590 match Registers::new(starting_address, quantity) {
591 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
592 Err(err) => self.reject(txn_id, err),
593 }
594 }
595
596 fn read_write_multiple_registers_response(
597 &mut self,
598 txn_id: u16,
599 _unit: UnitIdOrSlaveAddr,
600 registers: &Registers,
601 ) {
602 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
603 }
604
605 fn read_single_holding_register_response(
606 &mut self,
607 txn_id: u16,
608 _unit: UnitIdOrSlaveAddr,
609 address: u16,
610 value: u16,
611 ) {
612 match Registers::new(address, 1).and_then(|mut r| {
613 r.set_value(address, value)?;
614 Ok(r)
615 }) {
616 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
617 Err(err) => self.reject(txn_id, err),
618 }
619 }
620
621 fn read_single_register_response(
622 &mut self,
623 txn_id: u16,
624 _unit: UnitIdOrSlaveAddr,
625 address: u16,
626 value: u16,
627 ) {
628 match Registers::new(address, 1).and_then(|mut r| {
629 r.set_value(address, value)?;
630 Ok(r)
631 }) {
632 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
633 Err(err) => self.reject(txn_id, err),
634 }
635 }
636
637 fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
638 self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
639 }
640}
641
642#[cfg(feature = "discrete-inputs")]
643impl DiscreteInputResponse for AsyncApp {
644 fn read_multiple_discrete_inputs_response(
645 &mut self,
646 txn_id: u16,
647 _unit: UnitIdOrSlaveAddr,
648 discrete_inputs: &DiscreteInputs,
649 ) {
650 self.resolve(
651 txn_id,
652 WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
653 );
654 }
655
656 fn read_single_discrete_input_response(
657 &mut self,
658 txn_id: u16,
659 _unit: UnitIdOrSlaveAddr,
660 address: u16,
661 value: bool,
662 ) {
663 let bit = if value { 0b0000_0001 } else { 0 };
664 match DiscreteInputs::new(address, 1).and_then(|d| d.with_values(&[bit], 1)) {
665 Ok(discrete_inputs) => {
666 self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
667 }
668 Err(err) => self.reject(txn_id, err),
669 }
670 }
671}
672
673#[cfg(feature = "fifo")]
674impl FifoQueueResponse for AsyncApp {
675 fn read_fifo_queue_response(
676 &mut self,
677 txn_id: u16,
678 _unit: UnitIdOrSlaveAddr,
679 fifo_queue: &FifoQueue,
680 ) {
681 self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
682 }
683}
684
685#[cfg(feature = "file-record")]
686impl FileRecordResponse for AsyncApp {
687 fn read_file_record_response(
688 &mut self,
689 txn_id: u16,
690 _unit: UnitIdOrSlaveAddr,
691 data: &[SubRequestParams],
692 ) {
693 self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
694 }
695
696 fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
697 self.resolve(txn_id, WorkerResponse::FileRecordWrite);
698 }
699}
700
701#[cfg(feature = "diagnostics")]
702impl DiagnosticsResponse for AsyncApp {
703 fn read_device_identification_response(
704 &mut self,
705 txn_id: u16,
706 _unit: UnitIdOrSlaveAddr,
707 response: &DeviceIdentificationResponse,
708 ) {
709 self.resolve(
710 txn_id,
711 WorkerResponse::DeviceIdentification(response.clone()),
712 );
713 }
714
715 fn encapsulated_interface_transport_response(
716 &mut self,
717 txn_id: u16,
718 _unit: UnitIdOrSlaveAddr,
719 mei_type: EncapsulatedInterfaceType,
720 data: &[u8],
721 ) {
722 self.resolve(
723 txn_id,
724 WorkerResponse::EncapsulatedInterfaceTransport {
725 mei_type,
726 data: data.to_vec(),
727 },
728 );
729 }
730
731 fn read_exception_status_response(
732 &mut self,
733 txn_id: u16,
734 _unit: UnitIdOrSlaveAddr,
735 status: u8,
736 ) {
737 self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
738 }
739
740 fn diagnostics_response(
741 &mut self,
742 txn_id: u16,
743 _unit: UnitIdOrSlaveAddr,
744 sub_function: DiagnosticSubFunction,
745 data: &[u16],
746 ) {
747 self.resolve(
748 txn_id,
749 WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
750 sub_function,
751 data: data.to_vec(),
752 }),
753 );
754 }
755
756 fn get_comm_event_counter_response(
757 &mut self,
758 txn_id: u16,
759 _unit: UnitIdOrSlaveAddr,
760 status: u16,
761 event_count: u16,
762 ) {
763 self.resolve(
764 txn_id,
765 WorkerResponse::CommEventCounter {
766 status,
767 event_count,
768 },
769 );
770 }
771
772 fn get_comm_event_log_response(
773 &mut self,
774 txn_id: u16,
775 _unit: UnitIdOrSlaveAddr,
776 status: u16,
777 event_count: u16,
778 message_count: u16,
779 events: &[u8],
780 ) {
781 self.resolve(
782 txn_id,
783 WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
784 );
785 }
786
787 fn report_server_id_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, data: &[u8]) {
788 self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
789 }
790}
791
792fn register_pending(
793 pending: &PendingStore,
794 txn_id: u16,
795 sender: PendingSender,
796) -> Result<(), MbusError> {
797 let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
798 guard.insert(txn_id, sender);
799 Ok(())
800}
801
802fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
803 if let Ok(mut guard) = pending.lock()
804 && let Some(sender) = guard.remove(&txn_id)
805 {
806 let _ = sender.send(Err(error));
807 }
808}
809
810fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
811 if let Err(err) = result {
812 reject_pending(pending, txn_id, err);
813 }
814}
815
816fn handle_command<TRANSPORT, const N: usize>(
817 client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
818 pending: &PendingStore,
819 command: WorkerCommand,
820) where
821 TRANSPORT: Transport,
822{
823 match command {
824 WorkerCommand::Connect { sender } => {
825 let _ = sender.send(client.connect().map(|_| WorkerResponse::Ack));
826 }
827 #[cfg(feature = "coils")]
828 WorkerCommand::ReadMultipleCoils {
829 txn_id,
830 unit,
831 address,
832 quantity,
833 sender,
834 } => {
835 if register_pending(pending, txn_id, sender).is_ok() {
836 let result = client.read_multiple_coils(txn_id, unit, address, quantity);
837 submit_or_reject(pending, txn_id, result);
838 }
839 }
840 #[cfg(feature = "registers")]
841 WorkerCommand::ReadHoldingRegisters {
842 txn_id,
843 unit,
844 address,
845 quantity,
846 sender,
847 } => {
848 if register_pending(pending, txn_id, sender).is_ok() {
849 let result = client.read_holding_registers(txn_id, unit, address, quantity);
850 submit_or_reject(pending, txn_id, result);
851 }
852 }
853 #[cfg(feature = "registers")]
854 WorkerCommand::ReadInputRegisters {
855 txn_id,
856 unit,
857 address,
858 quantity,
859 sender,
860 } => {
861 if register_pending(pending, txn_id, sender).is_ok() {
862 let result = client.read_input_registers(txn_id, unit, address, quantity);
863 submit_or_reject(pending, txn_id, result);
864 }
865 }
866 #[cfg(feature = "registers")]
867 WorkerCommand::WriteSingleRegister {
868 txn_id,
869 unit,
870 address,
871 value,
872 sender,
873 } => {
874 if register_pending(pending, txn_id, sender).is_ok() {
875 let result = client.write_single_register(txn_id, unit, address, value);
876 submit_or_reject(pending, txn_id, result);
877 }
878 }
879 #[cfg(feature = "coils")]
880 WorkerCommand::WriteSingleCoil {
881 txn_id,
882 unit,
883 address,
884 value,
885 sender,
886 } => {
887 if register_pending(pending, txn_id, sender).is_ok() {
888 let result = client.write_single_coil(txn_id, unit, address, value);
889 submit_or_reject(pending, txn_id, result);
890 }
891 }
892 #[cfg(feature = "coils")]
893 WorkerCommand::WriteMultipleCoils {
894 txn_id,
895 unit,
896 address,
897 coils,
898 sender,
899 } => {
900 if register_pending(pending, txn_id, sender).is_ok() {
901 let result = client.write_multiple_coils(txn_id, unit, address, &coils);
902 submit_or_reject(pending, txn_id, result);
903 }
904 }
905 #[cfg(feature = "registers")]
906 WorkerCommand::WriteMultipleRegisters {
907 txn_id,
908 unit,
909 address,
910 values,
911 sender,
912 } => {
913 if register_pending(pending, txn_id, sender).is_ok() {
914 let result = client.write_multiple_registers(
915 txn_id,
916 unit,
917 address,
918 values.len() as u16,
919 &values,
920 );
921 submit_or_reject(pending, txn_id, result);
922 }
923 }
924 #[cfg(feature = "registers")]
925 WorkerCommand::ReadWriteMultipleRegisters {
926 txn_id,
927 unit,
928 read_address,
929 read_quantity,
930 write_address,
931 write_values,
932 sender,
933 } => {
934 if register_pending(pending, txn_id, sender).is_ok() {
935 let result = client.read_write_multiple_registers(
936 txn_id,
937 unit,
938 read_address,
939 read_quantity,
940 write_address,
941 &write_values,
942 );
943 submit_or_reject(pending, txn_id, result);
944 }
945 }
946 #[cfg(feature = "registers")]
947 WorkerCommand::MaskWriteRegister {
948 txn_id,
949 unit,
950 address,
951 and_mask,
952 or_mask,
953 sender,
954 } => {
955 if register_pending(pending, txn_id, sender).is_ok() {
956 let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
957 submit_or_reject(pending, txn_id, result);
958 }
959 }
960 #[cfg(feature = "discrete-inputs")]
961 WorkerCommand::ReadDiscreteInputs {
962 txn_id,
963 unit,
964 address,
965 quantity,
966 sender,
967 } => {
968 if register_pending(pending, txn_id, sender).is_ok() {
969 let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
970 submit_or_reject(pending, txn_id, result);
971 }
972 }
973 #[cfg(feature = "fifo")]
974 WorkerCommand::ReadFifoQueue {
975 txn_id,
976 unit,
977 address,
978 sender,
979 } => {
980 if register_pending(pending, txn_id, sender).is_ok() {
981 let result = client.read_fifo_queue(txn_id, unit, address);
982 submit_or_reject(pending, txn_id, result);
983 }
984 }
985 #[cfg(feature = "file-record")]
986 WorkerCommand::ReadFileRecord {
987 txn_id,
988 unit,
989 sub_request,
990 sender,
991 } => {
992 if register_pending(pending, txn_id, sender).is_ok() {
993 let result = client.read_file_record(txn_id, unit, &sub_request);
994 submit_or_reject(pending, txn_id, result);
995 }
996 }
997 #[cfg(feature = "file-record")]
998 WorkerCommand::WriteFileRecord {
999 txn_id,
1000 unit,
1001 sub_request,
1002 sender,
1003 } => {
1004 if register_pending(pending, txn_id, sender).is_ok() {
1005 let result = client.write_file_record(txn_id, unit, &sub_request);
1006 submit_or_reject(pending, txn_id, result);
1007 }
1008 }
1009 #[cfg(feature = "diagnostics")]
1010 WorkerCommand::ReadDeviceIdentification {
1011 txn_id,
1012 unit,
1013 read_device_id_code,
1014 object_id,
1015 sender,
1016 } => {
1017 if register_pending(pending, txn_id, sender).is_ok() {
1018 let result =
1019 client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
1020 submit_or_reject(pending, txn_id, result);
1021 }
1022 }
1023 #[cfg(feature = "diagnostics")]
1024 WorkerCommand::EncapsulatedInterfaceTransport {
1025 txn_id,
1026 unit,
1027 mei_type,
1028 data,
1029 sender,
1030 } => {
1031 if register_pending(pending, txn_id, sender).is_ok() {
1032 let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
1033 submit_or_reject(pending, txn_id, result);
1034 }
1035 }
1036 #[cfg(feature = "diagnostics")]
1037 WorkerCommand::ReadExceptionStatus {
1038 txn_id,
1039 unit,
1040 sender,
1041 } => {
1042 if register_pending(pending, txn_id, sender).is_ok() {
1043 let result = client.read_exception_status(txn_id, unit);
1044 submit_or_reject(pending, txn_id, result);
1045 }
1046 }
1047 #[cfg(feature = "diagnostics")]
1048 WorkerCommand::Diagnostics {
1049 txn_id,
1050 unit,
1051 sub_function,
1052 data,
1053 sender,
1054 } => {
1055 if register_pending(pending, txn_id, sender).is_ok() {
1056 let result = client.diagnostics(txn_id, unit, sub_function, &data);
1057 submit_or_reject(pending, txn_id, result);
1058 }
1059 }
1060 #[cfg(feature = "diagnostics")]
1061 WorkerCommand::GetCommEventCounter {
1062 txn_id,
1063 unit,
1064 sender,
1065 } => {
1066 if register_pending(pending, txn_id, sender).is_ok() {
1067 let result = client.get_comm_event_counter(txn_id, unit);
1068 submit_or_reject(pending, txn_id, result);
1069 }
1070 }
1071 #[cfg(feature = "diagnostics")]
1072 WorkerCommand::GetCommEventLog {
1073 txn_id,
1074 unit,
1075 sender,
1076 } => {
1077 if register_pending(pending, txn_id, sender).is_ok() {
1078 let result = client.get_comm_event_log(txn_id, unit);
1079 submit_or_reject(pending, txn_id, result);
1080 }
1081 }
1082 #[cfg(feature = "diagnostics")]
1083 WorkerCommand::ReportServerId {
1084 txn_id,
1085 unit,
1086 sender,
1087 } => {
1088 if register_pending(pending, txn_id, sender).is_ok() {
1089 let result = client.report_server_id(txn_id, unit);
1090 submit_or_reject(pending, txn_id, result);
1091 }
1092 }
1093 WorkerCommand::Shutdown => {}
1094 }
1095}
1096
1097fn run_worker<TRANSPORT, const N: usize>(
1098 mut client: ClientServices<TRANSPORT, AsyncApp, N>,
1099 pending: PendingStore,
1100 receiver: Receiver<WorkerCommand>,
1101 poll_interval: Duration,
1102) where
1103 TRANSPORT: Transport,
1104{
1105 loop {
1106 match receiver.recv_timeout(poll_interval) {
1110 Ok(WorkerCommand::Shutdown) => return,
1111 Ok(command) => {
1112 handle_command(&mut client, &pending, command);
1113 loop {
1114 match receiver.try_recv() {
1115 Ok(WorkerCommand::Shutdown) => return,
1116 Ok(command) => handle_command(&mut client, &pending, command),
1117 Err(mpsc::TryRecvError::Empty) => break,
1118 Err(mpsc::TryRecvError::Disconnected) => return,
1119 }
1120 }
1121 }
1122 Err(mpsc::RecvTimeoutError::Timeout) => {}
1123 Err(mpsc::RecvTimeoutError::Disconnected) => return,
1124 }
1125
1126 let has_pending = pending
1127 .lock()
1128 .map(|store| !store.is_empty())
1129 .unwrap_or(false);
1130
1131 if client.is_connected() && has_pending {
1132 client.poll();
1133 }
1134 }
1135}
1136
1137mod client_core;
1140mod network_client;
1141mod serial_client;
1142
1143pub(crate) use client_core::AsyncClientCore;
1144pub use network_client::AsyncTcpClient;
1145pub use serial_client::AsyncSerialClient;
1146
1147#[cfg(all(test, feature = "traffic"))]
1148mod tests {
1149 use super::*;
1150
1151 #[test]
1152 fn test_async_app_emits_traffic_event_to_channel() {
1153 let pending = Arc::new(Mutex::new(HashMap::new()));
1154 let (traffic_sender, traffic_receiver) = mpsc::channel();
1155
1156 let mut app = AsyncApp {
1157 pending,
1158 traffic_sender,
1159 };
1160
1161 let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1162 app.on_tx_frame(42, unit, &[0xAA, 0x55]);
1163
1164 let event = traffic_receiver
1165 .recv_timeout(Duration::from_millis(100))
1166 .unwrap();
1167 assert_eq!(event.direction, TrafficDirection::Tx);
1168 assert_eq!(event.txn_id, 42);
1169 assert_eq!(event.unit_id_slave_addr, unit);
1170 assert_eq!(event.frame, vec![0xAA, 0x55]);
1171 assert_eq!(event.error, None);
1172 }
1173
1174 #[test]
1175 fn test_async_app_emits_traffic_error_event_to_channel() {
1176 let pending = Arc::new(Mutex::new(HashMap::new()));
1177 let (traffic_sender, traffic_receiver) = mpsc::channel();
1178
1179 let mut app = AsyncApp {
1180 pending,
1181 traffic_sender,
1182 };
1183
1184 let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1185 app.on_rx_error(77, unit, MbusError::ChecksumError, &[0xAB]);
1186
1187 let event = traffic_receiver
1188 .recv_timeout(Duration::from_millis(100))
1189 .unwrap();
1190 assert_eq!(event.direction, TrafficDirection::Rx);
1191 assert_eq!(event.txn_id, 77);
1192 assert_eq!(event.unit_id_slave_addr, unit);
1193 assert_eq!(event.frame, vec![0xAB]);
1194 assert_eq!(event.error, Some(MbusError::ChecksumError));
1195 }
1196
1197 #[test]
1198 fn test_async_client_core_set_and_clear_traffic_handler() {
1199 let (sender, _receiver) = mpsc::channel();
1200 let traffic_handler: TrafficHandlerStore = Arc::new(Mutex::new(None));
1201 let core = AsyncClientCore::new(sender, traffic_handler.clone());
1202
1203 core.set_traffic_handler(|_evt| {});
1204 assert!(traffic_handler.lock().unwrap().is_some());
1205
1206 core.clear_traffic_handler();
1207 assert!(traffic_handler.lock().unwrap().is_none());
1208 }
1209}