Skip to main content

mbus_async/runtime/
mod.rs

1//! Internal runtime for the async facade.
2//!
3//! This module contains the worker-thread bridge between async callers and the
4//! synchronous `ClientServices` state machine.
5//!
6//! Public entry points are re-exported from the crate root:
7//! - [`AsyncTcpClient`] (TCP)
8//! - [`AsyncSerialClient`] (RTU/ASCII)
9//!
10//! Internal/shared building blocks:
11//! - [`AsyncClientCore`] stores worker channel state and implements request methods.
12//! - [`WorkerCommand`] and [`WorkerResponse`] carry typed request/response payloads.
13//! - [`run_worker`] drives polling and response routing.
14
15use std::collections::HashMap;
16#[cfg(feature = "traffic")]
17use std::panic::{AssertUnwindSafe, catch_unwind};
18use std::sync::atomic::{AtomicU16, Ordering};
19use std::sync::mpsc::{Receiver, Sender};
20use std::sync::{Arc, Mutex, mpsc};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::app::RequestErrorNotifier;
37#[cfg(feature = "traffic")]
38use mbus_client::app::{TrafficDirection, TrafficNotifier};
39use mbus_client::services::ClientServices;
40#[cfg(feature = "coils")]
41use mbus_client::services::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_client::services::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_client::services::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_client::services::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_client::services::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "registers")]
51use mbus_client::services::register::Registers;
52use mbus_core::errors::MbusError;
53#[cfg(feature = "diagnostics")]
54use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
55#[cfg(feature = "tcp")]
56use mbus_core::transport::ModbusTcpConfig;
57use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
58#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
59use mbus_core::transport::{ModbusSerialConfig, SerialMode};
60#[cfg(feature = "tcp")]
61use mbus_network::StdTcpTransport;
62#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
63use mbus_serial::StdSerialTransport;
64use tokio::sync::oneshot;
65
66#[cfg(feature = "diagnostics")]
67/// Diagnostics response payload returned by FC 08.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct DiagnosticsDataResponse {
70    /// Echoed diagnostic sub-function code.
71    pub sub_function: DiagnosticSubFunction,
72    /// Echoed diagnostic data words.
73    pub data: Vec<u16>,
74}
75#[cfg(feature = "diagnostics")]
76/// Communication event log payload `(status, event_count, message_count, events)` returned by FC 12.
77pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
78
79/// Async facade error type.
80#[derive(Debug, PartialEq, Eq)]
81pub enum AsyncError {
82    /// Error propagated from the underlying Modbus client stack.
83    Mbus(MbusError),
84    /// Background worker channel is closed or worker thread has stopped.
85    WorkerClosed,
86    /// Internal response routing mismatch between request and callback payload type.
87    UnexpectedResponseType,
88}
89
90impl From<MbusError> for AsyncError {
91    fn from(value: MbusError) -> Self {
92        Self::Mbus(value)
93    }
94}
95
96impl std::fmt::Display for AsyncError {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            Self::Mbus(err) => write!(f, "Modbus error: {err}"),
100            Self::WorkerClosed => write!(f, "async worker channel closed"),
101            Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
102        }
103    }
104}
105
106impl std::error::Error for AsyncError {
107    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
108        match self {
109            Self::Mbus(err) => Some(err),
110            _ => None,
111        }
112    }
113}
114
115type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
116type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
117#[cfg(feature = "traffic")]
118type TrafficHandler = Box<dyn FnMut(&TrafficEvent) + Send + 'static>;
119#[cfg(feature = "traffic")]
120type TrafficHandlerStore = Arc<Mutex<Option<TrafficHandler>>>;
121#[cfg(feature = "traffic")]
122type TrafficSender = Sender<TrafficEvent>;
123
124#[cfg(feature = "traffic")]
125/// Async traffic event emitted from the worker thread.
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct TrafficEvent {
128    /// Outbound or inbound frame direction.
129    pub direction: TrafficDirection,
130    /// Transaction identifier associated with the request lifecycle.
131    pub txn_id: u16,
132    /// Unit id (TCP) or slave address (Serial) for this frame.
133    pub unit_id_slave_addr: UnitIdOrSlaveAddr,
134    /// Raw ADU bytes observed on the wire.
135    pub frame: Vec<u8>,
136    /// Error details when the traffic event corresponds to a failed TX/RX path.
137    pub error: Option<MbusError>,
138}
139
140enum WorkerCommand {
141    Connect {
142        sender: PendingSender,
143    },
144    #[cfg(feature = "coils")]
145    ReadMultipleCoils {
146        txn_id: u16,
147        unit: UnitIdOrSlaveAddr,
148        address: u16,
149        quantity: u16,
150        sender: PendingSender,
151    },
152    #[cfg(feature = "registers")]
153    ReadHoldingRegisters {
154        txn_id: u16,
155        unit: UnitIdOrSlaveAddr,
156        address: u16,
157        quantity: u16,
158        sender: PendingSender,
159    },
160    #[cfg(feature = "registers")]
161    ReadInputRegisters {
162        txn_id: u16,
163        unit: UnitIdOrSlaveAddr,
164        address: u16,
165        quantity: u16,
166        sender: PendingSender,
167    },
168    #[cfg(feature = "registers")]
169    WriteSingleRegister {
170        txn_id: u16,
171        unit: UnitIdOrSlaveAddr,
172        address: u16,
173        value: u16,
174        sender: PendingSender,
175    },
176    #[cfg(feature = "coils")]
177    WriteSingleCoil {
178        txn_id: u16,
179        unit: UnitIdOrSlaveAddr,
180        address: u16,
181        value: bool,
182        sender: PendingSender,
183    },
184    #[cfg(feature = "coils")]
185    WriteMultipleCoils {
186        txn_id: u16,
187        unit: UnitIdOrSlaveAddr,
188        address: u16,
189        coils: Coils,
190        sender: PendingSender,
191    },
192    #[cfg(feature = "registers")]
193    WriteMultipleRegisters {
194        txn_id: u16,
195        unit: UnitIdOrSlaveAddr,
196        address: u16,
197        values: Vec<u16>,
198        sender: PendingSender,
199    },
200    #[cfg(feature = "registers")]
201    ReadWriteMultipleRegisters {
202        txn_id: u16,
203        unit: UnitIdOrSlaveAddr,
204        read_address: u16,
205        read_quantity: u16,
206        write_address: u16,
207        write_values: Vec<u16>,
208        sender: PendingSender,
209    },
210    #[cfg(feature = "registers")]
211    MaskWriteRegister {
212        txn_id: u16,
213        unit: UnitIdOrSlaveAddr,
214        address: u16,
215        and_mask: u16,
216        or_mask: u16,
217        sender: PendingSender,
218    },
219    #[cfg(feature = "discrete-inputs")]
220    ReadDiscreteInputs {
221        txn_id: u16,
222        unit: UnitIdOrSlaveAddr,
223        address: u16,
224        quantity: u16,
225        sender: PendingSender,
226    },
227    #[cfg(feature = "fifo")]
228    ReadFifoQueue {
229        txn_id: u16,
230        unit: UnitIdOrSlaveAddr,
231        address: u16,
232        sender: PendingSender,
233    },
234    #[cfg(feature = "file-record")]
235    ReadFileRecord {
236        txn_id: u16,
237        unit: UnitIdOrSlaveAddr,
238        sub_request: SubRequest,
239        sender: PendingSender,
240    },
241    #[cfg(feature = "file-record")]
242    WriteFileRecord {
243        txn_id: u16,
244        unit: UnitIdOrSlaveAddr,
245        sub_request: SubRequest,
246        sender: PendingSender,
247    },
248    #[cfg(feature = "diagnostics")]
249    ReadDeviceIdentification {
250        txn_id: u16,
251        unit: UnitIdOrSlaveAddr,
252        read_device_id_code: ReadDeviceIdCode,
253        object_id: ObjectId,
254        sender: PendingSender,
255    },
256    #[cfg(feature = "diagnostics")]
257    EncapsulatedInterfaceTransport {
258        txn_id: u16,
259        unit: UnitIdOrSlaveAddr,
260        mei_type: EncapsulatedInterfaceType,
261        data: Vec<u8>,
262        sender: PendingSender,
263    },
264    #[cfg(feature = "diagnostics")]
265    ReadExceptionStatus {
266        txn_id: u16,
267        unit: UnitIdOrSlaveAddr,
268        sender: PendingSender,
269    },
270    #[cfg(feature = "diagnostics")]
271    Diagnostics {
272        txn_id: u16,
273        unit: UnitIdOrSlaveAddr,
274        sub_function: DiagnosticSubFunction,
275        data: Vec<u16>,
276        sender: PendingSender,
277    },
278    #[cfg(feature = "diagnostics")]
279    GetCommEventCounter {
280        txn_id: u16,
281        unit: UnitIdOrSlaveAddr,
282        sender: PendingSender,
283    },
284    #[cfg(feature = "diagnostics")]
285    GetCommEventLog {
286        txn_id: u16,
287        unit: UnitIdOrSlaveAddr,
288        sender: PendingSender,
289    },
290    #[cfg(feature = "diagnostics")]
291    ReportServerId {
292        txn_id: u16,
293        unit: UnitIdOrSlaveAddr,
294        sender: PendingSender,
295    },
296    Shutdown,
297}
298
299enum WorkerResponse {
300    Ack,
301    #[cfg(feature = "coils")]
302    Coils(Coils),
303    #[cfg(feature = "registers")]
304    Registers(Registers),
305    #[cfg(feature = "registers")]
306    SingleRegisterWrite {
307        address: u16,
308        value: u16,
309    },
310    #[cfg(feature = "registers")]
311    MaskWriteRegister,
312    #[cfg(feature = "discrete-inputs")]
313    DiscreteInputs(DiscreteInputs),
314    #[cfg(feature = "fifo")]
315    FifoQueue(FifoQueue),
316    #[cfg(feature = "file-record")]
317    FileRecordRead(Vec<SubRequestParams>),
318    #[cfg(feature = "file-record")]
319    FileRecordWrite,
320    #[cfg(feature = "diagnostics")]
321    DeviceIdentification(DeviceIdentificationResponse),
322    #[cfg(feature = "diagnostics")]
323    EncapsulatedInterfaceTransport {
324        mei_type: EncapsulatedInterfaceType,
325        data: Vec<u8>,
326    },
327    #[cfg(feature = "diagnostics")]
328    ExceptionStatus(u8),
329    #[cfg(feature = "diagnostics")]
330    DiagnosticsData(DiagnosticsDataResponse),
331    #[cfg(feature = "diagnostics")]
332    CommEventCounter {
333        status: u16,
334        event_count: u16,
335    },
336    #[cfg(feature = "diagnostics")]
337    CommEventLog(CommEventLogResponse),
338    #[cfg(feature = "diagnostics")]
339    ReportServerId(Vec<u8>),
340}
341
342struct AsyncApp {
343    pending: PendingStore,
344    #[cfg(feature = "traffic")]
345    traffic_sender: TrafficSender,
346}
347
348impl AsyncApp {
349    fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
350        if let Ok(mut pending) = self.pending.lock()
351            && let Some(sender) = pending.remove(&txn_id)
352        {
353            let _ = sender.send(response);
354        }
355    }
356
357    fn resolve(&self, txn_id: u16, response: WorkerResponse) {
358        self.complete(txn_id, Ok(response));
359    }
360
361    fn reject(&self, txn_id: u16, error: MbusError) {
362        self.complete(txn_id, Err(error));
363    }
364
365    #[cfg(feature = "traffic")]
366    fn emit_traffic(
367        &self,
368        direction: TrafficDirection,
369        txn_id: u16,
370        unit_id_slave_addr: UnitIdOrSlaveAddr,
371        error: Option<MbusError>,
372        frame_bytes: &[u8],
373    ) {
374        let event = TrafficEvent {
375            direction,
376            txn_id,
377            unit_id_slave_addr,
378            frame: frame_bytes.to_vec(),
379            error,
380        };
381
382        let _ = self.traffic_sender.send(event);
383    }
384}
385
386impl TimeKeeper for AsyncApp {
387    fn current_millis(&self) -> u64 {
388        SystemTime::now()
389            .duration_since(UNIX_EPOCH)
390            .map(|d| d.as_millis() as u64)
391            .unwrap_or(0)
392    }
393}
394
395impl RequestErrorNotifier for AsyncApp {
396    fn request_failed(
397        &mut self,
398        txn_id: u16,
399        _unit_id_slave_addr: UnitIdOrSlaveAddr,
400        error: MbusError,
401    ) {
402        self.reject(txn_id, error);
403    }
404}
405
406#[cfg(feature = "traffic")]
407impl TrafficNotifier for AsyncApp {
408    fn on_tx_frame(
409        &mut self,
410        txn_id: u16,
411        unit_id_slave_addr: UnitIdOrSlaveAddr,
412        frame_bytes: &[u8],
413    ) {
414        self.emit_traffic(
415            TrafficDirection::Tx,
416            txn_id,
417            unit_id_slave_addr,
418            None,
419            frame_bytes,
420        );
421    }
422
423    fn on_rx_frame(
424        &mut self,
425        txn_id: u16,
426        unit_id_slave_addr: UnitIdOrSlaveAddr,
427        frame_bytes: &[u8],
428    ) {
429        self.emit_traffic(
430            TrafficDirection::Rx,
431            txn_id,
432            unit_id_slave_addr,
433            None,
434            frame_bytes,
435        );
436    }
437
438    fn on_tx_error(
439        &mut self,
440        txn_id: u16,
441        unit_id_slave_addr: UnitIdOrSlaveAddr,
442        error: MbusError,
443        frame_bytes: &[u8],
444    ) {
445        self.emit_traffic(
446            TrafficDirection::Tx,
447            txn_id,
448            unit_id_slave_addr,
449            Some(error),
450            frame_bytes,
451        );
452    }
453
454    fn on_rx_error(
455        &mut self,
456        txn_id: u16,
457        unit_id_slave_addr: UnitIdOrSlaveAddr,
458        error: MbusError,
459        frame_bytes: &[u8],
460    ) {
461        self.emit_traffic(
462            TrafficDirection::Rx,
463            txn_id,
464            unit_id_slave_addr,
465            Some(error),
466            frame_bytes,
467        );
468    }
469}
470
471#[cfg(feature = "traffic")]
472fn run_traffic_dispatcher(receiver: Receiver<TrafficEvent>, traffic_handler: TrafficHandlerStore) {
473    while let Ok(event) = receiver.recv() {
474        if let Ok(mut handler_slot) = traffic_handler.lock()
475            && let Some(handler) = handler_slot.as_mut()
476        {
477            let _ = catch_unwind(AssertUnwindSafe(|| handler(&event)));
478        }
479    }
480}
481
482#[cfg(feature = "coils")]
483impl CoilResponse for AsyncApp {
484    fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
485        self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
486    }
487
488    fn read_single_coil_response(
489        &mut self,
490        txn_id: u16,
491        _unit: UnitIdOrSlaveAddr,
492        address: u16,
493        value: bool,
494    ) {
495        match Coils::new(address, 1).and_then(|mut c| {
496            c.set_value(address, value)?;
497            Ok(c)
498        }) {
499            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
500            Err(err) => self.reject(txn_id, err),
501        }
502    }
503
504    fn write_single_coil_response(
505        &mut self,
506        txn_id: u16,
507        _unit: UnitIdOrSlaveAddr,
508        address: u16,
509        value: bool,
510    ) {
511        match Coils::new(address, 1).and_then(|mut c| {
512            c.set_value(address, value)?;
513            Ok(c)
514        }) {
515            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
516            Err(err) => self.reject(txn_id, err),
517        }
518    }
519
520    fn write_multiple_coils_response(
521        &mut self,
522        txn_id: u16,
523        _unit: UnitIdOrSlaveAddr,
524        address: u16,
525        quantity: u16,
526    ) {
527        match Coils::new(address, quantity) {
528            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
529            Err(err) => self.reject(txn_id, err),
530        }
531    }
532}
533
534#[cfg(feature = "registers")]
535impl RegisterResponse for AsyncApp {
536    fn read_multiple_input_registers_response(
537        &mut self,
538        txn_id: u16,
539        _unit: UnitIdOrSlaveAddr,
540        registers: &Registers,
541    ) {
542        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
543    }
544
545    fn read_single_input_register_response(
546        &mut self,
547        txn_id: u16,
548        _unit: UnitIdOrSlaveAddr,
549        address: u16,
550        value: u16,
551    ) {
552        match Registers::new(address, 1).and_then(|mut r| {
553            r.set_value(address, value)?;
554            Ok(r)
555        }) {
556            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
557            Err(err) => self.reject(txn_id, err),
558        }
559    }
560
561    fn read_multiple_holding_registers_response(
562        &mut self,
563        txn_id: u16,
564        _unit: UnitIdOrSlaveAddr,
565        registers: &Registers,
566    ) {
567        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
568    }
569
570    fn write_single_register_response(
571        &mut self,
572        txn_id: u16,
573        _unit: UnitIdOrSlaveAddr,
574        address: u16,
575        value: u16,
576    ) {
577        self.resolve(
578            txn_id,
579            WorkerResponse::SingleRegisterWrite { address, value },
580        );
581    }
582
583    fn write_multiple_registers_response(
584        &mut self,
585        txn_id: u16,
586        _unit: UnitIdOrSlaveAddr,
587        starting_address: u16,
588        quantity: u16,
589    ) {
590        match Registers::new(starting_address, quantity) {
591            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
592            Err(err) => self.reject(txn_id, err),
593        }
594    }
595
596    fn read_write_multiple_registers_response(
597        &mut self,
598        txn_id: u16,
599        _unit: UnitIdOrSlaveAddr,
600        registers: &Registers,
601    ) {
602        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
603    }
604
605    fn read_single_holding_register_response(
606        &mut self,
607        txn_id: u16,
608        _unit: UnitIdOrSlaveAddr,
609        address: u16,
610        value: u16,
611    ) {
612        match Registers::new(address, 1).and_then(|mut r| {
613            r.set_value(address, value)?;
614            Ok(r)
615        }) {
616            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
617            Err(err) => self.reject(txn_id, err),
618        }
619    }
620
621    fn read_single_register_response(
622        &mut self,
623        txn_id: u16,
624        _unit: UnitIdOrSlaveAddr,
625        address: u16,
626        value: u16,
627    ) {
628        match Registers::new(address, 1).and_then(|mut r| {
629            r.set_value(address, value)?;
630            Ok(r)
631        }) {
632            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
633            Err(err) => self.reject(txn_id, err),
634        }
635    }
636
637    fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
638        self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
639    }
640}
641
642#[cfg(feature = "discrete-inputs")]
643impl DiscreteInputResponse for AsyncApp {
644    fn read_multiple_discrete_inputs_response(
645        &mut self,
646        txn_id: u16,
647        _unit: UnitIdOrSlaveAddr,
648        discrete_inputs: &DiscreteInputs,
649    ) {
650        self.resolve(
651            txn_id,
652            WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
653        );
654    }
655
656    fn read_single_discrete_input_response(
657        &mut self,
658        txn_id: u16,
659        _unit: UnitIdOrSlaveAddr,
660        address: u16,
661        value: bool,
662    ) {
663        let bit = if value { 0b0000_0001 } else { 0 };
664        match DiscreteInputs::new(address, 1).and_then(|d| d.with_values(&[bit], 1)) {
665            Ok(discrete_inputs) => {
666                self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
667            }
668            Err(err) => self.reject(txn_id, err),
669        }
670    }
671}
672
673#[cfg(feature = "fifo")]
674impl FifoQueueResponse for AsyncApp {
675    fn read_fifo_queue_response(
676        &mut self,
677        txn_id: u16,
678        _unit: UnitIdOrSlaveAddr,
679        fifo_queue: &FifoQueue,
680    ) {
681        self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
682    }
683}
684
685#[cfg(feature = "file-record")]
686impl FileRecordResponse for AsyncApp {
687    fn read_file_record_response(
688        &mut self,
689        txn_id: u16,
690        _unit: UnitIdOrSlaveAddr,
691        data: &[SubRequestParams],
692    ) {
693        self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
694    }
695
696    fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
697        self.resolve(txn_id, WorkerResponse::FileRecordWrite);
698    }
699}
700
701#[cfg(feature = "diagnostics")]
702impl DiagnosticsResponse for AsyncApp {
703    fn read_device_identification_response(
704        &mut self,
705        txn_id: u16,
706        _unit: UnitIdOrSlaveAddr,
707        response: &DeviceIdentificationResponse,
708    ) {
709        self.resolve(
710            txn_id,
711            WorkerResponse::DeviceIdentification(response.clone()),
712        );
713    }
714
715    fn encapsulated_interface_transport_response(
716        &mut self,
717        txn_id: u16,
718        _unit: UnitIdOrSlaveAddr,
719        mei_type: EncapsulatedInterfaceType,
720        data: &[u8],
721    ) {
722        self.resolve(
723            txn_id,
724            WorkerResponse::EncapsulatedInterfaceTransport {
725                mei_type,
726                data: data.to_vec(),
727            },
728        );
729    }
730
731    fn read_exception_status_response(
732        &mut self,
733        txn_id: u16,
734        _unit: UnitIdOrSlaveAddr,
735        status: u8,
736    ) {
737        self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
738    }
739
740    fn diagnostics_response(
741        &mut self,
742        txn_id: u16,
743        _unit: UnitIdOrSlaveAddr,
744        sub_function: DiagnosticSubFunction,
745        data: &[u16],
746    ) {
747        self.resolve(
748            txn_id,
749            WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
750                sub_function,
751                data: data.to_vec(),
752            }),
753        );
754    }
755
756    fn get_comm_event_counter_response(
757        &mut self,
758        txn_id: u16,
759        _unit: UnitIdOrSlaveAddr,
760        status: u16,
761        event_count: u16,
762    ) {
763        self.resolve(
764            txn_id,
765            WorkerResponse::CommEventCounter {
766                status,
767                event_count,
768            },
769        );
770    }
771
772    fn get_comm_event_log_response(
773        &mut self,
774        txn_id: u16,
775        _unit: UnitIdOrSlaveAddr,
776        status: u16,
777        event_count: u16,
778        message_count: u16,
779        events: &[u8],
780    ) {
781        self.resolve(
782            txn_id,
783            WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
784        );
785    }
786
787    fn report_server_id_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, data: &[u8]) {
788        self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
789    }
790}
791
792fn register_pending(
793    pending: &PendingStore,
794    txn_id: u16,
795    sender: PendingSender,
796) -> Result<(), MbusError> {
797    let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
798    guard.insert(txn_id, sender);
799    Ok(())
800}
801
802fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
803    if let Ok(mut guard) = pending.lock()
804        && let Some(sender) = guard.remove(&txn_id)
805    {
806        let _ = sender.send(Err(error));
807    }
808}
809
810fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
811    if let Err(err) = result {
812        reject_pending(pending, txn_id, err);
813    }
814}
815
816fn handle_command<TRANSPORT, const N: usize>(
817    client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
818    pending: &PendingStore,
819    command: WorkerCommand,
820) where
821    TRANSPORT: Transport,
822{
823    match command {
824        WorkerCommand::Connect { sender } => {
825            let _ = sender.send(client.connect().map(|_| WorkerResponse::Ack));
826        }
827        #[cfg(feature = "coils")]
828        WorkerCommand::ReadMultipleCoils {
829            txn_id,
830            unit,
831            address,
832            quantity,
833            sender,
834        } => {
835            if register_pending(pending, txn_id, sender).is_ok() {
836                let result = client.read_multiple_coils(txn_id, unit, address, quantity);
837                submit_or_reject(pending, txn_id, result);
838            }
839        }
840        #[cfg(feature = "registers")]
841        WorkerCommand::ReadHoldingRegisters {
842            txn_id,
843            unit,
844            address,
845            quantity,
846            sender,
847        } => {
848            if register_pending(pending, txn_id, sender).is_ok() {
849                let result = client.read_holding_registers(txn_id, unit, address, quantity);
850                submit_or_reject(pending, txn_id, result);
851            }
852        }
853        #[cfg(feature = "registers")]
854        WorkerCommand::ReadInputRegisters {
855            txn_id,
856            unit,
857            address,
858            quantity,
859            sender,
860        } => {
861            if register_pending(pending, txn_id, sender).is_ok() {
862                let result = client.read_input_registers(txn_id, unit, address, quantity);
863                submit_or_reject(pending, txn_id, result);
864            }
865        }
866        #[cfg(feature = "registers")]
867        WorkerCommand::WriteSingleRegister {
868            txn_id,
869            unit,
870            address,
871            value,
872            sender,
873        } => {
874            if register_pending(pending, txn_id, sender).is_ok() {
875                let result = client.write_single_register(txn_id, unit, address, value);
876                submit_or_reject(pending, txn_id, result);
877            }
878        }
879        #[cfg(feature = "coils")]
880        WorkerCommand::WriteSingleCoil {
881            txn_id,
882            unit,
883            address,
884            value,
885            sender,
886        } => {
887            if register_pending(pending, txn_id, sender).is_ok() {
888                let result = client.write_single_coil(txn_id, unit, address, value);
889                submit_or_reject(pending, txn_id, result);
890            }
891        }
892        #[cfg(feature = "coils")]
893        WorkerCommand::WriteMultipleCoils {
894            txn_id,
895            unit,
896            address,
897            coils,
898            sender,
899        } => {
900            if register_pending(pending, txn_id, sender).is_ok() {
901                let result = client.write_multiple_coils(txn_id, unit, address, &coils);
902                submit_or_reject(pending, txn_id, result);
903            }
904        }
905        #[cfg(feature = "registers")]
906        WorkerCommand::WriteMultipleRegisters {
907            txn_id,
908            unit,
909            address,
910            values,
911            sender,
912        } => {
913            if register_pending(pending, txn_id, sender).is_ok() {
914                let result = client.write_multiple_registers(
915                    txn_id,
916                    unit,
917                    address,
918                    values.len() as u16,
919                    &values,
920                );
921                submit_or_reject(pending, txn_id, result);
922            }
923        }
924        #[cfg(feature = "registers")]
925        WorkerCommand::ReadWriteMultipleRegisters {
926            txn_id,
927            unit,
928            read_address,
929            read_quantity,
930            write_address,
931            write_values,
932            sender,
933        } => {
934            if register_pending(pending, txn_id, sender).is_ok() {
935                let result = client.read_write_multiple_registers(
936                    txn_id,
937                    unit,
938                    read_address,
939                    read_quantity,
940                    write_address,
941                    &write_values,
942                );
943                submit_or_reject(pending, txn_id, result);
944            }
945        }
946        #[cfg(feature = "registers")]
947        WorkerCommand::MaskWriteRegister {
948            txn_id,
949            unit,
950            address,
951            and_mask,
952            or_mask,
953            sender,
954        } => {
955            if register_pending(pending, txn_id, sender).is_ok() {
956                let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
957                submit_or_reject(pending, txn_id, result);
958            }
959        }
960        #[cfg(feature = "discrete-inputs")]
961        WorkerCommand::ReadDiscreteInputs {
962            txn_id,
963            unit,
964            address,
965            quantity,
966            sender,
967        } => {
968            if register_pending(pending, txn_id, sender).is_ok() {
969                let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
970                submit_or_reject(pending, txn_id, result);
971            }
972        }
973        #[cfg(feature = "fifo")]
974        WorkerCommand::ReadFifoQueue {
975            txn_id,
976            unit,
977            address,
978            sender,
979        } => {
980            if register_pending(pending, txn_id, sender).is_ok() {
981                let result = client.read_fifo_queue(txn_id, unit, address);
982                submit_or_reject(pending, txn_id, result);
983            }
984        }
985        #[cfg(feature = "file-record")]
986        WorkerCommand::ReadFileRecord {
987            txn_id,
988            unit,
989            sub_request,
990            sender,
991        } => {
992            if register_pending(pending, txn_id, sender).is_ok() {
993                let result = client.read_file_record(txn_id, unit, &sub_request);
994                submit_or_reject(pending, txn_id, result);
995            }
996        }
997        #[cfg(feature = "file-record")]
998        WorkerCommand::WriteFileRecord {
999            txn_id,
1000            unit,
1001            sub_request,
1002            sender,
1003        } => {
1004            if register_pending(pending, txn_id, sender).is_ok() {
1005                let result = client.write_file_record(txn_id, unit, &sub_request);
1006                submit_or_reject(pending, txn_id, result);
1007            }
1008        }
1009        #[cfg(feature = "diagnostics")]
1010        WorkerCommand::ReadDeviceIdentification {
1011            txn_id,
1012            unit,
1013            read_device_id_code,
1014            object_id,
1015            sender,
1016        } => {
1017            if register_pending(pending, txn_id, sender).is_ok() {
1018                let result =
1019                    client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
1020                submit_or_reject(pending, txn_id, result);
1021            }
1022        }
1023        #[cfg(feature = "diagnostics")]
1024        WorkerCommand::EncapsulatedInterfaceTransport {
1025            txn_id,
1026            unit,
1027            mei_type,
1028            data,
1029            sender,
1030        } => {
1031            if register_pending(pending, txn_id, sender).is_ok() {
1032                let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
1033                submit_or_reject(pending, txn_id, result);
1034            }
1035        }
1036        #[cfg(feature = "diagnostics")]
1037        WorkerCommand::ReadExceptionStatus {
1038            txn_id,
1039            unit,
1040            sender,
1041        } => {
1042            if register_pending(pending, txn_id, sender).is_ok() {
1043                let result = client.read_exception_status(txn_id, unit);
1044                submit_or_reject(pending, txn_id, result);
1045            }
1046        }
1047        #[cfg(feature = "diagnostics")]
1048        WorkerCommand::Diagnostics {
1049            txn_id,
1050            unit,
1051            sub_function,
1052            data,
1053            sender,
1054        } => {
1055            if register_pending(pending, txn_id, sender).is_ok() {
1056                let result = client.diagnostics(txn_id, unit, sub_function, &data);
1057                submit_or_reject(pending, txn_id, result);
1058            }
1059        }
1060        #[cfg(feature = "diagnostics")]
1061        WorkerCommand::GetCommEventCounter {
1062            txn_id,
1063            unit,
1064            sender,
1065        } => {
1066            if register_pending(pending, txn_id, sender).is_ok() {
1067                let result = client.get_comm_event_counter(txn_id, unit);
1068                submit_or_reject(pending, txn_id, result);
1069            }
1070        }
1071        #[cfg(feature = "diagnostics")]
1072        WorkerCommand::GetCommEventLog {
1073            txn_id,
1074            unit,
1075            sender,
1076        } => {
1077            if register_pending(pending, txn_id, sender).is_ok() {
1078                let result = client.get_comm_event_log(txn_id, unit);
1079                submit_or_reject(pending, txn_id, result);
1080            }
1081        }
1082        #[cfg(feature = "diagnostics")]
1083        WorkerCommand::ReportServerId {
1084            txn_id,
1085            unit,
1086            sender,
1087        } => {
1088            if register_pending(pending, txn_id, sender).is_ok() {
1089                let result = client.report_server_id(txn_id, unit);
1090                submit_or_reject(pending, txn_id, result);
1091            }
1092        }
1093        WorkerCommand::Shutdown => {}
1094    }
1095}
1096
1097fn run_worker<TRANSPORT, const N: usize>(
1098    mut client: ClientServices<TRANSPORT, AsyncApp, N>,
1099    pending: PendingStore,
1100    receiver: Receiver<WorkerCommand>,
1101    poll_interval: Duration,
1102) where
1103    TRANSPORT: Transport,
1104{
1105    loop {
1106        // Wait briefly for work first so we do not poll the transport before
1107        // any request has been registered (important for deterministic tests
1108        // with preloaded mock responses).
1109        match receiver.recv_timeout(poll_interval) {
1110            Ok(WorkerCommand::Shutdown) => return,
1111            Ok(command) => {
1112                handle_command(&mut client, &pending, command);
1113                loop {
1114                    match receiver.try_recv() {
1115                        Ok(WorkerCommand::Shutdown) => return,
1116                        Ok(command) => handle_command(&mut client, &pending, command),
1117                        Err(mpsc::TryRecvError::Empty) => break,
1118                        Err(mpsc::TryRecvError::Disconnected) => return,
1119                    }
1120                }
1121            }
1122            Err(mpsc::RecvTimeoutError::Timeout) => {}
1123            Err(mpsc::RecvTimeoutError::Disconnected) => return,
1124        }
1125
1126        let has_pending = pending
1127            .lock()
1128            .map(|store| !store.is_empty())
1129            .unwrap_or(false);
1130
1131        if client.is_connected() && has_pending {
1132            client.poll();
1133        }
1134    }
1135}
1136
1137// ── Submodules ───────────────────────────────────────────────────────────────
1138
1139mod client_core;
1140mod network_client;
1141mod serial_client;
1142
1143pub(crate) use client_core::AsyncClientCore;
1144pub use network_client::AsyncTcpClient;
1145pub use serial_client::AsyncSerialClient;
1146
1147#[cfg(all(test, feature = "traffic"))]
1148mod tests {
1149    use super::*;
1150
1151    #[test]
1152    fn test_async_app_emits_traffic_event_to_channel() {
1153        let pending = Arc::new(Mutex::new(HashMap::new()));
1154        let (traffic_sender, traffic_receiver) = mpsc::channel();
1155
1156        let mut app = AsyncApp {
1157            pending,
1158            traffic_sender,
1159        };
1160
1161        let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1162        app.on_tx_frame(42, unit, &[0xAA, 0x55]);
1163
1164        let event = traffic_receiver
1165            .recv_timeout(Duration::from_millis(100))
1166            .unwrap();
1167        assert_eq!(event.direction, TrafficDirection::Tx);
1168        assert_eq!(event.txn_id, 42);
1169        assert_eq!(event.unit_id_slave_addr, unit);
1170        assert_eq!(event.frame, vec![0xAA, 0x55]);
1171        assert_eq!(event.error, None);
1172    }
1173
1174    #[test]
1175    fn test_async_app_emits_traffic_error_event_to_channel() {
1176        let pending = Arc::new(Mutex::new(HashMap::new()));
1177        let (traffic_sender, traffic_receiver) = mpsc::channel();
1178
1179        let mut app = AsyncApp {
1180            pending,
1181            traffic_sender,
1182        };
1183
1184        let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1185        app.on_rx_error(77, unit, MbusError::ChecksumError, &[0xAB]);
1186
1187        let event = traffic_receiver
1188            .recv_timeout(Duration::from_millis(100))
1189            .unwrap();
1190        assert_eq!(event.direction, TrafficDirection::Rx);
1191        assert_eq!(event.txn_id, 77);
1192        assert_eq!(event.unit_id_slave_addr, unit);
1193        assert_eq!(event.frame, vec![0xAB]);
1194        assert_eq!(event.error, Some(MbusError::ChecksumError));
1195    }
1196
1197    #[test]
1198    fn test_async_client_core_set_and_clear_traffic_handler() {
1199        let (sender, _receiver) = mpsc::channel();
1200        let traffic_handler: TrafficHandlerStore = Arc::new(Mutex::new(None));
1201        let core = AsyncClientCore::new(sender, traffic_handler.clone());
1202
1203        core.set_traffic_handler(|_evt| {});
1204        assert!(traffic_handler.lock().unwrap().is_some());
1205
1206        core.clear_traffic_handler();
1207        assert!(traffic_handler.lock().unwrap().is_none());
1208    }
1209}