1use crate::{
2 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3 ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4 DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5 EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9 AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10 SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{
13 primitives::{decode_unsigned, encode_ctx_unsigned},
14 reader::Reader,
15 tag::Tag,
16 writer::Writer,
17};
18use rustbac_core::npdu::Npdu;
19use rustbac_core::services::acknowledge_alarm::{
20 AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
21};
22use rustbac_core::services::alarm_summary::{
23 AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
24 SERVICE_GET_ALARM_SUMMARY,
25};
26use rustbac_core::services::atomic_read_file::{
27 AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
28};
29use rustbac_core::services::atomic_write_file::{
30 AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
31};
32use rustbac_core::services::cov_notification::{
33 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
34 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
35};
36use rustbac_core::services::device_management::{
37 DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
38 ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
39};
40use rustbac_core::services::enrollment_summary::{
41 EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
42 GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
43};
44use rustbac_core::services::event_information::{
45 EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
46 SERVICE_GET_EVENT_INFORMATION,
47};
48use rustbac_core::services::event_notification::{
49 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
50 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
51};
52use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
53use rustbac_core::services::list_element::{
54 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
55 SERVICE_REMOVE_LIST_ELEMENT,
56};
57use rustbac_core::services::object_management::{
58 CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
59 SERVICE_DELETE_OBJECT,
60};
61use rustbac_core::services::private_transfer::{
62 ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
63 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
64};
65use rustbac_core::services::read_property::{
66 ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
67};
68use rustbac_core::services::read_property_multiple::{
69 PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
70 ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
71};
72use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
73use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
74use rustbac_core::services::subscribe_cov_property::{
75 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
76};
77use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
78use rustbac_core::services::value_codec::encode_application_data_value;
79use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
80use rustbac_core::services::who_is::WhoIsRequest;
81use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
82use rustbac_core::services::write_property_multiple::{
83 PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
84 SERVICE_WRITE_PROPERTY_MULTIPLE,
85};
86use rustbac_core::types::{
87 DataValue, Date, ErrorClass, ErrorCode, ObjectId, ObjectType, PropertyId, Time,
88};
89use rustbac_core::EncodeError;
90use rustbac_datalink::bip::transport::{
91 BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
92};
93use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
94use std::collections::{HashMap, HashSet};
95use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
96use std::sync::RwLock;
97use std::time::Duration;
98use tokio::sync::Mutex;
99use tokio::task::JoinHandle;
100use tokio::time::{timeout, Instant};
101
102const MIN_SEGMENT_DATA_LEN: usize = 32;
103const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
104
105pub struct BacnetClient<D: DataLink> {
121 datalink: D,
122 invoke_id: Mutex<u8>,
123 request_io_lock: Mutex<()>,
124 response_timeout: Duration,
125 segmented_request_window_size: u8,
126 segmented_request_retries: u8,
127 segment_ack_timeout: Duration,
128 capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
130 server_handler: Option<std::sync::Arc<dyn crate::server::ServiceHandler>>,
132 server_device_id: u32,
134 #[allow(unused)]
136 server_vendor_id: u16,
137}
138
139impl<D: DataLink + std::fmt::Debug> std::fmt::Debug for BacnetClient<D> {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("BacnetClient")
142 .field("datalink", &self.datalink)
143 .field("invoke_id", &self.invoke_id)
144 .field("response_timeout", &self.response_timeout)
145 .field(
146 "segmented_request_window_size",
147 &self.segmented_request_window_size,
148 )
149 .field("segmented_request_retries", &self.segmented_request_retries)
150 .field("segment_ack_timeout", &self.segment_ack_timeout)
151 .field(
152 "server_handler",
153 &self.server_handler.as_ref().map(|_| "..."),
154 )
155 .field("server_device_id", &self.server_device_id)
156 .field("server_vendor_id", &self.server_vendor_id)
157 .finish()
158 }
159}
160
161#[derive(Debug)]
167pub struct ForeignDeviceRenewal {
168 task: JoinHandle<()>,
169}
170
171impl ForeignDeviceRenewal {
172 pub fn stop(self) {
174 self.task.abort();
175 }
176}
177
178impl Drop for ForeignDeviceRenewal {
179 fn drop(&mut self) {
180 self.task.abort();
181 }
182}
183
184impl BacnetClient<BacnetIpTransport> {
185 pub async fn new() -> Result<Self, ClientError> {
189 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
190 let datalink = BacnetIpTransport::bind(bind_addr).await?;
191 Ok(Self {
192 datalink,
193 invoke_id: Mutex::new(1),
194 request_io_lock: Mutex::new(()),
195 response_timeout: Duration::from_secs(3),
196 segmented_request_window_size: 16,
197 segmented_request_retries: 2,
198 segment_ack_timeout: Duration::from_millis(500),
199 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
200 server_handler: None,
201 server_device_id: 0,
202 server_vendor_id: 0,
203 })
204 }
205
206 pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
213 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
214 let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
215 datalink.register_foreign_device(ttl_seconds).await?;
216 Ok(Self {
217 datalink,
218 invoke_id: Mutex::new(1),
219 request_io_lock: Mutex::new(()),
220 response_timeout: Duration::from_secs(3),
221 segmented_request_window_size: 16,
222 segmented_request_retries: 2,
223 segment_ack_timeout: Duration::from_millis(500),
224 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
225 server_handler: None,
226 server_device_id: 0,
227 server_vendor_id: 0,
228 })
229 }
230
231 pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
234 let _io = self.request_io_lock.lock().await;
235 self.datalink.register_foreign_device(ttl_seconds).await?;
236 Ok(())
237 }
238
239 pub async fn read_broadcast_distribution_table(
241 &self,
242 ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
243 let _io = self.request_io_lock.lock().await;
244 self.datalink
245 .read_broadcast_distribution_table()
246 .await
247 .map_err(ClientError::from)
248 }
249
250 pub async fn write_broadcast_distribution_table(
252 &self,
253 entries: &[BroadcastDistributionEntry],
254 ) -> Result<(), ClientError> {
255 let _io = self.request_io_lock.lock().await;
256 self.datalink
257 .write_broadcast_distribution_table(entries)
258 .await?;
259 Ok(())
260 }
261
262 pub async fn read_foreign_device_table(
264 &self,
265 ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
266 let _io = self.request_io_lock.lock().await;
267 self.datalink
268 .read_foreign_device_table()
269 .await
270 .map_err(ClientError::from)
271 }
272
273 pub async fn delete_foreign_device_table_entry(
275 &self,
276 address: SocketAddrV4,
277 ) -> Result<(), ClientError> {
278 let _io = self.request_io_lock.lock().await;
279 self.datalink
280 .delete_foreign_device_table_entry(address)
281 .await?;
282 Ok(())
283 }
284
285 pub fn start_foreign_device_renewal(
291 &self,
292 ttl_seconds: u16,
293 ) -> Result<ForeignDeviceRenewal, ClientError> {
294 if ttl_seconds == 0 {
295 return Err(EncodeError::InvalidLength.into());
296 }
297
298 let datalink = self.datalink.clone();
299 let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
300 let interval = Duration::from_secs(refresh_seconds.max(1));
301 let task = tokio::spawn(async move {
302 loop {
303 tokio::time::sleep(interval).await;
304 if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
305 log::warn!("foreign device renewal send failed: {err}");
306 }
307 }
308 });
309 Ok(ForeignDeviceRenewal { task })
310 }
311}
312
313impl BacnetClient<rustbac_datalink::bip6::transport::BacnetIp6Transport> {
314 pub async fn new_ipv6(
320 multicast: std::net::Ipv6Addr,
321 if_index: u32,
322 ) -> Result<Self, ClientError> {
323 use rustbac_datalink::bip6::transport::BacnetIp6Transport;
324 let bind_addr = std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, 47808, 0, 0);
325 let datalink = BacnetIp6Transport::bind(bind_addr, multicast, if_index).await?;
326 Ok(Self::with_datalink(datalink))
327 }
328}
329
330impl BacnetClient<BacnetScTransport> {
331 pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
334 let datalink = BacnetScTransport::connect(endpoint).await?;
335 Ok(Self::with_datalink(datalink))
336 }
337}
338
339impl<D: DataLink> BacnetClient<D> {
340 pub fn with_datalink(datalink: D) -> Self {
344 Self {
345 datalink,
346 invoke_id: Mutex::new(1),
347 request_io_lock: Mutex::new(()),
348 response_timeout: Duration::from_secs(3),
349 segmented_request_window_size: 16,
350 segmented_request_retries: 2,
351 segment_ack_timeout: Duration::from_millis(500),
352 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
353 server_handler: None,
354 server_device_id: 0,
355 server_vendor_id: 0,
356 }
357 }
358
359 pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
361 self.response_timeout = timeout;
362 self
363 }
364
365 pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
368 self.segmented_request_window_size = window_size.max(1);
369 self
370 }
371
372 pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
375 self.segmented_request_retries = retries;
376 self
377 }
378
379 pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
382 self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
383 self
384 }
385
386 pub fn with_server_handler(
393 mut self,
394 handler: std::sync::Arc<dyn crate::server::ServiceHandler>,
395 device_id: u32,
396 vendor_id: u16,
397 ) -> Self {
398 self.server_handler = Some(handler);
399 self.server_device_id = device_id;
400 self.server_vendor_id = vendor_id;
401 self
402 }
403
404 pub async fn poll_server(&self) -> Result<(), ClientError> {
412 let handler = self.server_handler.as_ref().ok_or(ClientError::Timeout)?;
413 let _io_lock = self.request_io_lock.lock().await;
414 let mut buf = [0u8; 1500];
415 match tokio::time::timeout(Duration::from_millis(50), self.datalink.recv(&mut buf)).await {
416 Ok(Ok((n, src))) => {
417 let _ = dispatch_incoming_request(
418 &self.datalink,
419 handler.as_ref(),
420 self.server_device_id,
421 self.server_vendor_id,
422 &buf[..n],
423 src,
424 )
425 .await;
426 Ok(())
427 }
428 _ => Ok(()),
429 }
430 }
431
432 async fn next_invoke_id(&self) -> u8 {
433 let mut lock = self.invoke_id.lock().await;
434 let id = *lock;
435 *lock = lock.wrapping_add(1);
436 if *lock == 0 {
437 *lock = 1;
438 }
439 id
440 }
441
442 async fn send_segment_ack(
443 &self,
444 address: DataLinkAddress,
445 invoke_id: u8,
446 sequence_number: u8,
447 window_size: u8,
448 ) -> Result<(), ClientError> {
449 let mut tx = [0u8; 64];
450 let mut w = Writer::new(&mut tx);
451 Npdu::new(0).encode(&mut w)?;
452 SegmentAck {
453 negative_ack: false,
454 sent_by_server: false,
455 invoke_id,
456 sequence_number,
457 actual_window_size: window_size,
458 }
459 .encode(&mut w)?;
460 self.datalink.send(address, w.as_written()).await?;
461 Ok(())
462 }
463
464 async fn recv_ignoring_invalid_frame(
465 &self,
466 buf: &mut [u8],
467 deadline: Instant,
468 ) -> Result<(usize, DataLinkAddress), ClientError> {
469 loop {
470 let remaining = deadline.saturating_duration_since(Instant::now());
471 if remaining.is_zero() {
472 return Err(ClientError::Timeout);
473 }
474
475 match timeout(remaining, self.datalink.recv(buf)).await {
476 Err(_) => return Err(ClientError::Timeout),
477 Ok(Err(DataLinkError::InvalidFrame)) => continue,
478 Ok(Err(e)) => return Err(e.into()),
479 Ok(Ok(v)) => return Ok(v),
480 }
481 }
482 }
483
484 async fn send_simple_ack(
485 &self,
486 address: DataLinkAddress,
487 invoke_id: u8,
488 service_choice: u8,
489 ) -> Result<(), ClientError> {
490 let mut tx = [0u8; 64];
491 let mut w = Writer::new(&mut tx);
492 Npdu::new(0).encode(&mut w)?;
493 SimpleAck {
494 invoke_id,
495 service_choice,
496 }
497 .encode(&mut w)?;
498 self.datalink.send(address, w.as_written()).await?;
499 Ok(())
500 }
501
502 fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
503 where
504 F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
505 {
506 for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
507 let mut buf = vec![0u8; size];
508 let mut w = Writer::new(&mut buf);
509 match encode(&mut w) {
510 Ok(()) => {
511 let written_len = w.as_written().len();
512 buf.truncate(written_len);
513 return Ok(buf);
514 }
515 Err(EncodeError::BufferTooSmall) => continue,
516 Err(e) => return Err(e.into()),
517 }
518 }
519 Err(ClientError::SegmentedRequestTooLarge)
520 }
521
522 const fn max_apdu_octets(max_apdu_code: u8) -> usize {
523 match max_apdu_code & 0x0f {
524 0 => 50,
525 1 => 128,
526 2 => 206,
527 3 => 480,
528 4 => 1024,
529 5 => 1476,
530 _ => 480,
531 }
532 }
533
534 async fn await_segment_ack(
535 &self,
536 address: DataLinkAddress,
537 invoke_id: u8,
538 service_choice: u8,
539 expected_sequence: u8,
540 deadline: Instant,
541 ) -> Result<SegmentAck, ClientError> {
542 loop {
543 let remaining = deadline.saturating_duration_since(Instant::now());
544 if remaining.is_zero() {
545 return Err(ClientError::Timeout);
546 }
547
548 let mut rx = [0u8; 1500];
549 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
550 let (n, src) = match recv {
551 Err(_) => return Err(ClientError::Timeout),
552 Ok(Err(DataLinkError::InvalidFrame)) => continue,
553 Ok(Err(e)) => return Err(e.into()),
554 Ok(Ok(v)) => v,
555 };
556 if src != address {
557 continue;
558 }
559
560 let Ok(apdu) = extract_apdu(&rx[..n]) else {
561 continue;
562 };
563 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
564 match ApduType::from_u8(first >> 4) {
565 Some(ApduType::SegmentAck) => {
566 let mut r = Reader::new(apdu);
567 let ack = SegmentAck::decode(&mut r)?;
568 if ack.invoke_id != invoke_id || !ack.sent_by_server {
569 continue;
570 }
571 if ack.negative_ack {
572 return Err(ClientError::SegmentNegativeAck {
573 sequence_number: ack.sequence_number,
574 });
575 }
576 if ack.sequence_number == expected_sequence {
577 return Ok(ack);
578 }
579 }
580 Some(ApduType::Error) => {
581 let mut r = Reader::new(apdu);
582 let err = BacnetError::decode(&mut r)?;
583 if err.invoke_id == invoke_id && err.service_choice == service_choice {
584 return Err(remote_service_error(err));
585 }
586 }
587 Some(ApduType::Reject) => {
588 let mut r = Reader::new(apdu);
589 let rej = RejectPdu::decode(&mut r)?;
590 if rej.invoke_id == invoke_id {
591 return Err(ClientError::RemoteReject { reason: rej.reason });
592 }
593 }
594 Some(ApduType::Abort) => {
595 let mut r = Reader::new(apdu);
596 let abort = AbortPdu::decode(&mut r)?;
597 if abort.invoke_id == invoke_id {
598 return Err(ClientError::RemoteAbort {
599 reason: abort.reason,
600 server: abort.server,
601 });
602 }
603 }
604 _ => continue,
605 }
606 }
607 }
608
609 async fn send_confirmed_request(
610 &self,
611 address: DataLinkAddress,
612 frame: &[u8],
613 deadline: Instant,
614 ) -> Result<(), ClientError> {
615 let mut pr = Reader::new(frame);
616 let _npdu = Npdu::decode(&mut pr)?;
617 let npdu_len = frame.len() - pr.remaining();
618 let npdu_bytes = &frame[..npdu_len];
619 let apdu = &frame[npdu_len..];
620
621 let mut ar = Reader::new(apdu);
622 let header = ConfirmedRequestHeader::decode(&mut ar)?;
623 let service_payload = ar.read_exact(ar.remaining())?;
624
625 let peer_max_apdu = self
628 .capability_cache
629 .read()
630 .ok()
631 .and_then(|c| c.get(&address).copied())
632 .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
633 let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
634 let segment_count = service_payload.len().div_ceil(segment_data_len);
635
636 if segment_count <= 1 {
637 self.datalink.send(address, frame).await?;
638 return Ok(());
639 }
640
641 if segment_count > usize::from(u8::MAX) + 1 {
642 return Err(ClientError::SegmentedRequestTooLarge);
643 }
644
645 let configured_window_size = self.segmented_request_window_size.max(1);
646 let mut window_size = configured_window_size;
647 let mut peer_window_ceiling = configured_window_size;
648 let mut batch_start = 0usize;
649 while batch_start < segment_count {
650 let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
651 let expected_sequence = (batch_end - 1) as u8;
652
653 let mut frames = Vec::with_capacity(batch_end - batch_start);
654 for segment_index in batch_start..batch_end {
655 let seq = segment_index as u8;
656 let more_follows = segment_index + 1 < segment_count;
657 let start = segment_index * segment_data_len;
658 let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
659 let segment = &service_payload[start..end];
660
661 let seg_header = ConfirmedRequestHeader {
662 segmented: true,
663 more_follows,
664 segmented_response_accepted: header.segmented_response_accepted,
665 max_segments: header.max_segments,
666 max_apdu: header.max_apdu,
667 invoke_id: header.invoke_id,
668 sequence_number: Some(seq),
669 proposed_window_size: Some(window_size),
670 service_choice: header.service_choice,
671 };
672
673 let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
674 let written_len = {
675 let mut w = Writer::new(&mut tx);
676 w.write_all(npdu_bytes)?;
677 seg_header.encode(&mut w)?;
678 w.write_all(segment)?;
679 w.as_written().len()
680 };
681 tx.truncate(written_len);
682 frames.push(tx);
683 }
684
685 let mut retries_remaining = self.segmented_request_retries;
686 loop {
687 for frame in &frames {
688 self.datalink.send(address, frame).await?;
689 }
690
691 if batch_end == segment_count {
692 break;
693 }
694
695 let remaining = deadline.saturating_duration_since(Instant::now());
696 if remaining.is_zero() {
697 return Err(ClientError::Timeout);
698 }
699 let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
700 match self
701 .await_segment_ack(
702 address,
703 header.invoke_id,
704 header.service_choice,
705 expected_sequence,
706 ack_wait_deadline,
707 )
708 .await
709 {
710 Ok(ack) => {
711 peer_window_ceiling =
712 peer_window_ceiling.min(ack.actual_window_size.max(1));
713 window_size = window_size
714 .saturating_add(1)
715 .min(configured_window_size)
716 .min(peer_window_ceiling)
717 .max(1);
718 break;
719 }
720 Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
721 if retries_remaining > 0 =>
722 {
723 retries_remaining -= 1;
724 window_size = window_size.saturating_div(2).max(1);
725 continue;
726 }
727 Err(e) => return Err(e),
728 }
729 }
730
731 batch_start = batch_end;
732 }
733
734 Ok(())
735 }
736
737 async fn collect_complex_ack_payload(
738 &self,
739 address: DataLinkAddress,
740 invoke_id: u8,
741 service_choice: u8,
742 first_header: ComplexAckHeader,
743 first_payload: &[u8],
744 deadline: Instant,
745 ) -> Result<Vec<u8>, ClientError> {
746 let mut payload = first_payload.to_vec();
747 if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
748 return Err(ClientError::ResponseTooLarge {
749 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
750 });
751 }
752 if !first_header.segmented {
753 return Ok(payload);
754 }
755
756 let mut last_seq = first_header
757 .sequence_number
758 .ok_or(ClientError::UnsupportedResponse)?;
759 let mut window_size = first_header.proposed_window_size.unwrap_or(1);
760 self.send_segment_ack(address, invoke_id, last_seq, window_size)
761 .await?;
762 let mut more_follows = first_header.more_follows;
763
764 while more_follows {
765 let mut rx = [0u8; 1500];
766 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
767 if src != address {
768 if let Some(ref handler) = self.server_handler {
770 let _ = dispatch_incoming_request(
771 &self.datalink,
772 handler.as_ref(),
773 self.server_device_id,
774 self.server_vendor_id,
775 &rx[..n],
776 src,
777 )
778 .await;
779 }
780 continue;
781 }
782
783 let Ok(apdu) = extract_apdu(&rx[..n]) else {
784 continue;
785 };
786 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
787 match ApduType::from_u8(first >> 4) {
788 Some(ApduType::ComplexAck) => {
789 let mut r = Reader::new(apdu);
790 let seg = ComplexAckHeader::decode(&mut r)?;
791 if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
792 continue;
793 }
794 if !seg.segmented {
795 return Err(ClientError::UnsupportedResponse);
796 }
797 let seq = seg
798 .sequence_number
799 .ok_or(ClientError::UnsupportedResponse)?;
800 if seq == last_seq {
801 self.send_segment_ack(address, invoke_id, last_seq, window_size)
803 .await?;
804 continue;
805 }
806 if seq != last_seq.wrapping_add(1) {
807 continue;
808 }
809
810 let seg_payload = r.read_exact(r.remaining())?;
811 if payload.len().saturating_add(seg_payload.len())
812 > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
813 {
814 return Err(ClientError::ResponseTooLarge {
815 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
816 });
817 }
818 payload.extend_from_slice(seg_payload);
819
820 last_seq = seq;
821 more_follows = seg.more_follows;
822 window_size = seg.proposed_window_size.unwrap_or(window_size);
823 self.send_segment_ack(address, invoke_id, last_seq, window_size)
824 .await?;
825 }
826 Some(ApduType::Error) => {
827 let mut r = Reader::new(apdu);
828 let err = BacnetError::decode(&mut r)?;
829 if err.invoke_id == invoke_id && err.service_choice == service_choice {
830 return Err(remote_service_error(err));
831 }
832 }
833 Some(ApduType::Reject) => {
834 let mut r = Reader::new(apdu);
835 let rej = RejectPdu::decode(&mut r)?;
836 if rej.invoke_id == invoke_id {
837 return Err(ClientError::RemoteReject { reason: rej.reason });
838 }
839 }
840 Some(ApduType::Abort) => {
841 let mut r = Reader::new(apdu);
842 let abort = AbortPdu::decode(&mut r)?;
843 if abort.invoke_id == invoke_id {
844 return Err(ClientError::RemoteAbort {
845 reason: abort.reason,
846 server: abort.server,
847 });
848 }
849 }
850 _ => {
851 if let Some(ref handler) = self.server_handler {
853 let _ = dispatch_incoming_request(
854 &self.datalink,
855 handler.as_ref(),
856 self.server_device_id,
857 self.server_vendor_id,
858 &rx[..n],
859 src,
860 )
861 .await;
862 }
863 continue;
864 }
865 }
866 }
867
868 Ok(payload)
869 }
870
871 pub async fn who_is(
877 &self,
878 range: Option<(u32, u32)>,
879 wait: Duration,
880 ) -> Result<Vec<DiscoveredDevice>, ClientError> {
881 let req = match range {
884 Some((low, high)) => WhoIsRequest {
885 low_limit: Some(low),
886 high_limit: Some(high),
887 },
888 None => WhoIsRequest::global(),
889 };
890
891 let mut tx = [0u8; 128];
892 let mut w = Writer::new(&mut tx);
893 Npdu::new(0).encode(&mut w)?;
894 req.encode(&mut w)?;
895
896 self.datalink
897 .send(
898 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
899 w.as_written(),
900 )
901 .await?;
902
903 let mut devices = Vec::new();
904 let mut seen = HashSet::new();
905 let deadline = tokio::time::Instant::now() + wait;
906
907 while tokio::time::Instant::now() < deadline {
908 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
909 let mut rx = [0u8; 1500];
910 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
911 match recv {
912 Ok(Ok((n, src))) => {
913 let Ok(apdu) = extract_apdu(&rx[..n]) else {
914 continue;
915 };
916 let mut r = Reader::new(apdu);
917 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
918 continue;
919 };
920 if unconfirmed.service_choice != SERVICE_I_AM {
921 continue;
922 }
923 let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
924 continue;
925 };
926 if seen.insert(i_am.device_id) {
927 devices.push(DiscoveredDevice {
928 address: src,
929 device_id: Some(i_am.device_id),
930 });
931 if let Ok(mut cache) = self.capability_cache.write() {
934 cache.insert(src, i_am.max_apdu as usize);
935 }
936 }
937 }
938 Ok(Err(DataLinkError::InvalidFrame)) => continue,
939 Ok(Err(e)) => return Err(e.into()),
940 Err(_) => break,
941 }
942 }
943
944 Ok(devices)
945 }
946
947 pub async fn who_has_object_id(
952 &self,
953 range: Option<(u32, u32)>,
954 object_id: ObjectId,
955 wait: Duration,
956 ) -> Result<Vec<DiscoveredObject>, ClientError> {
957 let req = WhoHasRequest {
958 low_limit: range.map(|(low, _)| low),
959 high_limit: range.map(|(_, high)| high),
960 object: WhoHasObject::ObjectId(object_id),
961 };
962 self.who_has(req, wait).await
963 }
964
965 pub async fn who_has_object_name(
969 &self,
970 range: Option<(u32, u32)>,
971 object_name: &str,
972 wait: Duration,
973 ) -> Result<Vec<DiscoveredObject>, ClientError> {
974 let req = WhoHasRequest {
975 low_limit: range.map(|(low, _)| low),
976 high_limit: range.map(|(_, high)| high),
977 object: WhoHasObject::ObjectName(object_name),
978 };
979 self.who_has(req, wait).await
980 }
981
982 async fn who_has(
983 &self,
984 request: WhoHasRequest<'_>,
985 wait: Duration,
986 ) -> Result<Vec<DiscoveredObject>, ClientError> {
987 let tx = self.encode_with_growth(|w| {
989 Npdu::new(0).encode(w)?;
990 request.encode(w)
991 })?;
992 self.datalink
993 .send(
994 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
995 &tx,
996 )
997 .await?;
998
999 let mut objects = Vec::new();
1000 let mut seen = HashSet::new();
1001 let deadline = tokio::time::Instant::now() + wait;
1002
1003 while tokio::time::Instant::now() < deadline {
1004 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1005 let mut rx = [0u8; 1500];
1006 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1007 match recv {
1008 Ok(Ok((n, src))) => {
1009 let Ok(apdu) = extract_apdu(&rx[..n]) else {
1010 continue;
1011 };
1012 let mut r = Reader::new(apdu);
1013 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
1014 continue;
1015 };
1016 if unconfirmed.service_choice != SERVICE_I_HAVE {
1017 continue;
1018 }
1019 let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
1020 continue;
1021 };
1022 if !seen.insert((src, i_have.object_id.raw())) {
1023 continue;
1024 }
1025 objects.push(DiscoveredObject {
1026 address: src,
1027 device_id: i_have.device_id,
1028 object_id: i_have.object_id,
1029 object_name: i_have.object_name.to_string(),
1030 });
1031 }
1032 Ok(Err(DataLinkError::InvalidFrame)) => continue,
1033 Ok(Err(e)) => return Err(e.into()),
1034 Err(_) => break,
1035 }
1036 }
1037
1038 Ok(objects)
1039 }
1040
1041 pub async fn device_communication_control(
1047 &self,
1048 address: DataLinkAddress,
1049 time_duration_seconds: Option<u16>,
1050 enable_disable: DeviceCommunicationState,
1051 password: Option<&str>,
1052 ) -> Result<(), ClientError> {
1053 let invoke_id = self.next_invoke_id().await;
1054 let request = DeviceCommunicationControlRequest {
1055 time_duration_seconds,
1056 enable_disable,
1057 password,
1058 invoke_id,
1059 };
1060 let tx = self.encode_with_growth(|w| {
1061 Npdu::new(0).encode(w)?;
1062 request.encode(w)
1063 })?;
1064 self.await_simple_ack_or_error(
1065 address,
1066 &tx,
1067 invoke_id,
1068 SERVICE_DEVICE_COMMUNICATION_CONTROL,
1069 self.response_timeout,
1070 )
1071 .await
1072 }
1073
1074 pub async fn reinitialize_device(
1078 &self,
1079 address: DataLinkAddress,
1080 state: ReinitializeState,
1081 password: Option<&str>,
1082 ) -> Result<(), ClientError> {
1083 let invoke_id = self.next_invoke_id().await;
1084 let request = ReinitializeDeviceRequest {
1085 state,
1086 password,
1087 invoke_id,
1088 };
1089 let tx = self.encode_with_growth(|w| {
1090 Npdu::new(0).encode(w)?;
1091 request.encode(w)
1092 })?;
1093 self.await_simple_ack_or_error(
1094 address,
1095 &tx,
1096 invoke_id,
1097 SERVICE_REINITIALIZE_DEVICE,
1098 self.response_timeout,
1099 )
1100 .await
1101 }
1102
1103 pub async fn time_synchronize(
1108 &self,
1109 address: DataLinkAddress,
1110 date: Date,
1111 time: Time,
1112 utc: bool,
1113 ) -> Result<(), ClientError> {
1114 let request = if utc {
1115 TimeSynchronizationRequest::utc(date, time)
1116 } else {
1117 TimeSynchronizationRequest::local(date, time)
1118 };
1119 let tx = self.encode_with_growth(|w| {
1120 Npdu::new(0).encode(w)?;
1121 request.encode(w)
1122 })?;
1123 self.datalink.send(address, &tx).await?;
1124 Ok(())
1125 }
1126
1127 pub async fn create_object_by_type(
1130 &self,
1131 address: DataLinkAddress,
1132 object_type: rustbac_core::types::ObjectType,
1133 ) -> Result<ObjectId, ClientError> {
1134 self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1135 .await
1136 }
1137
1138 pub async fn create_object(
1142 &self,
1143 address: DataLinkAddress,
1144 mut request: CreateObjectRequest,
1145 ) -> Result<ObjectId, ClientError> {
1146 request.invoke_id = self.next_invoke_id().await;
1147 let invoke_id = request.invoke_id;
1148 let tx = self.encode_with_growth(|w| {
1149 Npdu::new(0).encode(w)?;
1150 request.encode(w)
1151 })?;
1152 let payload = self
1153 .await_complex_ack_payload_or_error(
1154 address,
1155 &tx,
1156 invoke_id,
1157 SERVICE_CREATE_OBJECT,
1158 self.response_timeout,
1159 )
1160 .await?;
1161 let mut pr = Reader::new(&payload);
1162 let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1163 Ok(parsed.object_id)
1164 }
1165
1166 pub async fn delete_object(
1168 &self,
1169 address: DataLinkAddress,
1170 object_id: ObjectId,
1171 ) -> Result<(), ClientError> {
1172 let invoke_id = self.next_invoke_id().await;
1173 let request = DeleteObjectRequest {
1174 object_id,
1175 invoke_id,
1176 };
1177 let tx = self.encode_with_growth(|w| {
1178 Npdu::new(0).encode(w)?;
1179 request.encode(w)
1180 })?;
1181 self.await_simple_ack_or_error(
1182 address,
1183 &tx,
1184 invoke_id,
1185 SERVICE_DELETE_OBJECT,
1186 self.response_timeout,
1187 )
1188 .await
1189 }
1190
1191 pub async fn add_list_element(
1193 &self,
1194 address: DataLinkAddress,
1195 mut request: AddListElementRequest<'_>,
1196 ) -> Result<(), ClientError> {
1197 request.invoke_id = self.next_invoke_id().await;
1198 let invoke_id = request.invoke_id;
1199 let tx = self.encode_with_growth(|w| {
1200 Npdu::new(0).encode(w)?;
1201 request.encode(w)
1202 })?;
1203 self.await_simple_ack_or_error(
1204 address,
1205 &tx,
1206 invoke_id,
1207 SERVICE_ADD_LIST_ELEMENT,
1208 self.response_timeout,
1209 )
1210 .await
1211 }
1212
1213 pub async fn remove_list_element(
1215 &self,
1216 address: DataLinkAddress,
1217 mut request: RemoveListElementRequest<'_>,
1218 ) -> Result<(), ClientError> {
1219 request.invoke_id = self.next_invoke_id().await;
1220 let invoke_id = request.invoke_id;
1221 let tx = self.encode_with_growth(|w| {
1222 Npdu::new(0).encode(w)?;
1223 request.encode(w)
1224 })?;
1225 self.await_simple_ack_or_error(
1226 address,
1227 &tx,
1228 invoke_id,
1229 SERVICE_REMOVE_LIST_ELEMENT,
1230 self.response_timeout,
1231 )
1232 .await
1233 }
1234
1235 async fn await_simple_ack_or_error(
1236 &self,
1237 address: DataLinkAddress,
1238 tx: &[u8],
1239 invoke_id: u8,
1240 service_choice: u8,
1241 timeout_window: Duration,
1242 ) -> Result<(), ClientError> {
1243 #[cfg(feature = "tracing")]
1244 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1245 let _io_lock = self.request_io_lock.lock().await;
1246 let deadline = tokio::time::Instant::now() + timeout_window;
1247 self.send_confirmed_request(address, tx, deadline).await?;
1248 while tokio::time::Instant::now() < deadline {
1249 let mut rx = [0u8; 1500];
1250 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1251 if src != address {
1252 if let Some(ref handler) = self.server_handler {
1254 let _ = dispatch_incoming_request(
1255 &self.datalink,
1256 handler.as_ref(),
1257 self.server_device_id,
1258 self.server_vendor_id,
1259 &rx[..n],
1260 src,
1261 )
1262 .await;
1263 }
1264 continue;
1265 }
1266 let apdu = extract_apdu(&rx[..n])?;
1267 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1268 match ApduType::from_u8(first >> 4) {
1269 Some(ApduType::SimpleAck) => {
1270 let mut r = Reader::new(apdu);
1271 let ack = SimpleAck::decode(&mut r)?;
1272 if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1273 return Ok(());
1274 }
1275 }
1276 Some(ApduType::Error) => {
1277 let mut r = Reader::new(apdu);
1278 let err = BacnetError::decode(&mut r)?;
1279 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1280 return Err(remote_service_error(err));
1281 }
1282 }
1283 Some(ApduType::Reject) => {
1284 let mut r = Reader::new(apdu);
1285 let rej = RejectPdu::decode(&mut r)?;
1286 if rej.invoke_id == invoke_id {
1287 return Err(ClientError::RemoteReject { reason: rej.reason });
1288 }
1289 }
1290 Some(ApduType::Abort) => {
1291 let mut r = Reader::new(apdu);
1292 let abort = AbortPdu::decode(&mut r)?;
1293 if abort.invoke_id == invoke_id {
1294 return Err(ClientError::RemoteAbort {
1295 reason: abort.reason,
1296 server: abort.server,
1297 });
1298 }
1299 }
1300 _ => {
1301 if let Some(ref handler) = self.server_handler {
1303 let _ = dispatch_incoming_request(
1304 &self.datalink,
1305 handler.as_ref(),
1306 self.server_device_id,
1307 self.server_vendor_id,
1308 &rx[..n],
1309 src,
1310 )
1311 .await;
1312 }
1313 continue;
1314 }
1315 }
1316 }
1317 #[cfg(feature = "tracing")]
1318 tracing::warn!(invoke_id = invoke_id, "request timed out");
1319 Err(ClientError::Timeout)
1320 }
1321
1322 async fn await_complex_ack_payload_or_error(
1323 &self,
1324 address: DataLinkAddress,
1325 tx: &[u8],
1326 invoke_id: u8,
1327 service_choice: u8,
1328 timeout_window: Duration,
1329 ) -> Result<Vec<u8>, ClientError> {
1330 #[cfg(feature = "tracing")]
1331 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1332 let _io_lock = self.request_io_lock.lock().await;
1333 let deadline = tokio::time::Instant::now() + timeout_window;
1334 self.send_confirmed_request(address, tx, deadline).await?;
1335 while tokio::time::Instant::now() < deadline {
1336 let mut rx = [0u8; 1500];
1337 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1338 if src != address {
1339 if let Some(ref handler) = self.server_handler {
1341 let _ = dispatch_incoming_request(
1342 &self.datalink,
1343 handler.as_ref(),
1344 self.server_device_id,
1345 self.server_vendor_id,
1346 &rx[..n],
1347 src,
1348 )
1349 .await;
1350 }
1351 continue;
1352 }
1353
1354 let apdu = extract_apdu(&rx[..n])?;
1355 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1356 match ApduType::from_u8(first >> 4) {
1357 Some(ApduType::ComplexAck) => {
1358 let mut r = Reader::new(apdu);
1359 let ack = ComplexAckHeader::decode(&mut r)?;
1360 if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1361 continue;
1362 }
1363 return self
1364 .collect_complex_ack_payload(
1365 address,
1366 invoke_id,
1367 service_choice,
1368 ack,
1369 r.read_exact(r.remaining())?,
1370 deadline,
1371 )
1372 .await;
1373 }
1374 Some(ApduType::Error) => {
1375 let mut r = Reader::new(apdu);
1376 let err = BacnetError::decode(&mut r)?;
1377 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1378 return Err(remote_service_error(err));
1379 }
1380 }
1381 Some(ApduType::Reject) => {
1382 let mut r = Reader::new(apdu);
1383 let rej = RejectPdu::decode(&mut r)?;
1384 if rej.invoke_id == invoke_id {
1385 return Err(ClientError::RemoteReject { reason: rej.reason });
1386 }
1387 }
1388 Some(ApduType::Abort) => {
1389 let mut r = Reader::new(apdu);
1390 let abort = AbortPdu::decode(&mut r)?;
1391 if abort.invoke_id == invoke_id {
1392 return Err(ClientError::RemoteAbort {
1393 reason: abort.reason,
1394 server: abort.server,
1395 });
1396 }
1397 }
1398 _ => {
1399 if let Some(ref handler) = self.server_handler {
1401 let _ = dispatch_incoming_request(
1402 &self.datalink,
1403 handler.as_ref(),
1404 self.server_device_id,
1405 self.server_vendor_id,
1406 &rx[..n],
1407 src,
1408 )
1409 .await;
1410 }
1411 continue;
1412 }
1413 }
1414 }
1415 #[cfg(feature = "tracing")]
1416 tracing::warn!(invoke_id = invoke_id, "request timed out");
1417 Err(ClientError::Timeout)
1418 }
1419
1420 pub async fn get_alarm_summary(
1422 &self,
1423 address: DataLinkAddress,
1424 ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1425 let invoke_id = self.next_invoke_id().await;
1426 let request = GetAlarmSummaryRequest { invoke_id };
1427 let tx = self.encode_with_growth(|w| {
1428 Npdu::new(0).encode(w)?;
1429 request.encode(w)
1430 })?;
1431 let payload = self
1432 .await_complex_ack_payload_or_error(
1433 address,
1434 &tx,
1435 invoke_id,
1436 SERVICE_GET_ALARM_SUMMARY,
1437 self.response_timeout,
1438 )
1439 .await?;
1440 let mut pr = Reader::new(&payload);
1441 let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1442 Ok(into_client_alarm_summary(parsed.summaries))
1443 }
1444
1445 pub async fn get_enrollment_summary(
1447 &self,
1448 address: DataLinkAddress,
1449 ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1450 let invoke_id = self.next_invoke_id().await;
1451 let request = GetEnrollmentSummaryRequest { invoke_id };
1452 let tx = self.encode_with_growth(|w| {
1453 Npdu::new(0).encode(w)?;
1454 request.encode(w)
1455 })?;
1456 let payload = self
1457 .await_complex_ack_payload_or_error(
1458 address,
1459 &tx,
1460 invoke_id,
1461 SERVICE_GET_ENROLLMENT_SUMMARY,
1462 self.response_timeout,
1463 )
1464 .await?;
1465 let mut pr = Reader::new(&payload);
1466 let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1467 Ok(into_client_enrollment_summary(parsed.summaries))
1468 }
1469
1470 pub async fn get_event_information(
1475 &self,
1476 address: DataLinkAddress,
1477 last_received_object_id: Option<ObjectId>,
1478 ) -> Result<EventInformationResult, ClientError> {
1479 let invoke_id = self.next_invoke_id().await;
1480 let request = GetEventInformationRequest {
1481 last_received_object_id,
1482 invoke_id,
1483 };
1484 let tx = self.encode_with_growth(|w| {
1485 Npdu::new(0).encode(w)?;
1486 request.encode(w)
1487 })?;
1488 let payload = self
1489 .await_complex_ack_payload_or_error(
1490 address,
1491 &tx,
1492 invoke_id,
1493 SERVICE_GET_EVENT_INFORMATION,
1494 self.response_timeout,
1495 )
1496 .await?;
1497 let mut pr = Reader::new(&payload);
1498 let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1499 Ok(EventInformationResult {
1500 summaries: into_client_event_information(parsed.summaries),
1501 more_events: parsed.more_events,
1502 })
1503 }
1504
1505 pub async fn acknowledge_alarm(
1507 &self,
1508 address: DataLinkAddress,
1509 mut request: AcknowledgeAlarmRequest<'_>,
1510 ) -> Result<(), ClientError> {
1511 request.invoke_id = self.next_invoke_id().await;
1512 let invoke_id = request.invoke_id;
1513 let tx = self.encode_with_growth(|w| {
1514 Npdu::new(0).encode(w)?;
1515 request.encode(w)
1516 })?;
1517 self.await_simple_ack_or_error(
1518 address,
1519 &tx,
1520 invoke_id,
1521 SERVICE_ACKNOWLEDGE_ALARM,
1522 self.response_timeout,
1523 )
1524 .await
1525 }
1526
1527 pub async fn atomic_read_file_stream(
1532 &self,
1533 address: DataLinkAddress,
1534 file_object_id: ObjectId,
1535 file_start_position: i32,
1536 requested_octet_count: u32,
1537 ) -> Result<AtomicReadFileResult, ClientError> {
1538 let invoke_id = self.next_invoke_id().await;
1539 let request = AtomicReadFileRequest::stream(
1540 file_object_id,
1541 file_start_position,
1542 requested_octet_count,
1543 invoke_id,
1544 );
1545 self.atomic_read_file(address, request).await
1546 }
1547
1548 pub async fn atomic_read_file_record(
1553 &self,
1554 address: DataLinkAddress,
1555 file_object_id: ObjectId,
1556 file_start_record: i32,
1557 requested_record_count: u32,
1558 ) -> Result<AtomicReadFileResult, ClientError> {
1559 let invoke_id = self.next_invoke_id().await;
1560 let request = AtomicReadFileRequest::record(
1561 file_object_id,
1562 file_start_record,
1563 requested_record_count,
1564 invoke_id,
1565 );
1566 self.atomic_read_file(address, request).await
1567 }
1568
1569 async fn atomic_read_file(
1570 &self,
1571 address: DataLinkAddress,
1572 request: AtomicReadFileRequest,
1573 ) -> Result<AtomicReadFileResult, ClientError> {
1574 let invoke_id = request.invoke_id;
1575 let tx = self.encode_with_growth(|w| {
1576 Npdu::new(0).encode(w)?;
1577 request.encode(w)
1578 })?;
1579 let payload = self
1580 .await_complex_ack_payload_or_error(
1581 address,
1582 &tx,
1583 invoke_id,
1584 SERVICE_ATOMIC_READ_FILE,
1585 self.response_timeout,
1586 )
1587 .await?;
1588 let mut pr = Reader::new(&payload);
1589 let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1590 Ok(into_client_atomic_read_result(parsed))
1591 }
1592
1593 pub async fn atomic_write_file_stream(
1596 &self,
1597 address: DataLinkAddress,
1598 file_object_id: ObjectId,
1599 file_start_position: i32,
1600 file_data: &[u8],
1601 ) -> Result<AtomicWriteFileResult, ClientError> {
1602 let invoke_id = self.next_invoke_id().await;
1603 let request = AtomicWriteFileRequest::stream(
1604 file_object_id,
1605 file_start_position,
1606 file_data,
1607 invoke_id,
1608 );
1609 self.atomic_write_file(address, request).await
1610 }
1611
1612 pub async fn atomic_write_file_record(
1616 &self,
1617 address: DataLinkAddress,
1618 file_object_id: ObjectId,
1619 file_start_record: i32,
1620 file_record_data: &[&[u8]],
1621 ) -> Result<AtomicWriteFileResult, ClientError> {
1622 let invoke_id = self.next_invoke_id().await;
1623 let request = AtomicWriteFileRequest::record(
1624 file_object_id,
1625 file_start_record,
1626 file_record_data,
1627 invoke_id,
1628 );
1629 self.atomic_write_file(address, request).await
1630 }
1631
1632 async fn atomic_write_file(
1633 &self,
1634 address: DataLinkAddress,
1635 request: AtomicWriteFileRequest<'_>,
1636 ) -> Result<AtomicWriteFileResult, ClientError> {
1637 let invoke_id = request.invoke_id;
1638 let tx = self.encode_with_growth(|w| {
1639 Npdu::new(0).encode(w)?;
1640 request.encode(w)
1641 })?;
1642 let payload = self
1643 .await_complex_ack_payload_or_error(
1644 address,
1645 &tx,
1646 invoke_id,
1647 SERVICE_ATOMIC_WRITE_FILE,
1648 self.response_timeout,
1649 )
1650 .await?;
1651 let mut pr = Reader::new(&payload);
1652 let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1653 Ok(into_client_atomic_write_result(parsed))
1654 }
1655
1656 pub async fn subscribe_cov(
1660 &self,
1661 address: DataLinkAddress,
1662 mut request: SubscribeCovRequest,
1663 ) -> Result<(), ClientError> {
1664 request.invoke_id = self.next_invoke_id().await;
1665 let invoke_id = request.invoke_id;
1666 let tx = self.encode_with_growth(|w| {
1667 Npdu::new(0).encode(w)?;
1668 request.encode(w)
1669 })?;
1670 self.await_simple_ack_or_error(
1671 address,
1672 &tx,
1673 invoke_id,
1674 SERVICE_SUBSCRIBE_COV,
1675 self.response_timeout,
1676 )
1677 .await
1678 }
1679
1680 pub async fn cancel_cov_subscription(
1683 &self,
1684 address: DataLinkAddress,
1685 subscriber_process_id: u32,
1686 monitored_object_id: ObjectId,
1687 ) -> Result<(), ClientError> {
1688 self.subscribe_cov(
1689 address,
1690 SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1691 )
1692 .await
1693 }
1694
1695 pub async fn subscribe_cov_property(
1700 &self,
1701 address: DataLinkAddress,
1702 mut request: SubscribeCovPropertyRequest,
1703 ) -> Result<(), ClientError> {
1704 request.invoke_id = self.next_invoke_id().await;
1705 let invoke_id = request.invoke_id;
1706 let tx = self.encode_with_growth(|w| {
1707 Npdu::new(0).encode(w)?;
1708 request.encode(w)
1709 })?;
1710 self.await_simple_ack_or_error(
1711 address,
1712 &tx,
1713 invoke_id,
1714 SERVICE_SUBSCRIBE_COV_PROPERTY,
1715 self.response_timeout,
1716 )
1717 .await
1718 }
1719
1720 pub async fn cancel_cov_property_subscription(
1726 &self,
1727 address: DataLinkAddress,
1728 subscriber_process_id: u32,
1729 monitored_object_id: ObjectId,
1730 monitored_property_id: PropertyId,
1731 monitored_property_array_index: Option<u32>,
1732 ) -> Result<(), ClientError> {
1733 self.subscribe_cov_property(
1734 address,
1735 SubscribeCovPropertyRequest::cancel(
1736 subscriber_process_id,
1737 monitored_object_id,
1738 monitored_property_id,
1739 monitored_property_array_index,
1740 0,
1741 ),
1742 )
1743 .await
1744 }
1745
1746 pub async fn read_range_by_position(
1751 &self,
1752 address: DataLinkAddress,
1753 object_id: ObjectId,
1754 property_id: PropertyId,
1755 array_index: Option<u32>,
1756 reference_index: i32,
1757 count: i16,
1758 ) -> Result<ReadRangeResult, ClientError> {
1759 let invoke_id = self.next_invoke_id().await;
1760 let req = ReadRangeRequest::by_position(
1761 object_id,
1762 property_id,
1763 array_index,
1764 reference_index,
1765 count,
1766 invoke_id,
1767 );
1768 self.read_range_with_request(address, req).await
1769 }
1770
1771 pub async fn read_range_by_sequence_number(
1776 &self,
1777 address: DataLinkAddress,
1778 object_id: ObjectId,
1779 property_id: PropertyId,
1780 array_index: Option<u32>,
1781 reference_sequence: u32,
1782 count: i16,
1783 ) -> Result<ReadRangeResult, ClientError> {
1784 let invoke_id = self.next_invoke_id().await;
1785 let req = ReadRangeRequest::by_sequence_number(
1786 object_id,
1787 property_id,
1788 array_index,
1789 reference_sequence,
1790 count,
1791 invoke_id,
1792 );
1793 self.read_range_with_request(address, req).await
1794 }
1795
1796 pub async fn read_range_by_time(
1801 &self,
1802 address: DataLinkAddress,
1803 object_id: ObjectId,
1804 property_id: PropertyId,
1805 array_index: Option<u32>,
1806 at: (Date, Time),
1807 count: i16,
1808 ) -> Result<ReadRangeResult, ClientError> {
1809 let (date, time) = at;
1810 let invoke_id = self.next_invoke_id().await;
1811 let req = ReadRangeRequest::by_time(
1812 object_id,
1813 property_id,
1814 array_index,
1815 date,
1816 time,
1817 count,
1818 invoke_id,
1819 );
1820 self.read_range_with_request(address, req).await
1821 }
1822
1823 async fn read_range_with_request(
1824 &self,
1825 address: DataLinkAddress,
1826 req: ReadRangeRequest,
1827 ) -> Result<ReadRangeResult, ClientError> {
1828 let invoke_id = req.invoke_id;
1829 let tx = self.encode_with_growth(|w| {
1830 Npdu::new(0).encode(w)?;
1831 req.encode(w)
1832 })?;
1833 let payload = self
1834 .await_complex_ack_payload_or_error(
1835 address,
1836 &tx,
1837 invoke_id,
1838 SERVICE_READ_RANGE,
1839 self.response_timeout,
1840 )
1841 .await?;
1842 let mut pr = Reader::new(&payload);
1843 let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1844 into_client_read_range(parsed)
1845 }
1846
1847 pub async fn recv_cov_notification(
1853 &self,
1854 wait: Duration,
1855 ) -> Result<Option<CovNotification>, ClientError> {
1856 let _io_lock = self.request_io_lock.lock().await;
1857 let deadline = tokio::time::Instant::now() + wait;
1858
1859 while tokio::time::Instant::now() < deadline {
1860 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1861 let mut rx = [0u8; 1500];
1862 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1863 let (n, source) = match recv {
1864 Ok(Ok(v)) => v,
1865 Ok(Err(e)) => return Err(e.into()),
1866 Err(_) => break,
1867 };
1868
1869 let apdu = extract_apdu(&rx[..n])?;
1870 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1871 match ApduType::from_u8(first >> 4) {
1872 Some(ApduType::UnconfirmedRequest) => {
1873 let mut r = Reader::new(apdu);
1874 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1875 if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1876 continue;
1877 }
1878 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1879 return Ok(Some(into_client_cov_notification(source, false, cov)?));
1880 }
1881 Some(ApduType::ConfirmedRequest) => {
1882 let mut r = Reader::new(apdu);
1883 let header = ConfirmedRequestHeader::decode(&mut r)?;
1884 if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1885 continue;
1886 }
1887 if header.segmented {
1888 return Err(ClientError::UnsupportedResponse);
1889 }
1890
1891 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1892 self.send_simple_ack(
1893 source,
1894 header.invoke_id,
1895 SERVICE_CONFIRMED_COV_NOTIFICATION,
1896 )
1897 .await?;
1898 return Ok(Some(into_client_cov_notification(source, true, cov)?));
1899 }
1900 _ => continue,
1901 }
1902 }
1903
1904 Ok(None)
1905 }
1906
1907 pub async fn recv_event_notification(
1913 &self,
1914 wait: Duration,
1915 ) -> Result<Option<EventNotification>, ClientError> {
1916 let _io_lock = self.request_io_lock.lock().await;
1917 let deadline = tokio::time::Instant::now() + wait;
1918
1919 while tokio::time::Instant::now() < deadline {
1920 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1921 let mut rx = [0u8; 1500];
1922 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1923 let (n, source) = match recv {
1924 Ok(Ok(v)) => v,
1925 Ok(Err(e)) => return Err(e.into()),
1926 Err(_) => break,
1927 };
1928
1929 let apdu = extract_apdu(&rx[..n])?;
1930 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1931 match ApduType::from_u8(first >> 4) {
1932 Some(ApduType::UnconfirmedRequest) => {
1933 let mut r = Reader::new(apdu);
1934 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1935 if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1936 continue;
1937 }
1938 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1939 return Ok(Some(into_client_event_notification(
1940 source,
1941 false,
1942 notification,
1943 )));
1944 }
1945 Some(ApduType::ConfirmedRequest) => {
1946 let mut r = Reader::new(apdu);
1947 let header = ConfirmedRequestHeader::decode(&mut r)?;
1948 if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1949 continue;
1950 }
1951 if header.segmented {
1952 return Err(ClientError::UnsupportedResponse);
1953 }
1954 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1955 self.send_simple_ack(
1956 source,
1957 header.invoke_id,
1958 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1959 )
1960 .await?;
1961 return Ok(Some(into_client_event_notification(
1962 source,
1963 true,
1964 notification,
1965 )));
1966 }
1967 _ => continue,
1968 }
1969 }
1970
1971 Ok(None)
1972 }
1973
1974 pub async fn read_property(
1979 &self,
1980 address: DataLinkAddress,
1981 object_id: ObjectId,
1982 property_id: PropertyId,
1983 ) -> Result<ClientDataValue, ClientError> {
1984 let invoke_id = self.next_invoke_id().await;
1985 let req = ReadPropertyRequest {
1986 object_id,
1987 property_id,
1988 array_index: None,
1989 invoke_id,
1990 };
1991 let tx = self.encode_with_growth(|w| {
1992 Npdu::new(0).encode(w)?;
1993 req.encode(w)
1994 })?;
1995 let payload = self
1996 .await_complex_ack_payload_or_error(
1997 address,
1998 &tx,
1999 invoke_id,
2000 SERVICE_READ_PROPERTY,
2001 self.response_timeout,
2002 )
2003 .await?;
2004 let mut pr = Reader::new(&payload);
2005 let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
2006 into_client_value(parsed.value)
2007 }
2008
2009 pub async fn write_property(
2011 &self,
2012 address: DataLinkAddress,
2013 mut request: WritePropertyRequest<'_>,
2014 ) -> Result<(), ClientError> {
2015 request.invoke_id = self.next_invoke_id().await;
2016 let invoke_id = request.invoke_id;
2017 let tx = self.encode_with_growth(|w| {
2018 Npdu::new(0).encode(w)?;
2019 request.encode(w)
2020 })?;
2021 self.await_simple_ack_or_error(
2022 address,
2023 &tx,
2024 invoke_id,
2025 SERVICE_WRITE_PROPERTY,
2026 self.response_timeout,
2027 )
2028 .await
2029 }
2030
2031 pub async fn read_property_multiple(
2036 &self,
2037 address: DataLinkAddress,
2038 object_id: ObjectId,
2039 property_ids: &[PropertyId],
2040 ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
2041 let refs: Vec<PropertyReference> = property_ids
2042 .iter()
2043 .copied()
2044 .map(|property_id| PropertyReference {
2045 property_id,
2046 array_index: None,
2047 })
2048 .collect();
2049 let specs = [ReadAccessSpecification {
2050 object_id,
2051 properties: &refs,
2052 }];
2053
2054 let invoke_id = self.next_invoke_id().await;
2055 let req = ReadPropertyMultipleRequest {
2056 specs: &specs,
2057 invoke_id,
2058 };
2059
2060 let tx = self.encode_with_growth(|w| {
2061 Npdu::new(0).encode(w)?;
2062 req.encode(w)
2063 })?;
2064 let payload = self
2065 .await_complex_ack_payload_or_error(
2066 address,
2067 &tx,
2068 invoke_id,
2069 SERVICE_READ_PROPERTY_MULTIPLE,
2070 self.response_timeout,
2071 )
2072 .await?;
2073 let mut pr = Reader::new(&payload);
2074 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2075 let mut out = Vec::new();
2076 for access in parsed.results {
2077 if access.object_id != object_id {
2078 continue;
2079 }
2080 for item in access.results {
2081 out.push((item.property_id, into_client_value(item.value)?));
2082 }
2083 }
2084 Ok(out)
2085 }
2086
2087 pub async fn write_property_multiple(
2090 &self,
2091 address: DataLinkAddress,
2092 object_id: ObjectId,
2093 properties: &[PropertyWriteSpec<'_>],
2094 ) -> Result<(), ClientError> {
2095 let invoke_id = self.next_invoke_id().await;
2096 let specs = [WriteAccessSpecification {
2097 object_id,
2098 properties,
2099 }];
2100 let req = WritePropertyMultipleRequest {
2101 specs: &specs,
2102 invoke_id,
2103 };
2104
2105 let tx = self.encode_with_growth(|w| {
2106 Npdu::new(0).encode(w)?;
2107 req.encode(w)
2108 })?;
2109 self.await_simple_ack_or_error(
2110 address,
2111 &tx,
2112 invoke_id,
2113 SERVICE_WRITE_PROPERTY_MULTIPLE,
2114 self.response_timeout,
2115 )
2116 .await
2117 }
2118
2119 pub async fn private_transfer(
2121 &self,
2122 address: DataLinkAddress,
2123 vendor_id: u32,
2124 service_number: u32,
2125 service_parameters: Option<&[u8]>,
2126 ) -> Result<PrivateTransferAck, ClientError> {
2127 let invoke_id = self.next_invoke_id().await;
2128 let req = ConfirmedPrivateTransferRequest {
2129 vendor_id,
2130 service_number,
2131 service_parameters,
2132 invoke_id,
2133 };
2134
2135 let tx = self.encode_with_growth(|w| {
2136 Npdu::new(0).encode(w)?;
2137 req.encode(w)
2138 })?;
2139 let payload = self
2140 .await_complex_ack_payload_or_error(
2141 address,
2142 &tx,
2143 invoke_id,
2144 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
2145 self.response_timeout,
2146 )
2147 .await?;
2148 let mut r = Reader::new(&payload);
2149 PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
2150 }
2151
2152 pub async fn read_many(
2158 &self,
2159 address: DataLinkAddress,
2160 requests: &[(ObjectId, PropertyId)],
2161 ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
2162 let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
2164 for &(oid, pid) in requests {
2165 if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
2166 entry.1.push(PropertyReference {
2167 property_id: pid,
2168 array_index: None,
2169 });
2170 } else {
2171 grouped.push((
2172 oid,
2173 vec![PropertyReference {
2174 property_id: pid,
2175 array_index: None,
2176 }],
2177 ));
2178 }
2179 }
2180
2181 let specs: Vec<ReadAccessSpecification<'_>> = grouped
2182 .iter()
2183 .map(|(oid, props)| ReadAccessSpecification {
2184 object_id: *oid,
2185 properties: props,
2186 })
2187 .collect();
2188
2189 let invoke_id = self.next_invoke_id().await;
2190 let req = ReadPropertyMultipleRequest {
2191 specs: &specs,
2192 invoke_id,
2193 };
2194 let tx = self.encode_with_growth(|w| {
2195 Npdu::new(0).encode(w)?;
2196 req.encode(w)
2197 })?;
2198 let payload = self
2199 .await_complex_ack_payload_or_error(
2200 address,
2201 &tx,
2202 invoke_id,
2203 SERVICE_READ_PROPERTY_MULTIPLE,
2204 self.response_timeout,
2205 )
2206 .await?;
2207
2208 let mut pr = Reader::new(&payload);
2209 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2210 let mut out = HashMap::new();
2211 for access in parsed.results {
2212 for item in access.results {
2213 if let Ok(v) = into_client_value(item.value) {
2214 out.insert((access.object_id, item.property_id), v);
2215 }
2216 }
2217 }
2218 Ok(out)
2219 }
2220
2221 pub async fn write_many(
2228 &self,
2229 address: DataLinkAddress,
2230 writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2231 ) -> Result<(), ClientError> {
2232 use rustbac_core::types::{BitString, DataValue as DV};
2233
2234 fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2235 match v {
2236 ClientDataValue::Null => DV::Null,
2237 ClientDataValue::Boolean(b) => DV::Boolean(*b),
2238 ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2239 ClientDataValue::Signed(n) => DV::Signed(*n),
2240 ClientDataValue::Real(f) => DV::Real(*f),
2241 ClientDataValue::Double(f) => DV::Double(*f),
2242 ClientDataValue::OctetString(b) => DV::OctetString(b),
2243 ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2244 ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2245 unused_bits: *unused_bits,
2246 data,
2247 }),
2248 ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2249 ClientDataValue::Date(d) => DV::Date(*d),
2250 ClientDataValue::Time(t) => DV::Time(*t),
2251 ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2252 ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2253 tag_num: *tag_num,
2254 values: values.iter().map(cv_to_dv).collect(),
2255 },
2256 }
2257 }
2258
2259 let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2261 .iter()
2262 .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2263 .collect();
2264
2265 let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2267 for (oid, pid, val, prio) in &converted {
2268 let spec = PropertyWriteSpec {
2269 property_id: *pid,
2270 array_index: None,
2271 value: val.clone(),
2272 priority: *prio,
2273 };
2274 if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2275 entry.1.push(spec);
2276 } else {
2277 grouped.push((*oid, vec![spec]));
2278 }
2279 }
2280
2281 let specs: Vec<WriteAccessSpecification<'_>> = grouped
2282 .iter()
2283 .map(|(oid, props)| WriteAccessSpecification {
2284 object_id: *oid,
2285 properties: props,
2286 })
2287 .collect();
2288
2289 let invoke_id = self.next_invoke_id().await;
2290 let req = WritePropertyMultipleRequest {
2291 specs: &specs,
2292 invoke_id,
2293 };
2294 let tx = self.encode_with_growth(|w| {
2295 Npdu::new(0).encode(w)?;
2296 req.encode(w)
2297 })?;
2298 self.await_simple_ack_or_error(
2299 address,
2300 &tx,
2301 invoke_id,
2302 SERVICE_WRITE_PROPERTY_MULTIPLE,
2303 self.response_timeout,
2304 )
2305 .await
2306 }
2307}
2308
2309fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2310 let mut r = Reader::new(payload);
2311 let _npdu = Npdu::decode(&mut r)?;
2312 r.read_exact(r.remaining()).map_err(ClientError::from)
2313}
2314
2315async fn dispatch_incoming_request<D: DataLink>(
2324 datalink: &D,
2325 handler: &dyn crate::server::ServiceHandler,
2326 device_id: u32,
2327 vendor_id: u16,
2328 frame: &[u8],
2329 source: DataLinkAddress,
2330) -> Result<(), ClientError> {
2331 let mut r = Reader::new(frame);
2332 let _npdu = match Npdu::decode(&mut r) {
2333 Ok(n) => n,
2334 Err(_) => return Ok(()),
2335 };
2336
2337 if r.is_empty() {
2338 return Ok(());
2339 }
2340
2341 let first = match r.peek_u8() {
2342 Ok(b) => b,
2343 Err(_) => return Ok(()),
2344 };
2345 let apdu_type = ApduType::from_u8(first >> 4);
2346
2347 match apdu_type {
2348 Some(ApduType::UnconfirmedRequest) => {
2349 let header = match UnconfirmedRequestHeader::decode(&mut r) {
2350 Ok(h) => h,
2351 Err(_) => return Ok(()),
2352 };
2353 if header.service_choice == 0x08 {
2354 let limits = dispatch_decode_who_is_limits(&mut r);
2356 if dispatch_matches_who_is(device_id, limits) {
2357 dispatch_send_i_am(datalink, device_id, vendor_id, source).await;
2358 }
2359 }
2360 }
2362 Some(ApduType::ConfirmedRequest) => {
2363 let header = match ConfirmedRequestHeader::decode(&mut r) {
2364 Ok(h) => h,
2365 Err(_) => return Ok(()),
2366 };
2367 let invoke_id = header.invoke_id;
2368 match header.service_choice {
2369 SERVICE_READ_PROPERTY => {
2370 dispatch_handle_read_property(datalink, handler, &mut r, invoke_id, source)
2371 .await;
2372 }
2373 SERVICE_WRITE_PROPERTY => {
2374 dispatch_handle_write_property(datalink, handler, &mut r, invoke_id, source)
2375 .await;
2376 }
2377 SERVICE_READ_PROPERTY_MULTIPLE => {
2378 dispatch_handle_read_property_multiple(
2379 datalink, handler, &mut r, invoke_id, source,
2380 )
2381 .await;
2382 }
2383 SERVICE_WRITE_PROPERTY_MULTIPLE => {
2384 dispatch_handle_write_property_multiple(
2385 datalink, handler, &mut r, invoke_id, source,
2386 )
2387 .await;
2388 }
2389 SERVICE_SUBSCRIBE_COV => {
2390 dispatch_handle_subscribe_cov(datalink, handler, &mut r, invoke_id, source)
2391 .await;
2392 }
2393 SERVICE_CREATE_OBJECT => {
2394 dispatch_handle_create_object(datalink, handler, &mut r, invoke_id, source)
2395 .await;
2396 }
2397 SERVICE_DELETE_OBJECT => {
2398 dispatch_handle_delete_object(datalink, handler, &mut r, invoke_id, source)
2399 .await;
2400 }
2401 _ => {
2402 dispatch_send_reject(datalink, invoke_id, 0x08, source).await;
2404 }
2405 }
2406 }
2407 _ => {
2408 }
2410 }
2411
2412 Ok(())
2413}
2414
2415async fn dispatch_send_i_am<D: DataLink>(
2418 datalink: &D,
2419 device_id: u32,
2420 vendor_id: u16,
2421 target: DataLinkAddress,
2422) {
2423 let device_oid = ObjectId::new(ObjectType::Device, device_id);
2424 let req = IAmRequest {
2425 device_id: device_oid,
2426 max_apdu: 1476,
2427 segmentation: 3, vendor_id: vendor_id as u32,
2429 };
2430 let mut buf = [0u8; 128];
2431 let mut w = Writer::new(&mut buf);
2432 if Npdu::new(0).encode(&mut w).is_err() {
2433 return;
2434 }
2435 if req.encode(&mut w).is_err() {
2436 return;
2437 }
2438 let _ = datalink.send(target, w.as_written()).await;
2439}
2440
2441async fn dispatch_handle_read_property<D: DataLink>(
2442 datalink: &D,
2443 handler: &dyn crate::server::ServiceHandler,
2444 r: &mut Reader<'_>,
2445 invoke_id: u8,
2446 source: DataLinkAddress,
2447) {
2448 let object_id = match crate::decode_ctx_object_id(r) {
2449 Ok(v) => v,
2450 Err(_) => return,
2451 };
2452 let property_id_raw = match crate::decode_ctx_unsigned(r) {
2453 Ok(v) => v,
2454 Err(_) => return,
2455 };
2456 let property_id = PropertyId::from_u32(property_id_raw);
2457
2458 let array_index = dispatch_decode_optional_array_index(r);
2460
2461 match handler.read_property(object_id, property_id, array_index) {
2462 Ok(value) => {
2463 let borrowed = dispatch_client_value_to_borrowed(&value);
2464 let mut buf = [0u8; 1400];
2465 let mut w = Writer::new(&mut buf);
2466 if Npdu::new(0).encode(&mut w).is_err() {
2467 return;
2468 }
2469 if (ComplexAckHeader {
2470 segmented: false,
2471 more_follows: false,
2472 invoke_id,
2473 sequence_number: None,
2474 proposed_window_size: None,
2475 service_choice: SERVICE_READ_PROPERTY,
2476 })
2477 .encode(&mut w)
2478 .is_err()
2479 {
2480 return;
2481 }
2482 if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2483 return;
2484 }
2485 if encode_ctx_unsigned(&mut w, 1, property_id.to_u32()).is_err() {
2486 return;
2487 }
2488 if (Tag::Opening { tag_num: 3 }).encode(&mut w).is_err() {
2489 return;
2490 }
2491 if encode_application_data_value(&mut w, &borrowed).is_err() {
2492 return;
2493 }
2494 if (Tag::Closing { tag_num: 3 }).encode(&mut w).is_err() {
2495 return;
2496 }
2497 let _ = datalink.send(source, w.as_written()).await;
2498 }
2499 Err(err) => {
2500 dispatch_send_error(datalink, invoke_id, SERVICE_READ_PROPERTY, err, source).await;
2501 }
2502 }
2503}
2504
2505async fn dispatch_handle_write_property<D: DataLink>(
2506 datalink: &D,
2507 handler: &dyn crate::server::ServiceHandler,
2508 r: &mut Reader<'_>,
2509 invoke_id: u8,
2510 source: DataLinkAddress,
2511) {
2512 let object_id = match crate::decode_ctx_object_id(r) {
2513 Ok(v) => v,
2514 Err(_) => return,
2515 };
2516 let property_id_raw = match crate::decode_ctx_unsigned(r) {
2517 Ok(v) => v,
2518 Err(_) => return,
2519 };
2520 let property_id = PropertyId::from_u32(property_id_raw);
2521
2522 let next_tag = match Tag::decode(r) {
2524 Ok(t) => t,
2525 Err(_) => return,
2526 };
2527 let (array_index, value_start_tag) = match next_tag {
2528 Tag::Context { tag_num: 2, len } => {
2529 let idx = match decode_unsigned(r, len as usize) {
2530 Ok(v) => v,
2531 Err(_) => return,
2532 };
2533 let vt = match Tag::decode(r) {
2534 Ok(t) => t,
2535 Err(_) => return,
2536 };
2537 (Some(idx), vt)
2538 }
2539 other => (None, other),
2540 };
2541
2542 if value_start_tag != (Tag::Opening { tag_num: 3 }) {
2543 return;
2544 }
2545
2546 let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2547 Ok(v) => v,
2548 Err(_) => return,
2549 };
2550
2551 match Tag::decode(r) {
2552 Ok(Tag::Closing { tag_num: 3 }) => {}
2553 _ => return,
2554 }
2555
2556 let priority = if !r.is_empty() {
2558 match Tag::decode(r) {
2559 Ok(Tag::Context { tag_num: 4, len }) => match decode_unsigned(r, len as usize) {
2560 Ok(p) => Some(p as u8),
2561 Err(_) => return,
2562 },
2563 _ => None,
2564 }
2565 } else {
2566 None
2567 };
2568
2569 let client_val = crate::data_value_to_client(val);
2570
2571 match handler.write_property(object_id, property_id, array_index, client_val, priority) {
2572 Ok(()) => {
2573 let mut buf = [0u8; 32];
2574 let mut w = Writer::new(&mut buf);
2575 if Npdu::new(0).encode(&mut w).is_err() {
2576 return;
2577 }
2578 if (SimpleAck {
2579 invoke_id,
2580 service_choice: SERVICE_WRITE_PROPERTY,
2581 })
2582 .encode(&mut w)
2583 .is_err()
2584 {
2585 return;
2586 }
2587 let _ = datalink.send(source, w.as_written()).await;
2588 }
2589 Err(err) => {
2590 dispatch_send_error(datalink, invoke_id, SERVICE_WRITE_PROPERTY, err, source).await;
2591 }
2592 }
2593}
2594
2595async fn dispatch_handle_read_property_multiple<D: DataLink>(
2596 datalink: &D,
2597 handler: &dyn crate::server::ServiceHandler,
2598 r: &mut Reader<'_>,
2599 invoke_id: u8,
2600 source: DataLinkAddress,
2601) {
2602 type PropRefs = Vec<(PropertyId, Option<u32>)>;
2603 let mut specs: Vec<(ObjectId, PropRefs)> = Vec::new();
2604
2605 while !r.is_empty() {
2606 let object_id = match crate::decode_ctx_object_id(r) {
2607 Ok(v) => v,
2608 Err(_) => return,
2609 };
2610 match Tag::decode(r) {
2611 Ok(Tag::Opening { tag_num: 1 }) => {}
2612 _ => return,
2613 }
2614 let mut props: Vec<(PropertyId, Option<u32>)> = Vec::new();
2615 loop {
2616 let tag = match Tag::decode(r) {
2617 Ok(t) => t,
2618 Err(_) => return,
2619 };
2620 if tag == (Tag::Closing { tag_num: 1 }) {
2621 break;
2622 }
2623 let property_id = match tag {
2624 Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2625 Ok(v) => PropertyId::from_u32(v),
2626 Err(_) => return,
2627 },
2628 _ => return,
2629 };
2630 let array_index = if !r.is_empty() {
2631 match dispatch_peek_context_tag(r, 1) {
2632 Some(len) => {
2633 match Tag::decode(r) {
2634 Ok(_) => {}
2635 Err(_) => return,
2636 }
2637 match decode_unsigned(r, len as usize) {
2638 Ok(idx) => Some(idx),
2639 Err(_) => return,
2640 }
2641 }
2642 None => None,
2643 }
2644 } else {
2645 None
2646 };
2647 props.push((property_id, array_index));
2648 }
2649 specs.push((object_id, props));
2650 }
2651
2652 let mut buf = [0u8; 1400];
2653 let mut w = Writer::new(&mut buf);
2654 if Npdu::new(0).encode(&mut w).is_err() {
2655 return;
2656 }
2657 if (ComplexAckHeader {
2658 segmented: false,
2659 more_follows: false,
2660 invoke_id,
2661 sequence_number: None,
2662 proposed_window_size: None,
2663 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2664 })
2665 .encode(&mut w)
2666 .is_err()
2667 {
2668 return;
2669 }
2670
2671 for (object_id, props) in &specs {
2672 if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2673 return;
2674 }
2675 if (Tag::Opening { tag_num: 1 }).encode(&mut w).is_err() {
2676 return;
2677 }
2678
2679 for (property_id, array_index) in props {
2680 if encode_ctx_unsigned(&mut w, 2, property_id.to_u32()).is_err() {
2681 return;
2682 }
2683 if let Some(idx) = array_index {
2684 if encode_ctx_unsigned(&mut w, 3, *idx).is_err() {
2685 return;
2686 }
2687 }
2688 if (Tag::Opening { tag_num: 4 }).encode(&mut w).is_err() {
2689 return;
2690 }
2691
2692 match handler.read_property(*object_id, *property_id, *array_index) {
2693 Ok(value) => {
2694 let borrowed = dispatch_client_value_to_borrowed(&value);
2695 if encode_application_data_value(&mut w, &borrowed).is_err() {
2696 return;
2697 }
2698 }
2699 Err(err) => {
2700 let (class, code) = dispatch_error_class_code(err);
2701 if (Tag::Opening { tag_num: 5 }).encode(&mut w).is_err() {
2702 return;
2703 }
2704 if encode_ctx_unsigned(&mut w, 0, class as u32).is_err() {
2705 return;
2706 }
2707 if encode_ctx_unsigned(&mut w, 1, code as u32).is_err() {
2708 return;
2709 }
2710 if (Tag::Closing { tag_num: 5 }).encode(&mut w).is_err() {
2711 return;
2712 }
2713 }
2714 }
2715
2716 if (Tag::Closing { tag_num: 4 }).encode(&mut w).is_err() {
2717 return;
2718 }
2719 }
2720
2721 if (Tag::Closing { tag_num: 1 }).encode(&mut w).is_err() {
2722 return;
2723 }
2724 }
2725
2726 let _ = datalink.send(source, w.as_written()).await;
2727}
2728
2729async fn dispatch_handle_write_property_multiple<D: DataLink>(
2730 datalink: &D,
2731 handler: &dyn crate::server::ServiceHandler,
2732 r: &mut Reader<'_>,
2733 invoke_id: u8,
2734 source: DataLinkAddress,
2735) {
2736 while !r.is_empty() {
2737 let object_id = match crate::decode_ctx_object_id(r) {
2738 Ok(v) => v,
2739 Err(_) => return,
2740 };
2741 match Tag::decode(r) {
2742 Ok(Tag::Opening { tag_num: 1 }) => {}
2743 _ => return,
2744 }
2745 loop {
2746 let tag = match Tag::decode(r) {
2747 Ok(t) => t,
2748 Err(_) => return,
2749 };
2750 if tag == (Tag::Closing { tag_num: 1 }) {
2751 break;
2752 }
2753 let property_id = match tag {
2754 Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2755 Ok(v) => PropertyId::from_u32(v),
2756 Err(_) => return,
2757 },
2758 _ => return,
2759 };
2760 let array_index = if !r.is_empty() {
2761 match dispatch_peek_context_tag(r, 1) {
2762 Some(len) => {
2763 let _ = Tag::decode(r);
2764 decode_unsigned(r, len as usize).ok()
2765 }
2766 None => None,
2767 }
2768 } else {
2769 None
2770 };
2771 match Tag::decode(r) {
2772 Ok(Tag::Opening { tag_num: 2 }) => {}
2773 _ => return,
2774 }
2775 let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2776 Ok(v) => v,
2777 Err(_) => return,
2778 };
2779 match Tag::decode(r) {
2780 Ok(Tag::Closing { tag_num: 2 }) => {}
2781 _ => return,
2782 }
2783 let priority = if !r.is_empty() {
2784 match dispatch_peek_context_tag(r, 3) {
2785 Some(len) => {
2786 let _ = Tag::decode(r);
2787 decode_unsigned(r, len as usize).ok().map(|p| p as u8)
2788 }
2789 None => None,
2790 }
2791 } else {
2792 None
2793 };
2794 let client_val = crate::data_value_to_client(val);
2795 if let Err(err) =
2796 handler.write_property(object_id, property_id, array_index, client_val, priority)
2797 {
2798 dispatch_send_error(
2799 datalink,
2800 invoke_id,
2801 SERVICE_WRITE_PROPERTY_MULTIPLE,
2802 err,
2803 source,
2804 )
2805 .await;
2806 return;
2807 }
2808 }
2809 }
2810 let mut buf = [0u8; 32];
2812 let mut w = Writer::new(&mut buf);
2813 if Npdu::new(0).encode(&mut w).is_err() {
2814 return;
2815 }
2816 if (SimpleAck {
2817 invoke_id,
2818 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
2819 })
2820 .encode(&mut w)
2821 .is_err()
2822 {
2823 return;
2824 }
2825 let _ = datalink.send(source, w.as_written()).await;
2826}
2827
2828async fn dispatch_handle_subscribe_cov<D: DataLink>(
2829 datalink: &D,
2830 handler: &dyn crate::server::ServiceHandler,
2831 r: &mut Reader<'_>,
2832 invoke_id: u8,
2833 source: DataLinkAddress,
2834) {
2835 let subscriber_process_id = match Tag::decode(r) {
2837 Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2838 Ok(v) => v,
2839 Err(_) => return,
2840 },
2841 _ => return,
2842 };
2843 let monitored_object_id = match crate::decode_ctx_object_id(r) {
2845 Ok(v) => v,
2846 Err(_) => return,
2847 };
2848 let issue_confirmed = match Tag::decode(r) {
2850 Ok(Tag::Context { tag_num: 2, len }) => match decode_unsigned(r, len as usize) {
2851 Ok(v) => v != 0,
2852 Err(_) => return,
2853 },
2854 _ => return,
2855 };
2856 let lifetime = if !r.is_empty() {
2858 match dispatch_peek_context_tag(r, 3) {
2859 Some(len) => {
2860 let _ = Tag::decode(r);
2861 decode_unsigned(r, len as usize).ok()
2862 }
2863 None => None,
2864 }
2865 } else {
2866 None
2867 };
2868
2869 match handler.subscribe_cov(
2870 subscriber_process_id,
2871 monitored_object_id,
2872 issue_confirmed,
2873 lifetime,
2874 ) {
2875 Ok(()) => {
2876 let mut buf = [0u8; 32];
2877 let mut w = Writer::new(&mut buf);
2878 if Npdu::new(0).encode(&mut w).is_err() {
2879 return;
2880 }
2881 if (SimpleAck {
2882 invoke_id,
2883 service_choice: SERVICE_SUBSCRIBE_COV,
2884 })
2885 .encode(&mut w)
2886 .is_err()
2887 {
2888 return;
2889 }
2890 let _ = datalink.send(source, w.as_written()).await;
2891 }
2892 Err(err) => {
2893 dispatch_send_error(datalink, invoke_id, SERVICE_SUBSCRIBE_COV, err, source).await;
2894 }
2895 }
2896}
2897
2898async fn dispatch_handle_create_object<D: DataLink>(
2899 datalink: &D,
2900 handler: &dyn crate::server::ServiceHandler,
2901 r: &mut Reader<'_>,
2902 invoke_id: u8,
2903 source: DataLinkAddress,
2904) {
2905 match Tag::decode(r) {
2907 Ok(Tag::Opening { tag_num: 0 }) => {}
2908 _ => return,
2909 }
2910 let object_type_raw = match Tag::decode(r) {
2912 Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2913 Ok(v) => v,
2914 Err(_) => return,
2915 },
2916 _ => return,
2917 };
2918 let object_type = ObjectType::from_u16(object_type_raw as u16);
2919 match Tag::decode(r) {
2921 Ok(Tag::Closing { tag_num: 0 }) => {}
2922 _ => return,
2923 }
2924
2925 match handler.create_object(object_type) {
2926 Ok(created_id) => {
2927 let mut buf = [0u8; 64];
2928 let mut w = Writer::new(&mut buf);
2929 if Npdu::new(0).encode(&mut w).is_err() {
2930 return;
2931 }
2932 if (ComplexAckHeader {
2933 segmented: false,
2934 more_follows: false,
2935 invoke_id,
2936 sequence_number: None,
2937 proposed_window_size: None,
2938 service_choice: SERVICE_CREATE_OBJECT,
2939 })
2940 .encode(&mut w)
2941 .is_err()
2942 {
2943 return;
2944 }
2945 if encode_ctx_unsigned(&mut w, 0, created_id.raw()).is_err() {
2946 return;
2947 }
2948 let _ = datalink.send(source, w.as_written()).await;
2949 }
2950 Err(err) => {
2951 dispatch_send_error(datalink, invoke_id, SERVICE_CREATE_OBJECT, err, source).await;
2952 }
2953 }
2954}
2955
2956async fn dispatch_handle_delete_object<D: DataLink>(
2957 datalink: &D,
2958 handler: &dyn crate::server::ServiceHandler,
2959 r: &mut Reader<'_>,
2960 invoke_id: u8,
2961 source: DataLinkAddress,
2962) {
2963 let object_id = match crate::decode_ctx_object_id(r) {
2964 Ok(v) => v,
2965 Err(_) => return,
2966 };
2967
2968 match handler.delete_object(object_id) {
2969 Ok(()) => {
2970 let mut buf = [0u8; 32];
2971 let mut w = Writer::new(&mut buf);
2972 if Npdu::new(0).encode(&mut w).is_err() {
2973 return;
2974 }
2975 if (SimpleAck {
2976 invoke_id,
2977 service_choice: SERVICE_DELETE_OBJECT,
2978 })
2979 .encode(&mut w)
2980 .is_err()
2981 {
2982 return;
2983 }
2984 let _ = datalink.send(source, w.as_written()).await;
2985 }
2986 Err(err) => {
2987 dispatch_send_error(datalink, invoke_id, SERVICE_DELETE_OBJECT, err, source).await;
2988 }
2989 }
2990}
2991
2992async fn dispatch_send_error<D: DataLink>(
2993 datalink: &D,
2994 invoke_id: u8,
2995 service_choice: u8,
2996 err: crate::server::BacnetServiceError,
2997 target: DataLinkAddress,
2998) {
2999 let (class, code) = dispatch_error_class_code(err);
3000 let mut buf = [0u8; 64];
3001 let mut w = Writer::new(&mut buf);
3002 if Npdu::new(0).encode(&mut w).is_err() {
3003 return;
3004 }
3005 if w.write_u8(0x50).is_err() {
3007 return;
3008 }
3009 if w.write_u8(invoke_id).is_err() {
3010 return;
3011 }
3012 if w.write_u8(service_choice).is_err() {
3013 return;
3014 }
3015 if (Tag::Application {
3017 tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3018 len: 1,
3019 })
3020 .encode(&mut w)
3021 .is_err()
3022 {
3023 return;
3024 }
3025 if w.write_u8(class).is_err() {
3026 return;
3027 }
3028 if (Tag::Application {
3030 tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3031 len: 1,
3032 })
3033 .encode(&mut w)
3034 .is_err()
3035 {
3036 return;
3037 }
3038 if w.write_u8(code).is_err() {
3039 return;
3040 }
3041 let _ = datalink.send(target, w.as_written()).await;
3042}
3043
3044async fn dispatch_send_reject<D: DataLink>(
3045 datalink: &D,
3046 invoke_id: u8,
3047 reason: u8,
3048 target: DataLinkAddress,
3049) {
3050 let mut buf = [0u8; 16];
3051 let mut w = Writer::new(&mut buf);
3052 if Npdu::new(0).encode(&mut w).is_err() {
3053 return;
3054 }
3055 if w.write_u8(0x60).is_err() {
3057 return;
3058 }
3059 if w.write_u8(invoke_id).is_err() {
3060 return;
3061 }
3062 if w.write_u8(reason).is_err() {
3063 return;
3064 }
3065 let _ = datalink.send(target, w.as_written()).await;
3066}
3067
3068fn dispatch_decode_who_is_limits(r: &mut Reader<'_>) -> Option<(u32, u32)> {
3070 if r.is_empty() {
3071 return None;
3072 }
3073 let tag0 = Tag::decode(r).ok()?;
3074 let low = match tag0 {
3075 Tag::Context { tag_num: 0, len } => decode_unsigned(r, len as usize).ok()?,
3076 _ => return None,
3077 };
3078 let tag1 = Tag::decode(r).ok()?;
3079 let high = match tag1 {
3080 Tag::Context { tag_num: 1, len } => decode_unsigned(r, len as usize).ok()?,
3081 _ => return None,
3082 };
3083 Some((low, high))
3084}
3085
3086fn dispatch_error_class_code(err: crate::server::BacnetServiceError) -> (u8, u8) {
3087 use crate::server::BacnetServiceError;
3088 match err {
3089 BacnetServiceError::UnknownObject => (1, 31),
3090 BacnetServiceError::UnknownProperty => (2, 32),
3091 BacnetServiceError::WriteAccessDenied => (2, 40),
3092 BacnetServiceError::InvalidDataType => (2, 9),
3093 BacnetServiceError::ServiceNotSupported => (5, 53),
3094 }
3095}
3096
3097fn dispatch_matches_who_is(device_id: u32, limits: Option<(u32, u32)>) -> bool {
3098 match limits {
3099 None => true,
3100 Some((low, high)) => device_id >= low && device_id <= high,
3101 }
3102}
3103
3104fn dispatch_decode_optional_array_index(r: &mut Reader<'_>) -> Option<u32> {
3105 if r.is_empty() {
3106 return None;
3107 }
3108 let tag = Tag::decode(r).ok()?;
3109 match tag {
3110 Tag::Context { tag_num: 2, len } => decode_unsigned(r, len as usize).ok(),
3111 _ => None,
3112 }
3113}
3114
3115fn dispatch_peek_context_tag(r: &mut Reader<'_>, tag_num: u8) -> Option<u32> {
3116 let first = r.peek_u8().ok()?;
3117 let is_context = (first & 0x08) != 0 && (first & 0x07) < 6;
3118 if !is_context {
3119 return None;
3120 }
3121 let this_tag_num = first >> 4;
3122 if this_tag_num != tag_num {
3123 return None;
3124 }
3125 let short_len = first & 0x07;
3126 if short_len < 5 {
3127 Some(short_len as u32)
3128 } else {
3129 None
3130 }
3131}
3132
3133fn dispatch_client_value_to_borrowed(val: &ClientDataValue) -> DataValue<'_> {
3134 match val {
3135 ClientDataValue::Null => DataValue::Null,
3136 ClientDataValue::Boolean(v) => DataValue::Boolean(*v),
3137 ClientDataValue::Unsigned(v) => DataValue::Unsigned(*v),
3138 ClientDataValue::Signed(v) => DataValue::Signed(*v),
3139 ClientDataValue::Real(v) => DataValue::Real(*v),
3140 ClientDataValue::Double(v) => DataValue::Double(*v),
3141 ClientDataValue::OctetString(v) => DataValue::OctetString(v),
3142 ClientDataValue::CharacterString(v) => DataValue::CharacterString(v),
3143 ClientDataValue::BitString { unused_bits, data } => {
3144 DataValue::BitString(rustbac_core::types::BitString {
3145 unused_bits: *unused_bits,
3146 data,
3147 })
3148 }
3149 ClientDataValue::Enumerated(v) => DataValue::Enumerated(*v),
3150 ClientDataValue::Date(v) => DataValue::Date(*v),
3151 ClientDataValue::Time(v) => DataValue::Time(*v),
3152 ClientDataValue::ObjectId(v) => DataValue::ObjectId(*v),
3153 ClientDataValue::Constructed { tag_num, values } => DataValue::Constructed {
3154 tag_num: *tag_num,
3155 values: values
3156 .iter()
3157 .map(dispatch_client_value_to_borrowed)
3158 .collect(),
3159 },
3160 }
3161}
3162
3163fn remote_service_error(err: BacnetError) -> ClientError {
3164 ClientError::RemoteServiceError {
3165 service_choice: err.service_choice,
3166 error_class_raw: err.error_class,
3167 error_code_raw: err.error_code,
3168 error_class: err.error_class.and_then(ErrorClass::from_u32),
3169 error_code: err.error_code.and_then(ErrorCode::from_u32),
3170 }
3171}
3172
3173fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
3174 Ok(match value {
3175 DataValue::Null => ClientDataValue::Null,
3176 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
3177 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
3178 DataValue::Signed(v) => ClientDataValue::Signed(v),
3179 DataValue::Real(v) => ClientDataValue::Real(v),
3180 DataValue::Double(v) => ClientDataValue::Double(v),
3181 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
3182 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
3183 DataValue::BitString(v) => ClientDataValue::BitString {
3184 unused_bits: v.unused_bits,
3185 data: v.data.to_vec(),
3186 },
3187 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
3188 DataValue::Date(v) => ClientDataValue::Date(v),
3189 DataValue::Time(v) => ClientDataValue::Time(v),
3190 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
3191 DataValue::Constructed { tag_num, values } => {
3192 let mut children = Vec::with_capacity(values.len());
3193 for child in values {
3194 children.push(into_client_value(child)?);
3195 }
3196 ClientDataValue::Constructed {
3197 tag_num,
3198 values: children,
3199 }
3200 }
3201 })
3202}
3203
3204fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
3205 value
3206 .into_iter()
3207 .map(|item| AlarmSummaryItem {
3208 object_id: item.object_id,
3209 alarm_state_raw: item.alarm_state,
3210 alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3211 item.alarm_state,
3212 ),
3213 acknowledged_transitions: ClientBitString {
3214 unused_bits: item.acknowledged_transitions.unused_bits,
3215 data: item.acknowledged_transitions.data.to_vec(),
3216 },
3217 })
3218 .collect()
3219}
3220
3221fn into_client_enrollment_summary(
3222 value: Vec<CoreEnrollmentSummaryItem>,
3223) -> Vec<EnrollmentSummaryItem> {
3224 value
3225 .into_iter()
3226 .map(|item| EnrollmentSummaryItem {
3227 object_id: item.object_id,
3228 event_type: item.event_type,
3229 event_state_raw: item.event_state,
3230 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3231 item.event_state,
3232 ),
3233 priority: item.priority,
3234 notification_class: item.notification_class,
3235 })
3236 .collect()
3237}
3238
3239fn into_client_event_information(
3240 value: Vec<CoreEventSummaryItem<'_>>,
3241) -> Vec<EventInformationItem> {
3242 value
3243 .into_iter()
3244 .map(|item| EventInformationItem {
3245 object_id: item.object_id,
3246 event_state_raw: item.event_state,
3247 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3248 item.event_state,
3249 ),
3250 acknowledged_transitions: ClientBitString {
3251 unused_bits: item.acknowledged_transitions.unused_bits,
3252 data: item.acknowledged_transitions.data.to_vec(),
3253 },
3254 notify_type: item.notify_type,
3255 event_enable: ClientBitString {
3256 unused_bits: item.event_enable.unused_bits,
3257 data: item.event_enable.data.to_vec(),
3258 },
3259 event_priorities: item.event_priorities,
3260 })
3261 .collect()
3262}
3263
3264fn into_client_cov_notification(
3265 source: DataLinkAddress,
3266 confirmed: bool,
3267 value: CovNotificationRequest<'_>,
3268) -> Result<CovNotification, ClientError> {
3269 let mut values = Vec::with_capacity(value.values.len());
3270 for property in value.values {
3271 values.push(CovPropertyValue {
3272 property_id: property.property_id,
3273 array_index: property.array_index,
3274 value: into_client_value(property.value)?,
3275 priority: property.priority,
3276 });
3277 }
3278
3279 Ok(CovNotification {
3280 source,
3281 confirmed,
3282 subscriber_process_id: value.subscriber_process_id,
3283 initiating_device_id: value.initiating_device_id,
3284 monitored_object_id: value.monitored_object_id,
3285 time_remaining_seconds: value.time_remaining_seconds,
3286 values,
3287 })
3288}
3289
3290fn into_client_event_notification(
3291 source: DataLinkAddress,
3292 confirmed: bool,
3293 value: EventNotificationRequest<'_>,
3294) -> EventNotification {
3295 EventNotification {
3296 source,
3297 confirmed,
3298 process_id: value.process_id,
3299 initiating_device_id: value.initiating_device_id,
3300 event_object_id: value.event_object_id,
3301 timestamp: value.timestamp,
3302 notification_class: value.notification_class,
3303 priority: value.priority,
3304 event_type: value.event_type,
3305 message_text: value.message_text.map(str::to_string),
3306 notify_type: value.notify_type,
3307 ack_required: value.ack_required,
3308 from_state_raw: value.from_state,
3309 from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3310 value.from_state,
3311 ),
3312 to_state_raw: value.to_state,
3313 to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
3314 }
3315}
3316
3317fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
3318 let mut items = Vec::with_capacity(value.items.len());
3319 for item in value.items {
3320 items.push(into_client_value(item)?);
3321 }
3322 Ok(ReadRangeResult {
3323 object_id: value.object_id,
3324 property_id: value.property_id,
3325 array_index: value.array_index,
3326 result_flags: ClientBitString {
3327 unused_bits: value.result_flags.unused_bits,
3328 data: value.result_flags.data.to_vec(),
3329 },
3330 item_count: value.item_count,
3331 items,
3332 })
3333}
3334
3335fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
3336 match value.access_method {
3337 AtomicReadFileAckAccess::Stream {
3338 file_start_position,
3339 file_data,
3340 } => AtomicReadFileResult::Stream {
3341 end_of_file: value.end_of_file,
3342 file_start_position,
3343 file_data: file_data.to_vec(),
3344 },
3345 AtomicReadFileAckAccess::Record {
3346 file_start_record,
3347 returned_record_count,
3348 file_record_data,
3349 } => AtomicReadFileResult::Record {
3350 end_of_file: value.end_of_file,
3351 file_start_record,
3352 returned_record_count,
3353 file_record_data: file_record_data
3354 .into_iter()
3355 .map(|record| record.to_vec())
3356 .collect(),
3357 },
3358 }
3359}
3360
3361fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
3362 match value {
3363 AtomicWriteFileAck::Stream {
3364 file_start_position,
3365 } => AtomicWriteFileResult::Stream {
3366 file_start_position,
3367 },
3368 AtomicWriteFileAck::Record { file_start_record } => {
3369 AtomicWriteFileResult::Record { file_start_record }
3370 }
3371 }
3372}
3373
3374#[cfg(test)]
3375mod tests {
3376 use super::BacnetClient;
3377 use crate::{
3378 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
3379 EnrollmentSummaryItem, EventInformationItem, EventNotification,
3380 };
3381 use rustbac_core::apdu::{
3382 ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
3383 UnconfirmedRequestHeader,
3384 };
3385 use rustbac_core::encoding::{
3386 primitives::{
3387 decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
3388 encode_ctx_object_id, encode_ctx_unsigned,
3389 },
3390 reader::Reader,
3391 tag::{AppTag, Tag},
3392 writer::Writer,
3393 };
3394 use rustbac_core::npdu::Npdu;
3395 use rustbac_core::services::acknowledge_alarm::{
3396 AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
3397 };
3398 use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
3399 use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
3400 use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
3401 use rustbac_core::services::cov_notification::{
3402 SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3403 };
3404 use rustbac_core::services::device_management::{
3405 DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
3406 SERVICE_REINITIALIZE_DEVICE,
3407 };
3408 use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
3409 use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
3410 use rustbac_core::services::event_notification::{
3411 SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3412 };
3413 use rustbac_core::services::list_element::{
3414 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
3415 SERVICE_REMOVE_LIST_ELEMENT,
3416 };
3417 use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
3418 use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
3419 use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
3420 use rustbac_core::services::read_range::SERVICE_READ_RANGE;
3421 use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
3422 use rustbac_core::services::subscribe_cov_property::{
3423 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
3424 };
3425 use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
3426 use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
3427 use rustbac_core::services::write_property_multiple::{
3428 PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
3429 };
3430 use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
3431 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
3432 use std::collections::VecDeque;
3433 use std::sync::Arc;
3434 use std::time::Duration;
3435 use tokio::sync::Mutex;
3436
3437 #[derive(Debug, Default)]
3438 struct MockState {
3439 sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
3440 recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
3441 }
3442
3443 #[derive(Debug, Clone)]
3444 struct MockDataLink {
3445 state: Arc<MockState>,
3446 }
3447
3448 impl MockDataLink {
3449 fn new() -> (Self, Arc<MockState>) {
3450 let state = Arc::new(MockState::default());
3451 (
3452 Self {
3453 state: state.clone(),
3454 },
3455 state,
3456 )
3457 }
3458 }
3459
3460 impl DataLink for MockDataLink {
3461 async fn send(
3462 &self,
3463 address: DataLinkAddress,
3464 payload: &[u8],
3465 ) -> Result<(), DataLinkError> {
3466 self.state
3467 .sent
3468 .lock()
3469 .await
3470 .push((address, payload.to_vec()));
3471 Ok(())
3472 }
3473
3474 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
3475 let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
3476 return Err(DataLinkError::InvalidFrame);
3477 };
3478 if payload.len() > buf.len() {
3479 return Err(DataLinkError::FrameTooLarge);
3480 }
3481 buf[..payload.len()].copy_from_slice(&payload);
3482 Ok((payload.len(), addr))
3483 }
3484 }
3485
3486 fn with_npdu(apdu: &[u8]) -> Vec<u8> {
3487 let mut out = [0u8; 512];
3488 let mut w = Writer::new(&mut out);
3489 Npdu::new(0).encode(&mut w).unwrap();
3490 w.write_all(apdu).unwrap();
3491 w.as_written().to_vec()
3492 }
3493
3494 fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3495 let mut apdu_buf = [0u8; 256];
3496 let mut w = Writer::new(&mut apdu_buf);
3497 ComplexAckHeader {
3498 segmented: false,
3499 more_follows: false,
3500 invoke_id,
3501 sequence_number: None,
3502 proposed_window_size: None,
3503 service_choice: SERVICE_READ_RANGE,
3504 }
3505 .encode(&mut w)
3506 .unwrap();
3507 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3508 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3509 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3510 w.write_u8(5).unwrap();
3511 w.write_u8(0b1110_0000).unwrap();
3512 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3513 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3514 encode_app_real(&mut w, 42.0).unwrap();
3515 encode_app_real(&mut w, 43.0).unwrap();
3516 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3517 w.as_written().to_vec()
3518 }
3519
3520 fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
3521 let mut apdu_buf = [0u8; 256];
3522 let mut w = Writer::new(&mut apdu_buf);
3523 ComplexAckHeader {
3524 segmented: false,
3525 more_follows: false,
3526 invoke_id,
3527 sequence_number: None,
3528 proposed_window_size: None,
3529 service_choice: SERVICE_ATOMIC_READ_FILE,
3530 }
3531 .encode(&mut w)
3532 .unwrap();
3533 Tag::Application {
3534 tag: AppTag::Boolean,
3535 len: if eof { 1 } else { 0 },
3536 }
3537 .encode(&mut w)
3538 .unwrap();
3539 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3540 Tag::Application {
3541 tag: AppTag::SignedInt,
3542 len: 1,
3543 }
3544 .encode(&mut w)
3545 .unwrap();
3546 w.write_u8(0).unwrap();
3547 Tag::Application {
3548 tag: AppTag::OctetString,
3549 len: data.len() as u32,
3550 }
3551 .encode(&mut w)
3552 .unwrap();
3553 w.write_all(data).unwrap();
3554 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3555 w.as_written().to_vec()
3556 }
3557
3558 fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
3559 let mut apdu_buf = [0u8; 256];
3560 let mut w = Writer::new(&mut apdu_buf);
3561 ComplexAckHeader {
3562 segmented: false,
3563 more_follows: false,
3564 invoke_id,
3565 sequence_number: None,
3566 proposed_window_size: None,
3567 service_choice: SERVICE_ATOMIC_READ_FILE,
3568 }
3569 .encode(&mut w)
3570 .unwrap();
3571 Tag::Application {
3572 tag: AppTag::Boolean,
3573 len: 0,
3574 }
3575 .encode(&mut w)
3576 .unwrap();
3577 Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
3578 Tag::Application {
3579 tag: AppTag::SignedInt,
3580 len: 1,
3581 }
3582 .encode(&mut w)
3583 .unwrap();
3584 w.write_u8(7).unwrap();
3585 Tag::Application {
3586 tag: AppTag::UnsignedInt,
3587 len: 1,
3588 }
3589 .encode(&mut w)
3590 .unwrap();
3591 w.write_u8(2).unwrap();
3592 Tag::Application {
3593 tag: AppTag::OctetString,
3594 len: 2,
3595 }
3596 .encode(&mut w)
3597 .unwrap();
3598 w.write_all(&[0x01, 0x02]).unwrap();
3599 Tag::Application {
3600 tag: AppTag::OctetString,
3601 len: 3,
3602 }
3603 .encode(&mut w)
3604 .unwrap();
3605 w.write_all(&[0x03, 0x04, 0x05]).unwrap();
3606 Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
3607 w.as_written().to_vec()
3608 }
3609
3610 fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
3611 let mut apdu_buf = [0u8; 64];
3612 let mut w = Writer::new(&mut apdu_buf);
3613 ComplexAckHeader {
3614 segmented: false,
3615 more_follows: false,
3616 invoke_id,
3617 sequence_number: None,
3618 proposed_window_size: None,
3619 service_choice: SERVICE_ATOMIC_WRITE_FILE,
3620 }
3621 .encode(&mut w)
3622 .unwrap();
3623 Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
3624 w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
3625 w.as_written().to_vec()
3626 }
3627
3628 fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
3629 let mut apdu_buf = [0u8; 64];
3630 let mut w = Writer::new(&mut apdu_buf);
3631 ComplexAckHeader {
3632 segmented: false,
3633 more_follows: false,
3634 invoke_id,
3635 sequence_number: None,
3636 proposed_window_size: None,
3637 service_choice: SERVICE_ATOMIC_WRITE_FILE,
3638 }
3639 .encode(&mut w)
3640 .unwrap();
3641 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3642 w.write_u8(start_record as u8).unwrap();
3643 w.as_written().to_vec()
3644 }
3645
3646 fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3647 let mut apdu_buf = [0u8; 64];
3648 let mut w = Writer::new(&mut apdu_buf);
3649 ComplexAckHeader {
3650 segmented: false,
3651 more_follows: false,
3652 invoke_id,
3653 sequence_number: None,
3654 proposed_window_size: None,
3655 service_choice: SERVICE_CREATE_OBJECT,
3656 }
3657 .encode(&mut w)
3658 .unwrap();
3659 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3660 w.as_written().to_vec()
3661 }
3662
3663 fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3664 let mut apdu_buf = [0u8; 128];
3665 let mut w = Writer::new(&mut apdu_buf);
3666 ComplexAckHeader {
3667 segmented: false,
3668 more_follows: false,
3669 invoke_id,
3670 sequence_number: None,
3671 proposed_window_size: None,
3672 service_choice: SERVICE_GET_ALARM_SUMMARY,
3673 }
3674 .encode(&mut w)
3675 .unwrap();
3676 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3677 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3678 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3679 w.write_u8(5).unwrap();
3680 w.write_u8(0b1110_0000).unwrap();
3681
3682 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
3683 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3684 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3685 w.write_u8(5).unwrap();
3686 w.write_u8(0b1100_0000).unwrap();
3687 w.as_written().to_vec()
3688 }
3689
3690 fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3691 let mut apdu_buf = [0u8; 160];
3692 let mut w = Writer::new(&mut apdu_buf);
3693 ComplexAckHeader {
3694 segmented: false,
3695 more_follows: false,
3696 invoke_id,
3697 sequence_number: None,
3698 proposed_window_size: None,
3699 service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
3700 }
3701 .encode(&mut w)
3702 .unwrap();
3703 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3704 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3705 encode_ctx_unsigned(&mut w, 2, 2).unwrap();
3706 encode_ctx_unsigned(&mut w, 3, 200).unwrap();
3707 encode_ctx_unsigned(&mut w, 4, 10).unwrap();
3708
3709 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
3710 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3711 encode_ctx_unsigned(&mut w, 2, 0).unwrap();
3712 encode_ctx_unsigned(&mut w, 3, 20).unwrap();
3713 encode_ctx_unsigned(&mut w, 4, 11).unwrap();
3714 w.as_written().to_vec()
3715 }
3716
3717 fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
3718 let mut apdu_buf = [0u8; 256];
3719 let mut w = Writer::new(&mut apdu_buf);
3720 ComplexAckHeader {
3721 segmented: false,
3722 more_follows: false,
3723 invoke_id,
3724 sequence_number: None,
3725 proposed_window_size: None,
3726 service_choice: SERVICE_GET_EVENT_INFORMATION,
3727 }
3728 .encode(&mut w)
3729 .unwrap();
3730 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3731 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3732 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3733 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3734 w.write_u8(5).unwrap();
3735 w.write_u8(0b1110_0000).unwrap();
3736 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3737 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3738 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3739 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3740 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3741 encode_ctx_unsigned(&mut w, 4, 0).unwrap();
3742 Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
3743 w.write_u8(5).unwrap();
3744 w.write_u8(0b1100_0000).unwrap();
3745 Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
3746 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3747 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3748 encode_ctx_unsigned(&mut w, 2, 3).unwrap();
3749 Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
3750 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3751 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3752 w.write_u8(0).unwrap();
3753 w.as_written().to_vec()
3754 }
3755
3756 #[tokio::test]
3757 async fn who_has_object_name_collects_i_have() {
3758 let (dl, state) = MockDataLink::new();
3759 let client = BacnetClient::with_datalink(dl);
3760 let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
3761
3762 let mut apdu = [0u8; 128];
3763 let mut w = Writer::new(&mut apdu);
3764 UnconfirmedRequestHeader {
3765 service_choice: SERVICE_I_HAVE,
3766 }
3767 .encode(&mut w)
3768 .unwrap();
3769 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
3770 encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3771 encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
3772
3773 state
3774 .recv
3775 .lock()
3776 .await
3777 .push_back((with_npdu(w.as_written()), addr));
3778
3779 let results = client
3780 .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
3781 .await
3782 .unwrap();
3783 assert_eq!(results.len(), 1);
3784 assert_eq!(results[0].address, addr);
3785 assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
3786 assert_eq!(
3787 results[0].object_id,
3788 ObjectId::new(ObjectType::AnalogInput, 7)
3789 );
3790 assert_eq!(results[0].object_name, "Zone Temp");
3791
3792 let sent = state.sent.lock().await;
3793 assert_eq!(sent.len(), 1);
3794 let mut r = Reader::new(&sent[0].1);
3795 let _npdu = Npdu::decode(&mut r).unwrap();
3796 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3797 assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
3798 }
3799
3800 #[tokio::test]
3801 async fn device_communication_control_handles_simple_ack() {
3802 let (dl, state) = MockDataLink::new();
3803 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3804 let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
3805
3806 let mut apdu = [0u8; 32];
3807 let mut w = Writer::new(&mut apdu);
3808 SimpleAck {
3809 invoke_id: 1,
3810 service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
3811 }
3812 .encode(&mut w)
3813 .unwrap();
3814 state
3815 .recv
3816 .lock()
3817 .await
3818 .push_back((with_npdu(w.as_written()), addr));
3819
3820 client
3821 .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
3822 .await
3823 .unwrap();
3824
3825 let sent = state.sent.lock().await;
3826 assert_eq!(sent.len(), 1);
3827 let mut r = Reader::new(&sent[0].1);
3828 let _npdu = Npdu::decode(&mut r).unwrap();
3829 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3830 assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
3831 }
3832
3833 #[tokio::test]
3834 async fn reinitialize_device_handles_simple_ack() {
3835 let (dl, state) = MockDataLink::new();
3836 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3837 let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
3838
3839 let mut apdu = [0u8; 32];
3840 let mut w = Writer::new(&mut apdu);
3841 SimpleAck {
3842 invoke_id: 1,
3843 service_choice: SERVICE_REINITIALIZE_DEVICE,
3844 }
3845 .encode(&mut w)
3846 .unwrap();
3847 state
3848 .recv
3849 .lock()
3850 .await
3851 .push_back((with_npdu(w.as_written()), addr));
3852
3853 client
3854 .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
3855 .await
3856 .unwrap();
3857
3858 let sent = state.sent.lock().await;
3859 assert_eq!(sent.len(), 1);
3860 let mut r = Reader::new(&sent[0].1);
3861 let _npdu = Npdu::decode(&mut r).unwrap();
3862 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3863 assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
3864 }
3865
3866 #[tokio::test]
3867 async fn time_synchronize_sends_unconfirmed_request() {
3868 let (dl, state) = MockDataLink::new();
3869 let client = BacnetClient::with_datalink(dl);
3870 let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
3871
3872 client
3873 .time_synchronize(
3874 addr,
3875 Date {
3876 year_since_1900: 126,
3877 month: 2,
3878 day: 7,
3879 weekday: 6,
3880 },
3881 Time {
3882 hour: 10,
3883 minute: 11,
3884 second: 12,
3885 hundredths: 13,
3886 },
3887 false,
3888 )
3889 .await
3890 .unwrap();
3891
3892 let sent = state.sent.lock().await;
3893 assert_eq!(sent.len(), 1);
3894 let mut r = Reader::new(&sent[0].1);
3895 let _npdu = Npdu::decode(&mut r).unwrap();
3896 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3897 assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
3898 }
3899
3900 #[tokio::test]
3901 async fn get_alarm_summary_decodes_complex_ack() {
3902 let (dl, state) = MockDataLink::new();
3903 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3904 let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
3905
3906 state
3907 .recv
3908 .lock()
3909 .await
3910 .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
3911
3912 let summaries = client.get_alarm_summary(addr).await.unwrap();
3913 assert_eq!(summaries.len(), 2);
3914 assert_eq!(
3915 summaries[0],
3916 AlarmSummaryItem {
3917 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3918 alarm_state_raw: 1,
3919 alarm_state: Some(EventState::Fault),
3920 acknowledged_transitions: crate::ClientBitString {
3921 unused_bits: 5,
3922 data: vec![0b1110_0000],
3923 },
3924 }
3925 );
3926 assert_eq!(
3927 summaries[1],
3928 AlarmSummaryItem {
3929 object_id: ObjectId::new(ObjectType::BinaryInput, 2),
3930 alarm_state_raw: 0,
3931 alarm_state: Some(EventState::Normal),
3932 acknowledged_transitions: crate::ClientBitString {
3933 unused_bits: 5,
3934 data: vec![0b1100_0000],
3935 },
3936 }
3937 );
3938
3939 let sent = state.sent.lock().await;
3940 assert_eq!(sent.len(), 1);
3941 let mut r = Reader::new(&sent[0].1);
3942 let _npdu = Npdu::decode(&mut r).unwrap();
3943 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3944 assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
3945 }
3946
3947 #[tokio::test]
3948 async fn get_enrollment_summary_decodes_complex_ack() {
3949 let (dl, state) = MockDataLink::new();
3950 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3951 let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
3952
3953 state
3954 .recv
3955 .lock()
3956 .await
3957 .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
3958
3959 let summaries = client.get_enrollment_summary(addr).await.unwrap();
3960 assert_eq!(summaries.len(), 2);
3961 assert_eq!(
3962 summaries[0],
3963 EnrollmentSummaryItem {
3964 object_id: ObjectId::new(ObjectType::AnalogInput, 7),
3965 event_type: 1,
3966 event_state_raw: 2,
3967 event_state: Some(EventState::Offnormal),
3968 priority: 200,
3969 notification_class: 10,
3970 }
3971 );
3972 assert_eq!(
3973 summaries[1],
3974 EnrollmentSummaryItem {
3975 object_id: ObjectId::new(ObjectType::BinaryInput, 8),
3976 event_type: 0,
3977 event_state_raw: 0,
3978 event_state: Some(EventState::Normal),
3979 priority: 20,
3980 notification_class: 11,
3981 }
3982 );
3983
3984 let sent = state.sent.lock().await;
3985 assert_eq!(sent.len(), 1);
3986 let mut r = Reader::new(&sent[0].1);
3987 let _npdu = Npdu::decode(&mut r).unwrap();
3988 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3989 assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
3990 }
3991
3992 #[tokio::test]
3993 async fn get_event_information_decodes_complex_ack() {
3994 let (dl, state) = MockDataLink::new();
3995 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3996 let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
3997
3998 state
3999 .recv
4000 .lock()
4001 .await
4002 .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
4003
4004 let result = client.get_event_information(addr, None).await.unwrap();
4005 assert!(!result.more_events);
4006 assert_eq!(result.summaries.len(), 1);
4007 assert_eq!(
4008 result.summaries[0],
4009 EventInformationItem {
4010 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4011 event_state_raw: 2,
4012 event_state: Some(EventState::Offnormal),
4013 acknowledged_transitions: crate::ClientBitString {
4014 unused_bits: 5,
4015 data: vec![0b1110_0000],
4016 },
4017 notify_type: 0,
4018 event_enable: crate::ClientBitString {
4019 unused_bits: 5,
4020 data: vec![0b1100_0000],
4021 },
4022 event_priorities: [1, 2, 3],
4023 }
4024 );
4025 }
4026
4027 #[tokio::test]
4028 async fn acknowledge_alarm_handles_simple_ack() {
4029 let (dl, state) = MockDataLink::new();
4030 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4031 let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
4032
4033 let mut apdu = [0u8; 32];
4034 let mut w = Writer::new(&mut apdu);
4035 SimpleAck {
4036 invoke_id: 1,
4037 service_choice: SERVICE_ACKNOWLEDGE_ALARM,
4038 }
4039 .encode(&mut w)
4040 .unwrap();
4041 state
4042 .recv
4043 .lock()
4044 .await
4045 .push_back((with_npdu(w.as_written()), addr));
4046
4047 client
4048 .acknowledge_alarm(
4049 addr,
4050 AcknowledgeAlarmRequest {
4051 acknowledging_process_id: 10,
4052 event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4053 event_state_acknowledged: EventState::Offnormal,
4054 event_time_stamp: TimeStamp::SequenceNumber(42),
4055 acknowledgment_source: "operator",
4056 time_of_acknowledgment: TimeStamp::DateTime {
4057 date: Date {
4058 year_since_1900: 126,
4059 month: 2,
4060 day: 7,
4061 weekday: 6,
4062 },
4063 time: Time {
4064 hour: 10,
4065 minute: 11,
4066 second: 12,
4067 hundredths: 13,
4068 },
4069 },
4070 invoke_id: 0,
4071 },
4072 )
4073 .await
4074 .unwrap();
4075
4076 let sent = state.sent.lock().await;
4077 assert_eq!(sent.len(), 1);
4078 let mut r = Reader::new(&sent[0].1);
4079 let _npdu = Npdu::decode(&mut r).unwrap();
4080 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4081 assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
4082 }
4083
4084 #[tokio::test]
4085 async fn create_object_by_type_decodes_complex_ack() {
4086 let (dl, state) = MockDataLink::new();
4087 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4088 let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
4089 let created = ObjectId::new(ObjectType::AnalogValue, 42);
4090
4091 state
4092 .recv
4093 .lock()
4094 .await
4095 .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
4096
4097 let result = client
4098 .create_object_by_type(addr, ObjectType::AnalogValue)
4099 .await
4100 .unwrap();
4101 assert_eq!(result, created);
4102
4103 let sent = state.sent.lock().await;
4104 let mut r = Reader::new(&sent[0].1);
4105 let _npdu = Npdu::decode(&mut r).unwrap();
4106 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4107 assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
4108 }
4109
4110 #[tokio::test]
4111 async fn delete_object_handles_simple_ack() {
4112 let (dl, state) = MockDataLink::new();
4113 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4114 let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
4115
4116 let mut apdu = [0u8; 32];
4117 let mut w = Writer::new(&mut apdu);
4118 SimpleAck {
4119 invoke_id: 1,
4120 service_choice: SERVICE_DELETE_OBJECT,
4121 }
4122 .encode(&mut w)
4123 .unwrap();
4124 state
4125 .recv
4126 .lock()
4127 .await
4128 .push_back((with_npdu(w.as_written()), addr));
4129
4130 client
4131 .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
4132 .await
4133 .unwrap();
4134 }
4135
4136 #[tokio::test]
4137 async fn add_list_element_handles_simple_ack() {
4138 let (dl, state) = MockDataLink::new();
4139 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4140 let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
4141
4142 let mut apdu = [0u8; 32];
4143 let mut w = Writer::new(&mut apdu);
4144 SimpleAck {
4145 invoke_id: 1,
4146 service_choice: SERVICE_ADD_LIST_ELEMENT,
4147 }
4148 .encode(&mut w)
4149 .unwrap();
4150 state
4151 .recv
4152 .lock()
4153 .await
4154 .push_back((with_npdu(w.as_written()), addr));
4155
4156 let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
4157 client
4158 .add_list_element(
4159 addr,
4160 AddListElementRequest {
4161 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4162 property_id: PropertyId::Proprietary(512),
4163 array_index: None,
4164 elements: &values,
4165 invoke_id: 0,
4166 },
4167 )
4168 .await
4169 .unwrap();
4170 }
4171
4172 #[tokio::test]
4173 async fn remove_list_element_handles_simple_ack() {
4174 let (dl, state) = MockDataLink::new();
4175 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4176 let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
4177
4178 let mut apdu = [0u8; 32];
4179 let mut w = Writer::new(&mut apdu);
4180 SimpleAck {
4181 invoke_id: 1,
4182 service_choice: SERVICE_REMOVE_LIST_ELEMENT,
4183 }
4184 .encode(&mut w)
4185 .unwrap();
4186 state
4187 .recv
4188 .lock()
4189 .await
4190 .push_back((with_npdu(w.as_written()), addr));
4191
4192 let values = [DataValue::Unsigned(1)];
4193 client
4194 .remove_list_element(
4195 addr,
4196 RemoveListElementRequest {
4197 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4198 property_id: PropertyId::Proprietary(513),
4199 array_index: None,
4200 elements: &values,
4201 invoke_id: 0,
4202 },
4203 )
4204 .await
4205 .unwrap();
4206 }
4207
4208 #[tokio::test]
4209 async fn atomic_read_file_stream_decodes_complex_ack() {
4210 let (dl, state) = MockDataLink::new();
4211 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4212 let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
4213 let file_object = ObjectId::new(ObjectType::File, 2);
4214
4215 state.recv.lock().await.push_back((
4216 with_npdu(&atomic_read_file_stream_ack_apdu(
4217 1,
4218 true,
4219 &[0xAA, 0xBB, 0xCC],
4220 )),
4221 addr,
4222 ));
4223
4224 let result = client
4225 .atomic_read_file_stream(addr, file_object, 0, 3)
4226 .await
4227 .unwrap();
4228
4229 assert_eq!(
4230 result,
4231 AtomicReadFileResult::Stream {
4232 end_of_file: true,
4233 file_start_position: 0,
4234 file_data: vec![0xAA, 0xBB, 0xCC],
4235 }
4236 );
4237
4238 let sent = state.sent.lock().await;
4239 assert_eq!(sent.len(), 1);
4240 let mut r = Reader::new(&sent[0].1);
4241 let _npdu = Npdu::decode(&mut r).unwrap();
4242 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4243 assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
4244 }
4245
4246 #[tokio::test]
4247 async fn atomic_read_file_record_decodes_complex_ack() {
4248 let (dl, state) = MockDataLink::new();
4249 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4250 let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
4251 let file_object = ObjectId::new(ObjectType::File, 5);
4252
4253 state
4254 .recv
4255 .lock()
4256 .await
4257 .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
4258
4259 let result = client
4260 .atomic_read_file_record(addr, file_object, 7, 2)
4261 .await
4262 .unwrap();
4263
4264 assert_eq!(
4265 result,
4266 AtomicReadFileResult::Record {
4267 end_of_file: false,
4268 file_start_record: 7,
4269 returned_record_count: 2,
4270 file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
4271 }
4272 );
4273 }
4274
4275 #[tokio::test]
4276 async fn atomic_write_file_stream_decodes_complex_ack() {
4277 let (dl, state) = MockDataLink::new();
4278 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4279 let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
4280 let file_object = ObjectId::new(ObjectType::File, 3);
4281
4282 state
4283 .recv
4284 .lock()
4285 .await
4286 .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
4287
4288 let result = client
4289 .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
4290 .await
4291 .unwrap();
4292
4293 assert_eq!(
4294 result,
4295 AtomicWriteFileResult::Stream {
4296 file_start_position: 128
4297 }
4298 );
4299 }
4300
4301 #[tokio::test]
4302 async fn atomic_write_file_record_decodes_complex_ack() {
4303 let (dl, state) = MockDataLink::new();
4304 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4305 let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
4306 let file_object = ObjectId::new(ObjectType::File, 9);
4307 let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
4308
4309 state
4310 .recv
4311 .lock()
4312 .await
4313 .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
4314
4315 let result = client
4316 .atomic_write_file_record(addr, file_object, 7, &records)
4317 .await
4318 .unwrap();
4319
4320 assert_eq!(
4321 result,
4322 AtomicWriteFileResult::Record {
4323 file_start_record: 7
4324 }
4325 );
4326 }
4327
4328 #[tokio::test]
4329 async fn read_properties_decodes_complex_ack() {
4330 let (dl, state) = MockDataLink::new();
4331 let client = BacnetClient::with_datalink(dl);
4332 let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
4333 let object_id = ObjectId::new(ObjectType::Device, 1);
4334
4335 let mut apdu_buf = [0u8; 256];
4336 let mut w = Writer::new(&mut apdu_buf);
4337 ComplexAckHeader {
4338 segmented: false,
4339 more_follows: false,
4340 invoke_id: 1,
4341 sequence_number: None,
4342 proposed_window_size: None,
4343 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4344 }
4345 .encode(&mut w)
4346 .unwrap();
4347 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4348 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4349 .encode(&mut w)
4350 .unwrap();
4351 encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
4352 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4353 .encode(&mut w)
4354 .unwrap();
4355 encode_app_real(&mut w, 55.5).unwrap();
4356 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4357 .encode(&mut w)
4358 .unwrap();
4359 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4360 .encode(&mut w)
4361 .unwrap();
4362
4363 state
4364 .recv
4365 .lock()
4366 .await
4367 .push_back((with_npdu(w.as_written()), addr));
4368
4369 let values = client
4370 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4371 .await
4372 .unwrap();
4373 assert_eq!(values.len(), 1);
4374 assert_eq!(values[0].0, PropertyId::PresentValue);
4375 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
4376
4377 let sent = state.sent.lock().await;
4378 assert_eq!(sent.len(), 1);
4379 let mut r = Reader::new(&sent[0].1);
4380 let _npdu = Npdu::decode(&mut r).unwrap();
4381 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4382 assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
4383 }
4384
4385 #[tokio::test]
4386 async fn read_property_multiple_reassembles_segmented_complex_ack() {
4387 let (dl, state) = MockDataLink::new();
4388 let client = BacnetClient::with_datalink(dl);
4389 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4390 let object_id = ObjectId::new(ObjectType::Device, 1);
4391
4392 let mut payload_buf = [0u8; 256];
4393 let mut pw = Writer::new(&mut payload_buf);
4394 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4395 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4396 .encode(&mut pw)
4397 .unwrap();
4398 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4399 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4400 .encode(&mut pw)
4401 .unwrap();
4402 encode_app_real(&mut pw, 66.0).unwrap();
4403 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4404 .encode(&mut pw)
4405 .unwrap();
4406 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4407 .encode(&mut pw)
4408 .unwrap();
4409 let payload = pw.as_written();
4410 let split = payload.len() / 2;
4411
4412 let mut apdu1 = [0u8; 256];
4413 let mut w1 = Writer::new(&mut apdu1);
4414 ComplexAckHeader {
4415 segmented: true,
4416 more_follows: true,
4417 invoke_id: 1,
4418 sequence_number: Some(0),
4419 proposed_window_size: Some(1),
4420 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4421 }
4422 .encode(&mut w1)
4423 .unwrap();
4424 w1.write_all(&payload[..split]).unwrap();
4425
4426 let mut apdu2 = [0u8; 256];
4427 let mut w2 = Writer::new(&mut apdu2);
4428 ComplexAckHeader {
4429 segmented: true,
4430 more_follows: false,
4431 invoke_id: 1,
4432 sequence_number: Some(1),
4433 proposed_window_size: Some(1),
4434 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4435 }
4436 .encode(&mut w2)
4437 .unwrap();
4438 w2.write_all(&payload[split..]).unwrap();
4439
4440 state
4441 .recv
4442 .lock()
4443 .await
4444 .push_back((with_npdu(w1.as_written()), addr));
4445 state
4446 .recv
4447 .lock()
4448 .await
4449 .push_back((with_npdu(w2.as_written()), addr));
4450
4451 let values = client
4452 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4453 .await
4454 .unwrap();
4455 assert_eq!(values.len(), 1);
4456 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4457
4458 let sent = state.sent.lock().await;
4459 assert!(sent.len() >= 3);
4460
4461 let mut saw_segment_ack = 0usize;
4462 for (_, frame) in sent.iter().skip(1) {
4463 let mut r = Reader::new(frame);
4464 let _npdu = Npdu::decode(&mut r).unwrap();
4465 let apdu = r.read_exact(r.remaining()).unwrap();
4466 if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
4467 let mut sr = Reader::new(apdu);
4468 let sack = SegmentAck::decode(&mut sr).unwrap();
4469 assert_eq!(sack.invoke_id, 1);
4470 saw_segment_ack += 1;
4471 }
4472 }
4473 assert!(saw_segment_ack >= 1);
4474 }
4475
4476 #[tokio::test]
4477 async fn read_property_multiple_tolerates_duplicate_segment() {
4478 let (dl, state) = MockDataLink::new();
4479 let client = BacnetClient::with_datalink(dl);
4480 let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
4481 let object_id = ObjectId::new(ObjectType::Device, 1);
4482
4483 let mut payload_buf = [0u8; 256];
4484 let mut pw = Writer::new(&mut payload_buf);
4485 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4486 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4487 .encode(&mut pw)
4488 .unwrap();
4489 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4490 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4491 .encode(&mut pw)
4492 .unwrap();
4493 encode_app_real(&mut pw, 66.0).unwrap();
4494 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4495 .encode(&mut pw)
4496 .unwrap();
4497 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4498 .encode(&mut pw)
4499 .unwrap();
4500 let payload = pw.as_written();
4501 let split = payload.len() / 2;
4502
4503 let mut apdu1 = [0u8; 256];
4504 let mut w1 = Writer::new(&mut apdu1);
4505 ComplexAckHeader {
4506 segmented: true,
4507 more_follows: true,
4508 invoke_id: 1,
4509 sequence_number: Some(0),
4510 proposed_window_size: Some(1),
4511 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4512 }
4513 .encode(&mut w1)
4514 .unwrap();
4515 w1.write_all(&payload[..split]).unwrap();
4516
4517 let mut dup = [0u8; 256];
4518 let mut wd = Writer::new(&mut dup);
4519 ComplexAckHeader {
4520 segmented: true,
4521 more_follows: true,
4522 invoke_id: 1,
4523 sequence_number: Some(0),
4524 proposed_window_size: Some(1),
4525 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4526 }
4527 .encode(&mut wd)
4528 .unwrap();
4529 wd.write_all(&payload[..split]).unwrap();
4530
4531 let mut apdu2 = [0u8; 256];
4532 let mut w2 = Writer::new(&mut apdu2);
4533 ComplexAckHeader {
4534 segmented: true,
4535 more_follows: false,
4536 invoke_id: 1,
4537 sequence_number: Some(1),
4538 proposed_window_size: Some(1),
4539 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4540 }
4541 .encode(&mut w2)
4542 .unwrap();
4543 w2.write_all(&payload[split..]).unwrap();
4544
4545 {
4546 let mut recv = state.recv.lock().await;
4547 recv.push_back((with_npdu(w1.as_written()), addr));
4548 recv.push_back((with_npdu(wd.as_written()), addr));
4549 recv.push_back((with_npdu(w2.as_written()), addr));
4550 }
4551
4552 let values = client
4553 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4554 .await
4555 .unwrap();
4556 assert_eq!(values.len(), 1);
4557 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4558 }
4559
4560 #[tokio::test]
4561 async fn write_properties_handles_simple_ack() {
4562 let (dl, state) = MockDataLink::new();
4563 let client = BacnetClient::with_datalink(dl);
4564 let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
4565 let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
4566
4567 let mut apdu_buf = [0u8; 32];
4568 let mut w = Writer::new(&mut apdu_buf);
4569 SimpleAck {
4570 invoke_id: 1,
4571 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4572 }
4573 .encode(&mut w)
4574 .unwrap();
4575 state
4576 .recv
4577 .lock()
4578 .await
4579 .push_back((with_npdu(w.as_written()), addr));
4580
4581 let writes = [PropertyWriteSpec {
4582 property_id: PropertyId::PresentValue,
4583 array_index: None,
4584 value: DataValue::Real(12.5),
4585 priority: Some(8),
4586 }];
4587 client
4588 .write_property_multiple(addr, object_id, &writes)
4589 .await
4590 .unwrap();
4591
4592 let sent = state.sent.lock().await;
4593 assert_eq!(sent.len(), 1);
4594 let mut r = Reader::new(&sent[0].1);
4595 let _npdu = Npdu::decode(&mut r).unwrap();
4596 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4597 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4598 }
4599
4600 #[tokio::test]
4601 async fn subscribe_cov_handles_simple_ack() {
4602 let (dl, state) = MockDataLink::new();
4603 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4604 let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
4605
4606 let mut apdu_buf = [0u8; 32];
4607 let mut w = Writer::new(&mut apdu_buf);
4608 SimpleAck {
4609 invoke_id: 1,
4610 service_choice: SERVICE_SUBSCRIBE_COV,
4611 }
4612 .encode(&mut w)
4613 .unwrap();
4614 state
4615 .recv
4616 .lock()
4617 .await
4618 .push_back((with_npdu(w.as_written()), addr));
4619
4620 client
4621 .subscribe_cov(
4622 addr,
4623 SubscribeCovRequest {
4624 subscriber_process_id: 10,
4625 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4626 issue_confirmed_notifications: Some(false),
4627 lifetime_seconds: Some(300),
4628 invoke_id: 0,
4629 },
4630 )
4631 .await
4632 .unwrap();
4633
4634 let sent = state.sent.lock().await;
4635 assert_eq!(sent.len(), 1);
4636 let mut r = Reader::new(&sent[0].1);
4637 let _npdu = Npdu::decode(&mut r).unwrap();
4638 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4639 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
4640 }
4641
4642 #[tokio::test]
4643 async fn subscribe_cov_property_handles_simple_ack() {
4644 let (dl, state) = MockDataLink::new();
4645 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4646 let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
4647
4648 let mut apdu_buf = [0u8; 32];
4649 let mut w = Writer::new(&mut apdu_buf);
4650 SimpleAck {
4651 invoke_id: 1,
4652 service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
4653 }
4654 .encode(&mut w)
4655 .unwrap();
4656 state
4657 .recv
4658 .lock()
4659 .await
4660 .push_back((with_npdu(w.as_written()), addr));
4661
4662 client
4663 .subscribe_cov_property(
4664 addr,
4665 SubscribeCovPropertyRequest {
4666 subscriber_process_id: 22,
4667 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4668 issue_confirmed_notifications: Some(true),
4669 lifetime_seconds: Some(120),
4670 monitored_property_id: PropertyId::PresentValue,
4671 monitored_property_array_index: None,
4672 cov_increment: Some(0.1),
4673 invoke_id: 0,
4674 },
4675 )
4676 .await
4677 .unwrap();
4678
4679 let sent = state.sent.lock().await;
4680 assert_eq!(sent.len(), 1);
4681 let mut r = Reader::new(&sent[0].1);
4682 let _npdu = Npdu::decode(&mut r).unwrap();
4683 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4684 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
4685 }
4686
4687 #[tokio::test]
4688 async fn read_range_by_position_decodes_complex_ack() {
4689 let (dl, state) = MockDataLink::new();
4690 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4691 let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
4692 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4693
4694 let mut apdu_buf = [0u8; 256];
4695 let mut w = Writer::new(&mut apdu_buf);
4696 ComplexAckHeader {
4697 segmented: false,
4698 more_follows: false,
4699 invoke_id: 1,
4700 sequence_number: None,
4701 proposed_window_size: None,
4702 service_choice: SERVICE_READ_RANGE,
4703 }
4704 .encode(&mut w)
4705 .unwrap();
4706 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
4707 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4708 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
4709 w.write_u8(5).unwrap();
4710 w.write_u8(0b1110_0000).unwrap();
4711 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
4712 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
4713 encode_app_real(&mut w, 42.0).unwrap();
4714 encode_app_real(&mut w, 43.0).unwrap();
4715 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
4716
4717 state
4718 .recv
4719 .lock()
4720 .await
4721 .push_back((with_npdu(w.as_written()), addr));
4722
4723 let result = client
4724 .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
4725 .await
4726 .unwrap();
4727 assert_eq!(result.object_id, object_id);
4728 assert_eq!(result.item_count, 2);
4729 assert_eq!(result.items.len(), 2);
4730 assert!(matches!(
4731 result.items[0],
4732 ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
4733 ));
4734 }
4735
4736 #[tokio::test]
4737 async fn read_range_by_sequence_number_encodes_range_selector() {
4738 let (dl, state) = MockDataLink::new();
4739 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4740 let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
4741 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4742
4743 state
4744 .recv
4745 .lock()
4746 .await
4747 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4748
4749 let _ = client
4750 .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
4751 .await
4752 .unwrap();
4753
4754 let sent = state.sent.lock().await;
4755 let mut r = Reader::new(&sent[0].1);
4756 let _npdu = Npdu::decode(&mut r).unwrap();
4757 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4758 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4759 match Tag::decode(&mut r).unwrap() {
4760 Tag::Context { tag_num: 0, len: 4 } => {
4761 let _ = r.read_exact(4).unwrap();
4762 }
4763 other => panic!("unexpected object id tag: {other:?}"),
4764 }
4765 match Tag::decode(&mut r).unwrap() {
4766 Tag::Context { tag_num: 1, len } => {
4767 let _ = decode_unsigned(&mut r, len as usize).unwrap();
4768 }
4769 other => panic!("unexpected property tag: {other:?}"),
4770 }
4771 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
4772 match Tag::decode(&mut r).unwrap() {
4773 Tag::Application {
4774 tag: AppTag::UnsignedInt,
4775 len,
4776 } => {
4777 assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
4778 }
4779 other => panic!("unexpected ref seq tag: {other:?}"),
4780 }
4781 match Tag::decode(&mut r).unwrap() {
4782 Tag::Application {
4783 tag: AppTag::SignedInt,
4784 len,
4785 } => {
4786 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4787 }
4788 other => panic!("unexpected count tag: {other:?}"),
4789 }
4790 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
4791 }
4792
4793 #[tokio::test]
4794 async fn read_range_by_time_encodes_range_selector() {
4795 let (dl, state) = MockDataLink::new();
4796 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4797 let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
4798 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4799 let date = Date {
4800 year_since_1900: 126,
4801 month: 2,
4802 day: 7,
4803 weekday: 6,
4804 };
4805 let time = Time {
4806 hour: 10,
4807 minute: 11,
4808 second: 12,
4809 hundredths: 13,
4810 };
4811
4812 state
4813 .recv
4814 .lock()
4815 .await
4816 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4817
4818 let _ = client
4819 .read_range_by_time(
4820 addr,
4821 object_id,
4822 PropertyId::PresentValue,
4823 None,
4824 (date, time),
4825 2,
4826 )
4827 .await
4828 .unwrap();
4829
4830 let sent = state.sent.lock().await;
4831 let mut r = Reader::new(&sent[0].1);
4832 let _npdu = Npdu::decode(&mut r).unwrap();
4833 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4834 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4835 match Tag::decode(&mut r).unwrap() {
4836 Tag::Context { tag_num: 0, len: 4 } => {
4837 let _ = r.read_exact(4).unwrap();
4838 }
4839 other => panic!("unexpected object id tag: {other:?}"),
4840 }
4841 match Tag::decode(&mut r).unwrap() {
4842 Tag::Context { tag_num: 1, len } => {
4843 let _ = decode_unsigned(&mut r, len as usize).unwrap();
4844 }
4845 other => panic!("unexpected property tag: {other:?}"),
4846 }
4847 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
4848 match Tag::decode(&mut r).unwrap() {
4849 Tag::Application {
4850 tag: AppTag::Date,
4851 len: 4,
4852 } => {
4853 let raw = r.read_exact(4).unwrap();
4854 assert_eq!(
4855 raw,
4856 &[date.year_since_1900, date.month, date.day, date.weekday]
4857 );
4858 }
4859 other => panic!("unexpected date tag: {other:?}"),
4860 }
4861 match Tag::decode(&mut r).unwrap() {
4862 Tag::Application {
4863 tag: AppTag::Time,
4864 len: 4,
4865 } => {
4866 let raw = r.read_exact(4).unwrap();
4867 assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
4868 }
4869 other => panic!("unexpected time tag: {other:?}"),
4870 }
4871 match Tag::decode(&mut r).unwrap() {
4872 Tag::Application {
4873 tag: AppTag::SignedInt,
4874 len,
4875 } => {
4876 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4877 }
4878 other => panic!("unexpected count tag: {other:?}"),
4879 }
4880 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
4881 }
4882
4883 #[tokio::test]
4884 async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
4885 let (dl, state) = MockDataLink::new();
4886 let client = BacnetClient::with_datalink(dl);
4887 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4888
4889 let mut apdu = [0u8; 256];
4890 let mut w = Writer::new(&mut apdu);
4891 UnconfirmedRequestHeader {
4892 service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
4893 }
4894 .encode(&mut w)
4895 .unwrap();
4896 encode_ctx_unsigned(&mut w, 0, 17).unwrap();
4897 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4898 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
4899 encode_ctx_unsigned(&mut w, 3, 60).unwrap();
4900 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4901 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4902 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4903 encode_app_real(&mut w, 73.25).unwrap();
4904 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4905 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4906
4907 state
4908 .recv
4909 .lock()
4910 .await
4911 .push_back((with_npdu(w.as_written()), addr));
4912
4913 let notification = client
4914 .recv_cov_notification(Duration::from_secs(1))
4915 .await
4916 .unwrap()
4917 .unwrap();
4918 assert!(!notification.confirmed);
4919 assert_eq!(notification.subscriber_process_id, 17);
4920 assert_eq!(notification.values.len(), 1);
4921 assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
4922 assert!(matches!(
4923 notification.values[0].value,
4924 ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
4925 ));
4926
4927 let sent = state.sent.lock().await;
4928 assert!(sent.is_empty());
4929 }
4930
4931 #[tokio::test]
4932 async fn recv_confirmed_cov_notification_sends_simple_ack() {
4933 let (dl, state) = MockDataLink::new();
4934 let client = BacnetClient::with_datalink(dl);
4935 let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
4936
4937 let mut apdu = [0u8; 256];
4938 let mut w = Writer::new(&mut apdu);
4939 ConfirmedRequestHeader {
4940 segmented: false,
4941 more_follows: false,
4942 segmented_response_accepted: false,
4943 max_segments: 0,
4944 max_apdu: 5,
4945 invoke_id: 9,
4946 sequence_number: None,
4947 proposed_window_size: None,
4948 service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
4949 }
4950 .encode(&mut w)
4951 .unwrap();
4952 encode_ctx_unsigned(&mut w, 0, 18).unwrap();
4953 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4954 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
4955 encode_ctx_unsigned(&mut w, 3, 120).unwrap();
4956 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4957 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4958 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4959 encode_app_real(&mut w, 55.0).unwrap();
4960 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4961 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4962
4963 state
4964 .recv
4965 .lock()
4966 .await
4967 .push_back((with_npdu(w.as_written()), addr));
4968
4969 let notification = client
4970 .recv_cov_notification(Duration::from_secs(1))
4971 .await
4972 .unwrap()
4973 .unwrap();
4974 assert!(notification.confirmed);
4975 assert_eq!(notification.values.len(), 1);
4976
4977 let sent = state.sent.lock().await;
4978 assert_eq!(sent.len(), 1);
4979 let mut r = Reader::new(&sent[0].1);
4980 let _npdu = Npdu::decode(&mut r).unwrap();
4981 let ack = SimpleAck::decode(&mut r).unwrap();
4982 assert_eq!(ack.invoke_id, 9);
4983 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
4984 }
4985
4986 #[tokio::test]
4987 async fn recv_unconfirmed_event_notification_returns_decoded_value() {
4988 let (dl, state) = MockDataLink::new();
4989 let client = BacnetClient::with_datalink(dl);
4990 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4991
4992 let mut apdu = [0u8; 256];
4993 let mut w = Writer::new(&mut apdu);
4994 UnconfirmedRequestHeader {
4995 service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
4996 }
4997 .encode(&mut w)
4998 .unwrap();
4999 encode_ctx_unsigned(&mut w, 0, 99).unwrap();
5000 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5001 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
5002 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5003 encode_ctx_unsigned(&mut w, 1, 55).unwrap();
5004 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5005 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5006 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5007 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5008 encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
5009 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5010 Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
5011 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5012 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5013 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5014 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5015 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5016 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5017 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5018
5019 state
5020 .recv
5021 .lock()
5022 .await
5023 .push_back((with_npdu(w.as_written()), addr));
5024
5025 let notification: EventNotification = client
5026 .recv_event_notification(Duration::from_secs(1))
5027 .await
5028 .unwrap()
5029 .unwrap();
5030 assert!(!notification.confirmed);
5031 assert_eq!(notification.process_id, 99);
5032 assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
5033 assert_eq!(notification.ack_required, Some(true));
5034 assert_eq!(notification.from_state, Some(EventState::Offnormal));
5035 assert_eq!(notification.to_state, Some(EventState::Normal));
5036 assert_eq!(notification.notify_type, 0);
5037
5038 let sent = state.sent.lock().await;
5039 assert!(sent.is_empty());
5040 }
5041
5042 #[tokio::test]
5043 async fn recv_confirmed_event_notification_sends_simple_ack() {
5044 let (dl, state) = MockDataLink::new();
5045 let client = BacnetClient::with_datalink(dl);
5046 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5047
5048 let mut apdu = [0u8; 256];
5049 let mut w = Writer::new(&mut apdu);
5050 ConfirmedRequestHeader {
5051 segmented: false,
5052 more_follows: false,
5053 segmented_response_accepted: false,
5054 max_segments: 0,
5055 max_apdu: 5,
5056 invoke_id: 11,
5057 sequence_number: None,
5058 proposed_window_size: None,
5059 service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
5060 }
5061 .encode(&mut w)
5062 .unwrap();
5063 encode_ctx_unsigned(&mut w, 0, 100).unwrap();
5064 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5065 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
5066 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5067 encode_ctx_unsigned(&mut w, 1, 56).unwrap();
5068 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5069 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5070 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5071 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5072 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5073 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5074 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5075 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5076 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5077 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5078 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5079 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5080
5081 state
5082 .recv
5083 .lock()
5084 .await
5085 .push_back((with_npdu(w.as_written()), addr));
5086
5087 let notification = client
5088 .recv_event_notification(Duration::from_secs(1))
5089 .await
5090 .unwrap()
5091 .unwrap();
5092 assert!(notification.confirmed);
5093
5094 let sent = state.sent.lock().await;
5095 assert_eq!(sent.len(), 1);
5096 let mut r = Reader::new(&sent[0].1);
5097 let _npdu = Npdu::decode(&mut r).unwrap();
5098 let ack = SimpleAck::decode(&mut r).unwrap();
5099 assert_eq!(ack.invoke_id, 11);
5100 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
5101 }
5102
5103 #[tokio::test]
5104 async fn write_property_multiple_segments_large_request() {
5105 let (dl, state) = MockDataLink::new();
5106 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5107 let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
5108 let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
5109
5110 {
5111 let mut recv = state.recv.lock().await;
5112 for seq in 0u8..=254 {
5113 let mut apdu = [0u8; 16];
5114 let mut w = Writer::new(&mut apdu);
5115 SegmentAck {
5116 negative_ack: false,
5117 sent_by_server: true,
5118 invoke_id: 1,
5119 sequence_number: seq,
5120 actual_window_size: 1,
5121 }
5122 .encode(&mut w)
5123 .unwrap();
5124 recv.push_back((with_npdu(w.as_written()), addr));
5125 }
5126
5127 let mut apdu = [0u8; 16];
5128 let mut w = Writer::new(&mut apdu);
5129 SimpleAck {
5130 invoke_id: 1,
5131 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5132 }
5133 .encode(&mut w)
5134 .unwrap();
5135 recv.push_back((with_npdu(w.as_written()), addr));
5136 }
5137
5138 let writes: Vec<PropertyWriteSpec> = (0..180)
5139 .map(|_| PropertyWriteSpec {
5140 property_id: PropertyId::Description,
5141 array_index: None,
5142 value: DataValue::CharacterString(
5143 "rustbac segmented write test payload................................................................",
5144 ),
5145 priority: None,
5146 })
5147 .collect();
5148
5149 client
5150 .write_property_multiple(addr, object_id, &writes)
5151 .await
5152 .unwrap();
5153
5154 let sent = state.sent.lock().await;
5155 assert!(sent.len() > 1);
5156
5157 let mut seqs = Vec::new();
5158 let mut saw_more_follows = false;
5159 let mut saw_last = false;
5160 for (_, frame) in sent.iter() {
5161 let mut r = Reader::new(frame);
5162 let _npdu = Npdu::decode(&mut r).unwrap();
5163 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5164 assert!(hdr.segmented);
5165 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5166 if hdr.more_follows {
5167 saw_more_follows = true;
5168 } else {
5169 saw_last = true;
5170 }
5171 seqs.push(hdr.sequence_number.unwrap());
5172 }
5173
5174 assert!(saw_more_follows);
5175 assert!(saw_last);
5176 for (idx, seq) in seqs.iter().enumerate() {
5177 assert_eq!(*seq as usize, idx);
5178 }
5179 }
5180
5181 #[tokio::test]
5182 async fn write_property_multiple_uses_configured_segment_window() {
5183 let (dl, state) = MockDataLink::new();
5184 let client = BacnetClient::with_datalink(dl)
5185 .with_response_timeout(Duration::from_secs(1))
5186 .with_segmented_request_window_size(4);
5187 let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
5188 let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
5189
5190 {
5191 let mut recv = state.recv.lock().await;
5192 for seq in 0u8..=254 {
5193 let mut apdu = [0u8; 16];
5194 let mut w = Writer::new(&mut apdu);
5195 SegmentAck {
5196 negative_ack: false,
5197 sent_by_server: true,
5198 invoke_id: 1,
5199 sequence_number: seq,
5200 actual_window_size: 4,
5201 }
5202 .encode(&mut w)
5203 .unwrap();
5204 recv.push_back((with_npdu(w.as_written()), addr));
5205 }
5206
5207 let mut apdu = [0u8; 16];
5208 let mut w = Writer::new(&mut apdu);
5209 SimpleAck {
5210 invoke_id: 1,
5211 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5212 }
5213 .encode(&mut w)
5214 .unwrap();
5215 recv.push_back((with_npdu(w.as_written()), addr));
5216 }
5217
5218 let writes: Vec<PropertyWriteSpec> = (0..180)
5219 .map(|_| PropertyWriteSpec {
5220 property_id: PropertyId::Description,
5221 array_index: None,
5222 value: DataValue::CharacterString(
5223 "rustbac segmented write test payload................................................................",
5224 ),
5225 priority: None,
5226 })
5227 .collect();
5228
5229 client
5230 .write_property_multiple(addr, object_id, &writes)
5231 .await
5232 .unwrap();
5233
5234 let sent = state.sent.lock().await;
5235 assert!(sent.len() > 4);
5236 for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
5237 let mut r = Reader::new(frame);
5238 let _npdu = Npdu::decode(&mut r).unwrap();
5239 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5240 assert!(hdr.segmented);
5241 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5242 assert_eq!(hdr.sequence_number, Some(idx as u8));
5243 assert_eq!(hdr.proposed_window_size, Some(4));
5244 }
5245 }
5246
5247 #[tokio::test]
5248 async fn write_property_multiple_adapts_window_to_peer_ack_window() {
5249 let (dl, state) = MockDataLink::new();
5250 let client = BacnetClient::with_datalink(dl)
5251 .with_response_timeout(Duration::from_secs(1))
5252 .with_segmented_request_window_size(4);
5253 let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
5254 let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
5255
5256 {
5257 let mut recv = state.recv.lock().await;
5258 for seq in 0u8..=254 {
5259 let mut apdu = [0u8; 16];
5260 let mut w = Writer::new(&mut apdu);
5261 SegmentAck {
5262 negative_ack: false,
5263 sent_by_server: true,
5264 invoke_id: 1,
5265 sequence_number: seq,
5266 actual_window_size: 2,
5267 }
5268 .encode(&mut w)
5269 .unwrap();
5270 recv.push_back((with_npdu(w.as_written()), addr));
5271 }
5272
5273 let mut apdu = [0u8; 16];
5274 let mut w = Writer::new(&mut apdu);
5275 SimpleAck {
5276 invoke_id: 1,
5277 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5278 }
5279 .encode(&mut w)
5280 .unwrap();
5281 recv.push_back((with_npdu(w.as_written()), addr));
5282 }
5283
5284 let writes: Vec<PropertyWriteSpec> = (0..180)
5285 .map(|_| PropertyWriteSpec {
5286 property_id: PropertyId::Description,
5287 array_index: None,
5288 value: DataValue::CharacterString(
5289 "rustbac segmented write test payload................................................................",
5290 ),
5291 priority: None,
5292 })
5293 .collect();
5294
5295 client
5296 .write_property_multiple(addr, object_id, &writes)
5297 .await
5298 .unwrap();
5299
5300 let sent = state.sent.lock().await;
5301 let mut saw_adapted_window = false;
5302 for (_, frame) in sent.iter() {
5303 let mut r = Reader::new(frame);
5304 let _npdu = Npdu::decode(&mut r).unwrap();
5305 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5306 if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
5307 saw_adapted_window = true;
5308 break;
5309 }
5310 }
5311 assert!(saw_adapted_window);
5312 }
5313
5314 #[tokio::test]
5315 async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
5316 let (dl, state) = MockDataLink::new();
5317 let client = BacnetClient::with_datalink(dl)
5318 .with_response_timeout(Duration::from_secs(1))
5319 .with_segmented_request_window_size(1)
5320 .with_segmented_request_retries(1);
5321 let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
5322 let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
5323
5324 {
5325 let mut recv = state.recv.lock().await;
5326
5327 let mut nack_apdu = [0u8; 16];
5328 let mut nack_w = Writer::new(&mut nack_apdu);
5329 SegmentAck {
5330 negative_ack: true,
5331 sent_by_server: true,
5332 invoke_id: 1,
5333 sequence_number: 0,
5334 actual_window_size: 1,
5335 }
5336 .encode(&mut nack_w)
5337 .unwrap();
5338 recv.push_back((with_npdu(nack_w.as_written()), addr));
5339
5340 for seq in 0u8..=254 {
5341 let mut apdu = [0u8; 16];
5342 let mut w = Writer::new(&mut apdu);
5343 SegmentAck {
5344 negative_ack: false,
5345 sent_by_server: true,
5346 invoke_id: 1,
5347 sequence_number: seq,
5348 actual_window_size: 1,
5349 }
5350 .encode(&mut w)
5351 .unwrap();
5352 recv.push_back((with_npdu(w.as_written()), addr));
5353 }
5354
5355 let mut apdu = [0u8; 16];
5356 let mut w = Writer::new(&mut apdu);
5357 SimpleAck {
5358 invoke_id: 1,
5359 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5360 }
5361 .encode(&mut w)
5362 .unwrap();
5363 recv.push_back((with_npdu(w.as_written()), addr));
5364 }
5365
5366 let writes: Vec<PropertyWriteSpec> = (0..180)
5367 .map(|_| PropertyWriteSpec {
5368 property_id: PropertyId::Description,
5369 array_index: None,
5370 value: DataValue::CharacterString(
5371 "rustbac segmented write test payload................................................................",
5372 ),
5373 priority: None,
5374 })
5375 .collect();
5376
5377 client
5378 .write_property_multiple(addr, object_id, &writes)
5379 .await
5380 .unwrap();
5381
5382 let sent = state.sent.lock().await;
5383 let mut seq0_frames = 0usize;
5384 for (_, frame) in sent.iter() {
5385 let mut r = Reader::new(frame);
5386 let _npdu = Npdu::decode(&mut r).unwrap();
5387 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5388 if hdr.sequence_number == Some(0) {
5389 seq0_frames += 1;
5390 }
5391 }
5392 assert!(seq0_frames >= 2);
5393 }
5394
5395 #[tokio::test]
5396 async fn read_property_ignores_invalid_frames_until_valid_response() {
5397 let (dl, state) = MockDataLink::new();
5398 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5399 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
5400 let state_for_task = state.clone();
5401
5402 tokio::spawn(async move {
5403 tokio::time::sleep(Duration::from_millis(20)).await;
5404 let mut apdu = [0u8; 128];
5405 let mut w = Writer::new(&mut apdu);
5406 ComplexAckHeader {
5407 segmented: false,
5408 more_follows: false,
5409 invoke_id: 1,
5410 sequence_number: None,
5411 proposed_window_size: None,
5412 service_choice: SERVICE_READ_PROPERTY,
5413 }
5414 .encode(&mut w)
5415 .unwrap();
5416 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5417 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
5418 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5419 encode_app_real(&mut w, 77.0).unwrap();
5420 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5421 state_for_task
5422 .recv
5423 .lock()
5424 .await
5425 .push_back((with_npdu(w.as_written()), addr));
5426 });
5427
5428 let value = client
5429 .read_property(
5430 addr,
5431 ObjectId::new(ObjectType::Device, 1),
5432 PropertyId::PresentValue,
5433 )
5434 .await
5435 .unwrap();
5436 assert!(matches!(
5437 value,
5438 ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
5439 ));
5440 }
5441
5442 #[tokio::test]
5443 async fn read_property_maps_reject() {
5444 let (dl, state) = MockDataLink::new();
5445 let client = BacnetClient::with_datalink(dl);
5446 let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
5447
5448 let mut apdu = [0u8; 8];
5449 let mut w = Writer::new(&mut apdu);
5450 w.write_u8((ApduType::Reject as u8) << 4).unwrap();
5451 w.write_u8(1).unwrap(); w.write_u8(2).unwrap(); state
5454 .recv
5455 .lock()
5456 .await
5457 .push_back((with_npdu(w.as_written()), addr));
5458
5459 let err = client
5460 .read_property(
5461 addr,
5462 ObjectId::new(ObjectType::Device, 1),
5463 PropertyId::ObjectName,
5464 )
5465 .await
5466 .unwrap_err();
5467 assert!(matches!(
5468 err,
5469 crate::ClientError::RemoteReject { reason: 2 }
5470 ));
5471 }
5472
5473 #[tokio::test]
5474 async fn read_property_maps_remote_error_details() {
5475 let (dl, state) = MockDataLink::new();
5476 let client = BacnetClient::with_datalink(dl);
5477 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5478
5479 let mut apdu = [0u8; 16];
5480 let mut w = Writer::new(&mut apdu);
5481 w.write_u8((ApduType::Error as u8) << 4).unwrap();
5482 w.write_u8(1).unwrap(); w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
5484 .unwrap();
5485 Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
5486 w.write_u8(2).unwrap(); Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
5488 w.write_u8(32).unwrap(); state
5491 .recv
5492 .lock()
5493 .await
5494 .push_back((with_npdu(w.as_written()), addr));
5495
5496 let err = client
5497 .read_property(
5498 addr,
5499 ObjectId::new(ObjectType::Device, 1),
5500 PropertyId::ObjectName,
5501 )
5502 .await
5503 .unwrap_err();
5504 assert!(matches!(
5505 err,
5506 crate::ClientError::RemoteServiceError {
5507 service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
5508 error_class_raw: Some(2),
5509 error_code_raw: Some(32),
5510 error_class: Some(rustbac_core::types::ErrorClass::Property),
5511 error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
5512 }
5513 ));
5514 }
5515
5516 #[tokio::test]
5517 async fn write_property_maps_abort() {
5518 let (dl, state) = MockDataLink::new();
5519 let client = BacnetClient::with_datalink(dl);
5520 let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
5521
5522 let mut apdu = [0u8; 8];
5523 let mut w = Writer::new(&mut apdu);
5524 w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); w.write_u8(1).unwrap(); w.write_u8(9).unwrap(); state
5528 .recv
5529 .lock()
5530 .await
5531 .push_back((with_npdu(w.as_written()), addr));
5532
5533 let req = rustbac_core::services::write_property::WritePropertyRequest {
5534 object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
5535 property_id: PropertyId::PresentValue,
5536 value: DataValue::Real(10.0),
5537 priority: Some(8),
5538 ..Default::default()
5539 };
5540 let err = client.write_property(addr, req).await.unwrap_err();
5541 assert!(matches!(
5542 err,
5543 crate::ClientError::RemoteAbort {
5544 reason: 9,
5545 server: true
5546 }
5547 ));
5548 }
5549
5550 #[tokio::test]
5551 async fn read_property_multiple_returns_owned_string() {
5552 let (dl, state) = MockDataLink::new();
5553 let client = BacnetClient::with_datalink(dl);
5554 let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
5555 let object_id = ObjectId::new(ObjectType::Device, 1);
5556
5557 let mut apdu_buf = [0u8; 256];
5558 let mut w = Writer::new(&mut apdu_buf);
5559 ComplexAckHeader {
5560 segmented: false,
5561 more_follows: false,
5562 invoke_id: 1,
5563 sequence_number: None,
5564 proposed_window_size: None,
5565 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
5566 }
5567 .encode(&mut w)
5568 .unwrap();
5569 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
5570 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
5571 .encode(&mut w)
5572 .unwrap();
5573 encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
5574 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
5575 .encode(&mut w)
5576 .unwrap();
5577 rustbac_core::services::value_codec::encode_application_data_value(
5578 &mut w,
5579 &DataValue::CharacterString("AHU-1"),
5580 )
5581 .unwrap();
5582 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
5583 .encode(&mut w)
5584 .unwrap();
5585 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
5586 .encode(&mut w)
5587 .unwrap();
5588
5589 state
5590 .recv
5591 .lock()
5592 .await
5593 .push_back((with_npdu(w.as_written()), addr));
5594
5595 let values = client
5596 .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
5597 .await
5598 .unwrap();
5599 assert_eq!(values.len(), 1);
5600 assert_eq!(values[0].0, PropertyId::ObjectName);
5601 assert!(matches!(
5602 &values[0].1,
5603 ClientDataValue::CharacterString(s) if s == "AHU-1"
5604 ));
5605 }
5606
5607 #[tokio::test]
5608 async fn new_sc_rejects_invalid_endpoint() {
5609 let err = BacnetClient::new_sc("not a url").await.unwrap_err();
5610 assert!(matches!(err, crate::ClientError::DataLink(_)));
5611 }
5612}