Skip to main content

mbus_async/runtime/
mod.rs

1
2//! Internal runtime for the async facade.
3//!
4//! This module contains the worker-thread bridge between async callers and the
5//! synchronous `ClientServices` state machine.
6//!
7//! Public entry points are re-exported from the crate root:
8//! - [`AsyncTcpClient`] (TCP)
9//! - [`AsyncSerialClient`] (RTU/ASCII)
10//!
11//! Internal/shared building blocks:
12//! - [`AsyncClientCore`] stores worker channel state and implements request methods.
13//! - [`WorkerCommand`] and [`WorkerResponse`] carry typed request/response payloads.
14//! - [`run_worker`] drives polling and response routing.
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU16, Ordering};
18use std::sync::{mpsc, Arc, Mutex};
19use std::sync::mpsc::{Receiver, Sender};
20use std::thread;
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use mbus_client::app::RequestErrorNotifier;
24#[cfg(feature = "coils")]
25use mbus_client::app::CoilResponse;
26#[cfg(feature = "diagnostics")]
27use mbus_client::app::DiagnosticsResponse;
28#[cfg(feature = "discrete-inputs")]
29use mbus_client::app::DiscreteInputResponse;
30#[cfg(feature = "fifo")]
31use mbus_client::app::FifoQueueResponse;
32#[cfg(feature = "file-record")]
33use mbus_client::app::FileRecordResponse;
34#[cfg(feature = "registers")]
35use mbus_client::app::RegisterResponse;
36use mbus_client::services::ClientServices;
37#[cfg(feature = "coils")]
38use mbus_client::services::coil::Coils;
39#[cfg(feature = "diagnostics")]
40use mbus_client::services::diagnostic::{
41	DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode,
42};
43#[cfg(feature = "discrete-inputs")]
44use mbus_client::services::discrete_input::DiscreteInputs;
45#[cfg(feature = "fifo")]
46use mbus_client::services::fifo_queue::FifoQueue;
47#[cfg(feature = "file-record")]
48use mbus_client::services::file_record::{SubRequest, SubRequestParams};
49#[cfg(feature = "registers")]
50use mbus_client::services::register::Registers;
51use mbus_core::errors::MbusError;
52#[cfg(feature = "diagnostics")]
53use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
54use mbus_core::transport::{ModbusConfig, TimeKeeper, Transport, UnitIdOrSlaveAddr};
55#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
56use mbus_core::transport::{ModbusSerialConfig, SerialMode};
57#[cfg(feature = "tcp")]
58use mbus_core::transport::ModbusTcpConfig;
59#[cfg(any(feature = "serial-rtu", feature = "serial-ascii"))]
60use mbus_serial::StdSerialTransport;
61#[cfg(feature = "tcp")]
62use mbus_network::StdTcpTransport;
63use tokio::sync::oneshot;
64
65#[cfg(feature = "diagnostics")]
66/// Diagnostics response payload returned by FC 08.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct DiagnosticsDataResponse {
69	/// Echoed diagnostic sub-function code.
70	pub sub_function: DiagnosticSubFunction,
71	/// Echoed diagnostic data words.
72	pub data: Vec<u16>,
73}
74#[cfg(feature = "diagnostics")]
75/// Communication event log payload `(status, event_count, message_count, events)` returned by FC 12.
76pub type CommEventLogResponse = (u16, u16, u16, Vec<u8>);
77
78/// Async facade error type.
79#[derive(Debug, PartialEq, Eq)]
80pub enum AsyncError {
81	/// Error propagated from the underlying Modbus client stack.
82	Mbus(MbusError),
83	/// Background worker channel is closed or worker thread has stopped.
84	WorkerClosed,
85	/// Internal response routing mismatch between request and callback payload type.
86	UnexpectedResponseType,
87}
88
89impl From<MbusError> for AsyncError {
90	fn from(value: MbusError) -> Self {
91		Self::Mbus(value)
92	}
93}
94
95impl std::fmt::Display for AsyncError {
96	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97		match self {
98			Self::Mbus(err) => write!(f, "Modbus error: {err}"),
99			Self::WorkerClosed => write!(f, "async worker channel closed"),
100			Self::UnexpectedResponseType => write!(f, "unexpected response type from worker"),
101		}
102	}
103}
104
105impl std::error::Error for AsyncError {
106	fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
107		match self {
108			Self::Mbus(err) => Some(err),
109			_ => None,
110		}
111	}
112}
113
114type PendingSender = oneshot::Sender<Result<WorkerResponse, MbusError>>;
115type PendingStore = Arc<Mutex<HashMap<u16, PendingSender>>>;
116
117enum WorkerCommand {
118	#[cfg(feature = "coils")]
119	ReadMultipleCoils {
120		txn_id: u16,
121		unit: UnitIdOrSlaveAddr,
122		address: u16,
123		quantity: u16,
124		sender: PendingSender,
125	},
126	#[cfg(feature = "registers")]
127	ReadHoldingRegisters {
128		txn_id: u16,
129		unit: UnitIdOrSlaveAddr,
130		address: u16,
131		quantity: u16,
132		sender: PendingSender,
133	},
134	#[cfg(feature = "registers")]
135	ReadInputRegisters {
136		txn_id: u16,
137		unit: UnitIdOrSlaveAddr,
138		address: u16,
139		quantity: u16,
140		sender: PendingSender,
141	},
142	#[cfg(feature = "registers")]
143	WriteSingleRegister {
144		txn_id: u16,
145		unit: UnitIdOrSlaveAddr,
146		address: u16,
147		value: u16,
148		sender: PendingSender,
149	},
150	#[cfg(feature = "coils")]
151	WriteSingleCoil {
152		txn_id: u16,
153		unit: UnitIdOrSlaveAddr,
154		address: u16,
155		value: bool,
156		sender: PendingSender,
157	},
158	#[cfg(feature = "coils")]
159	WriteMultipleCoils {
160		txn_id: u16,
161		unit: UnitIdOrSlaveAddr,
162		address: u16,
163		coils: Coils,
164		sender: PendingSender,
165	},
166	#[cfg(feature = "registers")]
167	WriteMultipleRegisters {
168		txn_id: u16,
169		unit: UnitIdOrSlaveAddr,
170		address: u16,
171		values: Vec<u16>,
172		sender: PendingSender,
173	},
174	#[cfg(feature = "registers")]
175	ReadWriteMultipleRegisters {
176		txn_id: u16,
177		unit: UnitIdOrSlaveAddr,
178		read_address: u16,
179		read_quantity: u16,
180		write_address: u16,
181		write_values: Vec<u16>,
182		sender: PendingSender,
183	},
184	#[cfg(feature = "registers")]
185	MaskWriteRegister {
186		txn_id: u16,
187		unit: UnitIdOrSlaveAddr,
188		address: u16,
189		and_mask: u16,
190		or_mask: u16,
191		sender: PendingSender,
192	},
193	#[cfg(feature = "discrete-inputs")]
194	ReadDiscreteInputs {
195		txn_id: u16,
196		unit: UnitIdOrSlaveAddr,
197		address: u16,
198		quantity: u16,
199		sender: PendingSender,
200	},
201	#[cfg(feature = "fifo")]
202	ReadFifoQueue {
203		txn_id: u16,
204		unit: UnitIdOrSlaveAddr,
205		address: u16,
206		sender: PendingSender,
207	},
208	#[cfg(feature = "file-record")]
209	ReadFileRecord {
210		txn_id: u16,
211		unit: UnitIdOrSlaveAddr,
212		sub_request: SubRequest,
213		sender: PendingSender,
214	},
215	#[cfg(feature = "file-record")]
216	WriteFileRecord {
217		txn_id: u16,
218		unit: UnitIdOrSlaveAddr,
219		sub_request: SubRequest,
220		sender: PendingSender,
221	},
222	#[cfg(feature = "diagnostics")]
223	ReadDeviceIdentification {
224		txn_id: u16,
225		unit: UnitIdOrSlaveAddr,
226		read_device_id_code: ReadDeviceIdCode,
227		object_id: ObjectId,
228		sender: PendingSender,
229	},
230	#[cfg(feature = "diagnostics")]
231	EncapsulatedInterfaceTransport {
232		txn_id: u16,
233		unit: UnitIdOrSlaveAddr,
234		mei_type: EncapsulatedInterfaceType,
235		data: Vec<u8>,
236		sender: PendingSender,
237	},
238	#[cfg(feature = "diagnostics")]
239	ReadExceptionStatus {
240		txn_id: u16,
241		unit: UnitIdOrSlaveAddr,
242		sender: PendingSender,
243	},
244	#[cfg(feature = "diagnostics")]
245	Diagnostics {
246		txn_id: u16,
247		unit: UnitIdOrSlaveAddr,
248		sub_function: DiagnosticSubFunction,
249		data: Vec<u16>,
250		sender: PendingSender,
251	},
252	#[cfg(feature = "diagnostics")]
253	GetCommEventCounter {
254		txn_id: u16,
255		unit: UnitIdOrSlaveAddr,
256		sender: PendingSender,
257	},
258	#[cfg(feature = "diagnostics")]
259	GetCommEventLog {
260		txn_id: u16,
261		unit: UnitIdOrSlaveAddr,
262		sender: PendingSender,
263	},
264	#[cfg(feature = "diagnostics")]
265	ReportServerId {
266		txn_id: u16,
267		unit: UnitIdOrSlaveAddr,
268		sender: PendingSender,
269	},
270	Shutdown,
271}
272
273enum WorkerResponse {
274	#[cfg(feature = "coils")]
275	Coils(Coils),
276	#[cfg(feature = "registers")]
277	Registers(Registers),
278	#[cfg(feature = "registers")]
279	SingleRegisterWrite { address: u16, value: u16 },
280	#[cfg(feature = "registers")]
281	MaskWriteRegister,
282	#[cfg(feature = "discrete-inputs")]
283	DiscreteInputs(DiscreteInputs),
284	#[cfg(feature = "fifo")]
285	FifoQueue(FifoQueue),
286	#[cfg(feature = "file-record")]
287	FileRecordRead(Vec<SubRequestParams>),
288	#[cfg(feature = "file-record")]
289	FileRecordWrite,
290	#[cfg(feature = "diagnostics")]
291	DeviceIdentification(DeviceIdentificationResponse),
292	#[cfg(feature = "diagnostics")]
293	EncapsulatedInterfaceTransport {
294		mei_type: EncapsulatedInterfaceType,
295		data: Vec<u8>,
296	},
297	#[cfg(feature = "diagnostics")]
298	ExceptionStatus(u8),
299	#[cfg(feature = "diagnostics")]
300	DiagnosticsData(DiagnosticsDataResponse),
301	#[cfg(feature = "diagnostics")]
302	CommEventCounter { status: u16, event_count: u16 },
303	#[cfg(feature = "diagnostics")]
304	CommEventLog(CommEventLogResponse),
305	#[cfg(feature = "diagnostics")]
306	ReportServerId(Vec<u8>),
307}
308
309struct AsyncApp {
310	pending: PendingStore,
311}
312
313impl AsyncApp {
314	fn complete(&self, txn_id: u16, response: Result<WorkerResponse, MbusError>) {
315		if let Ok(mut pending) = self.pending.lock() {
316			if let Some(sender) = pending.remove(&txn_id) {
317				let _ = sender.send(response);
318			}
319		}
320	}
321
322	fn resolve(&self, txn_id: u16, response: WorkerResponse) {
323		self.complete(txn_id, Ok(response));
324	}
325
326	fn reject(&self, txn_id: u16, error: MbusError) {
327		self.complete(txn_id, Err(error));
328	}
329}
330
331impl TimeKeeper for AsyncApp {
332	fn current_millis(&self) -> u64 {
333		SystemTime::now()
334			.duration_since(UNIX_EPOCH)
335			.map(|d| d.as_millis() as u64)
336			.unwrap_or(0)
337	}
338}
339
340impl RequestErrorNotifier for AsyncApp {
341	fn request_failed(
342		&mut self,
343		txn_id: u16,
344		_unit_id_slave_addr: UnitIdOrSlaveAddr,
345		error: MbusError,
346	) {
347		self.reject(txn_id, error);
348	}
349}
350
351#[cfg(feature = "coils")]
352impl CoilResponse for AsyncApp {
353	fn read_coils_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr, coils: &Coils) {
354		self.resolve(txn_id, WorkerResponse::Coils(coils.clone()));
355	}
356
357	fn read_single_coil_response(
358		&mut self,
359		txn_id: u16,
360		_unit: UnitIdOrSlaveAddr,
361		address: u16,
362		value: bool,
363	) {
364		match Coils::new(address, 1).and_then(|mut c| {
365			c.set_value(address, value)?;
366			Ok(c)
367		}) {
368			Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
369			Err(err) => self.reject(txn_id, err),
370		}
371	}
372
373	fn write_single_coil_response(
374		&mut self,
375		txn_id: u16,
376		_unit: UnitIdOrSlaveAddr,
377		address: u16,
378		value: bool,
379	) {
380		match Coils::new(address, 1).and_then(|mut c| {
381			c.set_value(address, value)?;
382			Ok(c)
383		}) {
384			Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
385			Err(err) => self.reject(txn_id, err),
386		}
387	}
388
389	fn write_multiple_coils_response(
390		&mut self,
391		txn_id: u16,
392		_unit: UnitIdOrSlaveAddr,
393		address: u16,
394		quantity: u16,
395	) {
396		match Coils::new(address, quantity) {
397			Ok(coils) => self.resolve(txn_id, WorkerResponse::Coils(coils)),
398			Err(err) => self.reject(txn_id, err),
399		}
400	}
401}
402
403#[cfg(feature = "registers")]
404impl RegisterResponse for AsyncApp {
405	fn read_multiple_input_registers_response(
406		&mut self,
407		txn_id: u16,
408		_unit: UnitIdOrSlaveAddr,
409		registers: &Registers,
410	) {
411		self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
412	}
413
414	fn read_single_input_register_response(
415		&mut self,
416		txn_id: u16,
417		_unit: UnitIdOrSlaveAddr,
418		address: u16,
419		value: u16,
420	) {
421		match Registers::new(address, 1).and_then(|mut r| {
422			r.set_value(address, value)?;
423			Ok(r)
424		}) {
425			Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
426			Err(err) => self.reject(txn_id, err),
427		}
428	}
429
430	fn read_multiple_holding_registers_response(
431		&mut self,
432		txn_id: u16,
433		_unit: UnitIdOrSlaveAddr,
434		registers: &Registers,
435	) {
436		self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
437	}
438
439	fn write_single_register_response(
440		&mut self,
441		txn_id: u16,
442		_unit: UnitIdOrSlaveAddr,
443		address: u16,
444		value: u16,
445	) {
446		self.resolve(
447			txn_id,
448			WorkerResponse::SingleRegisterWrite { address, value },
449		);
450	}
451
452	fn write_multiple_registers_response(
453		&mut self,
454		txn_id: u16,
455		_unit: UnitIdOrSlaveAddr,
456		starting_address: u16,
457		quantity: u16,
458	) {
459		match Registers::new(starting_address, quantity) {
460			Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
461			Err(err) => self.reject(txn_id, err),
462		}
463	}
464
465	fn read_write_multiple_registers_response(
466		&mut self,
467		txn_id: u16,
468		_unit: UnitIdOrSlaveAddr,
469		registers: &Registers,
470	) {
471		self.resolve(txn_id, WorkerResponse::Registers(registers.clone()));
472	}
473
474	fn read_single_holding_register_response(
475		&mut self,
476		txn_id: u16,
477		_unit: UnitIdOrSlaveAddr,
478		address: u16,
479		value: u16,
480	) {
481		match Registers::new(address, 1).and_then(|mut r| {
482			r.set_value(address, value)?;
483			Ok(r)
484		}) {
485			Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
486			Err(err) => self.reject(txn_id, err),
487		}
488	}
489
490	fn read_single_register_response(
491		&mut self,
492		txn_id: u16,
493		_unit: UnitIdOrSlaveAddr,
494		address: u16,
495		value: u16,
496	) {
497		match Registers::new(address, 1).and_then(|mut r| {
498			r.set_value(address, value)?;
499			Ok(r)
500		}) {
501			Ok(registers) => self.resolve(txn_id, WorkerResponse::Registers(registers)),
502			Err(err) => self.reject(txn_id, err),
503		}
504	}
505
506	fn mask_write_register_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
507		self.resolve(txn_id, WorkerResponse::MaskWriteRegister);
508	}
509}
510
511#[cfg(feature = "discrete-inputs")]
512impl DiscreteInputResponse for AsyncApp {
513	fn read_multiple_discrete_inputs_response(
514		&mut self,
515		txn_id: u16,
516		_unit: UnitIdOrSlaveAddr,
517		discrete_inputs: &DiscreteInputs,
518	) {
519		self.resolve(
520			txn_id,
521			WorkerResponse::DiscreteInputs(discrete_inputs.clone()),
522		);
523	}
524
525	fn read_single_discrete_input_response(
526		&mut self,
527		txn_id: u16,
528		_unit: UnitIdOrSlaveAddr,
529		address: u16,
530		value: bool,
531	) {
532		let bit = if value { 0b0000_0001 } else { 0 };
533		match DiscreteInputs::new(address, 1).and_then(|d| {
534			d.with_values(&[bit], 1)
535		}) {
536			Ok(discrete_inputs) => {
537				self.resolve(txn_id, WorkerResponse::DiscreteInputs(discrete_inputs))
538			}
539			Err(err) => self.reject(txn_id, err),
540		}
541	}
542}
543
544#[cfg(feature = "fifo")]
545impl FifoQueueResponse for AsyncApp {
546	fn read_fifo_queue_response(
547		&mut self,
548		txn_id: u16,
549		_unit: UnitIdOrSlaveAddr,
550		fifo_queue: &FifoQueue,
551	) {
552		self.resolve(txn_id, WorkerResponse::FifoQueue(fifo_queue.clone()));
553	}
554}
555
556#[cfg(feature = "file-record")]
557impl FileRecordResponse for AsyncApp {
558	fn read_file_record_response(
559		&mut self,
560		txn_id: u16,
561		_unit: UnitIdOrSlaveAddr,
562		data: &[SubRequestParams],
563	) {
564		self.resolve(txn_id, WorkerResponse::FileRecordRead(data.to_vec()));
565	}
566
567	fn write_file_record_response(&mut self, txn_id: u16, _unit: UnitIdOrSlaveAddr) {
568		self.resolve(txn_id, WorkerResponse::FileRecordWrite);
569	}
570}
571
572#[cfg(feature = "diagnostics")]
573impl DiagnosticsResponse for AsyncApp {
574	fn read_device_identification_response(
575		&mut self,
576		txn_id: u16,
577		_unit: UnitIdOrSlaveAddr,
578		response: &DeviceIdentificationResponse,
579	) {
580		self.resolve(
581			txn_id,
582			WorkerResponse::DeviceIdentification(response.clone()),
583		);
584	}
585
586	fn encapsulated_interface_transport_response(
587		&mut self,
588		txn_id: u16,
589		_unit: UnitIdOrSlaveAddr,
590		mei_type: EncapsulatedInterfaceType,
591		data: &[u8],
592	) {
593		self.resolve(
594			txn_id,
595			WorkerResponse::EncapsulatedInterfaceTransport {
596				mei_type,
597				data: data.to_vec(),
598			},
599		);
600	}
601
602	fn read_exception_status_response(
603		&mut self,
604		txn_id: u16,
605		_unit: UnitIdOrSlaveAddr,
606		status: u8,
607	) {
608		self.resolve(txn_id, WorkerResponse::ExceptionStatus(status));
609	}
610
611	fn diagnostics_response(
612		&mut self,
613		txn_id: u16,
614		_unit: UnitIdOrSlaveAddr,
615		sub_function: DiagnosticSubFunction,
616		data: &[u16],
617	) {
618		self.resolve(
619			txn_id,
620			WorkerResponse::DiagnosticsData(DiagnosticsDataResponse {
621				sub_function,
622				data: data.to_vec(),
623			}),
624		);
625	}
626
627	fn get_comm_event_counter_response(
628		&mut self,
629		txn_id: u16,
630		_unit: UnitIdOrSlaveAddr,
631		status: u16,
632		event_count: u16,
633	) {
634		self.resolve(
635			txn_id,
636			WorkerResponse::CommEventCounter {
637				status,
638				event_count,
639			},
640		);
641	}
642
643	fn get_comm_event_log_response(
644		&mut self,
645		txn_id: u16,
646		_unit: UnitIdOrSlaveAddr,
647		status: u16,
648		event_count: u16,
649		message_count: u16,
650		events: &[u8],
651	) {
652		self.resolve(
653			txn_id,
654			WorkerResponse::CommEventLog((status, event_count, message_count, events.to_vec())),
655		);
656	}
657
658	fn report_server_id_response(
659		&mut self,
660		txn_id: u16,
661		_unit: UnitIdOrSlaveAddr,
662		data: &[u8],
663	) {
664		self.resolve(txn_id, WorkerResponse::ReportServerId(data.to_vec()));
665	}
666}
667
668fn register_pending(pending: &PendingStore, txn_id: u16, sender: PendingSender) -> Result<(), MbusError> {
669	let mut guard = pending.lock().map_err(|_| MbusError::Unexpected)?;
670	guard.insert(txn_id, sender);
671	Ok(())
672}
673
674fn reject_pending(pending: &PendingStore, txn_id: u16, error: MbusError) {
675	if let Ok(mut guard) = pending.lock() {
676		if let Some(sender) = guard.remove(&txn_id) {
677			let _ = sender.send(Err(error));
678		}
679	}
680}
681
682fn submit_or_reject(pending: &PendingStore, txn_id: u16, result: Result<(), MbusError>) {
683	if let Err(err) = result {
684		reject_pending(pending, txn_id, err);
685	}
686}
687
688fn handle_command<TRANSPORT, const N: usize>(
689	client: &mut ClientServices<TRANSPORT, AsyncApp, N>,
690	pending: &PendingStore,
691	command: WorkerCommand,
692) where
693	TRANSPORT: Transport,
694{
695	match command {
696		#[cfg(feature = "coils")]
697		WorkerCommand::ReadMultipleCoils {
698			txn_id,
699			unit,
700			address,
701			quantity,
702			sender,
703		} => {
704			if register_pending(pending, txn_id, sender).is_ok() {
705				let result = client.read_multiple_coils(txn_id, unit, address, quantity);
706				submit_or_reject(pending, txn_id, result);
707			}
708		}
709		#[cfg(feature = "registers")]
710		WorkerCommand::ReadHoldingRegisters {
711			txn_id,
712			unit,
713			address,
714			quantity,
715			sender,
716		} => {
717			if register_pending(pending, txn_id, sender).is_ok() {
718				let result = client.read_holding_registers(txn_id, unit, address, quantity);
719				submit_or_reject(pending, txn_id, result);
720			}
721		}
722		#[cfg(feature = "registers")]
723		WorkerCommand::ReadInputRegisters {
724			txn_id,
725			unit,
726			address,
727			quantity,
728			sender,
729		} => {
730			if register_pending(pending, txn_id, sender).is_ok() {
731				let result = client.read_input_registers(txn_id, unit, address, quantity);
732				submit_or_reject(pending, txn_id, result);
733			}
734		}
735		#[cfg(feature = "registers")]
736		WorkerCommand::WriteSingleRegister {
737			txn_id,
738			unit,
739			address,
740			value,
741			sender,
742		} => {
743			if register_pending(pending, txn_id, sender).is_ok() {
744				let result = client.write_single_register(txn_id, unit, address, value);
745				submit_or_reject(pending, txn_id, result);
746			}
747		}
748		#[cfg(feature = "coils")]
749		WorkerCommand::WriteSingleCoil {
750			txn_id,
751			unit,
752			address,
753			value,
754			sender,
755		} => {
756			if register_pending(pending, txn_id, sender).is_ok() {
757				let result = client.write_single_coil(txn_id, unit, address, value);
758				submit_or_reject(pending, txn_id, result);
759			}
760		}
761		#[cfg(feature = "coils")]
762		WorkerCommand::WriteMultipleCoils {
763			txn_id,
764			unit,
765			address,
766			coils,
767			sender,
768		} => {
769			if register_pending(pending, txn_id, sender).is_ok() {
770				let result = client.write_multiple_coils(txn_id, unit, address, &coils);
771				submit_or_reject(pending, txn_id, result);
772			}
773		}
774		#[cfg(feature = "registers")]
775		WorkerCommand::WriteMultipleRegisters {
776			txn_id,
777			unit,
778			address,
779			values,
780			sender,
781		} => {
782			if register_pending(pending, txn_id, sender).is_ok() {
783				let result =
784					client.write_multiple_registers(txn_id, unit, address, values.len() as u16, &values);
785				submit_or_reject(pending, txn_id, result);
786			}
787		}
788		#[cfg(feature = "registers")]
789		WorkerCommand::ReadWriteMultipleRegisters {
790			txn_id,
791			unit,
792			read_address,
793			read_quantity,
794			write_address,
795			write_values,
796			sender,
797		} => {
798			if register_pending(pending, txn_id, sender).is_ok() {
799				let result = client.read_write_multiple_registers(
800					txn_id,
801					unit,
802					read_address,
803					read_quantity,
804					write_address,
805					&write_values,
806				);
807				submit_or_reject(pending, txn_id, result);
808			}
809		}
810		#[cfg(feature = "registers")]
811		WorkerCommand::MaskWriteRegister {
812			txn_id,
813			unit,
814			address,
815			and_mask,
816			or_mask,
817			sender,
818		} => {
819			if register_pending(pending, txn_id, sender).is_ok() {
820				let result = client.mask_write_register(txn_id, unit, address, and_mask, or_mask);
821				submit_or_reject(pending, txn_id, result);
822			}
823		}
824		#[cfg(feature = "discrete-inputs")]
825		WorkerCommand::ReadDiscreteInputs {
826			txn_id,
827			unit,
828			address,
829			quantity,
830			sender,
831		} => {
832			if register_pending(pending, txn_id, sender).is_ok() {
833				let result = client.read_discrete_inputs(txn_id, unit, address, quantity);
834				submit_or_reject(pending, txn_id, result);
835			}
836		}
837		#[cfg(feature = "fifo")]
838		WorkerCommand::ReadFifoQueue {
839			txn_id,
840			unit,
841			address,
842			sender,
843		} => {
844			if register_pending(pending, txn_id, sender).is_ok() {
845				let result = client.read_fifo_queue(txn_id, unit, address);
846				submit_or_reject(pending, txn_id, result);
847			}
848		}
849		#[cfg(feature = "file-record")]
850		WorkerCommand::ReadFileRecord {
851			txn_id,
852			unit,
853			sub_request,
854			sender,
855		} => {
856			if register_pending(pending, txn_id, sender).is_ok() {
857				let result = client.read_file_record(txn_id, unit, &sub_request);
858				submit_or_reject(pending, txn_id, result);
859			}
860		}
861		#[cfg(feature = "file-record")]
862		WorkerCommand::WriteFileRecord {
863			txn_id,
864			unit,
865			sub_request,
866			sender,
867		} => {
868			if register_pending(pending, txn_id, sender).is_ok() {
869				let result = client.write_file_record(txn_id, unit, &sub_request);
870				submit_or_reject(pending, txn_id, result);
871			}
872		}
873		#[cfg(feature = "diagnostics")]
874		WorkerCommand::ReadDeviceIdentification {
875			txn_id,
876			unit,
877			read_device_id_code,
878			object_id,
879			sender,
880		} => {
881			if register_pending(pending, txn_id, sender).is_ok() {
882				let result =
883					client.read_device_identification(txn_id, unit, read_device_id_code, object_id);
884				submit_or_reject(pending, txn_id, result);
885			}
886		}
887		#[cfg(feature = "diagnostics")]
888		WorkerCommand::EncapsulatedInterfaceTransport {
889			txn_id,
890			unit,
891			mei_type,
892			data,
893			sender,
894		} => {
895			if register_pending(pending, txn_id, sender).is_ok() {
896				let result = client.encapsulated_interface_transport(txn_id, unit, mei_type, &data);
897				submit_or_reject(pending, txn_id, result);
898			}
899		}
900		#[cfg(feature = "diagnostics")]
901		WorkerCommand::ReadExceptionStatus {
902			txn_id,
903			unit,
904			sender,
905		} => {
906			if register_pending(pending, txn_id, sender).is_ok() {
907				let result = client.read_exception_status(txn_id, unit);
908				submit_or_reject(pending, txn_id, result);
909			}
910		}
911		#[cfg(feature = "diagnostics")]
912		WorkerCommand::Diagnostics {
913			txn_id,
914			unit,
915			sub_function,
916			data,
917			sender,
918		} => {
919			if register_pending(pending, txn_id, sender).is_ok() {
920				let result = client.diagnostics(txn_id, unit, sub_function, &data);
921				submit_or_reject(pending, txn_id, result);
922			}
923		}
924		#[cfg(feature = "diagnostics")]
925		WorkerCommand::GetCommEventCounter {
926			txn_id,
927			unit,
928			sender,
929		} => {
930			if register_pending(pending, txn_id, sender).is_ok() {
931				let result = client.get_comm_event_counter(txn_id, unit);
932				submit_or_reject(pending, txn_id, result);
933			}
934		}
935		#[cfg(feature = "diagnostics")]
936		WorkerCommand::GetCommEventLog {
937			txn_id,
938			unit,
939			sender,
940		} => {
941			if register_pending(pending, txn_id, sender).is_ok() {
942				let result = client.get_comm_event_log(txn_id, unit);
943				submit_or_reject(pending, txn_id, result);
944			}
945		}
946		#[cfg(feature = "diagnostics")]
947		WorkerCommand::ReportServerId {
948			txn_id,
949			unit,
950			sender,
951		} => {
952			if register_pending(pending, txn_id, sender).is_ok() {
953				let result = client.report_server_id(txn_id, unit);
954				submit_or_reject(pending, txn_id, result);
955			}
956		}
957		WorkerCommand::Shutdown => {}
958	}
959}
960
961fn run_worker<TRANSPORT, const N: usize>(
962	mut client: ClientServices<TRANSPORT, AsyncApp, N>,
963	pending: PendingStore,
964	receiver: Receiver<WorkerCommand>,
965	poll_interval: Duration,
966) where
967	TRANSPORT: Transport,
968{
969	loop {
970		// Wait briefly for work first so we do not poll the transport before
971		// any request has been registered (important for deterministic tests
972		// with preloaded mock responses).
973		match receiver.recv_timeout(poll_interval) {
974			Ok(WorkerCommand::Shutdown) => return,
975			Ok(command) => {
976				handle_command(&mut client, &pending, command);
977				loop {
978					match receiver.try_recv() {
979						Ok(WorkerCommand::Shutdown) => return,
980						Ok(command) => handle_command(&mut client, &pending, command),
981						Err(mpsc::TryRecvError::Empty) => break,
982						Err(mpsc::TryRecvError::Disconnected) => return,
983					}
984				}
985			}
986			Err(mpsc::RecvTimeoutError::Timeout) => {}
987			Err(mpsc::RecvTimeoutError::Disconnected) => return,
988		}
989
990		let _ = client.poll();
991	}
992}
993
994// ── Submodules ───────────────────────────────────────────────────────────────
995
996mod client_core;
997mod network_client;
998mod serial_client;
999
1000pub(crate) use client_core::AsyncClientCore;
1001pub use network_client::AsyncTcpClient;
1002pub use serial_client::AsyncSerialClient;