1use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::Duration;
27
28use tokio::sync::{mpsc, oneshot};
29
30#[cfg(any(
31 feature = "holding-registers",
32 feature = "input-registers",
33 feature = "diagnostics"
34))]
35use mbus_core::errors::MbusError;
36use mbus_core::transport::UnitIdOrSlaveAddr;
37
38#[cfg(feature = "diagnostics")]
39use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
40#[cfg(feature = "coils")]
41use mbus_core::models::coil::Coils;
42#[cfg(feature = "diagnostics")]
43use mbus_core::models::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
44#[cfg(feature = "discrete-inputs")]
45use mbus_core::models::discrete_input::DiscreteInputs;
46#[cfg(feature = "fifo")]
47use mbus_core::models::fifo_queue::FifoQueue;
48#[cfg(feature = "file-record")]
49use mbus_core::models::file_record::{SubRequest, SubRequestParams};
50#[cfg(feature = "holding-registers")]
51use mbus_core::models::register::HoldingRegisters;
52#[cfg(feature = "input-registers")]
53use mbus_core::models::register::InputRegisters;
54
55use crate::client::command::{ClientRequest, TaskCommand};
56use crate::client::response::ClientResponse;
57use crate::client::task::PendingCountReceiver;
58
59#[cfg(feature = "traffic")]
60use crate::client::notifier::{AsyncClientTrafficNotifier, NotifierStore};
61
62use super::AsyncError;
63#[cfg(feature = "diagnostics")]
64use super::{CommEventLogResponse, DiagnosticsDataResponse};
65
66#[derive(Clone)]
76pub struct AsyncClientCore {
77 cmd_tx: mpsc::Sender<TaskCommand>,
78 pending_count_rx: PendingCountReceiver,
79 request_timeout_ns: Arc<AtomicU64>,
81 #[cfg(feature = "traffic")]
82 notifier: NotifierStore,
83}
84
85impl AsyncClientCore {
86 pub(super) fn new(
88 cmd_tx: mpsc::Sender<TaskCommand>,
89 pending_count_rx: PendingCountReceiver,
90 #[cfg(feature = "traffic")] notifier: NotifierStore,
91 ) -> Self {
92 Self {
93 cmd_tx,
94 pending_count_rx,
95 request_timeout_ns: Arc::new(AtomicU64::new(0)),
96 #[cfg(feature = "traffic")]
97 notifier,
98 }
99 }
100
101 async fn send_request(&self, params: ClientRequest) -> Result<ClientResponse, AsyncError> {
108 let (resp_tx, rx) = oneshot::channel();
109 self.cmd_tx
110 .send(TaskCommand::Request { params, resp_tx })
111 .await
112 .map_err(|_| AsyncError::WorkerClosed)?;
113
114 let timeout_ns = self.request_timeout_ns.load(Ordering::Relaxed);
115 if timeout_ns > 0 {
116 let outcome = tokio::time::timeout(Duration::from_nanos(timeout_ns), rx).await;
117 if outcome.is_err() {
118 let _ = self.cmd_tx.try_send(TaskCommand::Disconnect);
122 return Err(AsyncError::Timeout);
123 }
124 outcome
125 .unwrap()
126 .map_err(|_| AsyncError::WorkerClosed)?
127 .map_err(AsyncError::Mbus)
128 } else {
129 rx.await
130 .map_err(|_| AsyncError::WorkerClosed)?
131 .map_err(AsyncError::Mbus)
132 }
133 }
134
135 pub async fn connect(&self) -> Result<(), AsyncError> {
142 let (resp_tx, rx) = oneshot::channel();
143 self.cmd_tx
144 .send(TaskCommand::Connect { resp_tx })
145 .await
146 .map_err(|_| AsyncError::WorkerClosed)?;
147 rx.await
148 .map_err(|_| AsyncError::WorkerClosed)?
149 .map_err(AsyncError::Mbus)
150 }
151
152 pub async fn disconnect(&self) -> Result<(), AsyncError> {
162 self.cmd_tx
163 .send(TaskCommand::Disconnect)
164 .await
165 .map_err(|_| AsyncError::WorkerClosed)
166 }
167
168 pub fn has_pending_requests(&self) -> bool {
172 *self.pending_count_rx.borrow() > 0
173 }
174 pub fn set_request_timeout(&self, timeout: Duration) {
187 self.request_timeout_ns.store(
188 u64::try_from(timeout.as_nanos()).unwrap_or(u64::MAX),
189 Ordering::Relaxed,
190 );
191 }
192
193 pub fn clear_request_timeout(&self) {
197 self.request_timeout_ns.store(0, Ordering::Relaxed);
198 }
199 #[cfg(feature = "traffic")]
206 pub fn set_traffic_notifier<N: AsyncClientTrafficNotifier + Send + 'static>(
207 &self,
208 notifier: N,
209 ) {
210 if let Ok(mut g) = self.notifier.try_lock() {
211 *g = Some(Box::new(notifier));
212 }
213 }
214
215 #[cfg(feature = "traffic")]
217 pub fn clear_traffic_notifier(&self) {
218 if let Ok(mut g) = self.notifier.try_lock() {
219 *g = None;
220 }
221 }
222
223 #[cfg(feature = "coils")]
229 pub async fn read_multiple_coils(
230 &self,
231 unit_id: u8,
232 address: u16,
233 quantity: u16,
234 ) -> Result<Coils, AsyncError> {
235 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
236 #[allow(unreachable_patterns)]
237 match self
238 .send_request(ClientRequest::ReadMultipleCoils {
239 unit,
240 address,
241 quantity,
242 })
243 .await?
244 {
245 ClientResponse::Coils(coils) => Ok(coils),
246 _ => Err(AsyncError::UnexpectedResponseType),
247 }
248 }
249
250 #[cfg(feature = "coils")]
254 pub async fn write_single_coil(
255 &self,
256 unit_id: u8,
257 address: u16,
258 value: bool,
259 ) -> Result<(u16, bool), AsyncError> {
260 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
261 #[allow(unreachable_patterns)]
262 match self
263 .send_request(ClientRequest::WriteSingleCoil {
264 unit,
265 address,
266 value,
267 })
268 .await?
269 {
270 ClientResponse::Coils(coils) => {
271 let v = coils.value(coils.from_address()).unwrap_or(false);
272 Ok((coils.from_address(), v))
273 }
274 _ => Err(AsyncError::UnexpectedResponseType),
275 }
276 }
277
278 #[cfg(feature = "coils")]
282 pub async fn write_multiple_coils(
283 &self,
284 unit_id: u8,
285 address: u16,
286 coils: &Coils,
287 ) -> Result<(u16, u16), AsyncError> {
288 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
289 #[allow(unreachable_patterns)]
290 match self
291 .send_request(ClientRequest::WriteMultipleCoils {
292 unit,
293 address,
294 coils: coils.clone(),
295 })
296 .await?
297 {
298 ClientResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
299 _ => Err(AsyncError::UnexpectedResponseType),
300 }
301 }
302
303 #[cfg(feature = "holding-registers")]
309 pub async fn read_holding_registers(
310 &self,
311 unit_id: u8,
312 address: u16,
313 quantity: u16,
314 ) -> Result<HoldingRegisters, AsyncError> {
315 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
316 match self
317 .send_request(ClientRequest::ReadHoldingRegisters {
318 unit,
319 address,
320 quantity,
321 })
322 .await?
323 {
324 ClientResponse::HoldingRegisters(regs) => Ok(regs),
325 _ => Err(AsyncError::UnexpectedResponseType),
326 }
327 }
328
329 #[cfg(feature = "input-registers")]
333 pub async fn read_input_registers(
334 &self,
335 unit_id: u8,
336 address: u16,
337 quantity: u16,
338 ) -> Result<InputRegisters, AsyncError> {
339 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
340 match self
341 .send_request(ClientRequest::ReadInputRegisters {
342 unit,
343 address,
344 quantity,
345 })
346 .await?
347 {
348 ClientResponse::InputRegisters(regs) => Ok(regs),
349 _ => Err(AsyncError::UnexpectedResponseType),
350 }
351 }
352
353 #[cfg(feature = "holding-registers")]
357 pub async fn write_single_register(
358 &self,
359 unit_id: u8,
360 address: u16,
361 value: u16,
362 ) -> Result<(u16, u16), AsyncError> {
363 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
364 match self
365 .send_request(ClientRequest::WriteSingleRegister {
366 unit,
367 address,
368 value,
369 })
370 .await?
371 {
372 ClientResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
373 _ => Err(AsyncError::UnexpectedResponseType),
374 }
375 }
376
377 #[cfg(feature = "holding-registers")]
381 pub async fn write_multiple_registers(
382 &self,
383 unit_id: u8,
384 address: u16,
385 values: &[u16],
386 ) -> Result<(u16, u16), AsyncError> {
387 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
388 let hv =
389 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
390 values,
391 )
392 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
393 match self
394 .send_request(ClientRequest::WriteMultipleRegisters {
395 unit,
396 address,
397 values: hv,
398 })
399 .await?
400 {
401 ClientResponse::HoldingRegisters(regs) => Ok((regs.from_address(), regs.quantity())),
402 _ => Err(AsyncError::UnexpectedResponseType),
403 }
404 }
405
406 #[cfg(feature = "holding-registers")]
412 pub async fn read_write_multiple_registers(
413 &self,
414 unit_id: u8,
415 read_address: u16,
416 read_quantity: u16,
417 write_address: u16,
418 write_values: &[u16],
419 ) -> Result<HoldingRegisters, AsyncError> {
420 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
421 let hv =
422 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
423 write_values,
424 )
425 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
426 match self
427 .send_request(ClientRequest::ReadWriteMultipleRegisters {
428 unit,
429 read_address,
430 read_quantity,
431 write_address,
432 write_values: hv,
433 })
434 .await?
435 {
436 ClientResponse::HoldingRegisters(regs) => Ok(regs),
437 _ => Err(AsyncError::UnexpectedResponseType),
438 }
439 }
440
441 #[cfg(feature = "holding-registers")]
445 pub async fn mask_write_register(
446 &self,
447 unit_id: u8,
448 address: u16,
449 and_mask: u16,
450 or_mask: u16,
451 ) -> Result<(), AsyncError> {
452 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
453 match self
454 .send_request(ClientRequest::MaskWriteRegister {
455 unit,
456 address,
457 and_mask,
458 or_mask,
459 })
460 .await?
461 {
462 ClientResponse::MaskWriteRegister => Ok(()),
463 _ => Err(AsyncError::UnexpectedResponseType),
464 }
465 }
466
467 #[cfg(feature = "discrete-inputs")]
473 pub async fn read_discrete_inputs(
474 &self,
475 unit_id: u8,
476 address: u16,
477 quantity: u16,
478 ) -> Result<DiscreteInputs, AsyncError> {
479 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
480 match self
481 .send_request(ClientRequest::ReadDiscreteInputs {
482 unit,
483 address,
484 quantity,
485 })
486 .await?
487 {
488 ClientResponse::DiscreteInputs(di) => Ok(di),
489 _ => Err(AsyncError::UnexpectedResponseType),
490 }
491 }
492
493 #[cfg(feature = "fifo")]
499 pub async fn read_fifo_queue(
500 &self,
501 unit_id: u8,
502 address: u16,
503 ) -> Result<FifoQueue, AsyncError> {
504 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
505 match self
506 .send_request(ClientRequest::ReadFifoQueue { unit, address })
507 .await?
508 {
509 ClientResponse::FifoQueue(queue) => Ok(queue),
510 _ => Err(AsyncError::UnexpectedResponseType),
511 }
512 }
513
514 #[cfg(feature = "file-record")]
520 pub async fn read_file_record(
521 &self,
522 unit_id: u8,
523 sub_request: &SubRequest,
524 ) -> Result<Vec<SubRequestParams>, AsyncError> {
525 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
526 match self
527 .send_request(ClientRequest::ReadFileRecord {
528 unit,
529 sub_request: sub_request.clone(),
530 })
531 .await?
532 {
533 ClientResponse::FileRecordRead(data) => Ok(data.into_iter().collect()),
534 _ => Err(AsyncError::UnexpectedResponseType),
535 }
536 }
537
538 #[cfg(feature = "file-record")]
540 pub async fn write_file_record(
541 &self,
542 unit_id: u8,
543 sub_request: &SubRequest,
544 ) -> Result<(), AsyncError> {
545 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
546 match self
547 .send_request(ClientRequest::WriteFileRecord {
548 unit,
549 sub_request: sub_request.clone(),
550 })
551 .await?
552 {
553 ClientResponse::FileRecordWrite => Ok(()),
554 _ => Err(AsyncError::UnexpectedResponseType),
555 }
556 }
557
558 #[cfg(feature = "diagnostics")]
562 pub async fn read_device_identification(
563 &self,
564 unit_id: u8,
565 read_device_id_code: ReadDeviceIdCode,
566 object_id: ObjectId,
567 ) -> Result<DeviceIdentificationResponse, AsyncError> {
568 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
569 match self
570 .send_request(ClientRequest::ReadDeviceIdentification {
571 unit,
572 read_device_id_code,
573 object_id,
574 })
575 .await?
576 {
577 ClientResponse::DeviceIdentification(resp) => Ok(resp),
578 _ => Err(AsyncError::UnexpectedResponseType),
579 }
580 }
581
582 #[cfg(feature = "diagnostics")]
586 pub async fn encapsulated_interface_transport(
587 &self,
588 unit_id: u8,
589 mei_type: EncapsulatedInterfaceType,
590 data: &[u8],
591 ) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
592 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
593 let hv =
594 heapless::Vec::<u8, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
595 data,
596 )
597 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
598 match self
599 .send_request(ClientRequest::EncapsulatedInterfaceTransport {
600 unit,
601 mei_type,
602 data: hv,
603 })
604 .await?
605 {
606 ClientResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
607 Ok((mei_type, data.as_slice().to_vec()))
608 }
609 _ => Err(AsyncError::UnexpectedResponseType),
610 }
611 }
612
613 #[cfg(feature = "diagnostics")]
615 pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
616 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
617 match self
618 .send_request(ClientRequest::ReadExceptionStatus { unit })
619 .await?
620 {
621 ClientResponse::ExceptionStatus(status) => Ok(status),
622 _ => Err(AsyncError::UnexpectedResponseType),
623 }
624 }
625
626 #[cfg(feature = "diagnostics")]
630 pub async fn diagnostics(
631 &self,
632 unit_id: u8,
633 sub_function: DiagnosticSubFunction,
634 data: &[u16],
635 ) -> Result<DiagnosticsDataResponse, AsyncError> {
636 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
637 let hv =
638 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
639 data,
640 )
641 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
642 match self
643 .send_request(ClientRequest::Diagnostics {
644 unit,
645 sub_function,
646 data: hv,
647 })
648 .await?
649 {
650 ClientResponse::DiagnosticsData { sub_function, data } => Ok(DiagnosticsDataResponse {
651 sub_function,
652 data: data.as_slice().to_vec(),
653 }),
654 _ => Err(AsyncError::UnexpectedResponseType),
655 }
656 }
657
658 #[cfg(feature = "diagnostics")]
662 pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
663 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
664 match self
665 .send_request(ClientRequest::GetCommEventCounter { unit })
666 .await?
667 {
668 ClientResponse::CommEventCounter {
669 status,
670 event_count,
671 } => Ok((status, event_count)),
672 _ => Err(AsyncError::UnexpectedResponseType),
673 }
674 }
675
676 #[cfg(feature = "diagnostics")]
680 pub async fn get_comm_event_log(
681 &self,
682 unit_id: u8,
683 ) -> Result<CommEventLogResponse, AsyncError> {
684 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
685 match self
686 .send_request(ClientRequest::GetCommEventLog { unit })
687 .await?
688 {
689 ClientResponse::CommEventLog {
690 status,
691 event_count,
692 message_count,
693 events,
694 } => Ok((
695 status,
696 event_count,
697 message_count,
698 events.as_slice().to_vec(),
699 )),
700 _ => Err(AsyncError::UnexpectedResponseType),
701 }
702 }
703
704 #[cfg(feature = "diagnostics")]
708 pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
709 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
710 match self
711 .send_request(ClientRequest::ReportServerId { unit })
712 .await?
713 {
714 ClientResponse::ReportServerId(data) => Ok(data.as_slice().to_vec()),
715 _ => Err(AsyncError::UnexpectedResponseType),
716 }
717 }
718}