1use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::Duration;
27
28use tokio::sync::{mpsc, oneshot};
29
30#[cfg(any(
31 feature = "holding-registers",
32 feature = "input-registers",
33 feature = "diagnostics"
34))]
35use mbus_core::errors::MbusError;
36use mbus_core::transport::UnitIdOrSlaveAddr;
37
38#[cfg(feature = "diagnostics")]
39use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
40#[cfg(feature = "coils")]
41use mbus_core::models::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_core::models::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_core::models::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_core::models::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_core::models::file_record::{SubRequest, SubRequestParams};
50#[cfg(any(feature = "holding-registers", feature = "input-registers"))]
51use mbus_core::models::register::Registers;
52
53use crate::client::command::{ClientRequest, TaskCommand};
54use crate::client::response::ClientResponse;
55use crate::client::task::PendingCountReceiver;
56
57#[cfg(feature = "traffic")]
58use crate::client::notifier::{AsyncClientTrafficNotifier, NotifierStore};
59
60use super::AsyncError;
61#[cfg(feature = "diagnostics")]
62use super::{CommEventLogResponse, DiagnosticsDataResponse};
63
64pub struct AsyncClientCore {
74 cmd_tx: mpsc::Sender<TaskCommand>,
75 pending_count_rx: PendingCountReceiver,
76 request_timeout_ns: Arc<AtomicU64>,
78 #[cfg(feature = "traffic")]
79 notifier: NotifierStore,
80}
81
82impl AsyncClientCore {
83 pub(super) fn new(
85 cmd_tx: mpsc::Sender<TaskCommand>,
86 pending_count_rx: PendingCountReceiver,
87 #[cfg(feature = "traffic")] notifier: NotifierStore,
88 ) -> Self {
89 Self {
90 cmd_tx,
91 pending_count_rx,
92 request_timeout_ns: Arc::new(AtomicU64::new(0)),
93 #[cfg(feature = "traffic")]
94 notifier,
95 }
96 }
97
98 async fn send_request(&self, params: ClientRequest) -> Result<ClientResponse, AsyncError> {
105 let (resp_tx, rx) = oneshot::channel();
106 self.cmd_tx
107 .send(TaskCommand::Request { params, resp_tx })
108 .await
109 .map_err(|_| AsyncError::WorkerClosed)?;
110
111 let timeout_ns = self.request_timeout_ns.load(Ordering::Relaxed);
112 if timeout_ns > 0 {
113 let outcome = tokio::time::timeout(Duration::from_nanos(timeout_ns), rx).await;
114 if outcome.is_err() {
115 let _ = self.cmd_tx.try_send(TaskCommand::Disconnect);
119 return Err(AsyncError::Timeout);
120 }
121 outcome
122 .unwrap()
123 .map_err(|_| AsyncError::WorkerClosed)?
124 .map_err(AsyncError::Mbus)
125 } else {
126 rx.await
127 .map_err(|_| AsyncError::WorkerClosed)?
128 .map_err(AsyncError::Mbus)
129 }
130 }
131
132 pub async fn connect(&self) -> Result<(), AsyncError> {
139 let (resp_tx, rx) = oneshot::channel();
140 self.cmd_tx
141 .send(TaskCommand::Connect { resp_tx })
142 .await
143 .map_err(|_| AsyncError::WorkerClosed)?;
144 rx.await
145 .map_err(|_| AsyncError::WorkerClosed)?
146 .map_err(AsyncError::Mbus)
147 }
148
149 pub async fn disconnect(&self) -> Result<(), AsyncError> {
159 self.cmd_tx
160 .send(TaskCommand::Disconnect)
161 .await
162 .map_err(|_| AsyncError::WorkerClosed)
163 }
164
165 pub fn has_pending_requests(&self) -> bool {
169 *self.pending_count_rx.borrow() > 0
170 }
171 pub fn set_request_timeout(&self, timeout: Duration) {
184 self.request_timeout_ns.store(
185 u64::try_from(timeout.as_nanos()).unwrap_or(u64::MAX),
186 Ordering::Relaxed,
187 );
188 }
189
190 pub fn clear_request_timeout(&self) {
194 self.request_timeout_ns.store(0, Ordering::Relaxed);
195 }
196 #[cfg(feature = "traffic")]
203 pub fn set_traffic_notifier<N: AsyncClientTrafficNotifier + Send + 'static>(
204 &self,
205 notifier: N,
206 ) {
207 if let Ok(mut g) = self.notifier.try_lock() {
208 *g = Some(Box::new(notifier));
209 }
210 }
211
212 #[cfg(feature = "traffic")]
214 pub fn clear_traffic_notifier(&self) {
215 if let Ok(mut g) = self.notifier.try_lock() {
216 *g = None;
217 }
218 }
219
220 #[cfg(feature = "coils")]
226 pub async fn read_multiple_coils(
227 &self,
228 unit_id: u8,
229 address: u16,
230 quantity: u16,
231 ) -> Result<Coils, AsyncError> {
232 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
233 #[allow(unreachable_patterns)]
234 match self
235 .send_request(ClientRequest::ReadMultipleCoils {
236 unit,
237 address,
238 quantity,
239 })
240 .await?
241 {
242 ClientResponse::Coils(coils) => Ok(coils),
243 _ => Err(AsyncError::UnexpectedResponseType),
244 }
245 }
246
247 #[cfg(feature = "coils")]
251 pub async fn write_single_coil(
252 &self,
253 unit_id: u8,
254 address: u16,
255 value: bool,
256 ) -> Result<(u16, bool), AsyncError> {
257 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
258 #[allow(unreachable_patterns)]
259 match self
260 .send_request(ClientRequest::WriteSingleCoil {
261 unit,
262 address,
263 value,
264 })
265 .await?
266 {
267 ClientResponse::Coils(coils) => {
268 let v = coils.value(coils.from_address()).unwrap_or(false);
269 Ok((coils.from_address(), v))
270 }
271 _ => Err(AsyncError::UnexpectedResponseType),
272 }
273 }
274
275 #[cfg(feature = "coils")]
279 pub async fn write_multiple_coils(
280 &self,
281 unit_id: u8,
282 address: u16,
283 coils: &Coils,
284 ) -> Result<(u16, u16), AsyncError> {
285 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
286 #[allow(unreachable_patterns)]
287 match self
288 .send_request(ClientRequest::WriteMultipleCoils {
289 unit,
290 address,
291 coils: coils.clone(),
292 })
293 .await?
294 {
295 ClientResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
296 _ => Err(AsyncError::UnexpectedResponseType),
297 }
298 }
299
300 #[cfg(feature = "holding-registers")]
306 pub async fn read_holding_registers(
307 &self,
308 unit_id: u8,
309 address: u16,
310 quantity: u16,
311 ) -> Result<Registers, AsyncError> {
312 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
313 match self
314 .send_request(ClientRequest::ReadHoldingRegisters {
315 unit,
316 address,
317 quantity,
318 })
319 .await?
320 {
321 ClientResponse::Registers(regs) => Ok(regs),
322 _ => Err(AsyncError::UnexpectedResponseType),
323 }
324 }
325
326 #[cfg(feature = "input-registers")]
330 pub async fn read_input_registers(
331 &self,
332 unit_id: u8,
333 address: u16,
334 quantity: u16,
335 ) -> Result<Registers, AsyncError> {
336 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
337 match self
338 .send_request(ClientRequest::ReadInputRegisters {
339 unit,
340 address,
341 quantity,
342 })
343 .await?
344 {
345 ClientResponse::Registers(regs) => Ok(regs),
346 _ => Err(AsyncError::UnexpectedResponseType),
347 }
348 }
349
350 #[cfg(feature = "holding-registers")]
354 pub async fn write_single_register(
355 &self,
356 unit_id: u8,
357 address: u16,
358 value: u16,
359 ) -> Result<(u16, u16), AsyncError> {
360 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
361 match self
362 .send_request(ClientRequest::WriteSingleRegister {
363 unit,
364 address,
365 value,
366 })
367 .await?
368 {
369 ClientResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
370 _ => Err(AsyncError::UnexpectedResponseType),
371 }
372 }
373
374 #[cfg(feature = "holding-registers")]
378 pub async fn write_multiple_registers(
379 &self,
380 unit_id: u8,
381 address: u16,
382 values: &[u16],
383 ) -> Result<(u16, u16), AsyncError> {
384 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
385 let hv =
386 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
387 values,
388 )
389 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
390 match self
391 .send_request(ClientRequest::WriteMultipleRegisters {
392 unit,
393 address,
394 values: hv,
395 })
396 .await?
397 {
398 ClientResponse::Registers(regs) => Ok((regs.from_address(), regs.quantity())),
399 _ => Err(AsyncError::UnexpectedResponseType),
400 }
401 }
402
403 #[cfg(feature = "holding-registers")]
409 pub async fn read_write_multiple_registers(
410 &self,
411 unit_id: u8,
412 read_address: u16,
413 read_quantity: u16,
414 write_address: u16,
415 write_values: &[u16],
416 ) -> Result<Registers, AsyncError> {
417 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
418 let hv =
419 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
420 write_values,
421 )
422 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
423 match self
424 .send_request(ClientRequest::ReadWriteMultipleRegisters {
425 unit,
426 read_address,
427 read_quantity,
428 write_address,
429 write_values: hv,
430 })
431 .await?
432 {
433 ClientResponse::Registers(regs) => Ok(regs),
434 _ => Err(AsyncError::UnexpectedResponseType),
435 }
436 }
437
438 #[cfg(feature = "holding-registers")]
442 pub async fn mask_write_register(
443 &self,
444 unit_id: u8,
445 address: u16,
446 and_mask: u16,
447 or_mask: u16,
448 ) -> Result<(), AsyncError> {
449 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
450 match self
451 .send_request(ClientRequest::MaskWriteRegister {
452 unit,
453 address,
454 and_mask,
455 or_mask,
456 })
457 .await?
458 {
459 ClientResponse::MaskWriteRegister => Ok(()),
460 _ => Err(AsyncError::UnexpectedResponseType),
461 }
462 }
463
464 #[cfg(feature = "discrete-inputs")]
470 pub async fn read_discrete_inputs(
471 &self,
472 unit_id: u8,
473 address: u16,
474 quantity: u16,
475 ) -> Result<DiscreteInputs, AsyncError> {
476 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
477 match self
478 .send_request(ClientRequest::ReadDiscreteInputs {
479 unit,
480 address,
481 quantity,
482 })
483 .await?
484 {
485 ClientResponse::DiscreteInputs(di) => Ok(di),
486 _ => Err(AsyncError::UnexpectedResponseType),
487 }
488 }
489
490 #[cfg(feature = "fifo")]
496 pub async fn read_fifo_queue(
497 &self,
498 unit_id: u8,
499 address: u16,
500 ) -> Result<FifoQueue, AsyncError> {
501 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
502 match self
503 .send_request(ClientRequest::ReadFifoQueue { unit, address })
504 .await?
505 {
506 ClientResponse::FifoQueue(queue) => Ok(queue),
507 _ => Err(AsyncError::UnexpectedResponseType),
508 }
509 }
510
511 #[cfg(feature = "file-record")]
517 pub async fn read_file_record(
518 &self,
519 unit_id: u8,
520 sub_request: &SubRequest,
521 ) -> Result<Vec<SubRequestParams>, AsyncError> {
522 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
523 match self
524 .send_request(ClientRequest::ReadFileRecord {
525 unit,
526 sub_request: sub_request.clone(),
527 })
528 .await?
529 {
530 ClientResponse::FileRecordRead(data) => Ok(data.into_iter().collect()),
531 _ => Err(AsyncError::UnexpectedResponseType),
532 }
533 }
534
535 #[cfg(feature = "file-record")]
537 pub async fn write_file_record(
538 &self,
539 unit_id: u8,
540 sub_request: &SubRequest,
541 ) -> Result<(), AsyncError> {
542 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
543 match self
544 .send_request(ClientRequest::WriteFileRecord {
545 unit,
546 sub_request: sub_request.clone(),
547 })
548 .await?
549 {
550 ClientResponse::FileRecordWrite => Ok(()),
551 _ => Err(AsyncError::UnexpectedResponseType),
552 }
553 }
554
555 #[cfg(feature = "diagnostics")]
559 pub async fn read_device_identification(
560 &self,
561 unit_id: u8,
562 read_device_id_code: ReadDeviceIdCode,
563 object_id: ObjectId,
564 ) -> Result<DeviceIdentificationResponse, AsyncError> {
565 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
566 match self
567 .send_request(ClientRequest::ReadDeviceIdentification {
568 unit,
569 read_device_id_code,
570 object_id,
571 })
572 .await?
573 {
574 ClientResponse::DeviceIdentification(resp) => Ok(resp),
575 _ => Err(AsyncError::UnexpectedResponseType),
576 }
577 }
578
579 #[cfg(feature = "diagnostics")]
583 pub async fn encapsulated_interface_transport(
584 &self,
585 unit_id: u8,
586 mei_type: EncapsulatedInterfaceType,
587 data: &[u8],
588 ) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
589 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
590 let hv =
591 heapless::Vec::<u8, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
592 data,
593 )
594 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
595 match self
596 .send_request(ClientRequest::EncapsulatedInterfaceTransport {
597 unit,
598 mei_type,
599 data: hv,
600 })
601 .await?
602 {
603 ClientResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
604 Ok((mei_type, data.as_slice().to_vec()))
605 }
606 _ => Err(AsyncError::UnexpectedResponseType),
607 }
608 }
609
610 #[cfg(feature = "diagnostics")]
612 pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
613 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
614 match self
615 .send_request(ClientRequest::ReadExceptionStatus { unit })
616 .await?
617 {
618 ClientResponse::ExceptionStatus(status) => Ok(status),
619 _ => Err(AsyncError::UnexpectedResponseType),
620 }
621 }
622
623 #[cfg(feature = "diagnostics")]
627 pub async fn diagnostics(
628 &self,
629 unit_id: u8,
630 sub_function: DiagnosticSubFunction,
631 data: &[u16],
632 ) -> Result<DiagnosticsDataResponse, AsyncError> {
633 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
634 let hv =
635 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
636 data,
637 )
638 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
639 match self
640 .send_request(ClientRequest::Diagnostics {
641 unit,
642 sub_function,
643 data: hv,
644 })
645 .await?
646 {
647 ClientResponse::DiagnosticsData { sub_function, data } => Ok(DiagnosticsDataResponse {
648 sub_function,
649 data: data.as_slice().to_vec(),
650 }),
651 _ => Err(AsyncError::UnexpectedResponseType),
652 }
653 }
654
655 #[cfg(feature = "diagnostics")]
659 pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
660 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
661 match self
662 .send_request(ClientRequest::GetCommEventCounter { unit })
663 .await?
664 {
665 ClientResponse::CommEventCounter {
666 status,
667 event_count,
668 } => Ok((status, event_count)),
669 _ => Err(AsyncError::UnexpectedResponseType),
670 }
671 }
672
673 #[cfg(feature = "diagnostics")]
677 pub async fn get_comm_event_log(
678 &self,
679 unit_id: u8,
680 ) -> Result<CommEventLogResponse, AsyncError> {
681 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
682 match self
683 .send_request(ClientRequest::GetCommEventLog { unit })
684 .await?
685 {
686 ClientResponse::CommEventLog {
687 status,
688 event_count,
689 message_count,
690 events,
691 } => Ok((
692 status,
693 event_count,
694 message_count,
695 events.as_slice().to_vec(),
696 )),
697 _ => Err(AsyncError::UnexpectedResponseType),
698 }
699 }
700
701 #[cfg(feature = "diagnostics")]
705 pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
706 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
707 match self
708 .send_request(ClientRequest::ReportServerId { unit })
709 .await?
710 {
711 ClientResponse::ReportServerId(data) => Ok(data.as_slice().to_vec()),
712 _ => Err(AsyncError::UnexpectedResponseType),
713 }
714 }
715}