1use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::Duration;
27
28use tokio::sync::{mpsc, oneshot};
29
30#[cfg(any(
31 feature = "holding-registers",
32 feature = "input-registers",
33 feature = "diagnostics"
34))]
35use mbus_core::errors::MbusError;
36use mbus_core::transport::UnitIdOrSlaveAddr;
37
38#[cfg(feature = "diagnostics")]
39use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
40#[cfg(feature = "coils")]
41use mbus_core::models::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_core::models::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_core::models::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_core::models::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_core::models::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "holding-registers")]
51use mbus_core::models::register::HoldingRegisters;
52#[cfg(feature = "input-registers")]
53use mbus_core::models::register::InputRegisters;
54
55use crate::client::command::{ClientRequest, TaskCommand};
56use crate::client::response::ClientResponse;
57use crate::client::task::PendingCountReceiver;
58
59#[cfg(feature = "traffic")]
60use crate::client::notifier::{AsyncClientTrafficNotifier, NotifierStore};
61
62use super::AsyncError;
63#[cfg(feature = "diagnostics")]
64use super::{CommEventLogResponse, DiagnosticsDataResponse};
65
66pub struct AsyncClientCore {
76 cmd_tx: mpsc::Sender<TaskCommand>,
77 pending_count_rx: PendingCountReceiver,
78 request_timeout_ns: Arc<AtomicU64>,
80 #[cfg(feature = "traffic")]
81 notifier: NotifierStore,
82}
83
84impl AsyncClientCore {
85 pub(super) fn new(
87 cmd_tx: mpsc::Sender<TaskCommand>,
88 pending_count_rx: PendingCountReceiver,
89 #[cfg(feature = "traffic")] notifier: NotifierStore,
90 ) -> Self {
91 Self {
92 cmd_tx,
93 pending_count_rx,
94 request_timeout_ns: Arc::new(AtomicU64::new(0)),
95 #[cfg(feature = "traffic")]
96 notifier,
97 }
98 }
99
100 async fn send_request(&self, params: ClientRequest) -> Result<ClientResponse, AsyncError> {
107 let (resp_tx, rx) = oneshot::channel();
108 self.cmd_tx
109 .send(TaskCommand::Request { params, resp_tx })
110 .await
111 .map_err(|_| AsyncError::WorkerClosed)?;
112
113 let timeout_ns = self.request_timeout_ns.load(Ordering::Relaxed);
114 if timeout_ns > 0 {
115 let outcome = tokio::time::timeout(Duration::from_nanos(timeout_ns), rx).await;
116 if outcome.is_err() {
117 let _ = self.cmd_tx.try_send(TaskCommand::Disconnect);
121 return Err(AsyncError::Timeout);
122 }
123 outcome
124 .unwrap()
125 .map_err(|_| AsyncError::WorkerClosed)?
126 .map_err(AsyncError::Mbus)
127 } else {
128 rx.await
129 .map_err(|_| AsyncError::WorkerClosed)?
130 .map_err(AsyncError::Mbus)
131 }
132 }
133
134 pub async fn connect(&self) -> Result<(), AsyncError> {
141 let (resp_tx, rx) = oneshot::channel();
142 self.cmd_tx
143 .send(TaskCommand::Connect { resp_tx })
144 .await
145 .map_err(|_| AsyncError::WorkerClosed)?;
146 rx.await
147 .map_err(|_| AsyncError::WorkerClosed)?
148 .map_err(AsyncError::Mbus)
149 }
150
151 pub async fn disconnect(&self) -> Result<(), AsyncError> {
161 self.cmd_tx
162 .send(TaskCommand::Disconnect)
163 .await
164 .map_err(|_| AsyncError::WorkerClosed)
165 }
166
167 pub fn has_pending_requests(&self) -> bool {
171 *self.pending_count_rx.borrow() > 0
172 }
173 pub fn set_request_timeout(&self, timeout: Duration) {
186 self.request_timeout_ns.store(
187 u64::try_from(timeout.as_nanos()).unwrap_or(u64::MAX),
188 Ordering::Relaxed,
189 );
190 }
191
192 pub fn clear_request_timeout(&self) {
196 self.request_timeout_ns.store(0, Ordering::Relaxed);
197 }
198 #[cfg(feature = "traffic")]
205 pub fn set_traffic_notifier<N: AsyncClientTrafficNotifier + Send + 'static>(
206 &self,
207 notifier: N,
208 ) {
209 if let Ok(mut g) = self.notifier.try_lock() {
210 *g = Some(Box::new(notifier));
211 }
212 }
213
214 #[cfg(feature = "traffic")]
216 pub fn clear_traffic_notifier(&self) {
217 if let Ok(mut g) = self.notifier.try_lock() {
218 *g = None;
219 }
220 }
221
222 #[cfg(feature = "coils")]
228 pub async fn read_multiple_coils(
229 &self,
230 unit_id: u8,
231 address: u16,
232 quantity: u16,
233 ) -> Result<Coils, AsyncError> {
234 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
235 #[allow(unreachable_patterns)]
236 match self
237 .send_request(ClientRequest::ReadMultipleCoils {
238 unit,
239 address,
240 quantity,
241 })
242 .await?
243 {
244 ClientResponse::Coils(coils) => Ok(coils),
245 _ => Err(AsyncError::UnexpectedResponseType),
246 }
247 }
248
249 #[cfg(feature = "coils")]
253 pub async fn write_single_coil(
254 &self,
255 unit_id: u8,
256 address: u16,
257 value: bool,
258 ) -> Result<(u16, bool), AsyncError> {
259 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
260 #[allow(unreachable_patterns)]
261 match self
262 .send_request(ClientRequest::WriteSingleCoil {
263 unit,
264 address,
265 value,
266 })
267 .await?
268 {
269 ClientResponse::Coils(coils) => {
270 let v = coils.value(coils.from_address()).unwrap_or(false);
271 Ok((coils.from_address(), v))
272 }
273 _ => Err(AsyncError::UnexpectedResponseType),
274 }
275 }
276
277 #[cfg(feature = "coils")]
281 pub async fn write_multiple_coils(
282 &self,
283 unit_id: u8,
284 address: u16,
285 coils: &Coils,
286 ) -> Result<(u16, u16), AsyncError> {
287 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
288 #[allow(unreachable_patterns)]
289 match self
290 .send_request(ClientRequest::WriteMultipleCoils {
291 unit,
292 address,
293 coils: coils.clone(),
294 })
295 .await?
296 {
297 ClientResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
298 _ => Err(AsyncError::UnexpectedResponseType),
299 }
300 }
301
302 #[cfg(feature = "holding-registers")]
308 pub async fn read_holding_registers(
309 &self,
310 unit_id: u8,
311 address: u16,
312 quantity: u16,
313 ) -> Result<HoldingRegisters, AsyncError> {
314 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
315 match self
316 .send_request(ClientRequest::ReadHoldingRegisters {
317 unit,
318 address,
319 quantity,
320 })
321 .await?
322 {
323 ClientResponse::HoldingRegisters(regs) => Ok(regs),
324 _ => Err(AsyncError::UnexpectedResponseType),
325 }
326 }
327
328 #[cfg(feature = "input-registers")]
332 pub async fn read_input_registers(
333 &self,
334 unit_id: u8,
335 address: u16,
336 quantity: u16,
337 ) -> Result<InputRegisters, AsyncError> {
338 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
339 match self
340 .send_request(ClientRequest::ReadInputRegisters {
341 unit,
342 address,
343 quantity,
344 })
345 .await?
346 {
347 ClientResponse::InputRegisters(regs) => Ok(regs),
348 _ => Err(AsyncError::UnexpectedResponseType),
349 }
350 }
351
352 #[cfg(feature = "holding-registers")]
356 pub async fn write_single_register(
357 &self,
358 unit_id: u8,
359 address: u16,
360 value: u16,
361 ) -> Result<(u16, u16), AsyncError> {
362 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
363 match self
364 .send_request(ClientRequest::WriteSingleRegister {
365 unit,
366 address,
367 value,
368 })
369 .await?
370 {
371 ClientResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
372 _ => Err(AsyncError::UnexpectedResponseType),
373 }
374 }
375
376 #[cfg(feature = "holding-registers")]
380 pub async fn write_multiple_registers(
381 &self,
382 unit_id: u8,
383 address: u16,
384 values: &[u16],
385 ) -> Result<(u16, u16), AsyncError> {
386 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
387 let hv =
388 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
389 values,
390 )
391 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
392 match self
393 .send_request(ClientRequest::WriteMultipleRegisters {
394 unit,
395 address,
396 values: hv,
397 })
398 .await?
399 {
400 ClientResponse::HoldingRegisters(regs) => Ok((regs.from_address(), regs.quantity())),
401 _ => Err(AsyncError::UnexpectedResponseType),
402 }
403 }
404
405 #[cfg(feature = "holding-registers")]
411 pub async fn read_write_multiple_registers(
412 &self,
413 unit_id: u8,
414 read_address: u16,
415 read_quantity: u16,
416 write_address: u16,
417 write_values: &[u16],
418 ) -> Result<HoldingRegisters, AsyncError> {
419 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
420 let hv =
421 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
422 write_values,
423 )
424 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
425 match self
426 .send_request(ClientRequest::ReadWriteMultipleRegisters {
427 unit,
428 read_address,
429 read_quantity,
430 write_address,
431 write_values: hv,
432 })
433 .await?
434 {
435 ClientResponse::HoldingRegisters(regs) => Ok(regs),
436 _ => Err(AsyncError::UnexpectedResponseType),
437 }
438 }
439
440 #[cfg(feature = "holding-registers")]
444 pub async fn mask_write_register(
445 &self,
446 unit_id: u8,
447 address: u16,
448 and_mask: u16,
449 or_mask: u16,
450 ) -> Result<(), AsyncError> {
451 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
452 match self
453 .send_request(ClientRequest::MaskWriteRegister {
454 unit,
455 address,
456 and_mask,
457 or_mask,
458 })
459 .await?
460 {
461 ClientResponse::MaskWriteRegister => Ok(()),
462 _ => Err(AsyncError::UnexpectedResponseType),
463 }
464 }
465
466 #[cfg(feature = "discrete-inputs")]
472 pub async fn read_discrete_inputs(
473 &self,
474 unit_id: u8,
475 address: u16,
476 quantity: u16,
477 ) -> Result<DiscreteInputs, AsyncError> {
478 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
479 match self
480 .send_request(ClientRequest::ReadDiscreteInputs {
481 unit,
482 address,
483 quantity,
484 })
485 .await?
486 {
487 ClientResponse::DiscreteInputs(di) => Ok(di),
488 _ => Err(AsyncError::UnexpectedResponseType),
489 }
490 }
491
492 #[cfg(feature = "fifo")]
498 pub async fn read_fifo_queue(
499 &self,
500 unit_id: u8,
501 address: u16,
502 ) -> Result<FifoQueue, AsyncError> {
503 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
504 match self
505 .send_request(ClientRequest::ReadFifoQueue { unit, address })
506 .await?
507 {
508 ClientResponse::FifoQueue(queue) => Ok(queue),
509 _ => Err(AsyncError::UnexpectedResponseType),
510 }
511 }
512
513 #[cfg(feature = "file-record")]
519 pub async fn read_file_record(
520 &self,
521 unit_id: u8,
522 sub_request: &SubRequest,
523 ) -> Result<Vec<SubRequestParams>, AsyncError> {
524 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
525 match self
526 .send_request(ClientRequest::ReadFileRecord {
527 unit,
528 sub_request: sub_request.clone(),
529 })
530 .await?
531 {
532 ClientResponse::FileRecordRead(data) => Ok(data.into_iter().collect()),
533 _ => Err(AsyncError::UnexpectedResponseType),
534 }
535 }
536
537 #[cfg(feature = "file-record")]
539 pub async fn write_file_record(
540 &self,
541 unit_id: u8,
542 sub_request: &SubRequest,
543 ) -> Result<(), AsyncError> {
544 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
545 match self
546 .send_request(ClientRequest::WriteFileRecord {
547 unit,
548 sub_request: sub_request.clone(),
549 })
550 .await?
551 {
552 ClientResponse::FileRecordWrite => Ok(()),
553 _ => Err(AsyncError::UnexpectedResponseType),
554 }
555 }
556
557 #[cfg(feature = "diagnostics")]
561 pub async fn read_device_identification(
562 &self,
563 unit_id: u8,
564 read_device_id_code: ReadDeviceIdCode,
565 object_id: ObjectId,
566 ) -> Result<DeviceIdentificationResponse, AsyncError> {
567 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
568 match self
569 .send_request(ClientRequest::ReadDeviceIdentification {
570 unit,
571 read_device_id_code,
572 object_id,
573 })
574 .await?
575 {
576 ClientResponse::DeviceIdentification(resp) => Ok(resp),
577 _ => Err(AsyncError::UnexpectedResponseType),
578 }
579 }
580
581 #[cfg(feature = "diagnostics")]
585 pub async fn encapsulated_interface_transport(
586 &self,
587 unit_id: u8,
588 mei_type: EncapsulatedInterfaceType,
589 data: &[u8],
590 ) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
591 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
592 let hv =
593 heapless::Vec::<u8, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
594 data,
595 )
596 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
597 match self
598 .send_request(ClientRequest::EncapsulatedInterfaceTransport {
599 unit,
600 mei_type,
601 data: hv,
602 })
603 .await?
604 {
605 ClientResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
606 Ok((mei_type, data.as_slice().to_vec()))
607 }
608 _ => Err(AsyncError::UnexpectedResponseType),
609 }
610 }
611
612 #[cfg(feature = "diagnostics")]
614 pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
615 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
616 match self
617 .send_request(ClientRequest::ReadExceptionStatus { unit })
618 .await?
619 {
620 ClientResponse::ExceptionStatus(status) => Ok(status),
621 _ => Err(AsyncError::UnexpectedResponseType),
622 }
623 }
624
625 #[cfg(feature = "diagnostics")]
629 pub async fn diagnostics(
630 &self,
631 unit_id: u8,
632 sub_function: DiagnosticSubFunction,
633 data: &[u16],
634 ) -> Result<DiagnosticsDataResponse, AsyncError> {
635 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
636 let hv =
637 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
638 data,
639 )
640 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
641 match self
642 .send_request(ClientRequest::Diagnostics {
643 unit,
644 sub_function,
645 data: hv,
646 })
647 .await?
648 {
649 ClientResponse::DiagnosticsData { sub_function, data } => Ok(DiagnosticsDataResponse {
650 sub_function,
651 data: data.as_slice().to_vec(),
652 }),
653 _ => Err(AsyncError::UnexpectedResponseType),
654 }
655 }
656
657 #[cfg(feature = "diagnostics")]
661 pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
662 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
663 match self
664 .send_request(ClientRequest::GetCommEventCounter { unit })
665 .await?
666 {
667 ClientResponse::CommEventCounter {
668 status,
669 event_count,
670 } => Ok((status, event_count)),
671 _ => Err(AsyncError::UnexpectedResponseType),
672 }
673 }
674
675 #[cfg(feature = "diagnostics")]
679 pub async fn get_comm_event_log(
680 &self,
681 unit_id: u8,
682 ) -> Result<CommEventLogResponse, AsyncError> {
683 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
684 match self
685 .send_request(ClientRequest::GetCommEventLog { unit })
686 .await?
687 {
688 ClientResponse::CommEventLog {
689 status,
690 event_count,
691 message_count,
692 events,
693 } => Ok((
694 status,
695 event_count,
696 message_count,
697 events.as_slice().to_vec(),
698 )),
699 _ => Err(AsyncError::UnexpectedResponseType),
700 }
701 }
702
703 #[cfg(feature = "diagnostics")]
707 pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
708 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
709 match self
710 .send_request(ClientRequest::ReportServerId { unit })
711 .await?
712 {
713 ClientResponse::ReportServerId(data) => Ok(data.as_slice().to_vec()),
714 _ => Err(AsyncError::UnexpectedResponseType),
715 }
716 }
717}