1use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::Duration;
27
28use tokio::sync::{mpsc, oneshot};
29
30#[cfg(any(feature = "registers", feature = "diagnostics"))]
31use mbus_core::errors::MbusError;
32use mbus_core::transport::UnitIdOrSlaveAddr;
33
34#[cfg(feature = "diagnostics")]
35use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
36#[cfg(feature = "coils")]
37use mbus_core::models::coil::Coils;
38#[cfg(feature = "diagnostics")]
39use mbus_core::models::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
40#[cfg(feature = "discrete-inputs")]
41use mbus_core::models::discrete_input::DiscreteInputs;
42#[cfg(feature = "fifo")]
43use mbus_core::models::fifo_queue::FifoQueue;
44#[cfg(feature = "file-record")]
45use mbus_core::models::file_record::{SubRequest, SubRequestParams};
46#[cfg(feature = "registers")]
47use mbus_core::models::register::Registers;
48
49use crate::client::command::{ClientRequest, TaskCommand};
50use crate::client::response::ClientResponse;
51use crate::client::task::PendingCountReceiver;
52
53#[cfg(feature = "traffic")]
54use crate::client::notifier::{AsyncClientNotifier, NotifierStore};
55
56use super::AsyncError;
57#[cfg(feature = "diagnostics")]
58use super::{CommEventLogResponse, DiagnosticsDataResponse};
59
60pub struct AsyncClientCore {
70 cmd_tx: mpsc::Sender<TaskCommand>,
71 pending_count_rx: PendingCountReceiver,
72 request_timeout_ns: Arc<AtomicU64>,
74 #[cfg(feature = "traffic")]
75 notifier: NotifierStore,
76}
77
78impl AsyncClientCore {
79 pub(super) fn new(
81 cmd_tx: mpsc::Sender<TaskCommand>,
82 pending_count_rx: PendingCountReceiver,
83 #[cfg(feature = "traffic")] notifier: NotifierStore,
84 ) -> Self {
85 Self {
86 cmd_tx,
87 pending_count_rx,
88 request_timeout_ns: Arc::new(AtomicU64::new(0)),
89 #[cfg(feature = "traffic")]
90 notifier,
91 }
92 }
93
94 async fn send_request(&self, params: ClientRequest) -> Result<ClientResponse, AsyncError> {
101 let (resp_tx, rx) = oneshot::channel();
102 self.cmd_tx
103 .send(TaskCommand::Request { params, resp_tx })
104 .await
105 .map_err(|_| AsyncError::WorkerClosed)?;
106
107 let timeout_ns = self.request_timeout_ns.load(Ordering::Relaxed);
108 if timeout_ns > 0 {
109 let outcome = tokio::time::timeout(Duration::from_nanos(timeout_ns), rx).await;
110 if outcome.is_err() {
111 let _ = self.cmd_tx.try_send(TaskCommand::Disconnect);
115 return Err(AsyncError::Timeout);
116 }
117 outcome
118 .unwrap()
119 .map_err(|_| AsyncError::WorkerClosed)?
120 .map_err(AsyncError::Mbus)
121 } else {
122 rx.await
123 .map_err(|_| AsyncError::WorkerClosed)?
124 .map_err(AsyncError::Mbus)
125 }
126 }
127
128 pub async fn connect(&self) -> Result<(), AsyncError> {
135 let (resp_tx, rx) = oneshot::channel();
136 self.cmd_tx
137 .send(TaskCommand::Connect { resp_tx })
138 .await
139 .map_err(|_| AsyncError::WorkerClosed)?;
140 rx.await
141 .map_err(|_| AsyncError::WorkerClosed)?
142 .map_err(AsyncError::Mbus)
143 }
144
145 pub async fn disconnect(&self) -> Result<(), AsyncError> {
155 self.cmd_tx
156 .send(TaskCommand::Disconnect)
157 .await
158 .map_err(|_| AsyncError::WorkerClosed)
159 }
160
161 pub fn has_pending_requests(&self) -> bool {
165 *self.pending_count_rx.borrow() > 0
166 }
167 pub fn set_request_timeout(&self, timeout: Duration) {
180 self.request_timeout_ns.store(
181 u64::try_from(timeout.as_nanos()).unwrap_or(u64::MAX),
182 Ordering::Relaxed,
183 );
184 }
185
186 pub fn clear_request_timeout(&self) {
190 self.request_timeout_ns.store(0, Ordering::Relaxed);
191 }
192 #[cfg(feature = "traffic")]
199 pub fn set_traffic_notifier<N: AsyncClientNotifier + Send + 'static>(&self, notifier: N) {
200 if let Ok(mut g) = self.notifier.try_lock() {
201 *g = Some(Box::new(notifier));
202 }
203 }
204
205 #[cfg(feature = "traffic")]
207 pub fn clear_traffic_notifier(&self) {
208 if let Ok(mut g) = self.notifier.try_lock() {
209 *g = None;
210 }
211 }
212
213 #[cfg(feature = "coils")]
219 pub async fn read_multiple_coils(
220 &self,
221 unit_id: u8,
222 address: u16,
223 quantity: u16,
224 ) -> Result<Coils, AsyncError> {
225 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
226 #[allow(unreachable_patterns)]
227 match self
228 .send_request(ClientRequest::ReadMultipleCoils {
229 unit,
230 address,
231 quantity,
232 })
233 .await?
234 {
235 ClientResponse::Coils(coils) => Ok(coils),
236 _ => Err(AsyncError::UnexpectedResponseType),
237 }
238 }
239
240 #[cfg(feature = "coils")]
244 pub async fn write_single_coil(
245 &self,
246 unit_id: u8,
247 address: u16,
248 value: bool,
249 ) -> Result<(u16, bool), AsyncError> {
250 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
251 #[allow(unreachable_patterns)]
252 match self
253 .send_request(ClientRequest::WriteSingleCoil {
254 unit,
255 address,
256 value,
257 })
258 .await?
259 {
260 ClientResponse::Coils(coils) => {
261 let v = coils.value(coils.from_address()).unwrap_or(false);
262 Ok((coils.from_address(), v))
263 }
264 _ => Err(AsyncError::UnexpectedResponseType),
265 }
266 }
267
268 #[cfg(feature = "coils")]
272 pub async fn write_multiple_coils(
273 &self,
274 unit_id: u8,
275 address: u16,
276 coils: &Coils,
277 ) -> Result<(u16, u16), AsyncError> {
278 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
279 #[allow(unreachable_patterns)]
280 match self
281 .send_request(ClientRequest::WriteMultipleCoils {
282 unit,
283 address,
284 coils: coils.clone(),
285 })
286 .await?
287 {
288 ClientResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
289 _ => Err(AsyncError::UnexpectedResponseType),
290 }
291 }
292
293 #[cfg(feature = "registers")]
299 pub async fn read_holding_registers(
300 &self,
301 unit_id: u8,
302 address: u16,
303 quantity: u16,
304 ) -> Result<Registers, AsyncError> {
305 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
306 match self
307 .send_request(ClientRequest::ReadHoldingRegisters {
308 unit,
309 address,
310 quantity,
311 })
312 .await?
313 {
314 ClientResponse::Registers(regs) => Ok(regs),
315 _ => Err(AsyncError::UnexpectedResponseType),
316 }
317 }
318
319 #[cfg(feature = "registers")]
323 pub async fn read_input_registers(
324 &self,
325 unit_id: u8,
326 address: u16,
327 quantity: u16,
328 ) -> Result<Registers, AsyncError> {
329 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
330 match self
331 .send_request(ClientRequest::ReadInputRegisters {
332 unit,
333 address,
334 quantity,
335 })
336 .await?
337 {
338 ClientResponse::Registers(regs) => Ok(regs),
339 _ => Err(AsyncError::UnexpectedResponseType),
340 }
341 }
342
343 #[cfg(feature = "registers")]
347 pub async fn write_single_register(
348 &self,
349 unit_id: u8,
350 address: u16,
351 value: u16,
352 ) -> Result<(u16, u16), AsyncError> {
353 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
354 match self
355 .send_request(ClientRequest::WriteSingleRegister {
356 unit,
357 address,
358 value,
359 })
360 .await?
361 {
362 ClientResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
363 _ => Err(AsyncError::UnexpectedResponseType),
364 }
365 }
366
367 #[cfg(feature = "registers")]
371 pub async fn write_multiple_registers(
372 &self,
373 unit_id: u8,
374 address: u16,
375 values: &[u16],
376 ) -> Result<(u16, u16), AsyncError> {
377 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
378 let hv =
379 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
380 values,
381 )
382 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
383 match self
384 .send_request(ClientRequest::WriteMultipleRegisters {
385 unit,
386 address,
387 values: hv,
388 })
389 .await?
390 {
391 ClientResponse::Registers(regs) => Ok((regs.from_address(), regs.quantity())),
392 _ => Err(AsyncError::UnexpectedResponseType),
393 }
394 }
395
396 #[cfg(feature = "registers")]
402 pub async fn read_write_multiple_registers(
403 &self,
404 unit_id: u8,
405 read_address: u16,
406 read_quantity: u16,
407 write_address: u16,
408 write_values: &[u16],
409 ) -> Result<Registers, AsyncError> {
410 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
411 let hv =
412 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
413 write_values,
414 )
415 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
416 match self
417 .send_request(ClientRequest::ReadWriteMultipleRegisters {
418 unit,
419 read_address,
420 read_quantity,
421 write_address,
422 write_values: hv,
423 })
424 .await?
425 {
426 ClientResponse::Registers(regs) => Ok(regs),
427 _ => Err(AsyncError::UnexpectedResponseType),
428 }
429 }
430
431 #[cfg(feature = "registers")]
435 pub async fn mask_write_register(
436 &self,
437 unit_id: u8,
438 address: u16,
439 and_mask: u16,
440 or_mask: u16,
441 ) -> Result<(), AsyncError> {
442 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
443 match self
444 .send_request(ClientRequest::MaskWriteRegister {
445 unit,
446 address,
447 and_mask,
448 or_mask,
449 })
450 .await?
451 {
452 ClientResponse::MaskWriteRegister => Ok(()),
453 _ => Err(AsyncError::UnexpectedResponseType),
454 }
455 }
456
457 #[cfg(feature = "discrete-inputs")]
463 pub async fn read_discrete_inputs(
464 &self,
465 unit_id: u8,
466 address: u16,
467 quantity: u16,
468 ) -> Result<DiscreteInputs, AsyncError> {
469 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
470 match self
471 .send_request(ClientRequest::ReadDiscreteInputs {
472 unit,
473 address,
474 quantity,
475 })
476 .await?
477 {
478 ClientResponse::DiscreteInputs(di) => Ok(di),
479 _ => Err(AsyncError::UnexpectedResponseType),
480 }
481 }
482
483 #[cfg(feature = "fifo")]
489 pub async fn read_fifo_queue(
490 &self,
491 unit_id: u8,
492 address: u16,
493 ) -> Result<FifoQueue, AsyncError> {
494 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
495 match self
496 .send_request(ClientRequest::ReadFifoQueue { unit, address })
497 .await?
498 {
499 ClientResponse::FifoQueue(queue) => Ok(queue),
500 _ => Err(AsyncError::UnexpectedResponseType),
501 }
502 }
503
504 #[cfg(feature = "file-record")]
510 pub async fn read_file_record(
511 &self,
512 unit_id: u8,
513 sub_request: &SubRequest,
514 ) -> Result<Vec<SubRequestParams>, AsyncError> {
515 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
516 match self
517 .send_request(ClientRequest::ReadFileRecord {
518 unit,
519 sub_request: sub_request.clone(),
520 })
521 .await?
522 {
523 ClientResponse::FileRecordRead(data) => Ok(data.into_iter().collect()),
524 _ => Err(AsyncError::UnexpectedResponseType),
525 }
526 }
527
528 #[cfg(feature = "file-record")]
530 pub async fn write_file_record(
531 &self,
532 unit_id: u8,
533 sub_request: &SubRequest,
534 ) -> Result<(), AsyncError> {
535 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
536 match self
537 .send_request(ClientRequest::WriteFileRecord {
538 unit,
539 sub_request: sub_request.clone(),
540 })
541 .await?
542 {
543 ClientResponse::FileRecordWrite => Ok(()),
544 _ => Err(AsyncError::UnexpectedResponseType),
545 }
546 }
547
548 #[cfg(feature = "diagnostics")]
552 pub async fn read_device_identification(
553 &self,
554 unit_id: u8,
555 read_device_id_code: ReadDeviceIdCode,
556 object_id: ObjectId,
557 ) -> Result<DeviceIdentificationResponse, AsyncError> {
558 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
559 match self
560 .send_request(ClientRequest::ReadDeviceIdentification {
561 unit,
562 read_device_id_code,
563 object_id,
564 })
565 .await?
566 {
567 ClientResponse::DeviceIdentification(resp) => Ok(resp),
568 _ => Err(AsyncError::UnexpectedResponseType),
569 }
570 }
571
572 #[cfg(feature = "diagnostics")]
576 pub async fn encapsulated_interface_transport(
577 &self,
578 unit_id: u8,
579 mei_type: EncapsulatedInterfaceType,
580 data: &[u8],
581 ) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
582 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
583 let hv =
584 heapless::Vec::<u8, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
585 data,
586 )
587 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
588 match self
589 .send_request(ClientRequest::EncapsulatedInterfaceTransport {
590 unit,
591 mei_type,
592 data: hv,
593 })
594 .await?
595 {
596 ClientResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
597 Ok((mei_type, data.as_slice().to_vec()))
598 }
599 _ => Err(AsyncError::UnexpectedResponseType),
600 }
601 }
602
603 #[cfg(feature = "diagnostics")]
605 pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
606 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
607 match self
608 .send_request(ClientRequest::ReadExceptionStatus { unit })
609 .await?
610 {
611 ClientResponse::ExceptionStatus(status) => Ok(status),
612 _ => Err(AsyncError::UnexpectedResponseType),
613 }
614 }
615
616 #[cfg(feature = "diagnostics")]
620 pub async fn diagnostics(
621 &self,
622 unit_id: u8,
623 sub_function: DiagnosticSubFunction,
624 data: &[u16],
625 ) -> Result<DiagnosticsDataResponse, AsyncError> {
626 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
627 let hv =
628 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
629 data,
630 )
631 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
632 match self
633 .send_request(ClientRequest::Diagnostics {
634 unit,
635 sub_function,
636 data: hv,
637 })
638 .await?
639 {
640 ClientResponse::DiagnosticsData { sub_function, data } => Ok(DiagnosticsDataResponse {
641 sub_function,
642 data: data.as_slice().to_vec(),
643 }),
644 _ => Err(AsyncError::UnexpectedResponseType),
645 }
646 }
647
648 #[cfg(feature = "diagnostics")]
652 pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
653 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
654 match self
655 .send_request(ClientRequest::GetCommEventCounter { unit })
656 .await?
657 {
658 ClientResponse::CommEventCounter {
659 status,
660 event_count,
661 } => Ok((status, event_count)),
662 _ => Err(AsyncError::UnexpectedResponseType),
663 }
664 }
665
666 #[cfg(feature = "diagnostics")]
670 pub async fn get_comm_event_log(
671 &self,
672 unit_id: u8,
673 ) -> Result<CommEventLogResponse, AsyncError> {
674 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
675 match self
676 .send_request(ClientRequest::GetCommEventLog { unit })
677 .await?
678 {
679 ClientResponse::CommEventLog {
680 status,
681 event_count,
682 message_count,
683 events,
684 } => Ok((
685 status,
686 event_count,
687 message_count,
688 events.as_slice().to_vec(),
689 )),
690 _ => Err(AsyncError::UnexpectedResponseType),
691 }
692 }
693
694 #[cfg(feature = "diagnostics")]
698 pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
699 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
700 match self
701 .send_request(ClientRequest::ReportServerId { unit })
702 .await?
703 {
704 ClientResponse::ReportServerId(data) => Ok(data.as_slice().to_vec()),
705 _ => Err(AsyncError::UnexpectedResponseType),
706 }
707 }
708}