1
2use std::collections::HashMap;
17use std::sync::atomic::{AtomicU16, Ordering};
18use std::sync::{mpsc, Arc, Mutex};
19use std::sync::mpsc::{Receiver, Sender};
20use std::thread;
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use mbus_client::app::RequestErrorNotifier;
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::services::ClientServices;
37#[cfg(feature = "coils")]
38use mbus_client::services::coil::Coils;
39#[cfg(feature = "diagnostics")]
40use mbus_client::services::diagnostic::{
41 DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode,
42};
43#[cfg(feature = "discrete-inputs")]
44use mbus_client::services::discrete_input::DiscreteInputs;
45#[cfg(feature = "fifo")]
46use mbus_client::services::fifo_queue::FifoQueue;
47#[cfg(feature = "file-record")]
48use mbus_client::services::file_record::{SubRequest, SubRequestParams};
49#[cfg(feature = "registers")]
50use mbus_client::services::register::Registers;
51use mbus_core::errors::MbusError;
52#[cfg(feature = "diagnostics")]
53use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
54use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
55#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
56use mbus_core::transport::{ModbusSerialConfig, SerialMode};
57#[cfg(feature = "tcp")]
58use mbus_core::transport::ModbusTcpConfig;
59#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
60use mbus_serial::StdSerialTransport;
61#[cfg(feature = "tcp")]
62use mbus_network::StdTcpTransport;
63use tokio::sync::oneshot;
64
65#[cfg(feature = "diagnostics")]
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct DiagnosticsDataResponse {
69 pub sub_function: DiagnosticSubFunction,
71 pub data: Vec<u16>,
73}
74#[cfg(feature = "diagnostics")]
75pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
77
78#[derive(Debug, PartialEq, Eq)]
80pub enum AsyncError {
81 Mbus(MbusError),
83 WorkerClosed,
85 UnexpectedResponseType,
87}
88
89impl From<MbusError> for AsyncError {
90 fn from(value: MbusError) -> Self {
91 Self::Mbus(value)
92 }
93}
94
95impl std::fmt::Display for AsyncError {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 Self::Mbus(err) => write!(f, "Modbus error: {err}"),
99 Self::WorkerClosed => write!(f, "async worker channel closed"),
100 Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
101 }
102 }
103}
104
105impl std::error::Error for AsyncError {
106 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
107 match self {
108 Self::Mbus(err) => Some(err),
109 _ => None,
110 }
111 }
112}
113
114type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
115type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
116
117enum WorkerCommand {
118 #[cfg(feature = "coils")]
119 ReadMultipleCoils {
120 txn_id: u16,
121 unit: UnitIdOrSlaveAddr,
122 address: u16,
123 quantity: u16,
124 sender: PendingSender,
125 },
126 #[cfg(feature = "registers")]
127 ReadHoldingRegisters {
128 txn_id: u16,
129 unit: UnitIdOrSlaveAddr,
130 address: u16,
131 quantity: u16,
132 sender: PendingSender,
133 },
134 #[cfg(feature = "registers")]
135 ReadInputRegisters {
136 txn_id: u16,
137 unit: UnitIdOrSlaveAddr,
138 address: u16,
139 quantity: u16,
140 sender: PendingSender,
141 },
142 #[cfg(feature = "registers")]
143 WriteSingleRegister {
144 txn_id: u16,
145 unit: UnitIdOrSlaveAddr,
146 address: u16,
147 value: u16,
148 sender: PendingSender,
149 },
150 #[cfg(feature = "coils")]
151 WriteSingleCoil {
152 txn_id: u16,
153 unit: UnitIdOrSlaveAddr,
154 address: u16,
155 value: bool,
156 sender: PendingSender,
157 },
158 #[cfg(feature = "coils")]
159 WriteMultipleCoils {
160 txn_id: u16,
161 unit: UnitIdOrSlaveAddr,
162 address: u16,
163 coils: Coils,
164 sender: PendingSender,
165 },
166 #[cfg(feature = "registers")]
167 WriteMultipleRegisters {
168 txn_id: u16,
169 unit: UnitIdOrSlaveAddr,
170 address: u16,
171 values: Vec<u16>,
172 sender: PendingSender,
173 },
174 #[cfg(feature = "registers")]
175 ReadWriteMultipleRegisters {
176 txn_id: u16,
177 unit: UnitIdOrSlaveAddr,
178 read_address: u16,
179 read_quantity: u16,
180 write_address: u16,
181 write_values: Vec<u16>,
182 sender: PendingSender,
183 },
184 #[cfg(feature = "registers")]
185 MaskWriteRegister {
186 txn_id: u16,
187 unit: UnitIdOrSlaveAddr,
188 address: u16,
189 and_mask: u16,
190 or_mask: u16,
191 sender: PendingSender,
192 },
193 #[cfg(feature = "discrete-inputs")]
194 ReadDiscreteInputs {
195 txn_id: u16,
196 unit: UnitIdOrSlaveAddr,
197 address: u16,
198 quantity: u16,
199 sender: PendingSender,
200 },
201 #[cfg(feature = "fifo")]
202 ReadFifoQueue {
203 txn_id: u16,
204 unit: UnitIdOrSlaveAddr,
205 address: u16,
206 sender: PendingSender,
207 },
208 #[cfg(feature = "file-record")]
209 ReadFileRecord {
210 txn_id: u16,
211 unit: UnitIdOrSlaveAddr,
212 sub_request: SubRequest,
213 sender: PendingSender,
214 },
215 #[cfg(feature = "file-record")]
216 WriteFileRecord {
217 txn_id: u16,
218 unit: UnitIdOrSlaveAddr,
219 sub_request: SubRequest,
220 sender: PendingSender,
221 },
222 #[cfg(feature = "diagnostics")]
223 ReadDeviceIdentification {
224 txn_id: u16,
225 unit: UnitIdOrSlaveAddr,
226 read_device_id_code: ReadDeviceIdCode,
227 object_id: ObjectId,
228 sender: PendingSender,
229 },
230 #[cfg(feature = "diagnostics")]
231 EncapsulatedInterfaceTransport {
232 txn_id: u16,
233 unit: UnitIdOrSlaveAddr,
234 mei_type: EncapsulatedInterfaceType,
235 data: Vec<u8>,
236 sender: PendingSender,
237 },
238 #[cfg(feature = "diagnostics")]
239 ReadExceptionStatus {
240 txn_id: u16,
241 unit: UnitIdOrSlaveAddr,
242 sender: PendingSender,
243 },
244 #[cfg(feature = "diagnostics")]
245 Diagnostics {
246 txn_id: u16,
247 unit: UnitIdOrSlaveAddr,
248 sub_function: DiagnosticSubFunction,
249 data: Vec<u16>,
250 sender: PendingSender,
251 },
252 #[cfg(feature = "diagnostics")]
253 GetCommEventCounter {
254 txn_id: u16,
255 unit: UnitIdOrSlaveAddr,
256 sender: PendingSender,
257 },
258 #[cfg(feature = "diagnostics")]
259 GetCommEventLog {
260 txn_id: u16,
261 unit: UnitIdOrSlaveAddr,
262 sender: PendingSender,
263 },
264 #[cfg(feature = "diagnostics")]
265 ReportServerId {
266 txn_id: u16,
267 unit: UnitIdOrSlaveAddr,
268 sender: PendingSender,
269 },
270 Shutdown,
271}
272
273enum WorkerResponse {
274 #[cfg(feature = "coils")]
275 Coils(Coils),
276 #[cfg(feature = "registers")]
277 Registers(Registers),
278 #[cfg(feature = "registers")]
279 SingleRegisterWrite { address: u16, value: u16 },
280 #[cfg(feature = "registers")]
281 MaskWriteRegister,
282 #[cfg(feature = "discrete-inputs")]
283 DiscreteInputs(DiscreteInputs),
284 #[cfg(feature = "fifo")]
285 FifoQueue(FifoQueue),
286 #[cfg(feature = "file-record")]
287 FileRecordRead(Vec<SubRequestParams>),
288 #[cfg(feature = "file-record")]
289 FileRecordWrite,
290 #[cfg(feature = "diagnostics")]
291 DeviceIdentification(DeviceIdentificationResponse),
292 #[cfg(feature = "diagnostics")]
293 EncapsulatedInterfaceTransport {
294 mei_type: EncapsulatedInterfaceType,
295 data: Vec<u8>,
296 },
297 #[cfg(feature = "diagnostics")]
298 ExceptionStatus(u8),
299 #[cfg(feature = "diagnostics")]
300 DiagnosticsData(DiagnosticsDataResponse),
301 #[cfg(feature = "diagnostics")]
302 CommEventCounter { status: u16, event_count: u16 },
303 #[cfg(feature = "diagnostics")]
304 CommEventLog(CommEventLogResponse),
305 #[cfg(feature = "diagnostics")]
306 ReportServerId(Vec<u8>),
307}
308
309struct AsyncApp {
310 pending: PendingStore,
311}
312
313impl AsyncApp {
314 fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
315 if let Ok(mut pending) = self.pending.lock() {
316 if let Some(sender) = pending.remove(&txn_id) {
317 let _ = sender.send(response);
318 }
319 }
320 }
321
322 fn resolve(&self, txn_id: u16, response: WorkerResponse) {
323 self.complete(txn_id, Ok(response));
324 }
325
326 fn reject(&self, txn_id: u16, error: MbusError) {
327 self.complete(txn_id, Err(error));
328 }
329}
330
331impl TimeKeeper for AsyncApp {
332 fn current_millis(&self) -> u64 {
333 SystemTime::now()
334 .duration_since(UNIX_EPOCH)
335 .map(|d| d.as_millis() as u64)
336 .unwrap_or(0)
337 }
338}
339
340impl RequestErrorNotifier for AsyncApp {
341 fn request_failed(
342 &mut self,
343 txn_id: u16,
344 _unit_id_slave_addr: UnitIdOrSlaveAddr,
345 error: MbusError,
346 ) {
347 self.reject(txn_id, error);
348 }
349}
350
351#[cfg(feature = "coils")]
352impl CoilResponse for AsyncApp {
353 fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
354 self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
355 }
356
357 fn read_single_coil_response(
358 &mut self,
359 txn_id: u16,
360 _unit: UnitIdOrSlaveAddr,
361 address: u16,
362 value: bool,
363 ) {
364 match Coils::new(address, 1).and_then(|mut c| {
365 c.set_value(address, value)?;
366 Ok(c)
367 }) {
368 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
369 Err(err) => self.reject(txn_id, err),
370 }
371 }
372
373 fn write_single_coil_response(
374 &mut self,
375 txn_id: u16,
376 _unit: UnitIdOrSlaveAddr,
377 address: u16,
378 value: bool,
379 ) {
380 match Coils::new(address, 1).and_then(|mut c| {
381 c.set_value(address, value)?;
382 Ok(c)
383 }) {
384 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
385 Err(err) => self.reject(txn_id, err),
386 }
387 }
388
389 fn write_multiple_coils_response(
390 &mut self,
391 txn_id: u16,
392 _unit: UnitIdOrSlaveAddr,
393 address: u16,
394 quantity: u16,
395 ) {
396 match Coils::new(address, quantity) {
397 Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
398 Err(err) => self.reject(txn_id, err),
399 }
400 }
401}
402
403#[cfg(feature = "registers")]
404impl RegisterResponse for AsyncApp {
405 fn read_multiple_input_registers_response(
406 &mut self,
407 txn_id: u16,
408 _unit: UnitIdOrSlaveAddr,
409 registers: &Registers,
410 ) {
411 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
412 }
413
414 fn read_single_input_register_response(
415 &mut self,
416 txn_id: u16,
417 _unit: UnitIdOrSlaveAddr,
418 address: u16,
419 value: u16,
420 ) {
421 match Registers::new(address, 1).and_then(|mut r| {
422 r.set_value(address, value)?;
423 Ok(r)
424 }) {
425 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
426 Err(err) => self.reject(txn_id, err),
427 }
428 }
429
430 fn read_multiple_holding_registers_response(
431 &mut self,
432 txn_id: u16,
433 _unit: UnitIdOrSlaveAddr,
434 registers: &Registers,
435 ) {
436 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
437 }
438
439 fn write_single_register_response(
440 &mut self,
441 txn_id: u16,
442 _unit: UnitIdOrSlaveAddr,
443 address: u16,
444 value: u16,
445 ) {
446 self.resolve(
447 txn_id,
448 WorkerResponse::SingleRegisterWrite { address, value },
449 );
450 }
451
452 fn write_multiple_registers_response(
453 &mut self,
454 txn_id: u16,
455 _unit: UnitIdOrSlaveAddr,
456 starting_address: u16,
457 quantity: u16,
458 ) {
459 match Registers::new(starting_address, quantity) {
460 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
461 Err(err) => self.reject(txn_id, err),
462 }
463 }
464
465 fn read_write_multiple_registers_response(
466 &mut self,
467 txn_id: u16,
468 _unit: UnitIdOrSlaveAddr,
469 registers: &Registers,
470 ) {
471 self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
472 }
473
474 fn read_single_holding_register_response(
475 &mut self,
476 txn_id: u16,
477 _unit: UnitIdOrSlaveAddr,
478 address: u16,
479 value: u16,
480 ) {
481 match Registers::new(address, 1).and_then(|mut r| {
482 r.set_value(address, value)?;
483 Ok(r)
484 }) {
485 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
486 Err(err) => self.reject(txn_id, err),
487 }
488 }
489
490 fn read_single_register_response(
491 &mut self,
492 txn_id: u16,
493 _unit: UnitIdOrSlaveAddr,
494 address: u16,
495 value: u16,
496 ) {
497 match Registers::new(address, 1).and_then(|mut r| {
498 r.set_value(address, value)?;
499 Ok(r)
500 }) {
501 Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
502 Err(err) => self.reject(txn_id, err),
503 }
504 }
505
506 fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
507 self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
508 }
509}
510
511#[cfg(feature = "discrete-inputs")]
512impl DiscreteInputResponse for AsyncApp {
513 fn read_multiple_discrete_inputs_response(
514 &mut self,
515 txn_id: u16,
516 _unit: UnitIdOrSlaveAddr,
517 discrete_inputs: &DiscreteInputs,
518 ) {
519 self.resolve(
520 txn_id,
521 WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
522 );
523 }
524
525 fn read_single_discrete_input_response(
526 &mut self,
527 txn_id: u16,
528 _unit: UnitIdOrSlaveAddr,
529 address: u16,
530 value: bool,
531 ) {
532 let bit = if value { 0b0000_0001 } else { 0 };
533 match DiscreteInputs::new(address, 1).and_then(|d| {
534 d.with_values(&[bit], 1)
535 }) {
536 Ok(discrete_inputs) => {
537 self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
538 }
539 Err(err) => self.reject(txn_id, err),
540 }
541 }
542}
543
544#[cfg(feature = "fifo")]
545impl FifoQueueResponse for AsyncApp {
546 fn read_fifo_queue_response(
547 &mut self,
548 txn_id: u16,
549 _unit: UnitIdOrSlaveAddr,
550 fifo_queue: &FifoQueue,
551 ) {
552 self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
553 }
554}
555
556#[cfg(feature = "file-record")]
557impl FileRecordResponse for AsyncApp {
558 fn read_file_record_response(
559 &mut self,
560 txn_id: u16,
561 _unit: UnitIdOrSlaveAddr,
562 data: &[SubRequestParams],
563 ) {
564 self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
565 }
566
567 fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
568 self.resolve(txn_id, WorkerResponse::FileRecordWrite);
569 }
570}
571
572#[cfg(feature = "diagnostics")]
573impl DiagnosticsResponse for AsyncApp {
574 fn read_device_identification_response(
575 &mut self,
576 txn_id: u16,
577 _unit: UnitIdOrSlaveAddr,
578 response: &DeviceIdentificationResponse,
579 ) {
580 self.resolve(
581 txn_id,
582 WorkerResponse::DeviceIdentification(response.clone()),
583 );
584 }
585
586 fn encapsulated_interface_transport_response(
587 &mut self,
588 txn_id: u16,
589 _unit: UnitIdOrSlaveAddr,
590 mei_type: EncapsulatedInterfaceType,
591 data: &[u8],
592 ) {
593 self.resolve(
594 txn_id,
595 WorkerResponse::EncapsulatedInterfaceTransport {
596 mei_type,
597 data: data.to_vec(),
598 },
599 );
600 }
601
602 fn read_exception_status_response(
603 &mut self,
604 txn_id: u16,
605 _unit: UnitIdOrSlaveAddr,
606 status: u8,
607 ) {
608 self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
609 }
610
611 fn diagnostics_response(
612 &mut self,
613 txn_id: u16,
614 _unit: UnitIdOrSlaveAddr,
615 sub_function: DiagnosticSubFunction,
616 data: &[u16],
617 ) {
618 self.resolve(
619 txn_id,
620 WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
621 sub_function,
622 data: data.to_vec(),
623 }),
624 );
625 }
626
627 fn get_comm_event_counter_response(
628 &mut self,
629 txn_id: u16,
630 _unit: UnitIdOrSlaveAddr,
631 status: u16,
632 event_count: u16,
633 ) {
634 self.resolve(
635 txn_id,
636 WorkerResponse::CommEventCounter {
637 status,
638 event_count,
639 },
640 );
641 }
642
643 fn get_comm_event_log_response(
644 &mut self,
645 txn_id: u16,
646 _unit: UnitIdOrSlaveAddr,
647 status: u16,
648 event_count: u16,
649 message_count: u16,
650 events: &[u8],
651 ) {
652 self.resolve(
653 txn_id,
654 WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
655 );
656 }
657
658 fn report_server_id_response(
659 &mut self,
660 txn_id: u16,
661 _unit: UnitIdOrSlaveAddr,
662 data: &[u8],
663 ) {
664 self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
665 }
666}
667
668fn register_pending(pending: &PendingStore, txn_id: u16, sender: PendingSender) -> Result<(), MbusError> {
669 let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
670 guard.insert(txn_id, sender);
671 Ok(())
672}
673
674fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
675 if let Ok(mut guard) = pending.lock() {
676 if let Some(sender) = guard.remove(&txn_id) {
677 let _ = sender.send(Err(error));
678 }
679 }
680}
681
682fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
683 if let Err(err) = result {
684 reject_pending(pending, txn_id, err);
685 }
686}
687
688fn handle_command<TRANSPORT, const N: usize>(
689 client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
690 pending: &PendingStore,
691 command: WorkerCommand,
692) where
693 TRANSPORT: Transport,
694{
695 match command {
696 #[cfg(feature = "coils")]
697 WorkerCommand::ReadMultipleCoils {
698 txn_id,
699 unit,
700 address,
701 quantity,
702 sender,
703 } => {
704 if register_pending(pending, txn_id, sender).is_ok() {
705 let result = client.read_multiple_coils(txn_id, unit, address, quantity);
706 submit_or_reject(pending, txn_id, result);
707 }
708 }
709 #[cfg(feature = "registers")]
710 WorkerCommand::ReadHoldingRegisters {
711 txn_id,
712 unit,
713 address,
714 quantity,
715 sender,
716 } => {
717 if register_pending(pending, txn_id, sender).is_ok() {
718 let result = client.read_holding_registers(txn_id, unit, address, quantity);
719 submit_or_reject(pending, txn_id, result);
720 }
721 }
722 #[cfg(feature = "registers")]
723 WorkerCommand::ReadInputRegisters {
724 txn_id,
725 unit,
726 address,
727 quantity,
728 sender,
729 } => {
730 if register_pending(pending, txn_id, sender).is_ok() {
731 let result = client.read_input_registers(txn_id, unit, address, quantity);
732 submit_or_reject(pending, txn_id, result);
733 }
734 }
735 #[cfg(feature = "registers")]
736 WorkerCommand::WriteSingleRegister {
737 txn_id,
738 unit,
739 address,
740 value,
741 sender,
742 } => {
743 if register_pending(pending, txn_id, sender).is_ok() {
744 let result = client.write_single_register(txn_id, unit, address, value);
745 submit_or_reject(pending, txn_id, result);
746 }
747 }
748 #[cfg(feature = "coils")]
749 WorkerCommand::WriteSingleCoil {
750 txn_id,
751 unit,
752 address,
753 value,
754 sender,
755 } => {
756 if register_pending(pending, txn_id, sender).is_ok() {
757 let result = client.write_single_coil(txn_id, unit, address, value);
758 submit_or_reject(pending, txn_id, result);
759 }
760 }
761 #[cfg(feature = "coils")]
762 WorkerCommand::WriteMultipleCoils {
763 txn_id,
764 unit,
765 address,
766 coils,
767 sender,
768 } => {
769 if register_pending(pending, txn_id, sender).is_ok() {
770 let result = client.write_multiple_coils(txn_id, unit, address, &coils);
771 submit_or_reject(pending, txn_id, result);
772 }
773 }
774 #[cfg(feature = "registers")]
775 WorkerCommand::WriteMultipleRegisters {
776 txn_id,
777 unit,
778 address,
779 values,
780 sender,
781 } => {
782 if register_pending(pending, txn_id, sender).is_ok() {
783 let result =
784 client.write_multiple_registers(txn_id, unit, address, values.len() as u16, &values);
785 submit_or_reject(pending, txn_id, result);
786 }
787 }
788 #[cfg(feature = "registers")]
789 WorkerCommand::ReadWriteMultipleRegisters {
790 txn_id,
791 unit,
792 read_address,
793 read_quantity,
794 write_address,
795 write_values,
796 sender,
797 } => {
798 if register_pending(pending, txn_id, sender).is_ok() {
799 let result = client.read_write_multiple_registers(
800 txn_id,
801 unit,
802 read_address,
803 read_quantity,
804 write_address,
805 &write_values,
806 );
807 submit_or_reject(pending, txn_id, result);
808 }
809 }
810 #[cfg(feature = "registers")]
811 WorkerCommand::MaskWriteRegister {
812 txn_id,
813 unit,
814 address,
815 and_mask,
816 or_mask,
817 sender,
818 } => {
819 if register_pending(pending, txn_id, sender).is_ok() {
820 let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
821 submit_or_reject(pending, txn_id, result);
822 }
823 }
824 #[cfg(feature = "discrete-inputs")]
825 WorkerCommand::ReadDiscreteInputs {
826 txn_id,
827 unit,
828 address,
829 quantity,
830 sender,
831 } => {
832 if register_pending(pending, txn_id, sender).is_ok() {
833 let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
834 submit_or_reject(pending, txn_id, result);
835 }
836 }
837 #[cfg(feature = "fifo")]
838 WorkerCommand::ReadFifoQueue {
839 txn_id,
840 unit,
841 address,
842 sender,
843 } => {
844 if register_pending(pending, txn_id, sender).is_ok() {
845 let result = client.read_fifo_queue(txn_id, unit, address);
846 submit_or_reject(pending, txn_id, result);
847 }
848 }
849 #[cfg(feature = "file-record")]
850 WorkerCommand::ReadFileRecord {
851 txn_id,
852 unit,
853 sub_request,
854 sender,
855 } => {
856 if register_pending(pending, txn_id, sender).is_ok() {
857 let result = client.read_file_record(txn_id, unit, &sub_request);
858 submit_or_reject(pending, txn_id, result);
859 }
860 }
861 #[cfg(feature = "file-record")]
862 WorkerCommand::WriteFileRecord {
863 txn_id,
864 unit,
865 sub_request,
866 sender,
867 } => {
868 if register_pending(pending, txn_id, sender).is_ok() {
869 let result = client.write_file_record(txn_id, unit, &sub_request);
870 submit_or_reject(pending, txn_id, result);
871 }
872 }
873 #[cfg(feature = "diagnostics")]
874 WorkerCommand::ReadDeviceIdentification {
875 txn_id,
876 unit,
877 read_device_id_code,
878 object_id,
879 sender,
880 } => {
881 if register_pending(pending, txn_id, sender).is_ok() {
882 let result =
883 client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
884 submit_or_reject(pending, txn_id, result);
885 }
886 }
887 #[cfg(feature = "diagnostics")]
888 WorkerCommand::EncapsulatedInterfaceTransport {
889 txn_id,
890 unit,
891 mei_type,
892 data,
893 sender,
894 } => {
895 if register_pending(pending, txn_id, sender).is_ok() {
896 let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
897 submit_or_reject(pending, txn_id, result);
898 }
899 }
900 #[cfg(feature = "diagnostics")]
901 WorkerCommand::ReadExceptionStatus {
902 txn_id,
903 unit,
904 sender,
905 } => {
906 if register_pending(pending, txn_id, sender).is_ok() {
907 let result = client.read_exception_status(txn_id, unit);
908 submit_or_reject(pending, txn_id, result);
909 }
910 }
911 #[cfg(feature = "diagnostics")]
912 WorkerCommand::Diagnostics {
913 txn_id,
914 unit,
915 sub_function,
916 data,
917 sender,
918 } => {
919 if register_pending(pending, txn_id, sender).is_ok() {
920 let result = client.diagnostics(txn_id, unit, sub_function, &data);
921 submit_or_reject(pending, txn_id, result);
922 }
923 }
924 #[cfg(feature = "diagnostics")]
925 WorkerCommand::GetCommEventCounter {
926 txn_id,
927 unit,
928 sender,
929 } => {
930 if register_pending(pending, txn_id, sender).is_ok() {
931 let result = client.get_comm_event_counter(txn_id, unit);
932 submit_or_reject(pending, txn_id, result);
933 }
934 }
935 #[cfg(feature = "diagnostics")]
936 WorkerCommand::GetCommEventLog {
937 txn_id,
938 unit,
939 sender,
940 } => {
941 if register_pending(pending, txn_id, sender).is_ok() {
942 let result = client.get_comm_event_log(txn_id, unit);
943 submit_or_reject(pending, txn_id, result);
944 }
945 }
946 #[cfg(feature = "diagnostics")]
947 WorkerCommand::ReportServerId {
948 txn_id,
949 unit,
950 sender,
951 } => {
952 if register_pending(pending, txn_id, sender).is_ok() {
953 let result = client.report_server_id(txn_id, unit);
954 submit_or_reject(pending, txn_id, result);
955 }
956 }
957 WorkerCommand::Shutdown => {}
958 }
959}
960
961fn run_worker<TRANSPORT, const N: usize>(
962 mut client: ClientServices<TRANSPORT, AsyncApp, N>,
963 pending: PendingStore,
964 receiver: Receiver<WorkerCommand>,
965 poll_interval: Duration,
966) where
967 TRANSPORT: Transport,
968{
969 loop {
970 match receiver.recv_timeout(poll_interval) {
974 Ok(WorkerCommand::Shutdown) => return,
975 Ok(command) => {
976 handle_command(&mut client, &pending, command);
977 loop {
978 match receiver.try_recv() {
979 Ok(WorkerCommand::Shutdown) => return,
980 Ok(command) => handle_command(&mut client, &pending, command),
981 Err(mpsc::TryRecvError::Empty) => break,
982 Err(mpsc::TryRecvError::Disconnected) => return,
983 }
984 }
985 }
986 Err(mpsc::RecvTimeoutError::Timeout) => {}
987 Err(mpsc::RecvTimeoutError::Disconnected) => return,
988 }
989
990 let _ = client.poll();
991 }
992}
993
994mod client_core;
997mod network_client;
998mod serial_client;
999
1000pub(crate) use client_core::AsyncClientCore;
1001pub use network_client::AsyncTcpClient;
1002pub use serial_client::AsyncSerialClient;