1use std::collections::HashMap;
16#[cfg(feature = "traffic")]
17use std::panic::{AssertUnwindSafe, catch_unwind};
18use std::sync::atomic::{AtomicU16, Ordering};
19use std::sync::mpsc::{Receiver, Sender};
20use std::sync::{Arc, Mutex, mpsc};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::app::RequestErrorNotifier;
37#[cfg(feature = "traffic")]
38use mbus_client::app::{TrafficDirection, TrafficNotifier};
39use mbus_client::services::ClientServices;
40#[cfg(feature = "coils")]
41use mbus_client::services::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_client::services::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_client::services::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_client::services::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_client::services::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "registers")]
51use mbus_client::services::register::Registers;
52use mbus_core::errors::MbusError;
53#[cfg(feature = "diagnostics")]
54use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
55#[cfg(feature = "tcp")]
56use mbus_core::transport::ModbusTcpConfig;
57use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
58#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
59use mbus_core::transport::{ModbusSerialConfig, SerialMode};
60#[cfg(feature = "tcp")]
61use mbus_network::StdTcpTransport;
62#[cfg(feature = "serial-ascii")]
63use mbus_serial::StdAsciiTransport;
64#[cfg(feature = "serial-rtu")]
65use mbus_serial::StdRtuTransport;
66#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
67use mbus_serial::StdSerialTransport;
68use tokio::sync::oneshot;
69
70#[cfg(feature = "diagnostics")]
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct DiagnosticsDataResponse {
74 pub sub_function: DiagnosticSubFunction,
76 pub data: Vec<u16>,
78}
79#[cfg(feature = "diagnostics")]
80pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
82
83#[derive(Debug, PartialEq, Eq)]
85pub enum AsyncError {
86 Mbus(MbusError),
88 WorkerClosed,
90 UnexpectedResponseType,
92}
93
94impl From<MbusError> for AsyncError {
95 fn from(value: MbusError) -> Self {
96 Self::Mbus(value)
97 }
98}
99
100impl std::fmt::Display for AsyncError {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 match self {
103 Self::Mbus(err) => write!(f, "Modbus error: {err}"),
104 Self::WorkerClosed => write!(f, "async worker channel closed"),
105 Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
106 }
107 }
108}
109
110impl std::error::Error for AsyncError {
111 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112 match self {
113 Self::Mbus(err) => Some(err),
114 _ => None,
115 }
116 }
117}
118
119type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
120type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
121#[cfg(feature = "traffic")]
122type TrafficHandler = Box<dyn FnMut(&TrafficEvent) + Send + 'static>;
123#[cfg(feature = "traffic")]
124type TrafficHandlerStore = Arc<Mutex<Option<TrafficHandler>>>;
125#[cfg(feature = "traffic")]
126type TrafficSender = Sender<TrafficEvent>;
127
128#[cfg(feature = "traffic")]
129#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct TrafficEvent {
132 pub direction: TrafficDirection,
134 pub txn_id: u16,
136 pub unit_id_slave_addr: UnitIdOrSlaveAddr,
138 pub frame: Vec<u8>,
140 pub error: Option<MbusError>,
142}
143
144enum WorkerCommand {
145 Connect {
146 sender: PendingSender,
147 },
148 HasPendingRequests {
149 sender: PendingSender,
150 },
151 #[cfg(feature = "coils")]
152 ReadMultipleCoils {
153 txn_id: u16,
154 unit: UnitIdOrSlaveAddr,
155 address: u16,
156 quantity: u16,
157 sender: PendingSender,
158 },
159 #[cfg(feature = "registers")]
160 ReadHoldingRegisters {
161 txn_id: u16,
162 unit: UnitIdOrSlaveAddr,
163 address: u16,
164 quantity: u16,
165 sender: PendingSender,
166 },
167 #[cfg(feature = "registers")]
168 ReadInputRegisters {
169 txn_id: u16,
170 unit: UnitIdOrSlaveAddr,
171 address: u16,
172 quantity: u16,
173 sender: PendingSender,
174 },
175 #[cfg(feature = "registers")]
176 WriteSingleRegister {
177 txn_id: u16,
178 unit: UnitIdOrSlaveAddr,
179 address: u16,
180 value: u16,
181 sender: PendingSender,
182 },
183 #[cfg(feature = "coils")]
184 WriteSingleCoil {
185 txn_id: u16,
186 unit: UnitIdOrSlaveAddr,
187 address: u16,
188 value: bool,
189 sender: PendingSender,
190 },
191 #[cfg(feature = "coils")]
192 WriteMultipleCoils {
193 txn_id: u16,
194 unit: UnitIdOrSlaveAddr,
195 address: u16,
196 coils: Coils,
197 sender: PendingSender,
198 },
199 #[cfg(feature = "registers")]
200 WriteMultipleRegisters {
201 txn_id: u16,
202 unit: UnitIdOrSlaveAddr,
203 address: u16,
204 values: Vec<u16>,
205 sender: PendingSender,
206 },
207 #[cfg(feature = "registers")]
208 ReadWriteMultipleRegisters {
209 txn_id: u16,
210 unit: UnitIdOrSlaveAddr,
211 read_address: u16,
212 read_quantity: u16,
213 write_address: u16,
214 write_values: Vec<u16>,
215 sender: PendingSender,
216 },
217 #[cfg(feature = "registers")]
218 MaskWriteRegister {
219 txn_id: u16,
220 unit: UnitIdOrSlaveAddr,
221 address: u16,
222 and_mask: u16,
223 or_mask: u16,
224 sender: PendingSender,
225 },
226 #[cfg(feature = "discrete-inputs")]
227 ReadDiscreteInputs {
228 txn_id: u16,
229 unit: UnitIdOrSlaveAddr,
230 address: u16,
231 quantity: u16,
232 sender: PendingSender,
233 },
234 #[cfg(feature = "fifo")]
235 ReadFifoQueue {
236 txn_id: u16,
237 unit: UnitIdOrSlaveAddr,
238 address: u16,
239 sender: PendingSender,
240 },
241 #[cfg(feature = "file-record")]
242 ReadFileRecord {
243 txn_id: u16,
244 unit: UnitIdOrSlaveAddr,
245 sub_request: SubRequest,
246 sender: PendingSender,
247 },
248 #[cfg(feature = "file-record")]
249 WriteFileRecord {
250 txn_id: u16,
251 unit: UnitIdOrSlaveAddr,
252 sub_request: SubRequest,
253 sender: PendingSender,
254 },
255 #[cfg(feature = "diagnostics")]
256 ReadDeviceIdentification {
257 txn_id: u16,
258 unit: UnitIdOrSlaveAddr,
259 read_device_id_code: ReadDeviceIdCode,
260 object_id: ObjectId,
261 sender: PendingSender,
262 },
263 #[cfg(feature = "diagnostics")]
264 EncapsulatedInterfaceTransport {
265 txn_id: u16,
266 unit: UnitIdOrSlaveAddr,
267 mei_type: EncapsulatedInterfaceType,
268 data: Vec<u8>,
269 sender: PendingSender,
270 },
271 #[cfg(feature = "diagnostics")]
272 ReadExceptionStatus {
273 txn_id: u16,
274 unit: UnitIdOrSlaveAddr,
275 sender: PendingSender,
276 },
277 #[cfg(feature = "diagnostics")]
278 Diagnostics {
279 txn_id: u16,
280 unit: UnitIdOrSlaveAddr,
281 sub_function: DiagnosticSubFunction,
282 data: Vec<u16>,
283 sender: PendingSender,
284 },
285 #[cfg(feature = "diagnostics")]
286 GetCommEventCounter {
287 txn_id: u16,
288 unit: UnitIdOrSlaveAddr,
289 sender: PendingSender,
290 },
291 #[cfg(feature = "diagnostics")]
292 GetCommEventLog {
293 txn_id: u16,
294 unit: UnitIdOrSlaveAddr,
295 sender: PendingSender,
296 },
297 #[cfg(feature = "diagnostics")]
298 ReportServerId {
299 txn_id: u16,
300 unit: UnitIdOrSlaveAddr,
301 sender: PendingSender,
302 },
303 Shutdown,
304}
305
306enum WorkerResponse {
307 Ack,
308 HasPendingRequests(bool),
309 #[cfg(feature = "coils")]
310 Coils(Coils),
311 #[cfg(feature = "registers")]
312 Registers(Registers),
313 #[cfg(feature = "registers")]
314 SingleRegisterWrite {
315 address: u16,
316 value: u16,
317 },
318 #[cfg(feature = "registers")]
319 MaskWriteRegister,
320 #[cfg(feature = "discrete-inputs")]
321 DiscreteInputs(DiscreteInputs),
322 #[cfg(feature = "fifo")]
323 FifoQueue(FifoQueue),
324 #[cfg(feature = "file-record")]
325 FileRecordRead(Vec<SubRequestParams>),
326 #[cfg(feature = "file-record")]
327 FileRecordWrite,
328 #[cfg(feature = "diagnostics")]
329 DeviceIdentification(DeviceIdentificationResponse),
330 #[cfg(feature = "diagnostics")]
331 EncapsulatedInterfaceTransport {
332 mei_type: EncapsulatedInterfaceType,
333 data: Vec<u8>,
334 },
335 #[cfg(feature = "diagnostics")]
336 ExceptionStatus(u8),
337 #[cfg(feature = "diagnostics")]
338 DiagnosticsData(DiagnosticsDataResponse),
339 #[cfg(feature = "diagnostics")]
340 CommEventCounter {
341 status: u16,
342 event_count: u16,
343 },
344 #[cfg(feature = "diagnostics")]
345 CommEventLog(CommEventLogResponse),
346 #[cfg(feature = "diagnostics")]
347 ReportServerId(Vec<u8>),
348}
349
350struct AsyncApp {
351 pending: PendingStore,
352 #[cfg(feature = "traffic")]
353 traffic_sender: TrafficSender,
354}
355
356impl AsyncApp {
357 fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
358 if let Ok(mut pending) = self.pending.lock()
359 && let Some(sender) = pending.remove(&txn_id)
360 {
361 let _ = sender.send(response);
362 }
363 }
364
365 fn resolve(&self, txn_id: u16, response: WorkerResponse) {
366 self.complete(txn_id, Ok(response));
367 }
368
369 fn reject(&self, txn_id: u16, error: MbusError) {
370 self.complete(txn_id, Err(error));
371 }
372
373 #[cfg(feature = "traffic")]
374 fn emit_traffic(
375 &self,
376 direction: TrafficDirection,
377 txn_id: u16,
378 unit_id_slave_addr: UnitIdOrSlaveAddr,
379 error: Option<MbusError>,
380 frame_bytes: &[u8],
381 ) {
382 let event = TrafficEvent {
383 direction,
384 txn_id,
385 unit_id_slave_addr,
386 frame: frame_bytes.to_vec(),
387 error,
388 };
389
390 let _ = self.traffic_sender.send(event);
391 }
392}
393
394impl TimeKeeper for AsyncApp {
395 fn current_millis(&self) -> u64 {
396 SystemTime::now()
397 .duration_since(UNIX_EPOCH)
398 .map(|d| d.as_millis() as u64)
399 .unwrap_or(0)
400 }
401}
402
403impl RequestErrorNotifier for AsyncApp {
404 fn request_failed(
405 &mut self,
406 txn_id: u16,
407 _unit_id_slave_addr: UnitIdOrSlaveAddr,
408 error: MbusError,
409 ) {
410 self.reject(txn_id, error);
411 }
412}
413
414#[cfg(feature = "traffic")]
415impl TrafficNotifier for AsyncApp {
416 fn on_tx_frame(
417 &mut self,
418 txn_id: u16,
419 unit_id_slave_addr: UnitIdOrSlaveAddr,
420 frame_bytes: &[u8],
421 ) {
422 self.emit_traffic(
423 TrafficDirection::Tx,
424 txn_id,
425 unit_id_slave_addr,
426 None,
427 frame_bytes,
428 );
429 }
430
431 fn on_rx_frame(
432 &mut self,
433 txn_id: u16,
434 unit_id_slave_addr: UnitIdOrSlaveAddr,
435 frame_bytes: &[u8],
436 ) {
437 self.emit_traffic(
438 TrafficDirection::Rx,
439 txn_id,
440 unit_id_slave_addr,
441 None,
442 frame_bytes,
443 );
444 }
445
446 fn on_tx_error(
447 &mut self,
448 txn_id: u16,
449 unit_id_slave_addr: UnitIdOrSlaveAddr,
450 error: MbusError,
451 frame_bytes: &[u8],
452 ) {
453 self.emit_traffic(
454 TrafficDirection::Tx,
455 txn_id,
456 unit_id_slave_addr,
457 Some(error),
458 frame_bytes,
459 );
460 }
461
462 fn on_rx_error(
463 &mut self,
464 txn_id: u16,
465 unit_id_slave_addr: UnitIdOrSlaveAddr,
466 error: MbusError,
467 frame_bytes: &[u8],
468 ) {
469 self.emit_traffic(
470 TrafficDirection::Rx,
471 txn_id,
472 unit_id_slave_addr,
473 Some(error),
474 frame_bytes,
475 );
476 }
477}
478
479#[cfg(feature = "traffic")]
480fn run_traffic_dispatcher(receiver: Receiver<TrafficEvent>, traffic_handler: TrafficHandlerStore) {
481 while let Ok(event) = receiver.recv() {
482 if let Ok(mut handler_slot) = traffic_handler.lock()
483 && let Some(handler) = handler_slot.as_mut()
484 {
485 let _ = catch_unwind(AssertUnwindSafe(|| handler(&event)));
486 }
487 }
488}
489
490#[cfg(feature = "coils")]
491impl CoilResponse for AsyncApp {
492 fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
493 self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
494 }
495
496 fn read_single_coil_response(
497 &mut self,
498 txn_id: u16,
499 _unit: UnitIdOrSlaveAddr,
500 address: u16,
501 value: bool,
502 ) {
503 match Coils::new(address, 1).and_then(|mut c| {
504 c.set_value(address, value)?;
505 Ok(c)
506 }) {
507 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
508 Err(err) => self.reject(txn_id, err),
509 }
510 }
511
512 fn write_single_coil_response(
513 &mut self,
514 txn_id: u16,
515 _unit: UnitIdOrSlaveAddr,
516 address: u16,
517 value: bool,
518 ) {
519 match Coils::new(address, 1).and_then(|mut c| {
520 c.set_value(address, value)?;
521 Ok(c)
522 }) {
523 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
524 Err(err) => self.reject(txn_id, err),
525 }
526 }
527
528 fn write_multiple_coils_response(
529 &mut self,
530 txn_id: u16,
531 _unit: UnitIdOrSlaveAddr,
532 address: u16,
533 quantity: u16,
534 ) {
535 match Coils::new(address, quantity) {
536 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
537 Err(err) => self.reject(txn_id, err),
538 }
539 }
540}
541
542#[cfg(feature = "registers")]
543impl RegisterResponse for AsyncApp {
544 fn read_multiple_input_registers_response(
545 &mut self,
546 txn_id: u16,
547 _unit: UnitIdOrSlaveAddr,
548 registers: &Registers,
549 ) {
550 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
551 }
552
553 fn read_single_input_register_response(
554 &mut self,
555 txn_id: u16,
556 _unit: UnitIdOrSlaveAddr,
557 address: u16,
558 value: u16,
559 ) {
560 match Registers::new(address, 1).and_then(|mut r| {
561 r.set_value(address, value)?;
562 Ok(r)
563 }) {
564 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
565 Err(err) => self.reject(txn_id, err),
566 }
567 }
568
569 fn read_multiple_holding_registers_response(
570 &mut self,
571 txn_id: u16,
572 _unit: UnitIdOrSlaveAddr,
573 registers: &Registers,
574 ) {
575 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
576 }
577
578 fn write_single_register_response(
579 &mut self,
580 txn_id: u16,
581 _unit: UnitIdOrSlaveAddr,
582 address: u16,
583 value: u16,
584 ) {
585 self.resolve(
586 txn_id,
587 WorkerResponse::SingleRegisterWrite { address, value },
588 );
589 }
590
591 fn write_multiple_registers_response(
592 &mut self,
593 txn_id: u16,
594 _unit: UnitIdOrSlaveAddr,
595 starting_address: u16,
596 quantity: u16,
597 ) {
598 match Registers::new(starting_address, quantity) {
599 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
600 Err(err) => self.reject(txn_id, err),
601 }
602 }
603
604 fn read_write_multiple_registers_response(
605 &mut self,
606 txn_id: u16,
607 _unit: UnitIdOrSlaveAddr,
608 registers: &Registers,
609 ) {
610 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
611 }
612
613 fn read_single_holding_register_response(
614 &mut self,
615 txn_id: u16,
616 _unit: UnitIdOrSlaveAddr,
617 address: u16,
618 value: u16,
619 ) {
620 match Registers::new(address, 1).and_then(|mut r| {
621 r.set_value(address, value)?;
622 Ok(r)
623 }) {
624 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
625 Err(err) => self.reject(txn_id, err),
626 }
627 }
628
629 fn read_single_register_response(
630 &mut self,
631 txn_id: u16,
632 _unit: UnitIdOrSlaveAddr,
633 address: u16,
634 value: u16,
635 ) {
636 match Registers::new(address, 1).and_then(|mut r| {
637 r.set_value(address, value)?;
638 Ok(r)
639 }) {
640 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
641 Err(err) => self.reject(txn_id, err),
642 }
643 }
644
645 fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
646 self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
647 }
648}
649
650#[cfg(feature = "discrete-inputs")]
651impl DiscreteInputResponse for AsyncApp {
652 fn read_multiple_discrete_inputs_response(
653 &mut self,
654 txn_id: u16,
655 _unit: UnitIdOrSlaveAddr,
656 discrete_inputs: &DiscreteInputs,
657 ) {
658 self.resolve(
659 txn_id,
660 WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
661 );
662 }
663
664 fn read_single_discrete_input_response(
665 &mut self,
666 txn_id: u16,
667 _unit: UnitIdOrSlaveAddr,
668 address: u16,
669 value: bool,
670 ) {
671 let bit = if value { 0b0000_0001 } else { 0 };
672 match DiscreteInputs::new(address, 1).and_then(|d| d.with_values(&[bit], 1)) {
673 Ok(discrete_inputs) => {
674 self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
675 }
676 Err(err) => self.reject(txn_id, err),
677 }
678 }
679}
680
681#[cfg(feature = "fifo")]
682impl FifoQueueResponse for AsyncApp {
683 fn read_fifo_queue_response(
684 &mut self,
685 txn_id: u16,
686 _unit: UnitIdOrSlaveAddr,
687 fifo_queue: &FifoQueue,
688 ) {
689 self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
690 }
691}
692
693#[cfg(feature = "file-record")]
694impl FileRecordResponse for AsyncApp {
695 fn read_file_record_response(
696 &mut self,
697 txn_id: u16,
698 _unit: UnitIdOrSlaveAddr,
699 data: &[SubRequestParams],
700 ) {
701 self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
702 }
703
704 fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
705 self.resolve(txn_id, WorkerResponse::FileRecordWrite);
706 }
707}
708
709#[cfg(feature = "diagnostics")]
710impl DiagnosticsResponse for AsyncApp {
711 fn read_device_identification_response(
712 &mut self,
713 txn_id: u16,
714 _unit: UnitIdOrSlaveAddr,
715 response: &DeviceIdentificationResponse,
716 ) {
717 self.resolve(
718 txn_id,
719 WorkerResponse::DeviceIdentification(response.clone()),
720 );
721 }
722
723 fn encapsulated_interface_transport_response(
724 &mut self,
725 txn_id: u16,
726 _unit: UnitIdOrSlaveAddr,
727 mei_type: EncapsulatedInterfaceType,
728 data: &[u8],
729 ) {
730 self.resolve(
731 txn_id,
732 WorkerResponse::EncapsulatedInterfaceTransport {
733 mei_type,
734 data: data.to_vec(),
735 },
736 );
737 }
738
739 fn read_exception_status_response(
740 &mut self,
741 txn_id: u16,
742 _unit: UnitIdOrSlaveAddr,
743 status: u8,
744 ) {
745 self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
746 }
747
748 fn diagnostics_response(
749 &mut self,
750 txn_id: u16,
751 _unit: UnitIdOrSlaveAddr,
752 sub_function: DiagnosticSubFunction,
753 data: &[u16],
754 ) {
755 self.resolve(
756 txn_id,
757 WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
758 sub_function,
759 data: data.to_vec(),
760 }),
761 );
762 }
763
764 fn get_comm_event_counter_response(
765 &mut self,
766 txn_id: u16,
767 _unit: UnitIdOrSlaveAddr,
768 status: u16,
769 event_count: u16,
770 ) {
771 self.resolve(
772 txn_id,
773 WorkerResponse::CommEventCounter {
774 status,
775 event_count,
776 },
777 );
778 }
779
780 fn get_comm_event_log_response(
781 &mut self,
782 txn_id: u16,
783 _unit: UnitIdOrSlaveAddr,
784 status: u16,
785 event_count: u16,
786 message_count: u16,
787 events: &[u8],
788 ) {
789 self.resolve(
790 txn_id,
791 WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
792 );
793 }
794
795 fn report_server_id_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, data: &[u8]) {
796 self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
797 }
798}
799
800fn register_pending(
801 pending: &PendingStore,
802 txn_id: u16,
803 sender: PendingSender,
804) -> Result<(), MbusError> {
805 let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
806 guard.insert(txn_id, sender);
807 Ok(())
808}
809
810fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
811 if let Ok(mut guard) = pending.lock()
812 && let Some(sender) = guard.remove(&txn_id)
813 {
814 let _ = sender.send(Err(error));
815 }
816}
817
818fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
819 if let Err(err) = result {
820 reject_pending(pending, txn_id, err);
821 }
822}
823
824fn handle_command<TRANSPORT, const N: usize>(
825 client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
826 pending: &PendingStore,
827 command: WorkerCommand,
828) where
829 TRANSPORT: Transport,
830{
831 match command {
832 WorkerCommand::Connect { sender } => {
833 let _ = sender.send(client.connect().map(|_| WorkerResponse::Ack));
834 }
835 WorkerCommand::HasPendingRequests { sender } => {
836 let _ = sender.send(Ok(WorkerResponse::HasPendingRequests(
837 client.has_pending_requests(),
838 )));
839 }
840 #[cfg(feature = "coils")]
841 WorkerCommand::ReadMultipleCoils {
842 txn_id,
843 unit,
844 address,
845 quantity,
846 sender,
847 } => {
848 if register_pending(pending, txn_id, sender).is_ok() {
849 let result = client.read_multiple_coils(txn_id, unit, address, quantity);
850 submit_or_reject(pending, txn_id, result);
851 }
852 }
853 #[cfg(feature = "registers")]
854 WorkerCommand::ReadHoldingRegisters {
855 txn_id,
856 unit,
857 address,
858 quantity,
859 sender,
860 } => {
861 if register_pending(pending, txn_id, sender).is_ok() {
862 let result = client.read_holding_registers(txn_id, unit, address, quantity);
863 submit_or_reject(pending, txn_id, result);
864 }
865 }
866 #[cfg(feature = "registers")]
867 WorkerCommand::ReadInputRegisters {
868 txn_id,
869 unit,
870 address,
871 quantity,
872 sender,
873 } => {
874 if register_pending(pending, txn_id, sender).is_ok() {
875 let result = client.read_input_registers(txn_id, unit, address, quantity);
876 submit_or_reject(pending, txn_id, result);
877 }
878 }
879 #[cfg(feature = "registers")]
880 WorkerCommand::WriteSingleRegister {
881 txn_id,
882 unit,
883 address,
884 value,
885 sender,
886 } => {
887 if register_pending(pending, txn_id, sender).is_ok() {
888 let result = client.write_single_register(txn_id, unit, address, value);
889 submit_or_reject(pending, txn_id, result);
890 }
891 }
892 #[cfg(feature = "coils")]
893 WorkerCommand::WriteSingleCoil {
894 txn_id,
895 unit,
896 address,
897 value,
898 sender,
899 } => {
900 if register_pending(pending, txn_id, sender).is_ok() {
901 let result = client.write_single_coil(txn_id, unit, address, value);
902 submit_or_reject(pending, txn_id, result);
903 }
904 }
905 #[cfg(feature = "coils")]
906 WorkerCommand::WriteMultipleCoils {
907 txn_id,
908 unit,
909 address,
910 coils,
911 sender,
912 } => {
913 if register_pending(pending, txn_id, sender).is_ok() {
914 let result = client.write_multiple_coils(txn_id, unit, address, &coils);
915 submit_or_reject(pending, txn_id, result);
916 }
917 }
918 #[cfg(feature = "registers")]
919 WorkerCommand::WriteMultipleRegisters {
920 txn_id,
921 unit,
922 address,
923 values,
924 sender,
925 } => {
926 if register_pending(pending, txn_id, sender).is_ok() {
927 let result = client.write_multiple_registers(
928 txn_id,
929 unit,
930 address,
931 values.len() as u16,
932 &values,
933 );
934 submit_or_reject(pending, txn_id, result);
935 }
936 }
937 #[cfg(feature = "registers")]
938 WorkerCommand::ReadWriteMultipleRegisters {
939 txn_id,
940 unit,
941 read_address,
942 read_quantity,
943 write_address,
944 write_values,
945 sender,
946 } => {
947 if register_pending(pending, txn_id, sender).is_ok() {
948 let result = client.read_write_multiple_registers(
949 txn_id,
950 unit,
951 read_address,
952 read_quantity,
953 write_address,
954 &write_values,
955 );
956 submit_or_reject(pending, txn_id, result);
957 }
958 }
959 #[cfg(feature = "registers")]
960 WorkerCommand::MaskWriteRegister {
961 txn_id,
962 unit,
963 address,
964 and_mask,
965 or_mask,
966 sender,
967 } => {
968 if register_pending(pending, txn_id, sender).is_ok() {
969 let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
970 submit_or_reject(pending, txn_id, result);
971 }
972 }
973 #[cfg(feature = "discrete-inputs")]
974 WorkerCommand::ReadDiscreteInputs {
975 txn_id,
976 unit,
977 address,
978 quantity,
979 sender,
980 } => {
981 if register_pending(pending, txn_id, sender).is_ok() {
982 let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
983 submit_or_reject(pending, txn_id, result);
984 }
985 }
986 #[cfg(feature = "fifo")]
987 WorkerCommand::ReadFifoQueue {
988 txn_id,
989 unit,
990 address,
991 sender,
992 } => {
993 if register_pending(pending, txn_id, sender).is_ok() {
994 let result = client.read_fifo_queue(txn_id, unit, address);
995 submit_or_reject(pending, txn_id, result);
996 }
997 }
998 #[cfg(feature = "file-record")]
999 WorkerCommand::ReadFileRecord {
1000 txn_id,
1001 unit,
1002 sub_request,
1003 sender,
1004 } => {
1005 if register_pending(pending, txn_id, sender).is_ok() {
1006 let result = client.read_file_record(txn_id, unit, &sub_request);
1007 submit_or_reject(pending, txn_id, result);
1008 }
1009 }
1010 #[cfg(feature = "file-record")]
1011 WorkerCommand::WriteFileRecord {
1012 txn_id,
1013 unit,
1014 sub_request,
1015 sender,
1016 } => {
1017 if register_pending(pending, txn_id, sender).is_ok() {
1018 let result = client.write_file_record(txn_id, unit, &sub_request);
1019 submit_or_reject(pending, txn_id, result);
1020 }
1021 }
1022 #[cfg(feature = "diagnostics")]
1023 WorkerCommand::ReadDeviceIdentification {
1024 txn_id,
1025 unit,
1026 read_device_id_code,
1027 object_id,
1028 sender,
1029 } => {
1030 if register_pending(pending, txn_id, sender).is_ok() {
1031 let result =
1032 client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
1033 submit_or_reject(pending, txn_id, result);
1034 }
1035 }
1036 #[cfg(feature = "diagnostics")]
1037 WorkerCommand::EncapsulatedInterfaceTransport {
1038 txn_id,
1039 unit,
1040 mei_type,
1041 data,
1042 sender,
1043 } => {
1044 if register_pending(pending, txn_id, sender).is_ok() {
1045 let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
1046 submit_or_reject(pending, txn_id, result);
1047 }
1048 }
1049 #[cfg(feature = "diagnostics")]
1050 WorkerCommand::ReadExceptionStatus {
1051 txn_id,
1052 unit,
1053 sender,
1054 } => {
1055 if register_pending(pending, txn_id, sender).is_ok() {
1056 let result = client.read_exception_status(txn_id, unit);
1057 submit_or_reject(pending, txn_id, result);
1058 }
1059 }
1060 #[cfg(feature = "diagnostics")]
1061 WorkerCommand::Diagnostics {
1062 txn_id,
1063 unit,
1064 sub_function,
1065 data,
1066 sender,
1067 } => {
1068 if register_pending(pending, txn_id, sender).is_ok() {
1069 let result = client.diagnostics(txn_id, unit, sub_function, &data);
1070 submit_or_reject(pending, txn_id, result);
1071 }
1072 }
1073 #[cfg(feature = "diagnostics")]
1074 WorkerCommand::GetCommEventCounter {
1075 txn_id,
1076 unit,
1077 sender,
1078 } => {
1079 if register_pending(pending, txn_id, sender).is_ok() {
1080 let result = client.get_comm_event_counter(txn_id, unit);
1081 submit_or_reject(pending, txn_id, result);
1082 }
1083 }
1084 #[cfg(feature = "diagnostics")]
1085 WorkerCommand::GetCommEventLog {
1086 txn_id,
1087 unit,
1088 sender,
1089 } => {
1090 if register_pending(pending, txn_id, sender).is_ok() {
1091 let result = client.get_comm_event_log(txn_id, unit);
1092 submit_or_reject(pending, txn_id, result);
1093 }
1094 }
1095 #[cfg(feature = "diagnostics")]
1096 WorkerCommand::ReportServerId {
1097 txn_id,
1098 unit,
1099 sender,
1100 } => {
1101 if register_pending(pending, txn_id, sender).is_ok() {
1102 let result = client.report_server_id(txn_id, unit);
1103 submit_or_reject(pending, txn_id, result);
1104 }
1105 }
1106 WorkerCommand::Shutdown => {}
1107 }
1108}
1109
1110fn run_worker<TRANSPORT, const N: usize>(
1111 mut client: ClientServices<TRANSPORT, AsyncApp, N>,
1112 pending: PendingStore,
1113 receiver: Receiver<WorkerCommand>,
1114 poll_interval: Duration,
1115) where
1116 TRANSPORT: Transport,
1117{
1118 loop {
1119 loop {
1122 match receiver.try_recv() {
1123 Ok(WorkerCommand::Shutdown) => return,
1124 Ok(command) => handle_command(&mut client, &pending, command),
1125 Err(mpsc::TryRecvError::Empty) => break,
1126 Err(mpsc::TryRecvError::Disconnected) => return,
1127 }
1128 }
1129
1130 let should_poll = client.is_connected() && client.has_pending_requests();
1131
1132 if should_poll {
1133 client.poll();
1134
1135 match receiver.recv_timeout(poll_interval) {
1138 Ok(WorkerCommand::Shutdown) => return,
1139 Ok(command) => handle_command(&mut client, &pending, command),
1140 Err(mpsc::RecvTimeoutError::Timeout) => {}
1141 Err(mpsc::RecvTimeoutError::Disconnected) => return,
1142 }
1143
1144 continue;
1145 }
1146
1147 match receiver.recv() {
1149 Ok(WorkerCommand::Shutdown) => return,
1150 Ok(command) => handle_command(&mut client, &pending, command),
1151 Err(_) => return,
1152 }
1153 }
1154}
1155
1156mod client_core;
1159mod network_client;
1160mod serial_client;
1161
1162pub(crate) use client_core::AsyncClientCore;
1163pub use network_client::AsyncTcpClient;
1164pub use serial_client::AsyncSerialClient;
1165
1166#[cfg(all(test, feature = "traffic"))]
1167mod tests {
1168 use super::*;
1169
1170 #[test]
1171 fn test_async_app_emits_traffic_event_to_channel() {
1172 let pending = Arc::new(Mutex::new(HashMap::new()));
1173 let (traffic_sender, traffic_receiver) = mpsc::channel();
1174
1175 let mut app = AsyncApp {
1176 pending,
1177 traffic_sender,
1178 };
1179
1180 let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1181 app.on_tx_frame(42, unit, &[0xAA, 0x55]);
1182
1183 let event = traffic_receiver
1184 .recv_timeout(Duration::from_millis(100))
1185 .unwrap();
1186 assert_eq!(event.direction, TrafficDirection::Tx);
1187 assert_eq!(event.txn_id, 42);
1188 assert_eq!(event.unit_id_slave_addr, unit);
1189 assert_eq!(event.frame, vec![0xAA, 0x55]);
1190 assert_eq!(event.error, None);
1191 }
1192
1193 #[test]
1194 fn test_async_app_emits_traffic_error_event_to_channel() {
1195 let pending = Arc::new(Mutex::new(HashMap::new()));
1196 let (traffic_sender, traffic_receiver) = mpsc::channel();
1197
1198 let mut app = AsyncApp {
1199 pending,
1200 traffic_sender,
1201 };
1202
1203 let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1204 app.on_rx_error(77, unit, MbusError::ChecksumError, &[0xAB]);
1205
1206 let event = traffic_receiver
1207 .recv_timeout(Duration::from_millis(100))
1208 .unwrap();
1209 assert_eq!(event.direction, TrafficDirection::Rx);
1210 assert_eq!(event.txn_id, 77);
1211 assert_eq!(event.unit_id_slave_addr, unit);
1212 assert_eq!(event.frame, vec![0xAB]);
1213 assert_eq!(event.error, Some(MbusError::ChecksumError));
1214 }
1215
1216 #[test]
1217 fn test_async_client_core_set_and_clear_traffic_handler() {
1218 let (sender, _receiver) = mpsc::channel();
1219 let traffic_handler: TrafficHandlerStore = Arc::new(Mutex::new(None));
1220 let core = AsyncClientCore::new(sender, traffic_handler.clone());
1221
1222 core.set_traffic_handler(|_evt| {});
1223 assert!(traffic_handler.lock().unwrap().is_some());
1224
1225 core.clear_traffic_handler();
1226 assert!(traffic_handler.lock().unwrap().is_none());
1227 }
1228}