1use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::time::Duration;
28
29use tokio::sync::{mpsc, oneshot};
30
31use mbus_core::errors::MbusError;
32use mbus_core::transport::UnitIdOrSlaveAddr;
33
34#[cfg(feature = "diagnostics")]
35use mbus_core::function_codes::public::{DiagnosticSubFunction, EncapsulatedInterfaceType};
36#[cfg(feature = "coils")]
37use mbus_core::models::coil::Coils;
38#[cfg(feature = "diagnostics")]
39use mbus_core::models::diagnostic::{DeviceIdentificationResponse, ObjectId, ReadDeviceIdCode};
40#[cfg(feature = "discrete-inputs")]
41use mbus_core::models::discrete_input::DiscreteInputs;
42#[cfg(feature = "fifo")]
43use mbus_core::models::fifo_queue::FifoQueue;
44#[cfg(feature = "file-record")]
45use mbus_core::models::file_record::{SubRequest, SubRequestParams};
46#[cfg(feature = "registers")]
47use mbus_core::models::register::Registers;
48
49use crate::client::command::{ClientRequest, TaskCommand};
50use crate::client::response::ClientResponse;
51use crate::client::task::PendingCountReceiver;
52
53#[cfg(feature = "traffic")]
54use crate::client::notifier::{AsyncClientNotifier, NotifierStore};
55
56use super::AsyncError;
57#[cfg(feature = "diagnostics")]
58use super::{CommEventLogResponse, DiagnosticsDataResponse};
59
60pub struct AsyncClientCore {
72 cmd_tx: mpsc::Sender<TaskCommand>,
73 pending_count_rx: PendingCountReceiver,
74 request_timeout_ns: Arc<AtomicU64>,
76 #[cfg(feature = "traffic")]
77 notifier: NotifierStore,
78}
79
80impl AsyncClientCore {
81 pub(super) fn new(
85 cmd_tx: mpsc::Sender<TaskCommand>,
86 pending_count_rx: PendingCountReceiver,
87 #[cfg(feature = "traffic")] notifier: NotifierStore,
88 ) -> Self {
89 Self {
90 cmd_tx,
91 pending_count_rx,
92 request_timeout_ns: Arc::new(AtomicU64::new(0)),
93 #[cfg(feature = "traffic")]
94 notifier,
95 }
96 }
97
98 async fn send_request(&self, params: ClientRequest) -> Result<ClientResponse, AsyncError> {
105 let (resp_tx, rx) = oneshot::channel();
106 self.cmd_tx
107 .send(TaskCommand::Request { params, resp_tx })
108 .await
109 .map_err(|_| AsyncError::WorkerClosed)?;
110
111 let timeout_ns = self.request_timeout_ns.load(Ordering::Relaxed);
112 if timeout_ns > 0 {
113 let outcome = tokio::time::timeout(Duration::from_nanos(timeout_ns), rx).await;
114 if outcome.is_err() {
115 let _ = self.cmd_tx.try_send(TaskCommand::Disconnect);
119 return Err(AsyncError::Timeout);
120 }
121 outcome
122 .unwrap()
123 .map_err(|_| AsyncError::WorkerClosed)?
124 .map_err(AsyncError::Mbus)
125 } else {
126 rx.await
127 .map_err(|_| AsyncError::WorkerClosed)?
128 .map_err(AsyncError::Mbus)
129 }
130 }
131
132 pub async fn connect(&self) -> Result<(), AsyncError> {
139 let (resp_tx, rx) = oneshot::channel();
140 self.cmd_tx
141 .send(TaskCommand::Connect { resp_tx })
142 .await
143 .map_err(|_| AsyncError::WorkerClosed)?;
144 rx.await
145 .map_err(|_| AsyncError::WorkerClosed)?
146 .map_err(AsyncError::Mbus)
147 }
148
149 pub fn has_pending_requests(&self) -> bool {
153 *self.pending_count_rx.borrow() > 0
154 }
155 pub fn set_request_timeout(&self, timeout: Duration) {
168 self.request_timeout_ns.store(
169 u64::try_from(timeout.as_nanos()).unwrap_or(u64::MAX),
170 Ordering::Relaxed,
171 );
172 }
173
174 pub fn clear_request_timeout(&self) {
178 self.request_timeout_ns.store(0, Ordering::Relaxed);
179 }
180 #[cfg(feature = "traffic")]
187 pub fn set_traffic_notifier<N: AsyncClientNotifier + Send + 'static>(&self, notifier: N) {
188 if let Ok(mut g) = self.notifier.try_lock() {
189 *g = Some(Box::new(notifier));
190 }
191 }
192
193 #[cfg(feature = "traffic")]
195 pub fn clear_traffic_notifier(&self) {
196 if let Ok(mut g) = self.notifier.try_lock() {
197 *g = None;
198 }
199 }
200
201 #[cfg(feature = "coils")]
207 pub async fn read_multiple_coils(
208 &self,
209 unit_id: u8,
210 address: u16,
211 quantity: u16,
212 ) -> Result<Coils, AsyncError> {
213 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
214 match self
215 .send_request(ClientRequest::ReadMultipleCoils {
216 unit,
217 address,
218 quantity,
219 })
220 .await?
221 {
222 ClientResponse::Coils(coils) => Ok(coils),
223 _ => Err(AsyncError::UnexpectedResponseType),
224 }
225 }
226
227 #[cfg(feature = "coils")]
231 pub async fn write_single_coil(
232 &self,
233 unit_id: u8,
234 address: u16,
235 value: bool,
236 ) -> Result<(u16, bool), AsyncError> {
237 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
238 match self
239 .send_request(ClientRequest::WriteSingleCoil {
240 unit,
241 address,
242 value,
243 })
244 .await?
245 {
246 ClientResponse::Coils(coils) => {
247 let v = coils.value(coils.from_address()).unwrap_or(false);
248 Ok((coils.from_address(), v))
249 }
250 _ => Err(AsyncError::UnexpectedResponseType),
251 }
252 }
253
254 #[cfg(feature = "coils")]
258 pub async fn write_multiple_coils(
259 &self,
260 unit_id: u8,
261 address: u16,
262 coils: &Coils,
263 ) -> Result<(u16, u16), AsyncError> {
264 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
265 match self
266 .send_request(ClientRequest::WriteMultipleCoils {
267 unit,
268 address,
269 coils: coils.clone(),
270 })
271 .await?
272 {
273 ClientResponse::Coils(coils) => Ok((coils.from_address(), coils.quantity())),
274 _ => Err(AsyncError::UnexpectedResponseType),
275 }
276 }
277
278 #[cfg(feature = "registers")]
284 pub async fn read_holding_registers(
285 &self,
286 unit_id: u8,
287 address: u16,
288 quantity: u16,
289 ) -> Result<Registers, AsyncError> {
290 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
291 match self
292 .send_request(ClientRequest::ReadHoldingRegisters {
293 unit,
294 address,
295 quantity,
296 })
297 .await?
298 {
299 ClientResponse::Registers(regs) => Ok(regs),
300 _ => Err(AsyncError::UnexpectedResponseType),
301 }
302 }
303
304 #[cfg(feature = "registers")]
308 pub async fn read_input_registers(
309 &self,
310 unit_id: u8,
311 address: u16,
312 quantity: u16,
313 ) -> Result<Registers, AsyncError> {
314 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
315 match self
316 .send_request(ClientRequest::ReadInputRegisters {
317 unit,
318 address,
319 quantity,
320 })
321 .await?
322 {
323 ClientResponse::Registers(regs) => Ok(regs),
324 _ => Err(AsyncError::UnexpectedResponseType),
325 }
326 }
327
328 #[cfg(feature = "registers")]
332 pub async fn write_single_register(
333 &self,
334 unit_id: u8,
335 address: u16,
336 value: u16,
337 ) -> Result<(u16, u16), AsyncError> {
338 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
339 match self
340 .send_request(ClientRequest::WriteSingleRegister {
341 unit,
342 address,
343 value,
344 })
345 .await?
346 {
347 ClientResponse::SingleRegisterWrite { address, value } => Ok((address, value)),
348 _ => Err(AsyncError::UnexpectedResponseType),
349 }
350 }
351
352 #[cfg(feature = "registers")]
356 pub async fn write_multiple_registers(
357 &self,
358 unit_id: u8,
359 address: u16,
360 values: &[u16],
361 ) -> Result<(u16, u16), AsyncError> {
362 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
363 let hv =
364 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
365 values,
366 )
367 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
368 match self
369 .send_request(ClientRequest::WriteMultipleRegisters {
370 unit,
371 address,
372 values: hv,
373 })
374 .await?
375 {
376 ClientResponse::Registers(regs) => Ok((regs.from_address(), regs.quantity())),
377 _ => Err(AsyncError::UnexpectedResponseType),
378 }
379 }
380
381 #[cfg(feature = "registers")]
387 pub async fn read_write_multiple_registers(
388 &self,
389 unit_id: u8,
390 read_address: u16,
391 read_quantity: u16,
392 write_address: u16,
393 write_values: &[u16],
394 ) -> Result<Registers, AsyncError> {
395 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
396 let hv =
397 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
398 write_values,
399 )
400 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
401 match self
402 .send_request(ClientRequest::ReadWriteMultipleRegisters {
403 unit,
404 read_address,
405 read_quantity,
406 write_address,
407 write_values: hv,
408 })
409 .await?
410 {
411 ClientResponse::Registers(regs) => Ok(regs),
412 _ => Err(AsyncError::UnexpectedResponseType),
413 }
414 }
415
416 #[cfg(feature = "registers")]
420 pub async fn mask_write_register(
421 &self,
422 unit_id: u8,
423 address: u16,
424 and_mask: u16,
425 or_mask: u16,
426 ) -> Result<(), AsyncError> {
427 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
428 match self
429 .send_request(ClientRequest::MaskWriteRegister {
430 unit,
431 address,
432 and_mask,
433 or_mask,
434 })
435 .await?
436 {
437 ClientResponse::MaskWriteRegister => Ok(()),
438 _ => Err(AsyncError::UnexpectedResponseType),
439 }
440 }
441
442 #[cfg(feature = "discrete-inputs")]
448 pub async fn read_discrete_inputs(
449 &self,
450 unit_id: u8,
451 address: u16,
452 quantity: u16,
453 ) -> Result<DiscreteInputs, AsyncError> {
454 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
455 match self
456 .send_request(ClientRequest::ReadDiscreteInputs {
457 unit,
458 address,
459 quantity,
460 })
461 .await?
462 {
463 ClientResponse::DiscreteInputs(di) => Ok(di),
464 _ => Err(AsyncError::UnexpectedResponseType),
465 }
466 }
467
468 #[cfg(feature = "fifo")]
474 pub async fn read_fifo_queue(
475 &self,
476 unit_id: u8,
477 address: u16,
478 ) -> Result<FifoQueue, AsyncError> {
479 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
480 match self
481 .send_request(ClientRequest::ReadFifoQueue { unit, address })
482 .await?
483 {
484 ClientResponse::FifoQueue(queue) => Ok(queue),
485 _ => Err(AsyncError::UnexpectedResponseType),
486 }
487 }
488
489 #[cfg(feature = "file-record")]
495 pub async fn read_file_record(
496 &self,
497 unit_id: u8,
498 sub_request: &SubRequest,
499 ) -> Result<Vec<SubRequestParams>, AsyncError> {
500 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
501 match self
502 .send_request(ClientRequest::ReadFileRecord {
503 unit,
504 sub_request: sub_request.clone(),
505 })
506 .await?
507 {
508 ClientResponse::FileRecordRead(data) => Ok(data.into_iter().collect()),
509 _ => Err(AsyncError::UnexpectedResponseType),
510 }
511 }
512
513 #[cfg(feature = "file-record")]
515 pub async fn write_file_record(
516 &self,
517 unit_id: u8,
518 sub_request: &SubRequest,
519 ) -> Result<(), AsyncError> {
520 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
521 match self
522 .send_request(ClientRequest::WriteFileRecord {
523 unit,
524 sub_request: sub_request.clone(),
525 })
526 .await?
527 {
528 ClientResponse::FileRecordWrite => Ok(()),
529 _ => Err(AsyncError::UnexpectedResponseType),
530 }
531 }
532
533 #[cfg(feature = "diagnostics")]
537 pub async fn read_device_identification(
538 &self,
539 unit_id: u8,
540 read_device_id_code: ReadDeviceIdCode,
541 object_id: ObjectId,
542 ) -> Result<DeviceIdentificationResponse, AsyncError> {
543 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
544 match self
545 .send_request(ClientRequest::ReadDeviceIdentification {
546 unit,
547 read_device_id_code,
548 object_id,
549 })
550 .await?
551 {
552 ClientResponse::DeviceIdentification(resp) => Ok(resp),
553 _ => Err(AsyncError::UnexpectedResponseType),
554 }
555 }
556
557 #[cfg(feature = "diagnostics")]
561 pub async fn encapsulated_interface_transport(
562 &self,
563 unit_id: u8,
564 mei_type: EncapsulatedInterfaceType,
565 data: &[u8],
566 ) -> Result<(EncapsulatedInterfaceType, Vec<u8>), AsyncError> {
567 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
568 let hv =
569 heapless::Vec::<u8, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
570 data,
571 )
572 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
573 match self
574 .send_request(ClientRequest::EncapsulatedInterfaceTransport {
575 unit,
576 mei_type,
577 data: hv,
578 })
579 .await?
580 {
581 ClientResponse::EncapsulatedInterfaceTransport { mei_type, data } => {
582 Ok((mei_type, data.as_slice().to_vec()))
583 }
584 _ => Err(AsyncError::UnexpectedResponseType),
585 }
586 }
587
588 #[cfg(feature = "diagnostics")]
590 pub async fn read_exception_status(&self, unit_id: u8) -> Result<u8, AsyncError> {
591 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
592 match self
593 .send_request(ClientRequest::ReadExceptionStatus { unit })
594 .await?
595 {
596 ClientResponse::ExceptionStatus(status) => Ok(status),
597 _ => Err(AsyncError::UnexpectedResponseType),
598 }
599 }
600
601 #[cfg(feature = "diagnostics")]
605 pub async fn diagnostics(
606 &self,
607 unit_id: u8,
608 sub_function: DiagnosticSubFunction,
609 data: &[u16],
610 ) -> Result<DiagnosticsDataResponse, AsyncError> {
611 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
612 let hv =
613 heapless::Vec::<u16, { mbus_core::data_unit::common::MAX_PDU_DATA_LEN }>::from_slice(
614 data,
615 )
616 .map_err(|_| AsyncError::Mbus(MbusError::BufferTooSmall))?;
617 match self
618 .send_request(ClientRequest::Diagnostics {
619 unit,
620 sub_function,
621 data: hv,
622 })
623 .await?
624 {
625 ClientResponse::DiagnosticsData { sub_function, data } => Ok(DiagnosticsDataResponse {
626 sub_function,
627 data: data.as_slice().to_vec(),
628 }),
629 _ => Err(AsyncError::UnexpectedResponseType),
630 }
631 }
632
633 #[cfg(feature = "diagnostics")]
637 pub async fn get_comm_event_counter(&self, unit_id: u8) -> Result<(u16, u16), AsyncError> {
638 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
639 match self
640 .send_request(ClientRequest::GetCommEventCounter { unit })
641 .await?
642 {
643 ClientResponse::CommEventCounter {
644 status,
645 event_count,
646 } => Ok((status, event_count)),
647 _ => Err(AsyncError::UnexpectedResponseType),
648 }
649 }
650
651 #[cfg(feature = "diagnostics")]
655 pub async fn get_comm_event_log(
656 &self,
657 unit_id: u8,
658 ) -> Result<CommEventLogResponse, AsyncError> {
659 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
660 match self
661 .send_request(ClientRequest::GetCommEventLog { unit })
662 .await?
663 {
664 ClientResponse::CommEventLog {
665 status,
666 event_count,
667 message_count,
668 events,
669 } => Ok((
670 status,
671 event_count,
672 message_count,
673 events.as_slice().to_vec(),
674 )),
675 _ => Err(AsyncError::UnexpectedResponseType),
676 }
677 }
678
679 #[cfg(feature = "diagnostics")]
683 pub async fn report_server_id(&self, unit_id: u8) -> Result<Vec<u8>, AsyncError> {
684 let unit = UnitIdOrSlaveAddr::new(unit_id).map_err(AsyncError::Mbus)?;
685 match self
686 .send_request(ClientRequest::ReportServerId { unit })
687 .await?
688 {
689 ClientResponse::ReportServerId(data) => Ok(data.as_slice().to_vec()),
690 _ => Err(AsyncError::UnexpectedResponseType),
691 }
692 }
693}