Skip to main content

mbus_async/runtime/
mod.rs

1//! Internal runtime for the async facade.
2//!
3//! This module contains the worker-thread bridge between async callers and the
4//! synchronous `ClientServices` state machine.
5//!
6//! Public entry points are re-exported from the crate root:
7//! - [`AsyncTcpClient`] (TCP)
8//! - [`AsyncSerialClient`] (RTU/ASCII)
9//!
10//! Internal/shared building blocks:
11//! - [`AsyncClientCore`] stores worker channel state and implements request methods.
12//! - [`WorkerCommand`] and [`WorkerResponse`] carry typed request/response payloads.
13//! - [`run_worker`] drives polling and response routing.
14
15use std::collections::HashMap;
16#[cfg(feature = "traffic")]
17use std::panic::{AssertUnwindSafe, catch_unwind};
18use std::sync::atomic::{AtomicU16, Ordering};
19use std::sync::mpsc::{Receiver, Sender};
20use std::sync::{Arc, Mutex, mpsc};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::app::RequestErrorNotifier;
37#[cfg(feature = "traffic")]
38use mbus_client::app::{TrafficDirection, TrafficNotifier};
39use mbus_client::services::ClientServices;
40#[cfg(feature = "coils")]
41use mbus_client::services::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_client::services::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_client::services::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_client::services::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_client::services::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "registers")]
51use mbus_client::services::register::Registers;
52use mbus_core::errors::MbusError;
53#[cfg(feature = "diagnostics")]
54use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
55#[cfg(feature = "tcp")]
56use mbus_core::transport::ModbusTcpConfig;
57use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
58#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
59use mbus_core::transport::{ModbusSerialConfig, SerialMode};
60#[cfg(feature = "tcp")]
61use mbus_network::StdTcpTransport;
62#[cfg(feature = "serial-ascii")]
63use mbus_serial::StdAsciiTransport;
64#[cfg(feature = "serial-rtu")]
65use mbus_serial::StdRtuTransport;
66#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
67use mbus_serial::StdSerialTransport;
68use tokio::sync::oneshot;
69
70#[cfg(feature = "diagnostics")]
71/// Diagnostics response payload returned by FC 08.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct DiagnosticsDataResponse {
74    /// Echoed diagnostic sub-function code.
75    pub sub_function: DiagnosticSubFunction,
76    /// Echoed diagnostic data words.
77    pub data: Vec<u16>,
78}
79#[cfg(feature = "diagnostics")]
80/// Communication event log payload `(status, event_count, message_count, events)` returned by FC 12.
81pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
82
83/// Async facade error type.
84#[derive(Debug, PartialEq, Eq)]
85pub enum AsyncError {
86    /// Error propagated from the underlying Modbus client stack.
87    Mbus(MbusError),
88    /// Background worker channel is closed or worker thread has stopped.
89    WorkerClosed,
90    /// Internal response routing mismatch between request and callback payload type.
91    UnexpectedResponseType,
92}
93
94impl From<MbusError> for AsyncError {
95    fn from(value: MbusError) -> Self {
96        Self::Mbus(value)
97    }
98}
99
100impl std::fmt::Display for AsyncError {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        match self {
103            Self::Mbus(err) => write!(f, "Modbus error: {err}"),
104            Self::WorkerClosed => write!(f, "async worker channel closed"),
105            Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
106        }
107    }
108}
109
110impl std::error::Error for AsyncError {
111    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112        match self {
113            Self::Mbus(err) => Some(err),
114            _ => None,
115        }
116    }
117}
118
119type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
120type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
121#[cfg(feature = "traffic")]
122type TrafficHandler = Box<dyn FnMut(&TrafficEvent) + Send + 'static>;
123#[cfg(feature = "traffic")]
124type TrafficHandlerStore = Arc<Mutex<Option<TrafficHandler>>>;
125#[cfg(feature = "traffic")]
126type TrafficSender = Sender<TrafficEvent>;
127
128#[cfg(feature = "traffic")]
129/// Async traffic event emitted from the worker thread.
130#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct TrafficEvent {
132    /// Outbound or inbound frame direction.
133    pub direction: TrafficDirection,
134    /// Transaction identifier associated with the request lifecycle.
135    pub txn_id: u16,
136    /// Unit id (TCP) or slave address (Serial) for this frame.
137    pub unit_id_slave_addr: UnitIdOrSlaveAddr,
138    /// Raw ADU bytes observed on the wire.
139    pub frame: Vec<u8>,
140    /// Error details when the traffic event corresponds to a failed TX/RX path.
141    pub error: Option<MbusError>,
142}
143
144enum WorkerCommand {
145    Connect {
146        sender: PendingSender,
147    },
148    HasPendingRequests {
149        sender: PendingSender,
150    },
151    #[cfg(feature = "coils")]
152    ReadMultipleCoils {
153        txn_id: u16,
154        unit: UnitIdOrSlaveAddr,
155        address: u16,
156        quantity: u16,
157        sender: PendingSender,
158    },
159    #[cfg(feature = "registers")]
160    ReadHoldingRegisters {
161        txn_id: u16,
162        unit: UnitIdOrSlaveAddr,
163        address: u16,
164        quantity: u16,
165        sender: PendingSender,
166    },
167    #[cfg(feature = "registers")]
168    ReadInputRegisters {
169        txn_id: u16,
170        unit: UnitIdOrSlaveAddr,
171        address: u16,
172        quantity: u16,
173        sender: PendingSender,
174    },
175    #[cfg(feature = "registers")]
176    WriteSingleRegister {
177        txn_id: u16,
178        unit: UnitIdOrSlaveAddr,
179        address: u16,
180        value: u16,
181        sender: PendingSender,
182    },
183    #[cfg(feature = "coils")]
184    WriteSingleCoil {
185        txn_id: u16,
186        unit: UnitIdOrSlaveAddr,
187        address: u16,
188        value: bool,
189        sender: PendingSender,
190    },
191    #[cfg(feature = "coils")]
192    WriteMultipleCoils {
193        txn_id: u16,
194        unit: UnitIdOrSlaveAddr,
195        address: u16,
196        coils: Coils,
197        sender: PendingSender,
198    },
199    #[cfg(feature = "registers")]
200    WriteMultipleRegisters {
201        txn_id: u16,
202        unit: UnitIdOrSlaveAddr,
203        address: u16,
204        values: Vec<u16>,
205        sender: PendingSender,
206    },
207    #[cfg(feature = "registers")]
208    ReadWriteMultipleRegisters {
209        txn_id: u16,
210        unit: UnitIdOrSlaveAddr,
211        read_address: u16,
212        read_quantity: u16,
213        write_address: u16,
214        write_values: Vec<u16>,
215        sender: PendingSender,
216    },
217    #[cfg(feature = "registers")]
218    MaskWriteRegister {
219        txn_id: u16,
220        unit: UnitIdOrSlaveAddr,
221        address: u16,
222        and_mask: u16,
223        or_mask: u16,
224        sender: PendingSender,
225    },
226    #[cfg(feature = "discrete-inputs")]
227    ReadDiscreteInputs {
228        txn_id: u16,
229        unit: UnitIdOrSlaveAddr,
230        address: u16,
231        quantity: u16,
232        sender: PendingSender,
233    },
234    #[cfg(feature = "fifo")]
235    ReadFifoQueue {
236        txn_id: u16,
237        unit: UnitIdOrSlaveAddr,
238        address: u16,
239        sender: PendingSender,
240    },
241    #[cfg(feature = "file-record")]
242    ReadFileRecord {
243        txn_id: u16,
244        unit: UnitIdOrSlaveAddr,
245        sub_request: SubRequest,
246        sender: PendingSender,
247    },
248    #[cfg(feature = "file-record")]
249    WriteFileRecord {
250        txn_id: u16,
251        unit: UnitIdOrSlaveAddr,
252        sub_request: SubRequest,
253        sender: PendingSender,
254    },
255    #[cfg(feature = "diagnostics")]
256    ReadDeviceIdentification {
257        txn_id: u16,
258        unit: UnitIdOrSlaveAddr,
259        read_device_id_code: ReadDeviceIdCode,
260        object_id: ObjectId,
261        sender: PendingSender,
262    },
263    #[cfg(feature = "diagnostics")]
264    EncapsulatedInterfaceTransport {
265        txn_id: u16,
266        unit: UnitIdOrSlaveAddr,
267        mei_type: EncapsulatedInterfaceType,
268        data: Vec<u8>,
269        sender: PendingSender,
270    },
271    #[cfg(feature = "diagnostics")]
272    ReadExceptionStatus {
273        txn_id: u16,
274        unit: UnitIdOrSlaveAddr,
275        sender: PendingSender,
276    },
277    #[cfg(feature = "diagnostics")]
278    Diagnostics {
279        txn_id: u16,
280        unit: UnitIdOrSlaveAddr,
281        sub_function: DiagnosticSubFunction,
282        data: Vec<u16>,
283        sender: PendingSender,
284    },
285    #[cfg(feature = "diagnostics")]
286    GetCommEventCounter {
287        txn_id: u16,
288        unit: UnitIdOrSlaveAddr,
289        sender: PendingSender,
290    },
291    #[cfg(feature = "diagnostics")]
292    GetCommEventLog {
293        txn_id: u16,
294        unit: UnitIdOrSlaveAddr,
295        sender: PendingSender,
296    },
297    #[cfg(feature = "diagnostics")]
298    ReportServerId {
299        txn_id: u16,
300        unit: UnitIdOrSlaveAddr,
301        sender: PendingSender,
302    },
303    Shutdown,
304}
305
306enum WorkerResponse {
307    Ack,
308    HasPendingRequests(bool),
309    #[cfg(feature = "coils")]
310    Coils(Coils),
311    #[cfg(feature = "registers")]
312    Registers(Registers),
313    #[cfg(feature = "registers")]
314    SingleRegisterWrite {
315        address: u16,
316        value: u16,
317    },
318    #[cfg(feature = "registers")]
319    MaskWriteRegister,
320    #[cfg(feature = "discrete-inputs")]
321    DiscreteInputs(DiscreteInputs),
322    #[cfg(feature = "fifo")]
323    FifoQueue(FifoQueue),
324    #[cfg(feature = "file-record")]
325    FileRecordRead(Vec<SubRequestParams>),
326    #[cfg(feature = "file-record")]
327    FileRecordWrite,
328    #[cfg(feature = "diagnostics")]
329    DeviceIdentification(DeviceIdentificationResponse),
330    #[cfg(feature = "diagnostics")]
331    EncapsulatedInterfaceTransport {
332        mei_type: EncapsulatedInterfaceType,
333        data: Vec<u8>,
334    },
335    #[cfg(feature = "diagnostics")]
336    ExceptionStatus(u8),
337    #[cfg(feature = "diagnostics")]
338    DiagnosticsData(DiagnosticsDataResponse),
339    #[cfg(feature = "diagnostics")]
340    CommEventCounter {
341        status: u16,
342        event_count: u16,
343    },
344    #[cfg(feature = "diagnostics")]
345    CommEventLog(CommEventLogResponse),
346    #[cfg(feature = "diagnostics")]
347    ReportServerId(Vec<u8>),
348}
349
350struct AsyncApp {
351    pending: PendingStore,
352    #[cfg(feature = "traffic")]
353    traffic_sender: TrafficSender,
354}
355
356impl AsyncApp {
357    fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
358        if let Ok(mut pending) = self.pending.lock()
359            && let Some(sender) = pending.remove(&txn_id)
360        {
361            let _ = sender.send(response);
362        }
363    }
364
365    fn resolve(&self, txn_id: u16, response: WorkerResponse) {
366        self.complete(txn_id, Ok(response));
367    }
368
369    fn reject(&self, txn_id: u16, error: MbusError) {
370        self.complete(txn_id, Err(error));
371    }
372
373    #[cfg(feature = "traffic")]
374    fn emit_traffic(
375        &self,
376        direction: TrafficDirection,
377        txn_id: u16,
378        unit_id_slave_addr: UnitIdOrSlaveAddr,
379        error: Option<MbusError>,
380        frame_bytes: &[u8],
381    ) {
382        let event = TrafficEvent {
383            direction,
384            txn_id,
385            unit_id_slave_addr,
386            frame: frame_bytes.to_vec(),
387            error,
388        };
389
390        let _ = self.traffic_sender.send(event);
391    }
392}
393
394impl TimeKeeper for AsyncApp {
395    fn current_millis(&self) -> u64 {
396        SystemTime::now()
397            .duration_since(UNIX_EPOCH)
398            .map(|d| d.as_millis() as u64)
399            .unwrap_or(0)
400    }
401}
402
403impl RequestErrorNotifier for AsyncApp {
404    fn request_failed(
405        &mut self,
406        txn_id: u16,
407        _unit_id_slave_addr: UnitIdOrSlaveAddr,
408        error: MbusError,
409    ) {
410        self.reject(txn_id, error);
411    }
412}
413
414#[cfg(feature = "traffic")]
415impl TrafficNotifier for AsyncApp {
416    fn on_tx_frame(
417        &mut self,
418        txn_id: u16,
419        unit_id_slave_addr: UnitIdOrSlaveAddr,
420        frame_bytes: &[u8],
421    ) {
422        self.emit_traffic(
423            TrafficDirection::Tx,
424            txn_id,
425            unit_id_slave_addr,
426            None,
427            frame_bytes,
428        );
429    }
430
431    fn on_rx_frame(
432        &mut self,
433        txn_id: u16,
434        unit_id_slave_addr: UnitIdOrSlaveAddr,
435        frame_bytes: &[u8],
436    ) {
437        self.emit_traffic(
438            TrafficDirection::Rx,
439            txn_id,
440            unit_id_slave_addr,
441            None,
442            frame_bytes,
443        );
444    }
445
446    fn on_tx_error(
447        &mut self,
448        txn_id: u16,
449        unit_id_slave_addr: UnitIdOrSlaveAddr,
450        error: MbusError,
451        frame_bytes: &[u8],
452    ) {
453        self.emit_traffic(
454            TrafficDirection::Tx,
455            txn_id,
456            unit_id_slave_addr,
457            Some(error),
458            frame_bytes,
459        );
460    }
461
462    fn on_rx_error(
463        &mut self,
464        txn_id: u16,
465        unit_id_slave_addr: UnitIdOrSlaveAddr,
466        error: MbusError,
467        frame_bytes: &[u8],
468    ) {
469        self.emit_traffic(
470            TrafficDirection::Rx,
471            txn_id,
472            unit_id_slave_addr,
473            Some(error),
474            frame_bytes,
475        );
476    }
477}
478
479#[cfg(feature = "traffic")]
480fn run_traffic_dispatcher(receiver: Receiver<TrafficEvent>, traffic_handler: TrafficHandlerStore) {
481    while let Ok(event) = receiver.recv() {
482        if let Ok(mut handler_slot) = traffic_handler.lock()
483            && let Some(handler) = handler_slot.as_mut()
484        {
485            let _ = catch_unwind(AssertUnwindSafe(|| handler(&event)));
486        }
487    }
488}
489
490#[cfg(feature = "coils")]
491impl CoilResponse for AsyncApp {
492    fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
493        self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
494    }
495
496    fn read_single_coil_response(
497        &mut self,
498        txn_id: u16,
499        _unit: UnitIdOrSlaveAddr,
500        address: u16,
501        value: bool,
502    ) {
503        match Coils::new(address, 1).and_then(|mut c| {
504            c.set_value(address, value)?;
505            Ok(c)
506        }) {
507            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
508            Err(err) => self.reject(txn_id, err),
509        }
510    }
511
512    fn write_single_coil_response(
513        &mut self,
514        txn_id: u16,
515        _unit: UnitIdOrSlaveAddr,
516        address: u16,
517        value: bool,
518    ) {
519        match Coils::new(address, 1).and_then(|mut c| {
520            c.set_value(address, value)?;
521            Ok(c)
522        }) {
523            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
524            Err(err) => self.reject(txn_id, err),
525        }
526    }
527
528    fn write_multiple_coils_response(
529        &mut self,
530        txn_id: u16,
531        _unit: UnitIdOrSlaveAddr,
532        address: u16,
533        quantity: u16,
534    ) {
535        match Coils::new(address, quantity) {
536            Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
537            Err(err) => self.reject(txn_id, err),
538        }
539    }
540}
541
542#[cfg(feature = "registers")]
543impl RegisterResponse for AsyncApp {
544    fn read_multiple_input_registers_response(
545        &mut self,
546        txn_id: u16,
547        _unit: UnitIdOrSlaveAddr,
548        registers: &Registers,
549    ) {
550        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
551    }
552
553    fn read_single_input_register_response(
554        &mut self,
555        txn_id: u16,
556        _unit: UnitIdOrSlaveAddr,
557        address: u16,
558        value: u16,
559    ) {
560        match Registers::new(address, 1).and_then(|mut r| {
561            r.set_value(address, value)?;
562            Ok(r)
563        }) {
564            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
565            Err(err) => self.reject(txn_id, err),
566        }
567    }
568
569    fn read_multiple_holding_registers_response(
570        &mut self,
571        txn_id: u16,
572        _unit: UnitIdOrSlaveAddr,
573        registers: &Registers,
574    ) {
575        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
576    }
577
578    fn write_single_register_response(
579        &mut self,
580        txn_id: u16,
581        _unit: UnitIdOrSlaveAddr,
582        address: u16,
583        value: u16,
584    ) {
585        self.resolve(
586            txn_id,
587            WorkerResponse::SingleRegisterWrite { address, value },
588        );
589    }
590
591    fn write_multiple_registers_response(
592        &mut self,
593        txn_id: u16,
594        _unit: UnitIdOrSlaveAddr,
595        starting_address: u16,
596        quantity: u16,
597    ) {
598        match Registers::new(starting_address, quantity) {
599            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
600            Err(err) => self.reject(txn_id, err),
601        }
602    }
603
604    fn read_write_multiple_registers_response(
605        &mut self,
606        txn_id: u16,
607        _unit: UnitIdOrSlaveAddr,
608        registers: &Registers,
609    ) {
610        self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
611    }
612
613    fn read_single_holding_register_response(
614        &mut self,
615        txn_id: u16,
616        _unit: UnitIdOrSlaveAddr,
617        address: u16,
618        value: u16,
619    ) {
620        match Registers::new(address, 1).and_then(|mut r| {
621            r.set_value(address, value)?;
622            Ok(r)
623        }) {
624            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
625            Err(err) => self.reject(txn_id, err),
626        }
627    }
628
629    fn read_single_register_response(
630        &mut self,
631        txn_id: u16,
632        _unit: UnitIdOrSlaveAddr,
633        address: u16,
634        value: u16,
635    ) {
636        match Registers::new(address, 1).and_then(|mut r| {
637            r.set_value(address, value)?;
638            Ok(r)
639        }) {
640            Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
641            Err(err) => self.reject(txn_id, err),
642        }
643    }
644
645    fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
646        self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
647    }
648}
649
650#[cfg(feature = "discrete-inputs")]
651impl DiscreteInputResponse for AsyncApp {
652    fn read_multiple_discrete_inputs_response(
653        &mut self,
654        txn_id: u16,
655        _unit: UnitIdOrSlaveAddr,
656        discrete_inputs: &DiscreteInputs,
657    ) {
658        self.resolve(
659            txn_id,
660            WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
661        );
662    }
663
664    fn read_single_discrete_input_response(
665        &mut self,
666        txn_id: u16,
667        _unit: UnitIdOrSlaveAddr,
668        address: u16,
669        value: bool,
670    ) {
671        let bit = if value { 0b0000_0001 } else { 0 };
672        match DiscreteInputs::new(address, 1).and_then(|d| d.with_values(&[bit], 1)) {
673            Ok(discrete_inputs) => {
674                self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
675            }
676            Err(err) => self.reject(txn_id, err),
677        }
678    }
679}
680
681#[cfg(feature = "fifo")]
682impl FifoQueueResponse for AsyncApp {
683    fn read_fifo_queue_response(
684        &mut self,
685        txn_id: u16,
686        _unit: UnitIdOrSlaveAddr,
687        fifo_queue: &FifoQueue,
688    ) {
689        self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
690    }
691}
692
693#[cfg(feature = "file-record")]
694impl FileRecordResponse for AsyncApp {
695    fn read_file_record_response(
696        &mut self,
697        txn_id: u16,
698        _unit: UnitIdOrSlaveAddr,
699        data: &[SubRequestParams],
700    ) {
701        self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
702    }
703
704    fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
705        self.resolve(txn_id, WorkerResponse::FileRecordWrite);
706    }
707}
708
709#[cfg(feature = "diagnostics")]
710impl DiagnosticsResponse for AsyncApp {
711    fn read_device_identification_response(
712        &mut self,
713        txn_id: u16,
714        _unit: UnitIdOrSlaveAddr,
715        response: &DeviceIdentificationResponse,
716    ) {
717        self.resolve(
718            txn_id,
719            WorkerResponse::DeviceIdentification(response.clone()),
720        );
721    }
722
723    fn encapsulated_interface_transport_response(
724        &mut self,
725        txn_id: u16,
726        _unit: UnitIdOrSlaveAddr,
727        mei_type: EncapsulatedInterfaceType,
728        data: &[u8],
729    ) {
730        self.resolve(
731            txn_id,
732            WorkerResponse::EncapsulatedInterfaceTransport {
733                mei_type,
734                data: data.to_vec(),
735            },
736        );
737    }
738
739    fn read_exception_status_response(
740        &mut self,
741        txn_id: u16,
742        _unit: UnitIdOrSlaveAddr,
743        status: u8,
744    ) {
745        self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
746    }
747
748    fn diagnostics_response(
749        &mut self,
750        txn_id: u16,
751        _unit: UnitIdOrSlaveAddr,
752        sub_function: DiagnosticSubFunction,
753        data: &[u16],
754    ) {
755        self.resolve(
756            txn_id,
757            WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
758                sub_function,
759                data: data.to_vec(),
760            }),
761        );
762    }
763
764    fn get_comm_event_counter_response(
765        &mut self,
766        txn_id: u16,
767        _unit: UnitIdOrSlaveAddr,
768        status: u16,
769        event_count: u16,
770    ) {
771        self.resolve(
772            txn_id,
773            WorkerResponse::CommEventCounter {
774                status,
775                event_count,
776            },
777        );
778    }
779
780    fn get_comm_event_log_response(
781        &mut self,
782        txn_id: u16,
783        _unit: UnitIdOrSlaveAddr,
784        status: u16,
785        event_count: u16,
786        message_count: u16,
787        events: &[u8],
788    ) {
789        self.resolve(
790            txn_id,
791            WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
792        );
793    }
794
795    fn report_server_id_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, data: &[u8]) {
796        self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
797    }
798}
799
800fn register_pending(
801    pending: &PendingStore,
802    txn_id: u16,
803    sender: PendingSender,
804) -> Result<(), MbusError> {
805    let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
806    guard.insert(txn_id, sender);
807    Ok(())
808}
809
810fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
811    if let Ok(mut guard) = pending.lock()
812        && let Some(sender) = guard.remove(&txn_id)
813    {
814        let _ = sender.send(Err(error));
815    }
816}
817
818fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
819    if let Err(err) = result {
820        reject_pending(pending, txn_id, err);
821    }
822}
823
824fn handle_command<TRANSPORT, const N: usize>(
825    client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
826    pending: &PendingStore,
827    command: WorkerCommand,
828) where
829    TRANSPORT: Transport,
830{
831    match command {
832        WorkerCommand::Connect { sender } => {
833            let _ = sender.send(client.connect().map(|_| WorkerResponse::Ack));
834        }
835        WorkerCommand::HasPendingRequests { sender } => {
836            let _ = sender.send(Ok(WorkerResponse::HasPendingRequests(
837                client.has_pending_requests(),
838            )));
839        }
840        #[cfg(feature = "coils")]
841        WorkerCommand::ReadMultipleCoils {
842            txn_id,
843            unit,
844            address,
845            quantity,
846            sender,
847        } => {
848            if register_pending(pending, txn_id, sender).is_ok() {
849                let result = client.read_multiple_coils(txn_id, unit, address, quantity);
850                submit_or_reject(pending, txn_id, result);
851            }
852        }
853        #[cfg(feature = "registers")]
854        WorkerCommand::ReadHoldingRegisters {
855            txn_id,
856            unit,
857            address,
858            quantity,
859            sender,
860        } => {
861            if register_pending(pending, txn_id, sender).is_ok() {
862                let result = client.read_holding_registers(txn_id, unit, address, quantity);
863                submit_or_reject(pending, txn_id, result);
864            }
865        }
866        #[cfg(feature = "registers")]
867        WorkerCommand::ReadInputRegisters {
868            txn_id,
869            unit,
870            address,
871            quantity,
872            sender,
873        } => {
874            if register_pending(pending, txn_id, sender).is_ok() {
875                let result = client.read_input_registers(txn_id, unit, address, quantity);
876                submit_or_reject(pending, txn_id, result);
877            }
878        }
879        #[cfg(feature = "registers")]
880        WorkerCommand::WriteSingleRegister {
881            txn_id,
882            unit,
883            address,
884            value,
885            sender,
886        } => {
887            if register_pending(pending, txn_id, sender).is_ok() {
888                let result = client.write_single_register(txn_id, unit, address, value);
889                submit_or_reject(pending, txn_id, result);
890            }
891        }
892        #[cfg(feature = "coils")]
893        WorkerCommand::WriteSingleCoil {
894            txn_id,
895            unit,
896            address,
897            value,
898            sender,
899        } => {
900            if register_pending(pending, txn_id, sender).is_ok() {
901                let result = client.write_single_coil(txn_id, unit, address, value);
902                submit_or_reject(pending, txn_id, result);
903            }
904        }
905        #[cfg(feature = "coils")]
906        WorkerCommand::WriteMultipleCoils {
907            txn_id,
908            unit,
909            address,
910            coils,
911            sender,
912        } => {
913            if register_pending(pending, txn_id, sender).is_ok() {
914                let result = client.write_multiple_coils(txn_id, unit, address, &coils);
915                submit_or_reject(pending, txn_id, result);
916            }
917        }
918        #[cfg(feature = "registers")]
919        WorkerCommand::WriteMultipleRegisters {
920            txn_id,
921            unit,
922            address,
923            values,
924            sender,
925        } => {
926            if register_pending(pending, txn_id, sender).is_ok() {
927                let result = client.write_multiple_registers(
928                    txn_id,
929                    unit,
930                    address,
931                    values.len() as u16,
932                    &values,
933                );
934                submit_or_reject(pending, txn_id, result);
935            }
936        }
937        #[cfg(feature = "registers")]
938        WorkerCommand::ReadWriteMultipleRegisters {
939            txn_id,
940            unit,
941            read_address,
942            read_quantity,
943            write_address,
944            write_values,
945            sender,
946        } => {
947            if register_pending(pending, txn_id, sender).is_ok() {
948                let result = client.read_write_multiple_registers(
949                    txn_id,
950                    unit,
951                    read_address,
952                    read_quantity,
953                    write_address,
954                    &write_values,
955                );
956                submit_or_reject(pending, txn_id, result);
957            }
958        }
959        #[cfg(feature = "registers")]
960        WorkerCommand::MaskWriteRegister {
961            txn_id,
962            unit,
963            address,
964            and_mask,
965            or_mask,
966            sender,
967        } => {
968            if register_pending(pending, txn_id, sender).is_ok() {
969                let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
970                submit_or_reject(pending, txn_id, result);
971            }
972        }
973        #[cfg(feature = "discrete-inputs")]
974        WorkerCommand::ReadDiscreteInputs {
975            txn_id,
976            unit,
977            address,
978            quantity,
979            sender,
980        } => {
981            if register_pending(pending, txn_id, sender).is_ok() {
982                let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
983                submit_or_reject(pending, txn_id, result);
984            }
985        }
986        #[cfg(feature = "fifo")]
987        WorkerCommand::ReadFifoQueue {
988            txn_id,
989            unit,
990            address,
991            sender,
992        } => {
993            if register_pending(pending, txn_id, sender).is_ok() {
994                let result = client.read_fifo_queue(txn_id, unit, address);
995                submit_or_reject(pending, txn_id, result);
996            }
997        }
998        #[cfg(feature = "file-record")]
999        WorkerCommand::ReadFileRecord {
1000            txn_id,
1001            unit,
1002            sub_request,
1003            sender,
1004        } => {
1005            if register_pending(pending, txn_id, sender).is_ok() {
1006                let result = client.read_file_record(txn_id, unit, &sub_request);
1007                submit_or_reject(pending, txn_id, result);
1008            }
1009        }
1010        #[cfg(feature = "file-record")]
1011        WorkerCommand::WriteFileRecord {
1012            txn_id,
1013            unit,
1014            sub_request,
1015            sender,
1016        } => {
1017            if register_pending(pending, txn_id, sender).is_ok() {
1018                let result = client.write_file_record(txn_id, unit, &sub_request);
1019                submit_or_reject(pending, txn_id, result);
1020            }
1021        }
1022        #[cfg(feature = "diagnostics")]
1023        WorkerCommand::ReadDeviceIdentification {
1024            txn_id,
1025            unit,
1026            read_device_id_code,
1027            object_id,
1028            sender,
1029        } => {
1030            if register_pending(pending, txn_id, sender).is_ok() {
1031                let result =
1032                    client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
1033                submit_or_reject(pending, txn_id, result);
1034            }
1035        }
1036        #[cfg(feature = "diagnostics")]
1037        WorkerCommand::EncapsulatedInterfaceTransport {
1038            txn_id,
1039            unit,
1040            mei_type,
1041            data,
1042            sender,
1043        } => {
1044            if register_pending(pending, txn_id, sender).is_ok() {
1045                let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
1046                submit_or_reject(pending, txn_id, result);
1047            }
1048        }
1049        #[cfg(feature = "diagnostics")]
1050        WorkerCommand::ReadExceptionStatus {
1051            txn_id,
1052            unit,
1053            sender,
1054        } => {
1055            if register_pending(pending, txn_id, sender).is_ok() {
1056                let result = client.read_exception_status(txn_id, unit);
1057                submit_or_reject(pending, txn_id, result);
1058            }
1059        }
1060        #[cfg(feature = "diagnostics")]
1061        WorkerCommand::Diagnostics {
1062            txn_id,
1063            unit,
1064            sub_function,
1065            data,
1066            sender,
1067        } => {
1068            if register_pending(pending, txn_id, sender).is_ok() {
1069                let result = client.diagnostics(txn_id, unit, sub_function, &data);
1070                submit_or_reject(pending, txn_id, result);
1071            }
1072        }
1073        #[cfg(feature = "diagnostics")]
1074        WorkerCommand::GetCommEventCounter {
1075            txn_id,
1076            unit,
1077            sender,
1078        } => {
1079            if register_pending(pending, txn_id, sender).is_ok() {
1080                let result = client.get_comm_event_counter(txn_id, unit);
1081                submit_or_reject(pending, txn_id, result);
1082            }
1083        }
1084        #[cfg(feature = "diagnostics")]
1085        WorkerCommand::GetCommEventLog {
1086            txn_id,
1087            unit,
1088            sender,
1089        } => {
1090            if register_pending(pending, txn_id, sender).is_ok() {
1091                let result = client.get_comm_event_log(txn_id, unit);
1092                submit_or_reject(pending, txn_id, result);
1093            }
1094        }
1095        #[cfg(feature = "diagnostics")]
1096        WorkerCommand::ReportServerId {
1097            txn_id,
1098            unit,
1099            sender,
1100        } => {
1101            if register_pending(pending, txn_id, sender).is_ok() {
1102                let result = client.report_server_id(txn_id, unit);
1103                submit_or_reject(pending, txn_id, result);
1104            }
1105        }
1106        WorkerCommand::Shutdown => {}
1107    }
1108}
1109
1110fn run_worker<TRANSPORT, const N: usize>(
1111    mut client: ClientServices<TRANSPORT, AsyncApp, N>,
1112    pending: PendingStore,
1113    receiver: Receiver<WorkerCommand>,
1114    poll_interval: Duration,
1115) where
1116    TRANSPORT: Transport,
1117{
1118    loop {
1119        // Drain all queued commands first so newly enqueued requests are
1120        // visible before we decide whether to poll or sleep.
1121        loop {
1122            match receiver.try_recv() {
1123                Ok(WorkerCommand::Shutdown) => return,
1124                Ok(command) => handle_command(&mut client, &pending, command),
1125                Err(mpsc::TryRecvError::Empty) => break,
1126                Err(mpsc::TryRecvError::Disconnected) => return,
1127            }
1128        }
1129
1130        let should_poll = client.is_connected() && client.has_pending_requests();
1131
1132        if should_poll {
1133            client.poll();
1134
1135            // Active mode: sleep up to `poll_interval`, but wake early when a
1136            // new command arrives.
1137            match receiver.recv_timeout(poll_interval) {
1138                Ok(WorkerCommand::Shutdown) => return,
1139                Ok(command) => handle_command(&mut client, &pending, command),
1140                Err(mpsc::RecvTimeoutError::Timeout) => {}
1141                Err(mpsc::RecvTimeoutError::Disconnected) => return,
1142            }
1143
1144            continue;
1145        }
1146
1147        // Idle mode: block until a command arrives (event-driven wakeup).
1148        match receiver.recv() {
1149            Ok(WorkerCommand::Shutdown) => return,
1150            Ok(command) => handle_command(&mut client, &pending, command),
1151            Err(_) => return,
1152        }
1153    }
1154}
1155
1156// ── Submodules ───────────────────────────────────────────────────────────────
1157
1158mod client_core;
1159mod network_client;
1160mod serial_client;
1161
1162pub(crate) use client_core::AsyncClientCore;
1163pub use network_client::AsyncTcpClient;
1164pub use serial_client::AsyncSerialClient;
1165
1166#[cfg(all(test, feature = "traffic"))]
1167mod tests {
1168    use super::*;
1169
1170    #[test]
1171    fn test_async_app_emits_traffic_event_to_channel() {
1172        let pending = Arc::new(Mutex::new(HashMap::new()));
1173        let (traffic_sender, traffic_receiver) = mpsc::channel();
1174
1175        let mut app = AsyncApp {
1176            pending,
1177            traffic_sender,
1178        };
1179
1180        let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1181        app.on_tx_frame(42, unit, &[0xAA, 0x55]);
1182
1183        let event = traffic_receiver
1184            .recv_timeout(Duration::from_millis(100))
1185            .unwrap();
1186        assert_eq!(event.direction, TrafficDirection::Tx);
1187        assert_eq!(event.txn_id, 42);
1188        assert_eq!(event.unit_id_slave_addr, unit);
1189        assert_eq!(event.frame, vec![0xAA, 0x55]);
1190        assert_eq!(event.error, None);
1191    }
1192
1193    #[test]
1194    fn test_async_app_emits_traffic_error_event_to_channel() {
1195        let pending = Arc::new(Mutex::new(HashMap::new()));
1196        let (traffic_sender, traffic_receiver) = mpsc::channel();
1197
1198        let mut app = AsyncApp {
1199            pending,
1200            traffic_sender,
1201        };
1202
1203        let unit = UnitIdOrSlaveAddr::new(1).unwrap();
1204        app.on_rx_error(77, unit, MbusError::ChecksumError, &[0xAB]);
1205
1206        let event = traffic_receiver
1207            .recv_timeout(Duration::from_millis(100))
1208            .unwrap();
1209        assert_eq!(event.direction, TrafficDirection::Rx);
1210        assert_eq!(event.txn_id, 77);
1211        assert_eq!(event.unit_id_slave_addr, unit);
1212        assert_eq!(event.frame, vec![0xAB]);
1213        assert_eq!(event.error, Some(MbusError::ChecksumError));
1214    }
1215
1216    #[test]
1217    fn test_async_client_core_set_and_clear_traffic_handler() {
1218        let (sender, _receiver) = mpsc::channel();
1219        let traffic_handler: TrafficHandlerStore = Arc::new(Mutex::new(None));
1220        let core = AsyncClientCore::new(sender, traffic_handler.clone());
1221
1222        core.set_traffic_handler(|_evt| {});
1223        assert!(traffic_handler.lock().unwrap().is_some());
1224
1225        core.clear_traffic_handler();
1226        assert!(traffic_handler.lock().unwrap().is_none());
1227    }
1228}