1use crate::{
2 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3 ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4 DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5 EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9 AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10 SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{
13 primitives::{decode_unsigned, encode_ctx_unsigned},
14 reader::Reader,
15 tag::Tag,
16 writer::Writer,
17};
18use rustbac_core::npdu::Npdu;
19use rustbac_core::services::acknowledge_alarm::{
20 AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
21};
22use rustbac_core::services::alarm_summary::{
23 AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
24 SERVICE_GET_ALARM_SUMMARY,
25};
26use rustbac_core::services::atomic_read_file::{
27 AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
28};
29use rustbac_core::services::atomic_write_file::{
30 AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
31};
32use rustbac_core::services::cov_notification::{
33 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
34 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
35};
36use rustbac_core::services::device_management::{
37 DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
38 ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
39};
40use rustbac_core::services::enrollment_summary::{
41 EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
42 GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
43};
44use rustbac_core::services::event_information::{
45 EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
46 SERVICE_GET_EVENT_INFORMATION,
47};
48use rustbac_core::services::event_notification::{
49 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
50 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
51};
52use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
53use rustbac_core::services::list_element::{
54 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
55 SERVICE_REMOVE_LIST_ELEMENT,
56};
57use rustbac_core::services::object_management::{
58 CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
59 SERVICE_DELETE_OBJECT,
60};
61use rustbac_core::services::private_transfer::{
62 ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
63 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
64};
65use rustbac_core::services::read_property::{
66 ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
67};
68use rustbac_core::services::read_property_multiple::{
69 PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
70 ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
71};
72use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
73use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
74use rustbac_core::services::subscribe_cov_property::{
75 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
76};
77use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
78use rustbac_core::services::value_codec::encode_application_data_value;
79use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
80use rustbac_core::services::who_is::WhoIsRequest;
81use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
82use rustbac_core::services::write_property_multiple::{
83 PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
84 SERVICE_WRITE_PROPERTY_MULTIPLE,
85};
86use rustbac_core::types::{
87 DataValue, Date, ErrorClass, ErrorCode, ObjectId, ObjectType, PropertyId, Time,
88};
89use rustbac_core::EncodeError;
90use rustbac_datalink::bip::transport::{
91 BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
92};
93use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
94use std::collections::{HashMap, HashSet};
95use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
96use std::sync::RwLock;
97use std::time::Duration;
98use tokio::sync::Mutex;
99use tokio::task::JoinHandle;
100use tokio::time::{timeout, Instant};
101
102const MIN_SEGMENT_DATA_LEN: usize = 32;
103const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
104
105pub struct BacnetClient<D: DataLink> {
121 datalink: D,
122 invoke_id: Mutex<u8>,
123 request_io_lock: Mutex<()>,
124 response_timeout: Duration,
125 segmented_request_window_size: u8,
126 segmented_request_retries: u8,
127 segment_ack_timeout: Duration,
128 capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
130 server_handler: Option<std::sync::Arc<dyn crate::server::ServiceHandler>>,
132 server_device_id: u32,
134 #[allow(unused)]
136 server_vendor_id: u16,
137}
138
139impl<D: DataLink + std::fmt::Debug> std::fmt::Debug for BacnetClient<D> {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("BacnetClient")
142 .field("datalink", &self.datalink)
143 .field("invoke_id", &self.invoke_id)
144 .field("response_timeout", &self.response_timeout)
145 .field(
146 "segmented_request_window_size",
147 &self.segmented_request_window_size,
148 )
149 .field("segmented_request_retries", &self.segmented_request_retries)
150 .field("segment_ack_timeout", &self.segment_ack_timeout)
151 .field(
152 "server_handler",
153 &self.server_handler.as_ref().map(|_| "..."),
154 )
155 .field("server_device_id", &self.server_device_id)
156 .field("server_vendor_id", &self.server_vendor_id)
157 .finish()
158 }
159}
160
161#[derive(Debug)]
167pub struct ForeignDeviceRenewal {
168 task: JoinHandle<()>,
169}
170
171impl ForeignDeviceRenewal {
172 pub fn stop(self) {
174 self.task.abort();
175 }
176}
177
178impl Drop for ForeignDeviceRenewal {
179 fn drop(&mut self) {
180 self.task.abort();
181 }
182}
183
184impl BacnetClient<BacnetIpTransport> {
185 pub async fn new() -> Result<Self, ClientError> {
189 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
190 let datalink = BacnetIpTransport::bind(bind_addr).await?;
191 Ok(Self {
192 datalink,
193 invoke_id: Mutex::new(1),
194 request_io_lock: Mutex::new(()),
195 response_timeout: Duration::from_secs(3),
196 segmented_request_window_size: 16,
197 segmented_request_retries: 2,
198 segment_ack_timeout: Duration::from_millis(500),
199 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
200 server_handler: None,
201 server_device_id: 0,
202 server_vendor_id: 0,
203 })
204 }
205
206 pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
213 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
214 let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
215 datalink.register_foreign_device(ttl_seconds).await?;
216 Ok(Self {
217 datalink,
218 invoke_id: Mutex::new(1),
219 request_io_lock: Mutex::new(()),
220 response_timeout: Duration::from_secs(3),
221 segmented_request_window_size: 16,
222 segmented_request_retries: 2,
223 segment_ack_timeout: Duration::from_millis(500),
224 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
225 server_handler: None,
226 server_device_id: 0,
227 server_vendor_id: 0,
228 })
229 }
230
231 pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
234 let _io = self.request_io_lock.lock().await;
235 self.datalink.register_foreign_device(ttl_seconds).await?;
236 Ok(())
237 }
238
239 pub async fn read_broadcast_distribution_table(
241 &self,
242 ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
243 let _io = self.request_io_lock.lock().await;
244 self.datalink
245 .read_broadcast_distribution_table()
246 .await
247 .map_err(ClientError::from)
248 }
249
250 pub async fn write_broadcast_distribution_table(
252 &self,
253 entries: &[BroadcastDistributionEntry],
254 ) -> Result<(), ClientError> {
255 let _io = self.request_io_lock.lock().await;
256 self.datalink
257 .write_broadcast_distribution_table(entries)
258 .await?;
259 Ok(())
260 }
261
262 pub async fn read_foreign_device_table(
264 &self,
265 ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
266 let _io = self.request_io_lock.lock().await;
267 self.datalink
268 .read_foreign_device_table()
269 .await
270 .map_err(ClientError::from)
271 }
272
273 pub async fn delete_foreign_device_table_entry(
275 &self,
276 address: SocketAddrV4,
277 ) -> Result<(), ClientError> {
278 let _io = self.request_io_lock.lock().await;
279 self.datalink
280 .delete_foreign_device_table_entry(address)
281 .await?;
282 Ok(())
283 }
284
285 pub fn start_foreign_device_renewal(
291 &self,
292 ttl_seconds: u16,
293 ) -> Result<ForeignDeviceRenewal, ClientError> {
294 if ttl_seconds == 0 {
295 return Err(EncodeError::InvalidLength.into());
296 }
297
298 let datalink = self.datalink.clone();
299 let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
300 let interval = Duration::from_secs(refresh_seconds.max(1));
301 let task = tokio::spawn(async move {
302 loop {
303 tokio::time::sleep(interval).await;
304 if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
305 log::warn!("foreign device renewal send failed: {err}");
306 }
307 }
308 });
309 Ok(ForeignDeviceRenewal { task })
310 }
311}
312
313impl BacnetClient<BacnetScTransport> {
314 pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
317 let datalink = BacnetScTransport::connect(endpoint).await?;
318 Ok(Self::with_datalink(datalink))
319 }
320}
321
322impl<D: DataLink> BacnetClient<D> {
323 pub fn with_datalink(datalink: D) -> Self {
327 Self {
328 datalink,
329 invoke_id: Mutex::new(1),
330 request_io_lock: Mutex::new(()),
331 response_timeout: Duration::from_secs(3),
332 segmented_request_window_size: 16,
333 segmented_request_retries: 2,
334 segment_ack_timeout: Duration::from_millis(500),
335 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
336 server_handler: None,
337 server_device_id: 0,
338 server_vendor_id: 0,
339 }
340 }
341
342 pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
344 self.response_timeout = timeout;
345 self
346 }
347
348 pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
351 self.segmented_request_window_size = window_size.max(1);
352 self
353 }
354
355 pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
358 self.segmented_request_retries = retries;
359 self
360 }
361
362 pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
365 self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
366 self
367 }
368
369 pub fn with_server_handler(
376 mut self,
377 handler: std::sync::Arc<dyn crate::server::ServiceHandler>,
378 device_id: u32,
379 vendor_id: u16,
380 ) -> Self {
381 self.server_handler = Some(handler);
382 self.server_device_id = device_id;
383 self.server_vendor_id = vendor_id;
384 self
385 }
386
387 pub async fn poll_server(&self) -> Result<(), ClientError> {
395 let handler = self.server_handler.as_ref().ok_or(ClientError::Timeout)?;
396 let _io_lock = self.request_io_lock.lock().await;
397 let mut buf = [0u8; 1500];
398 match tokio::time::timeout(Duration::from_millis(50), self.datalink.recv(&mut buf)).await {
399 Ok(Ok((n, src))) => {
400 let _ = dispatch_incoming_request(
401 &self.datalink,
402 handler.as_ref(),
403 self.server_device_id,
404 self.server_vendor_id,
405 &buf[..n],
406 src,
407 )
408 .await;
409 Ok(())
410 }
411 _ => Ok(()),
412 }
413 }
414
415 async fn next_invoke_id(&self) -> u8 {
416 let mut lock = self.invoke_id.lock().await;
417 let id = *lock;
418 *lock = lock.wrapping_add(1);
419 if *lock == 0 {
420 *lock = 1;
421 }
422 id
423 }
424
425 async fn send_segment_ack(
426 &self,
427 address: DataLinkAddress,
428 invoke_id: u8,
429 sequence_number: u8,
430 window_size: u8,
431 ) -> Result<(), ClientError> {
432 let mut tx = [0u8; 64];
433 let mut w = Writer::new(&mut tx);
434 Npdu::new(0).encode(&mut w)?;
435 SegmentAck {
436 negative_ack: false,
437 sent_by_server: false,
438 invoke_id,
439 sequence_number,
440 actual_window_size: window_size,
441 }
442 .encode(&mut w)?;
443 self.datalink.send(address, w.as_written()).await?;
444 Ok(())
445 }
446
447 async fn recv_ignoring_invalid_frame(
448 &self,
449 buf: &mut [u8],
450 deadline: Instant,
451 ) -> Result<(usize, DataLinkAddress), ClientError> {
452 loop {
453 let remaining = deadline.saturating_duration_since(Instant::now());
454 if remaining.is_zero() {
455 return Err(ClientError::Timeout);
456 }
457
458 match timeout(remaining, self.datalink.recv(buf)).await {
459 Err(_) => return Err(ClientError::Timeout),
460 Ok(Err(DataLinkError::InvalidFrame)) => continue,
461 Ok(Err(e)) => return Err(e.into()),
462 Ok(Ok(v)) => return Ok(v),
463 }
464 }
465 }
466
467 async fn send_simple_ack(
468 &self,
469 address: DataLinkAddress,
470 invoke_id: u8,
471 service_choice: u8,
472 ) -> Result<(), ClientError> {
473 let mut tx = [0u8; 64];
474 let mut w = Writer::new(&mut tx);
475 Npdu::new(0).encode(&mut w)?;
476 SimpleAck {
477 invoke_id,
478 service_choice,
479 }
480 .encode(&mut w)?;
481 self.datalink.send(address, w.as_written()).await?;
482 Ok(())
483 }
484
485 fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
486 where
487 F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
488 {
489 for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
490 let mut buf = vec![0u8; size];
491 let mut w = Writer::new(&mut buf);
492 match encode(&mut w) {
493 Ok(()) => {
494 let written_len = w.as_written().len();
495 buf.truncate(written_len);
496 return Ok(buf);
497 }
498 Err(EncodeError::BufferTooSmall) => continue,
499 Err(e) => return Err(e.into()),
500 }
501 }
502 Err(ClientError::SegmentedRequestTooLarge)
503 }
504
505 const fn max_apdu_octets(max_apdu_code: u8) -> usize {
506 match max_apdu_code & 0x0f {
507 0 => 50,
508 1 => 128,
509 2 => 206,
510 3 => 480,
511 4 => 1024,
512 5 => 1476,
513 _ => 480,
514 }
515 }
516
517 async fn await_segment_ack(
518 &self,
519 address: DataLinkAddress,
520 invoke_id: u8,
521 service_choice: u8,
522 expected_sequence: u8,
523 deadline: Instant,
524 ) -> Result<SegmentAck, ClientError> {
525 loop {
526 let remaining = deadline.saturating_duration_since(Instant::now());
527 if remaining.is_zero() {
528 return Err(ClientError::Timeout);
529 }
530
531 let mut rx = [0u8; 1500];
532 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
533 let (n, src) = match recv {
534 Err(_) => return Err(ClientError::Timeout),
535 Ok(Err(DataLinkError::InvalidFrame)) => continue,
536 Ok(Err(e)) => return Err(e.into()),
537 Ok(Ok(v)) => v,
538 };
539 if src != address {
540 continue;
541 }
542
543 let Ok(apdu) = extract_apdu(&rx[..n]) else {
544 continue;
545 };
546 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
547 match ApduType::from_u8(first >> 4) {
548 Some(ApduType::SegmentAck) => {
549 let mut r = Reader::new(apdu);
550 let ack = SegmentAck::decode(&mut r)?;
551 if ack.invoke_id != invoke_id || !ack.sent_by_server {
552 continue;
553 }
554 if ack.negative_ack {
555 return Err(ClientError::SegmentNegativeAck {
556 sequence_number: ack.sequence_number,
557 });
558 }
559 if ack.sequence_number == expected_sequence {
560 return Ok(ack);
561 }
562 }
563 Some(ApduType::Error) => {
564 let mut r = Reader::new(apdu);
565 let err = BacnetError::decode(&mut r)?;
566 if err.invoke_id == invoke_id && err.service_choice == service_choice {
567 return Err(remote_service_error(err));
568 }
569 }
570 Some(ApduType::Reject) => {
571 let mut r = Reader::new(apdu);
572 let rej = RejectPdu::decode(&mut r)?;
573 if rej.invoke_id == invoke_id {
574 return Err(ClientError::RemoteReject { reason: rej.reason });
575 }
576 }
577 Some(ApduType::Abort) => {
578 let mut r = Reader::new(apdu);
579 let abort = AbortPdu::decode(&mut r)?;
580 if abort.invoke_id == invoke_id {
581 return Err(ClientError::RemoteAbort {
582 reason: abort.reason,
583 server: abort.server,
584 });
585 }
586 }
587 _ => continue,
588 }
589 }
590 }
591
592 async fn send_confirmed_request(
593 &self,
594 address: DataLinkAddress,
595 frame: &[u8],
596 deadline: Instant,
597 ) -> Result<(), ClientError> {
598 let mut pr = Reader::new(frame);
599 let _npdu = Npdu::decode(&mut pr)?;
600 let npdu_len = frame.len() - pr.remaining();
601 let npdu_bytes = &frame[..npdu_len];
602 let apdu = &frame[npdu_len..];
603
604 let mut ar = Reader::new(apdu);
605 let header = ConfirmedRequestHeader::decode(&mut ar)?;
606 let service_payload = ar.read_exact(ar.remaining())?;
607
608 let peer_max_apdu = self
611 .capability_cache
612 .read()
613 .ok()
614 .and_then(|c| c.get(&address).copied())
615 .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
616 let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
617 let segment_count = service_payload.len().div_ceil(segment_data_len);
618
619 if segment_count <= 1 {
620 self.datalink.send(address, frame).await?;
621 return Ok(());
622 }
623
624 if segment_count > usize::from(u8::MAX) + 1 {
625 return Err(ClientError::SegmentedRequestTooLarge);
626 }
627
628 let configured_window_size = self.segmented_request_window_size.max(1);
629 let mut window_size = configured_window_size;
630 let mut peer_window_ceiling = configured_window_size;
631 let mut batch_start = 0usize;
632 while batch_start < segment_count {
633 let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
634 let expected_sequence = (batch_end - 1) as u8;
635
636 let mut frames = Vec::with_capacity(batch_end - batch_start);
637 for segment_index in batch_start..batch_end {
638 let seq = segment_index as u8;
639 let more_follows = segment_index + 1 < segment_count;
640 let start = segment_index * segment_data_len;
641 let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
642 let segment = &service_payload[start..end];
643
644 let seg_header = ConfirmedRequestHeader {
645 segmented: true,
646 more_follows,
647 segmented_response_accepted: header.segmented_response_accepted,
648 max_segments: header.max_segments,
649 max_apdu: header.max_apdu,
650 invoke_id: header.invoke_id,
651 sequence_number: Some(seq),
652 proposed_window_size: Some(window_size),
653 service_choice: header.service_choice,
654 };
655
656 let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
657 let written_len = {
658 let mut w = Writer::new(&mut tx);
659 w.write_all(npdu_bytes)?;
660 seg_header.encode(&mut w)?;
661 w.write_all(segment)?;
662 w.as_written().len()
663 };
664 tx.truncate(written_len);
665 frames.push(tx);
666 }
667
668 let mut retries_remaining = self.segmented_request_retries;
669 loop {
670 for frame in &frames {
671 self.datalink.send(address, frame).await?;
672 }
673
674 if batch_end == segment_count {
675 break;
676 }
677
678 let remaining = deadline.saturating_duration_since(Instant::now());
679 if remaining.is_zero() {
680 return Err(ClientError::Timeout);
681 }
682 let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
683 match self
684 .await_segment_ack(
685 address,
686 header.invoke_id,
687 header.service_choice,
688 expected_sequence,
689 ack_wait_deadline,
690 )
691 .await
692 {
693 Ok(ack) => {
694 peer_window_ceiling =
695 peer_window_ceiling.min(ack.actual_window_size.max(1));
696 window_size = window_size
697 .saturating_add(1)
698 .min(configured_window_size)
699 .min(peer_window_ceiling)
700 .max(1);
701 break;
702 }
703 Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
704 if retries_remaining > 0 =>
705 {
706 retries_remaining -= 1;
707 window_size = window_size.saturating_div(2).max(1);
708 continue;
709 }
710 Err(e) => return Err(e),
711 }
712 }
713
714 batch_start = batch_end;
715 }
716
717 Ok(())
718 }
719
720 async fn collect_complex_ack_payload(
721 &self,
722 address: DataLinkAddress,
723 invoke_id: u8,
724 service_choice: u8,
725 first_header: ComplexAckHeader,
726 first_payload: &[u8],
727 deadline: Instant,
728 ) -> Result<Vec<u8>, ClientError> {
729 let mut payload = first_payload.to_vec();
730 if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
731 return Err(ClientError::ResponseTooLarge {
732 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
733 });
734 }
735 if !first_header.segmented {
736 return Ok(payload);
737 }
738
739 let mut last_seq = first_header
740 .sequence_number
741 .ok_or(ClientError::UnsupportedResponse)?;
742 let mut window_size = first_header.proposed_window_size.unwrap_or(1);
743 self.send_segment_ack(address, invoke_id, last_seq, window_size)
744 .await?;
745 let mut more_follows = first_header.more_follows;
746
747 while more_follows {
748 let mut rx = [0u8; 1500];
749 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
750 if src != address {
751 if let Some(ref handler) = self.server_handler {
753 let _ = dispatch_incoming_request(
754 &self.datalink,
755 handler.as_ref(),
756 self.server_device_id,
757 self.server_vendor_id,
758 &rx[..n],
759 src,
760 )
761 .await;
762 }
763 continue;
764 }
765
766 let Ok(apdu) = extract_apdu(&rx[..n]) else {
767 continue;
768 };
769 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
770 match ApduType::from_u8(first >> 4) {
771 Some(ApduType::ComplexAck) => {
772 let mut r = Reader::new(apdu);
773 let seg = ComplexAckHeader::decode(&mut r)?;
774 if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
775 continue;
776 }
777 if !seg.segmented {
778 return Err(ClientError::UnsupportedResponse);
779 }
780 let seq = seg
781 .sequence_number
782 .ok_or(ClientError::UnsupportedResponse)?;
783 if seq == last_seq {
784 self.send_segment_ack(address, invoke_id, last_seq, window_size)
786 .await?;
787 continue;
788 }
789 if seq != last_seq.wrapping_add(1) {
790 continue;
791 }
792
793 let seg_payload = r.read_exact(r.remaining())?;
794 if payload.len().saturating_add(seg_payload.len())
795 > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
796 {
797 return Err(ClientError::ResponseTooLarge {
798 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
799 });
800 }
801 payload.extend_from_slice(seg_payload);
802
803 last_seq = seq;
804 more_follows = seg.more_follows;
805 window_size = seg.proposed_window_size.unwrap_or(window_size);
806 self.send_segment_ack(address, invoke_id, last_seq, window_size)
807 .await?;
808 }
809 Some(ApduType::Error) => {
810 let mut r = Reader::new(apdu);
811 let err = BacnetError::decode(&mut r)?;
812 if err.invoke_id == invoke_id && err.service_choice == service_choice {
813 return Err(remote_service_error(err));
814 }
815 }
816 Some(ApduType::Reject) => {
817 let mut r = Reader::new(apdu);
818 let rej = RejectPdu::decode(&mut r)?;
819 if rej.invoke_id == invoke_id {
820 return Err(ClientError::RemoteReject { reason: rej.reason });
821 }
822 }
823 Some(ApduType::Abort) => {
824 let mut r = Reader::new(apdu);
825 let abort = AbortPdu::decode(&mut r)?;
826 if abort.invoke_id == invoke_id {
827 return Err(ClientError::RemoteAbort {
828 reason: abort.reason,
829 server: abort.server,
830 });
831 }
832 }
833 _ => {
834 if let Some(ref handler) = self.server_handler {
836 let _ = dispatch_incoming_request(
837 &self.datalink,
838 handler.as_ref(),
839 self.server_device_id,
840 self.server_vendor_id,
841 &rx[..n],
842 src,
843 )
844 .await;
845 }
846 continue;
847 }
848 }
849 }
850
851 Ok(payload)
852 }
853
854 pub async fn who_is(
860 &self,
861 range: Option<(u32, u32)>,
862 wait: Duration,
863 ) -> Result<Vec<DiscoveredDevice>, ClientError> {
864 let req = match range {
867 Some((low, high)) => WhoIsRequest {
868 low_limit: Some(low),
869 high_limit: Some(high),
870 },
871 None => WhoIsRequest::global(),
872 };
873
874 let mut tx = [0u8; 128];
875 let mut w = Writer::new(&mut tx);
876 Npdu::new(0).encode(&mut w)?;
877 req.encode(&mut w)?;
878
879 self.datalink
880 .send(
881 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
882 w.as_written(),
883 )
884 .await?;
885
886 let mut devices = Vec::new();
887 let mut seen = HashSet::new();
888 let deadline = tokio::time::Instant::now() + wait;
889
890 while tokio::time::Instant::now() < deadline {
891 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
892 let mut rx = [0u8; 1500];
893 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
894 match recv {
895 Ok(Ok((n, src))) => {
896 let Ok(apdu) = extract_apdu(&rx[..n]) else {
897 continue;
898 };
899 let mut r = Reader::new(apdu);
900 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
901 continue;
902 };
903 if unconfirmed.service_choice != SERVICE_I_AM {
904 continue;
905 }
906 let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
907 continue;
908 };
909 if seen.insert(i_am.device_id) {
910 devices.push(DiscoveredDevice {
911 address: src,
912 device_id: Some(i_am.device_id),
913 });
914 if let Ok(mut cache) = self.capability_cache.write() {
917 cache.insert(src, i_am.max_apdu as usize);
918 }
919 }
920 }
921 Ok(Err(DataLinkError::InvalidFrame)) => continue,
922 Ok(Err(e)) => return Err(e.into()),
923 Err(_) => break,
924 }
925 }
926
927 Ok(devices)
928 }
929
930 pub async fn who_has_object_id(
935 &self,
936 range: Option<(u32, u32)>,
937 object_id: ObjectId,
938 wait: Duration,
939 ) -> Result<Vec<DiscoveredObject>, ClientError> {
940 let req = WhoHasRequest {
941 low_limit: range.map(|(low, _)| low),
942 high_limit: range.map(|(_, high)| high),
943 object: WhoHasObject::ObjectId(object_id),
944 };
945 self.who_has(req, wait).await
946 }
947
948 pub async fn who_has_object_name(
952 &self,
953 range: Option<(u32, u32)>,
954 object_name: &str,
955 wait: Duration,
956 ) -> Result<Vec<DiscoveredObject>, ClientError> {
957 let req = WhoHasRequest {
958 low_limit: range.map(|(low, _)| low),
959 high_limit: range.map(|(_, high)| high),
960 object: WhoHasObject::ObjectName(object_name),
961 };
962 self.who_has(req, wait).await
963 }
964
965 async fn who_has(
966 &self,
967 request: WhoHasRequest<'_>,
968 wait: Duration,
969 ) -> Result<Vec<DiscoveredObject>, ClientError> {
970 let tx = self.encode_with_growth(|w| {
972 Npdu::new(0).encode(w)?;
973 request.encode(w)
974 })?;
975 self.datalink
976 .send(
977 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
978 &tx,
979 )
980 .await?;
981
982 let mut objects = Vec::new();
983 let mut seen = HashSet::new();
984 let deadline = tokio::time::Instant::now() + wait;
985
986 while tokio::time::Instant::now() < deadline {
987 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
988 let mut rx = [0u8; 1500];
989 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
990 match recv {
991 Ok(Ok((n, src))) => {
992 let Ok(apdu) = extract_apdu(&rx[..n]) else {
993 continue;
994 };
995 let mut r = Reader::new(apdu);
996 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
997 continue;
998 };
999 if unconfirmed.service_choice != SERVICE_I_HAVE {
1000 continue;
1001 }
1002 let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
1003 continue;
1004 };
1005 if !seen.insert((src, i_have.object_id.raw())) {
1006 continue;
1007 }
1008 objects.push(DiscoveredObject {
1009 address: src,
1010 device_id: i_have.device_id,
1011 object_id: i_have.object_id,
1012 object_name: i_have.object_name.to_string(),
1013 });
1014 }
1015 Ok(Err(DataLinkError::InvalidFrame)) => continue,
1016 Ok(Err(e)) => return Err(e.into()),
1017 Err(_) => break,
1018 }
1019 }
1020
1021 Ok(objects)
1022 }
1023
1024 pub async fn device_communication_control(
1030 &self,
1031 address: DataLinkAddress,
1032 time_duration_seconds: Option<u16>,
1033 enable_disable: DeviceCommunicationState,
1034 password: Option<&str>,
1035 ) -> Result<(), ClientError> {
1036 let invoke_id = self.next_invoke_id().await;
1037 let request = DeviceCommunicationControlRequest {
1038 time_duration_seconds,
1039 enable_disable,
1040 password,
1041 invoke_id,
1042 };
1043 let tx = self.encode_with_growth(|w| {
1044 Npdu::new(0).encode(w)?;
1045 request.encode(w)
1046 })?;
1047 self.await_simple_ack_or_error(
1048 address,
1049 &tx,
1050 invoke_id,
1051 SERVICE_DEVICE_COMMUNICATION_CONTROL,
1052 self.response_timeout,
1053 )
1054 .await
1055 }
1056
1057 pub async fn reinitialize_device(
1061 &self,
1062 address: DataLinkAddress,
1063 state: ReinitializeState,
1064 password: Option<&str>,
1065 ) -> Result<(), ClientError> {
1066 let invoke_id = self.next_invoke_id().await;
1067 let request = ReinitializeDeviceRequest {
1068 state,
1069 password,
1070 invoke_id,
1071 };
1072 let tx = self.encode_with_growth(|w| {
1073 Npdu::new(0).encode(w)?;
1074 request.encode(w)
1075 })?;
1076 self.await_simple_ack_or_error(
1077 address,
1078 &tx,
1079 invoke_id,
1080 SERVICE_REINITIALIZE_DEVICE,
1081 self.response_timeout,
1082 )
1083 .await
1084 }
1085
1086 pub async fn time_synchronize(
1091 &self,
1092 address: DataLinkAddress,
1093 date: Date,
1094 time: Time,
1095 utc: bool,
1096 ) -> Result<(), ClientError> {
1097 let request = if utc {
1098 TimeSynchronizationRequest::utc(date, time)
1099 } else {
1100 TimeSynchronizationRequest::local(date, time)
1101 };
1102 let tx = self.encode_with_growth(|w| {
1103 Npdu::new(0).encode(w)?;
1104 request.encode(w)
1105 })?;
1106 self.datalink.send(address, &tx).await?;
1107 Ok(())
1108 }
1109
1110 pub async fn create_object_by_type(
1113 &self,
1114 address: DataLinkAddress,
1115 object_type: rustbac_core::types::ObjectType,
1116 ) -> Result<ObjectId, ClientError> {
1117 self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1118 .await
1119 }
1120
1121 pub async fn create_object(
1125 &self,
1126 address: DataLinkAddress,
1127 mut request: CreateObjectRequest,
1128 ) -> Result<ObjectId, ClientError> {
1129 request.invoke_id = self.next_invoke_id().await;
1130 let invoke_id = request.invoke_id;
1131 let tx = self.encode_with_growth(|w| {
1132 Npdu::new(0).encode(w)?;
1133 request.encode(w)
1134 })?;
1135 let payload = self
1136 .await_complex_ack_payload_or_error(
1137 address,
1138 &tx,
1139 invoke_id,
1140 SERVICE_CREATE_OBJECT,
1141 self.response_timeout,
1142 )
1143 .await?;
1144 let mut pr = Reader::new(&payload);
1145 let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1146 Ok(parsed.object_id)
1147 }
1148
1149 pub async fn delete_object(
1151 &self,
1152 address: DataLinkAddress,
1153 object_id: ObjectId,
1154 ) -> Result<(), ClientError> {
1155 let invoke_id = self.next_invoke_id().await;
1156 let request = DeleteObjectRequest {
1157 object_id,
1158 invoke_id,
1159 };
1160 let tx = self.encode_with_growth(|w| {
1161 Npdu::new(0).encode(w)?;
1162 request.encode(w)
1163 })?;
1164 self.await_simple_ack_or_error(
1165 address,
1166 &tx,
1167 invoke_id,
1168 SERVICE_DELETE_OBJECT,
1169 self.response_timeout,
1170 )
1171 .await
1172 }
1173
1174 pub async fn add_list_element(
1176 &self,
1177 address: DataLinkAddress,
1178 mut request: AddListElementRequest<'_>,
1179 ) -> Result<(), ClientError> {
1180 request.invoke_id = self.next_invoke_id().await;
1181 let invoke_id = request.invoke_id;
1182 let tx = self.encode_with_growth(|w| {
1183 Npdu::new(0).encode(w)?;
1184 request.encode(w)
1185 })?;
1186 self.await_simple_ack_or_error(
1187 address,
1188 &tx,
1189 invoke_id,
1190 SERVICE_ADD_LIST_ELEMENT,
1191 self.response_timeout,
1192 )
1193 .await
1194 }
1195
1196 pub async fn remove_list_element(
1198 &self,
1199 address: DataLinkAddress,
1200 mut request: RemoveListElementRequest<'_>,
1201 ) -> Result<(), ClientError> {
1202 request.invoke_id = self.next_invoke_id().await;
1203 let invoke_id = request.invoke_id;
1204 let tx = self.encode_with_growth(|w| {
1205 Npdu::new(0).encode(w)?;
1206 request.encode(w)
1207 })?;
1208 self.await_simple_ack_or_error(
1209 address,
1210 &tx,
1211 invoke_id,
1212 SERVICE_REMOVE_LIST_ELEMENT,
1213 self.response_timeout,
1214 )
1215 .await
1216 }
1217
1218 async fn await_simple_ack_or_error(
1219 &self,
1220 address: DataLinkAddress,
1221 tx: &[u8],
1222 invoke_id: u8,
1223 service_choice: u8,
1224 timeout_window: Duration,
1225 ) -> Result<(), ClientError> {
1226 #[cfg(feature = "tracing")]
1227 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1228 let _io_lock = self.request_io_lock.lock().await;
1229 let deadline = tokio::time::Instant::now() + timeout_window;
1230 self.send_confirmed_request(address, tx, deadline).await?;
1231 while tokio::time::Instant::now() < deadline {
1232 let mut rx = [0u8; 1500];
1233 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1234 if src != address {
1235 if let Some(ref handler) = self.server_handler {
1237 let _ = dispatch_incoming_request(
1238 &self.datalink,
1239 handler.as_ref(),
1240 self.server_device_id,
1241 self.server_vendor_id,
1242 &rx[..n],
1243 src,
1244 )
1245 .await;
1246 }
1247 continue;
1248 }
1249 let apdu = extract_apdu(&rx[..n])?;
1250 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1251 match ApduType::from_u8(first >> 4) {
1252 Some(ApduType::SimpleAck) => {
1253 let mut r = Reader::new(apdu);
1254 let ack = SimpleAck::decode(&mut r)?;
1255 if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1256 return Ok(());
1257 }
1258 }
1259 Some(ApduType::Error) => {
1260 let mut r = Reader::new(apdu);
1261 let err = BacnetError::decode(&mut r)?;
1262 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1263 return Err(remote_service_error(err));
1264 }
1265 }
1266 Some(ApduType::Reject) => {
1267 let mut r = Reader::new(apdu);
1268 let rej = RejectPdu::decode(&mut r)?;
1269 if rej.invoke_id == invoke_id {
1270 return Err(ClientError::RemoteReject { reason: rej.reason });
1271 }
1272 }
1273 Some(ApduType::Abort) => {
1274 let mut r = Reader::new(apdu);
1275 let abort = AbortPdu::decode(&mut r)?;
1276 if abort.invoke_id == invoke_id {
1277 return Err(ClientError::RemoteAbort {
1278 reason: abort.reason,
1279 server: abort.server,
1280 });
1281 }
1282 }
1283 _ => {
1284 if let Some(ref handler) = self.server_handler {
1286 let _ = dispatch_incoming_request(
1287 &self.datalink,
1288 handler.as_ref(),
1289 self.server_device_id,
1290 self.server_vendor_id,
1291 &rx[..n],
1292 src,
1293 )
1294 .await;
1295 }
1296 continue;
1297 }
1298 }
1299 }
1300 #[cfg(feature = "tracing")]
1301 tracing::warn!(invoke_id = invoke_id, "request timed out");
1302 Err(ClientError::Timeout)
1303 }
1304
1305 async fn await_complex_ack_payload_or_error(
1306 &self,
1307 address: DataLinkAddress,
1308 tx: &[u8],
1309 invoke_id: u8,
1310 service_choice: u8,
1311 timeout_window: Duration,
1312 ) -> Result<Vec<u8>, ClientError> {
1313 #[cfg(feature = "tracing")]
1314 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1315 let _io_lock = self.request_io_lock.lock().await;
1316 let deadline = tokio::time::Instant::now() + timeout_window;
1317 self.send_confirmed_request(address, tx, deadline).await?;
1318 while tokio::time::Instant::now() < deadline {
1319 let mut rx = [0u8; 1500];
1320 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1321 if src != address {
1322 if let Some(ref handler) = self.server_handler {
1324 let _ = dispatch_incoming_request(
1325 &self.datalink,
1326 handler.as_ref(),
1327 self.server_device_id,
1328 self.server_vendor_id,
1329 &rx[..n],
1330 src,
1331 )
1332 .await;
1333 }
1334 continue;
1335 }
1336
1337 let apdu = extract_apdu(&rx[..n])?;
1338 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1339 match ApduType::from_u8(first >> 4) {
1340 Some(ApduType::ComplexAck) => {
1341 let mut r = Reader::new(apdu);
1342 let ack = ComplexAckHeader::decode(&mut r)?;
1343 if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1344 continue;
1345 }
1346 return self
1347 .collect_complex_ack_payload(
1348 address,
1349 invoke_id,
1350 service_choice,
1351 ack,
1352 r.read_exact(r.remaining())?,
1353 deadline,
1354 )
1355 .await;
1356 }
1357 Some(ApduType::Error) => {
1358 let mut r = Reader::new(apdu);
1359 let err = BacnetError::decode(&mut r)?;
1360 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1361 return Err(remote_service_error(err));
1362 }
1363 }
1364 Some(ApduType::Reject) => {
1365 let mut r = Reader::new(apdu);
1366 let rej = RejectPdu::decode(&mut r)?;
1367 if rej.invoke_id == invoke_id {
1368 return Err(ClientError::RemoteReject { reason: rej.reason });
1369 }
1370 }
1371 Some(ApduType::Abort) => {
1372 let mut r = Reader::new(apdu);
1373 let abort = AbortPdu::decode(&mut r)?;
1374 if abort.invoke_id == invoke_id {
1375 return Err(ClientError::RemoteAbort {
1376 reason: abort.reason,
1377 server: abort.server,
1378 });
1379 }
1380 }
1381 _ => {
1382 if let Some(ref handler) = self.server_handler {
1384 let _ = dispatch_incoming_request(
1385 &self.datalink,
1386 handler.as_ref(),
1387 self.server_device_id,
1388 self.server_vendor_id,
1389 &rx[..n],
1390 src,
1391 )
1392 .await;
1393 }
1394 continue;
1395 }
1396 }
1397 }
1398 #[cfg(feature = "tracing")]
1399 tracing::warn!(invoke_id = invoke_id, "request timed out");
1400 Err(ClientError::Timeout)
1401 }
1402
1403 pub async fn get_alarm_summary(
1405 &self,
1406 address: DataLinkAddress,
1407 ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1408 let invoke_id = self.next_invoke_id().await;
1409 let request = GetAlarmSummaryRequest { invoke_id };
1410 let tx = self.encode_with_growth(|w| {
1411 Npdu::new(0).encode(w)?;
1412 request.encode(w)
1413 })?;
1414 let payload = self
1415 .await_complex_ack_payload_or_error(
1416 address,
1417 &tx,
1418 invoke_id,
1419 SERVICE_GET_ALARM_SUMMARY,
1420 self.response_timeout,
1421 )
1422 .await?;
1423 let mut pr = Reader::new(&payload);
1424 let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1425 Ok(into_client_alarm_summary(parsed.summaries))
1426 }
1427
1428 pub async fn get_enrollment_summary(
1430 &self,
1431 address: DataLinkAddress,
1432 ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1433 let invoke_id = self.next_invoke_id().await;
1434 let request = GetEnrollmentSummaryRequest { invoke_id };
1435 let tx = self.encode_with_growth(|w| {
1436 Npdu::new(0).encode(w)?;
1437 request.encode(w)
1438 })?;
1439 let payload = self
1440 .await_complex_ack_payload_or_error(
1441 address,
1442 &tx,
1443 invoke_id,
1444 SERVICE_GET_ENROLLMENT_SUMMARY,
1445 self.response_timeout,
1446 )
1447 .await?;
1448 let mut pr = Reader::new(&payload);
1449 let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1450 Ok(into_client_enrollment_summary(parsed.summaries))
1451 }
1452
1453 pub async fn get_event_information(
1458 &self,
1459 address: DataLinkAddress,
1460 last_received_object_id: Option<ObjectId>,
1461 ) -> Result<EventInformationResult, ClientError> {
1462 let invoke_id = self.next_invoke_id().await;
1463 let request = GetEventInformationRequest {
1464 last_received_object_id,
1465 invoke_id,
1466 };
1467 let tx = self.encode_with_growth(|w| {
1468 Npdu::new(0).encode(w)?;
1469 request.encode(w)
1470 })?;
1471 let payload = self
1472 .await_complex_ack_payload_or_error(
1473 address,
1474 &tx,
1475 invoke_id,
1476 SERVICE_GET_EVENT_INFORMATION,
1477 self.response_timeout,
1478 )
1479 .await?;
1480 let mut pr = Reader::new(&payload);
1481 let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1482 Ok(EventInformationResult {
1483 summaries: into_client_event_information(parsed.summaries),
1484 more_events: parsed.more_events,
1485 })
1486 }
1487
1488 pub async fn acknowledge_alarm(
1490 &self,
1491 address: DataLinkAddress,
1492 mut request: AcknowledgeAlarmRequest<'_>,
1493 ) -> Result<(), ClientError> {
1494 request.invoke_id = self.next_invoke_id().await;
1495 let invoke_id = request.invoke_id;
1496 let tx = self.encode_with_growth(|w| {
1497 Npdu::new(0).encode(w)?;
1498 request.encode(w)
1499 })?;
1500 self.await_simple_ack_or_error(
1501 address,
1502 &tx,
1503 invoke_id,
1504 SERVICE_ACKNOWLEDGE_ALARM,
1505 self.response_timeout,
1506 )
1507 .await
1508 }
1509
1510 pub async fn atomic_read_file_stream(
1515 &self,
1516 address: DataLinkAddress,
1517 file_object_id: ObjectId,
1518 file_start_position: i32,
1519 requested_octet_count: u32,
1520 ) -> Result<AtomicReadFileResult, ClientError> {
1521 let invoke_id = self.next_invoke_id().await;
1522 let request = AtomicReadFileRequest::stream(
1523 file_object_id,
1524 file_start_position,
1525 requested_octet_count,
1526 invoke_id,
1527 );
1528 self.atomic_read_file(address, request).await
1529 }
1530
1531 pub async fn atomic_read_file_record(
1536 &self,
1537 address: DataLinkAddress,
1538 file_object_id: ObjectId,
1539 file_start_record: i32,
1540 requested_record_count: u32,
1541 ) -> Result<AtomicReadFileResult, ClientError> {
1542 let invoke_id = self.next_invoke_id().await;
1543 let request = AtomicReadFileRequest::record(
1544 file_object_id,
1545 file_start_record,
1546 requested_record_count,
1547 invoke_id,
1548 );
1549 self.atomic_read_file(address, request).await
1550 }
1551
1552 async fn atomic_read_file(
1553 &self,
1554 address: DataLinkAddress,
1555 request: AtomicReadFileRequest,
1556 ) -> Result<AtomicReadFileResult, ClientError> {
1557 let invoke_id = request.invoke_id;
1558 let tx = self.encode_with_growth(|w| {
1559 Npdu::new(0).encode(w)?;
1560 request.encode(w)
1561 })?;
1562 let payload = self
1563 .await_complex_ack_payload_or_error(
1564 address,
1565 &tx,
1566 invoke_id,
1567 SERVICE_ATOMIC_READ_FILE,
1568 self.response_timeout,
1569 )
1570 .await?;
1571 let mut pr = Reader::new(&payload);
1572 let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1573 Ok(into_client_atomic_read_result(parsed))
1574 }
1575
1576 pub async fn atomic_write_file_stream(
1579 &self,
1580 address: DataLinkAddress,
1581 file_object_id: ObjectId,
1582 file_start_position: i32,
1583 file_data: &[u8],
1584 ) -> Result<AtomicWriteFileResult, ClientError> {
1585 let invoke_id = self.next_invoke_id().await;
1586 let request = AtomicWriteFileRequest::stream(
1587 file_object_id,
1588 file_start_position,
1589 file_data,
1590 invoke_id,
1591 );
1592 self.atomic_write_file(address, request).await
1593 }
1594
1595 pub async fn atomic_write_file_record(
1599 &self,
1600 address: DataLinkAddress,
1601 file_object_id: ObjectId,
1602 file_start_record: i32,
1603 file_record_data: &[&[u8]],
1604 ) -> Result<AtomicWriteFileResult, ClientError> {
1605 let invoke_id = self.next_invoke_id().await;
1606 let request = AtomicWriteFileRequest::record(
1607 file_object_id,
1608 file_start_record,
1609 file_record_data,
1610 invoke_id,
1611 );
1612 self.atomic_write_file(address, request).await
1613 }
1614
1615 async fn atomic_write_file(
1616 &self,
1617 address: DataLinkAddress,
1618 request: AtomicWriteFileRequest<'_>,
1619 ) -> Result<AtomicWriteFileResult, ClientError> {
1620 let invoke_id = request.invoke_id;
1621 let tx = self.encode_with_growth(|w| {
1622 Npdu::new(0).encode(w)?;
1623 request.encode(w)
1624 })?;
1625 let payload = self
1626 .await_complex_ack_payload_or_error(
1627 address,
1628 &tx,
1629 invoke_id,
1630 SERVICE_ATOMIC_WRITE_FILE,
1631 self.response_timeout,
1632 )
1633 .await?;
1634 let mut pr = Reader::new(&payload);
1635 let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1636 Ok(into_client_atomic_write_result(parsed))
1637 }
1638
1639 pub async fn subscribe_cov(
1643 &self,
1644 address: DataLinkAddress,
1645 mut request: SubscribeCovRequest,
1646 ) -> Result<(), ClientError> {
1647 request.invoke_id = self.next_invoke_id().await;
1648 let invoke_id = request.invoke_id;
1649 let tx = self.encode_with_growth(|w| {
1650 Npdu::new(0).encode(w)?;
1651 request.encode(w)
1652 })?;
1653 self.await_simple_ack_or_error(
1654 address,
1655 &tx,
1656 invoke_id,
1657 SERVICE_SUBSCRIBE_COV,
1658 self.response_timeout,
1659 )
1660 .await
1661 }
1662
1663 pub async fn cancel_cov_subscription(
1666 &self,
1667 address: DataLinkAddress,
1668 subscriber_process_id: u32,
1669 monitored_object_id: ObjectId,
1670 ) -> Result<(), ClientError> {
1671 self.subscribe_cov(
1672 address,
1673 SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1674 )
1675 .await
1676 }
1677
1678 pub async fn subscribe_cov_property(
1683 &self,
1684 address: DataLinkAddress,
1685 mut request: SubscribeCovPropertyRequest,
1686 ) -> Result<(), ClientError> {
1687 request.invoke_id = self.next_invoke_id().await;
1688 let invoke_id = request.invoke_id;
1689 let tx = self.encode_with_growth(|w| {
1690 Npdu::new(0).encode(w)?;
1691 request.encode(w)
1692 })?;
1693 self.await_simple_ack_or_error(
1694 address,
1695 &tx,
1696 invoke_id,
1697 SERVICE_SUBSCRIBE_COV_PROPERTY,
1698 self.response_timeout,
1699 )
1700 .await
1701 }
1702
1703 pub async fn cancel_cov_property_subscription(
1709 &self,
1710 address: DataLinkAddress,
1711 subscriber_process_id: u32,
1712 monitored_object_id: ObjectId,
1713 monitored_property_id: PropertyId,
1714 monitored_property_array_index: Option<u32>,
1715 ) -> Result<(), ClientError> {
1716 self.subscribe_cov_property(
1717 address,
1718 SubscribeCovPropertyRequest::cancel(
1719 subscriber_process_id,
1720 monitored_object_id,
1721 monitored_property_id,
1722 monitored_property_array_index,
1723 0,
1724 ),
1725 )
1726 .await
1727 }
1728
1729 pub async fn read_range_by_position(
1734 &self,
1735 address: DataLinkAddress,
1736 object_id: ObjectId,
1737 property_id: PropertyId,
1738 array_index: Option<u32>,
1739 reference_index: i32,
1740 count: i16,
1741 ) -> Result<ReadRangeResult, ClientError> {
1742 let invoke_id = self.next_invoke_id().await;
1743 let req = ReadRangeRequest::by_position(
1744 object_id,
1745 property_id,
1746 array_index,
1747 reference_index,
1748 count,
1749 invoke_id,
1750 );
1751 self.read_range_with_request(address, req).await
1752 }
1753
1754 pub async fn read_range_by_sequence_number(
1759 &self,
1760 address: DataLinkAddress,
1761 object_id: ObjectId,
1762 property_id: PropertyId,
1763 array_index: Option<u32>,
1764 reference_sequence: u32,
1765 count: i16,
1766 ) -> Result<ReadRangeResult, ClientError> {
1767 let invoke_id = self.next_invoke_id().await;
1768 let req = ReadRangeRequest::by_sequence_number(
1769 object_id,
1770 property_id,
1771 array_index,
1772 reference_sequence,
1773 count,
1774 invoke_id,
1775 );
1776 self.read_range_with_request(address, req).await
1777 }
1778
1779 pub async fn read_range_by_time(
1784 &self,
1785 address: DataLinkAddress,
1786 object_id: ObjectId,
1787 property_id: PropertyId,
1788 array_index: Option<u32>,
1789 at: (Date, Time),
1790 count: i16,
1791 ) -> Result<ReadRangeResult, ClientError> {
1792 let (date, time) = at;
1793 let invoke_id = self.next_invoke_id().await;
1794 let req = ReadRangeRequest::by_time(
1795 object_id,
1796 property_id,
1797 array_index,
1798 date,
1799 time,
1800 count,
1801 invoke_id,
1802 );
1803 self.read_range_with_request(address, req).await
1804 }
1805
1806 async fn read_range_with_request(
1807 &self,
1808 address: DataLinkAddress,
1809 req: ReadRangeRequest,
1810 ) -> Result<ReadRangeResult, ClientError> {
1811 let invoke_id = req.invoke_id;
1812 let tx = self.encode_with_growth(|w| {
1813 Npdu::new(0).encode(w)?;
1814 req.encode(w)
1815 })?;
1816 let payload = self
1817 .await_complex_ack_payload_or_error(
1818 address,
1819 &tx,
1820 invoke_id,
1821 SERVICE_READ_RANGE,
1822 self.response_timeout,
1823 )
1824 .await?;
1825 let mut pr = Reader::new(&payload);
1826 let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1827 into_client_read_range(parsed)
1828 }
1829
1830 pub async fn recv_cov_notification(
1836 &self,
1837 wait: Duration,
1838 ) -> Result<Option<CovNotification>, ClientError> {
1839 let _io_lock = self.request_io_lock.lock().await;
1840 let deadline = tokio::time::Instant::now() + wait;
1841
1842 while tokio::time::Instant::now() < deadline {
1843 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1844 let mut rx = [0u8; 1500];
1845 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1846 let (n, source) = match recv {
1847 Ok(Ok(v)) => v,
1848 Ok(Err(e)) => return Err(e.into()),
1849 Err(_) => break,
1850 };
1851
1852 let apdu = extract_apdu(&rx[..n])?;
1853 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1854 match ApduType::from_u8(first >> 4) {
1855 Some(ApduType::UnconfirmedRequest) => {
1856 let mut r = Reader::new(apdu);
1857 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1858 if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1859 continue;
1860 }
1861 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1862 return Ok(Some(into_client_cov_notification(source, false, cov)?));
1863 }
1864 Some(ApduType::ConfirmedRequest) => {
1865 let mut r = Reader::new(apdu);
1866 let header = ConfirmedRequestHeader::decode(&mut r)?;
1867 if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1868 continue;
1869 }
1870 if header.segmented {
1871 return Err(ClientError::UnsupportedResponse);
1872 }
1873
1874 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1875 self.send_simple_ack(
1876 source,
1877 header.invoke_id,
1878 SERVICE_CONFIRMED_COV_NOTIFICATION,
1879 )
1880 .await?;
1881 return Ok(Some(into_client_cov_notification(source, true, cov)?));
1882 }
1883 _ => continue,
1884 }
1885 }
1886
1887 Ok(None)
1888 }
1889
1890 pub async fn recv_event_notification(
1896 &self,
1897 wait: Duration,
1898 ) -> Result<Option<EventNotification>, ClientError> {
1899 let _io_lock = self.request_io_lock.lock().await;
1900 let deadline = tokio::time::Instant::now() + wait;
1901
1902 while tokio::time::Instant::now() < deadline {
1903 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1904 let mut rx = [0u8; 1500];
1905 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1906 let (n, source) = match recv {
1907 Ok(Ok(v)) => v,
1908 Ok(Err(e)) => return Err(e.into()),
1909 Err(_) => break,
1910 };
1911
1912 let apdu = extract_apdu(&rx[..n])?;
1913 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1914 match ApduType::from_u8(first >> 4) {
1915 Some(ApduType::UnconfirmedRequest) => {
1916 let mut r = Reader::new(apdu);
1917 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1918 if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1919 continue;
1920 }
1921 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1922 return Ok(Some(into_client_event_notification(
1923 source,
1924 false,
1925 notification,
1926 )));
1927 }
1928 Some(ApduType::ConfirmedRequest) => {
1929 let mut r = Reader::new(apdu);
1930 let header = ConfirmedRequestHeader::decode(&mut r)?;
1931 if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1932 continue;
1933 }
1934 if header.segmented {
1935 return Err(ClientError::UnsupportedResponse);
1936 }
1937 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1938 self.send_simple_ack(
1939 source,
1940 header.invoke_id,
1941 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1942 )
1943 .await?;
1944 return Ok(Some(into_client_event_notification(
1945 source,
1946 true,
1947 notification,
1948 )));
1949 }
1950 _ => continue,
1951 }
1952 }
1953
1954 Ok(None)
1955 }
1956
1957 pub async fn read_property(
1962 &self,
1963 address: DataLinkAddress,
1964 object_id: ObjectId,
1965 property_id: PropertyId,
1966 ) -> Result<ClientDataValue, ClientError> {
1967 let invoke_id = self.next_invoke_id().await;
1968 let req = ReadPropertyRequest {
1969 object_id,
1970 property_id,
1971 array_index: None,
1972 invoke_id,
1973 };
1974 let tx = self.encode_with_growth(|w| {
1975 Npdu::new(0).encode(w)?;
1976 req.encode(w)
1977 })?;
1978 let payload = self
1979 .await_complex_ack_payload_or_error(
1980 address,
1981 &tx,
1982 invoke_id,
1983 SERVICE_READ_PROPERTY,
1984 self.response_timeout,
1985 )
1986 .await?;
1987 let mut pr = Reader::new(&payload);
1988 let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1989 into_client_value(parsed.value)
1990 }
1991
1992 pub async fn write_property(
1994 &self,
1995 address: DataLinkAddress,
1996 mut request: WritePropertyRequest<'_>,
1997 ) -> Result<(), ClientError> {
1998 request.invoke_id = self.next_invoke_id().await;
1999 let invoke_id = request.invoke_id;
2000 let tx = self.encode_with_growth(|w| {
2001 Npdu::new(0).encode(w)?;
2002 request.encode(w)
2003 })?;
2004 self.await_simple_ack_or_error(
2005 address,
2006 &tx,
2007 invoke_id,
2008 SERVICE_WRITE_PROPERTY,
2009 self.response_timeout,
2010 )
2011 .await
2012 }
2013
2014 pub async fn read_property_multiple(
2019 &self,
2020 address: DataLinkAddress,
2021 object_id: ObjectId,
2022 property_ids: &[PropertyId],
2023 ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
2024 let refs: Vec<PropertyReference> = property_ids
2025 .iter()
2026 .copied()
2027 .map(|property_id| PropertyReference {
2028 property_id,
2029 array_index: None,
2030 })
2031 .collect();
2032 let specs = [ReadAccessSpecification {
2033 object_id,
2034 properties: &refs,
2035 }];
2036
2037 let invoke_id = self.next_invoke_id().await;
2038 let req = ReadPropertyMultipleRequest {
2039 specs: &specs,
2040 invoke_id,
2041 };
2042
2043 let tx = self.encode_with_growth(|w| {
2044 Npdu::new(0).encode(w)?;
2045 req.encode(w)
2046 })?;
2047 let payload = self
2048 .await_complex_ack_payload_or_error(
2049 address,
2050 &tx,
2051 invoke_id,
2052 SERVICE_READ_PROPERTY_MULTIPLE,
2053 self.response_timeout,
2054 )
2055 .await?;
2056 let mut pr = Reader::new(&payload);
2057 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2058 let mut out = Vec::new();
2059 for access in parsed.results {
2060 if access.object_id != object_id {
2061 continue;
2062 }
2063 for item in access.results {
2064 out.push((item.property_id, into_client_value(item.value)?));
2065 }
2066 }
2067 Ok(out)
2068 }
2069
2070 pub async fn write_property_multiple(
2073 &self,
2074 address: DataLinkAddress,
2075 object_id: ObjectId,
2076 properties: &[PropertyWriteSpec<'_>],
2077 ) -> Result<(), ClientError> {
2078 let invoke_id = self.next_invoke_id().await;
2079 let specs = [WriteAccessSpecification {
2080 object_id,
2081 properties,
2082 }];
2083 let req = WritePropertyMultipleRequest {
2084 specs: &specs,
2085 invoke_id,
2086 };
2087
2088 let tx = self.encode_with_growth(|w| {
2089 Npdu::new(0).encode(w)?;
2090 req.encode(w)
2091 })?;
2092 self.await_simple_ack_or_error(
2093 address,
2094 &tx,
2095 invoke_id,
2096 SERVICE_WRITE_PROPERTY_MULTIPLE,
2097 self.response_timeout,
2098 )
2099 .await
2100 }
2101
2102 pub async fn private_transfer(
2104 &self,
2105 address: DataLinkAddress,
2106 vendor_id: u32,
2107 service_number: u32,
2108 service_parameters: Option<&[u8]>,
2109 ) -> Result<PrivateTransferAck, ClientError> {
2110 let invoke_id = self.next_invoke_id().await;
2111 let req = ConfirmedPrivateTransferRequest {
2112 vendor_id,
2113 service_number,
2114 service_parameters,
2115 invoke_id,
2116 };
2117
2118 let tx = self.encode_with_growth(|w| {
2119 Npdu::new(0).encode(w)?;
2120 req.encode(w)
2121 })?;
2122 let payload = self
2123 .await_complex_ack_payload_or_error(
2124 address,
2125 &tx,
2126 invoke_id,
2127 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
2128 self.response_timeout,
2129 )
2130 .await?;
2131 let mut r = Reader::new(&payload);
2132 PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
2133 }
2134
2135 pub async fn read_many(
2141 &self,
2142 address: DataLinkAddress,
2143 requests: &[(ObjectId, PropertyId)],
2144 ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
2145 let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
2147 for &(oid, pid) in requests {
2148 if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
2149 entry.1.push(PropertyReference {
2150 property_id: pid,
2151 array_index: None,
2152 });
2153 } else {
2154 grouped.push((
2155 oid,
2156 vec![PropertyReference {
2157 property_id: pid,
2158 array_index: None,
2159 }],
2160 ));
2161 }
2162 }
2163
2164 let specs: Vec<ReadAccessSpecification<'_>> = grouped
2165 .iter()
2166 .map(|(oid, props)| ReadAccessSpecification {
2167 object_id: *oid,
2168 properties: props,
2169 })
2170 .collect();
2171
2172 let invoke_id = self.next_invoke_id().await;
2173 let req = ReadPropertyMultipleRequest {
2174 specs: &specs,
2175 invoke_id,
2176 };
2177 let tx = self.encode_with_growth(|w| {
2178 Npdu::new(0).encode(w)?;
2179 req.encode(w)
2180 })?;
2181 let payload = self
2182 .await_complex_ack_payload_or_error(
2183 address,
2184 &tx,
2185 invoke_id,
2186 SERVICE_READ_PROPERTY_MULTIPLE,
2187 self.response_timeout,
2188 )
2189 .await?;
2190
2191 let mut pr = Reader::new(&payload);
2192 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2193 let mut out = HashMap::new();
2194 for access in parsed.results {
2195 for item in access.results {
2196 if let Ok(v) = into_client_value(item.value) {
2197 out.insert((access.object_id, item.property_id), v);
2198 }
2199 }
2200 }
2201 Ok(out)
2202 }
2203
2204 pub async fn write_many(
2211 &self,
2212 address: DataLinkAddress,
2213 writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2214 ) -> Result<(), ClientError> {
2215 use rustbac_core::types::{BitString, DataValue as DV};
2216
2217 fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2218 match v {
2219 ClientDataValue::Null => DV::Null,
2220 ClientDataValue::Boolean(b) => DV::Boolean(*b),
2221 ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2222 ClientDataValue::Signed(n) => DV::Signed(*n),
2223 ClientDataValue::Real(f) => DV::Real(*f),
2224 ClientDataValue::Double(f) => DV::Double(*f),
2225 ClientDataValue::OctetString(b) => DV::OctetString(b),
2226 ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2227 ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2228 unused_bits: *unused_bits,
2229 data,
2230 }),
2231 ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2232 ClientDataValue::Date(d) => DV::Date(*d),
2233 ClientDataValue::Time(t) => DV::Time(*t),
2234 ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2235 ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2236 tag_num: *tag_num,
2237 values: values.iter().map(cv_to_dv).collect(),
2238 },
2239 }
2240 }
2241
2242 let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2244 .iter()
2245 .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2246 .collect();
2247
2248 let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2250 for (oid, pid, val, prio) in &converted {
2251 let spec = PropertyWriteSpec {
2252 property_id: *pid,
2253 array_index: None,
2254 value: val.clone(),
2255 priority: *prio,
2256 };
2257 if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2258 entry.1.push(spec);
2259 } else {
2260 grouped.push((*oid, vec![spec]));
2261 }
2262 }
2263
2264 let specs: Vec<WriteAccessSpecification<'_>> = grouped
2265 .iter()
2266 .map(|(oid, props)| WriteAccessSpecification {
2267 object_id: *oid,
2268 properties: props,
2269 })
2270 .collect();
2271
2272 let invoke_id = self.next_invoke_id().await;
2273 let req = WritePropertyMultipleRequest {
2274 specs: &specs,
2275 invoke_id,
2276 };
2277 let tx = self.encode_with_growth(|w| {
2278 Npdu::new(0).encode(w)?;
2279 req.encode(w)
2280 })?;
2281 self.await_simple_ack_or_error(
2282 address,
2283 &tx,
2284 invoke_id,
2285 SERVICE_WRITE_PROPERTY_MULTIPLE,
2286 self.response_timeout,
2287 )
2288 .await
2289 }
2290}
2291
2292fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2293 let mut r = Reader::new(payload);
2294 let _npdu = Npdu::decode(&mut r)?;
2295 r.read_exact(r.remaining()).map_err(ClientError::from)
2296}
2297
2298async fn dispatch_incoming_request<D: DataLink>(
2307 datalink: &D,
2308 handler: &dyn crate::server::ServiceHandler,
2309 device_id: u32,
2310 vendor_id: u16,
2311 frame: &[u8],
2312 source: DataLinkAddress,
2313) -> Result<(), ClientError> {
2314 let mut r = Reader::new(frame);
2315 let _npdu = match Npdu::decode(&mut r) {
2316 Ok(n) => n,
2317 Err(_) => return Ok(()),
2318 };
2319
2320 if r.is_empty() {
2321 return Ok(());
2322 }
2323
2324 let first = match r.peek_u8() {
2325 Ok(b) => b,
2326 Err(_) => return Ok(()),
2327 };
2328 let apdu_type = ApduType::from_u8(first >> 4);
2329
2330 match apdu_type {
2331 Some(ApduType::UnconfirmedRequest) => {
2332 let header = match UnconfirmedRequestHeader::decode(&mut r) {
2333 Ok(h) => h,
2334 Err(_) => return Ok(()),
2335 };
2336 if header.service_choice == 0x08 {
2337 let limits = dispatch_decode_who_is_limits(&mut r);
2339 if dispatch_matches_who_is(device_id, limits) {
2340 dispatch_send_i_am(datalink, device_id, vendor_id, source).await;
2341 }
2342 }
2343 }
2345 Some(ApduType::ConfirmedRequest) => {
2346 let header = match ConfirmedRequestHeader::decode(&mut r) {
2347 Ok(h) => h,
2348 Err(_) => return Ok(()),
2349 };
2350 let invoke_id = header.invoke_id;
2351 match header.service_choice {
2352 SERVICE_READ_PROPERTY => {
2353 dispatch_handle_read_property(datalink, handler, &mut r, invoke_id, source)
2354 .await;
2355 }
2356 SERVICE_WRITE_PROPERTY => {
2357 dispatch_handle_write_property(datalink, handler, &mut r, invoke_id, source)
2358 .await;
2359 }
2360 SERVICE_READ_PROPERTY_MULTIPLE => {
2361 dispatch_handle_read_property_multiple(
2362 datalink, handler, &mut r, invoke_id, source,
2363 )
2364 .await;
2365 }
2366 SERVICE_WRITE_PROPERTY_MULTIPLE => {
2367 dispatch_handle_write_property_multiple(
2368 datalink, handler, &mut r, invoke_id, source,
2369 )
2370 .await;
2371 }
2372 SERVICE_SUBSCRIBE_COV => {
2373 dispatch_handle_subscribe_cov(datalink, handler, &mut r, invoke_id, source)
2374 .await;
2375 }
2376 SERVICE_CREATE_OBJECT => {
2377 dispatch_handle_create_object(datalink, handler, &mut r, invoke_id, source)
2378 .await;
2379 }
2380 SERVICE_DELETE_OBJECT => {
2381 dispatch_handle_delete_object(datalink, handler, &mut r, invoke_id, source)
2382 .await;
2383 }
2384 _ => {
2385 dispatch_send_reject(datalink, invoke_id, 0x08, source).await;
2387 }
2388 }
2389 }
2390 _ => {
2391 }
2393 }
2394
2395 Ok(())
2396}
2397
2398async fn dispatch_send_i_am<D: DataLink>(
2401 datalink: &D,
2402 device_id: u32,
2403 vendor_id: u16,
2404 target: DataLinkAddress,
2405) {
2406 let device_oid = ObjectId::new(ObjectType::Device, device_id);
2407 let req = IAmRequest {
2408 device_id: device_oid,
2409 max_apdu: 1476,
2410 segmentation: 3, vendor_id: vendor_id as u32,
2412 };
2413 let mut buf = [0u8; 128];
2414 let mut w = Writer::new(&mut buf);
2415 if Npdu::new(0).encode(&mut w).is_err() {
2416 return;
2417 }
2418 if req.encode(&mut w).is_err() {
2419 return;
2420 }
2421 let _ = datalink.send(target, w.as_written()).await;
2422}
2423
2424async fn dispatch_handle_read_property<D: DataLink>(
2425 datalink: &D,
2426 handler: &dyn crate::server::ServiceHandler,
2427 r: &mut Reader<'_>,
2428 invoke_id: u8,
2429 source: DataLinkAddress,
2430) {
2431 let object_id = match crate::decode_ctx_object_id(r) {
2432 Ok(v) => v,
2433 Err(_) => return,
2434 };
2435 let property_id_raw = match crate::decode_ctx_unsigned(r) {
2436 Ok(v) => v,
2437 Err(_) => return,
2438 };
2439 let property_id = PropertyId::from_u32(property_id_raw);
2440
2441 let array_index = dispatch_decode_optional_array_index(r);
2443
2444 match handler.read_property(object_id, property_id, array_index) {
2445 Ok(value) => {
2446 let borrowed = dispatch_client_value_to_borrowed(&value);
2447 let mut buf = [0u8; 1400];
2448 let mut w = Writer::new(&mut buf);
2449 if Npdu::new(0).encode(&mut w).is_err() {
2450 return;
2451 }
2452 if (ComplexAckHeader {
2453 segmented: false,
2454 more_follows: false,
2455 invoke_id,
2456 sequence_number: None,
2457 proposed_window_size: None,
2458 service_choice: SERVICE_READ_PROPERTY,
2459 })
2460 .encode(&mut w)
2461 .is_err()
2462 {
2463 return;
2464 }
2465 if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2466 return;
2467 }
2468 if encode_ctx_unsigned(&mut w, 1, property_id.to_u32()).is_err() {
2469 return;
2470 }
2471 if (Tag::Opening { tag_num: 3 }).encode(&mut w).is_err() {
2472 return;
2473 }
2474 if encode_application_data_value(&mut w, &borrowed).is_err() {
2475 return;
2476 }
2477 if (Tag::Closing { tag_num: 3 }).encode(&mut w).is_err() {
2478 return;
2479 }
2480 let _ = datalink.send(source, w.as_written()).await;
2481 }
2482 Err(err) => {
2483 dispatch_send_error(datalink, invoke_id, SERVICE_READ_PROPERTY, err, source).await;
2484 }
2485 }
2486}
2487
2488async fn dispatch_handle_write_property<D: DataLink>(
2489 datalink: &D,
2490 handler: &dyn crate::server::ServiceHandler,
2491 r: &mut Reader<'_>,
2492 invoke_id: u8,
2493 source: DataLinkAddress,
2494) {
2495 let object_id = match crate::decode_ctx_object_id(r) {
2496 Ok(v) => v,
2497 Err(_) => return,
2498 };
2499 let property_id_raw = match crate::decode_ctx_unsigned(r) {
2500 Ok(v) => v,
2501 Err(_) => return,
2502 };
2503 let property_id = PropertyId::from_u32(property_id_raw);
2504
2505 let next_tag = match Tag::decode(r) {
2507 Ok(t) => t,
2508 Err(_) => return,
2509 };
2510 let (array_index, value_start_tag) = match next_tag {
2511 Tag::Context { tag_num: 2, len } => {
2512 let idx = match decode_unsigned(r, len as usize) {
2513 Ok(v) => v,
2514 Err(_) => return,
2515 };
2516 let vt = match Tag::decode(r) {
2517 Ok(t) => t,
2518 Err(_) => return,
2519 };
2520 (Some(idx), vt)
2521 }
2522 other => (None, other),
2523 };
2524
2525 if value_start_tag != (Tag::Opening { tag_num: 3 }) {
2526 return;
2527 }
2528
2529 let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2530 Ok(v) => v,
2531 Err(_) => return,
2532 };
2533
2534 match Tag::decode(r) {
2535 Ok(Tag::Closing { tag_num: 3 }) => {}
2536 _ => return,
2537 }
2538
2539 let priority = if !r.is_empty() {
2541 match Tag::decode(r) {
2542 Ok(Tag::Context { tag_num: 4, len }) => match decode_unsigned(r, len as usize) {
2543 Ok(p) => Some(p as u8),
2544 Err(_) => return,
2545 },
2546 _ => None,
2547 }
2548 } else {
2549 None
2550 };
2551
2552 let client_val = crate::data_value_to_client(val);
2553
2554 match handler.write_property(object_id, property_id, array_index, client_val, priority) {
2555 Ok(()) => {
2556 let mut buf = [0u8; 32];
2557 let mut w = Writer::new(&mut buf);
2558 if Npdu::new(0).encode(&mut w).is_err() {
2559 return;
2560 }
2561 if (SimpleAck {
2562 invoke_id,
2563 service_choice: SERVICE_WRITE_PROPERTY,
2564 })
2565 .encode(&mut w)
2566 .is_err()
2567 {
2568 return;
2569 }
2570 let _ = datalink.send(source, w.as_written()).await;
2571 }
2572 Err(err) => {
2573 dispatch_send_error(datalink, invoke_id, SERVICE_WRITE_PROPERTY, err, source).await;
2574 }
2575 }
2576}
2577
2578async fn dispatch_handle_read_property_multiple<D: DataLink>(
2579 datalink: &D,
2580 handler: &dyn crate::server::ServiceHandler,
2581 r: &mut Reader<'_>,
2582 invoke_id: u8,
2583 source: DataLinkAddress,
2584) {
2585 type PropRefs = Vec<(PropertyId, Option<u32>)>;
2586 let mut specs: Vec<(ObjectId, PropRefs)> = Vec::new();
2587
2588 while !r.is_empty() {
2589 let object_id = match crate::decode_ctx_object_id(r) {
2590 Ok(v) => v,
2591 Err(_) => return,
2592 };
2593 match Tag::decode(r) {
2594 Ok(Tag::Opening { tag_num: 1 }) => {}
2595 _ => return,
2596 }
2597 let mut props: Vec<(PropertyId, Option<u32>)> = Vec::new();
2598 loop {
2599 let tag = match Tag::decode(r) {
2600 Ok(t) => t,
2601 Err(_) => return,
2602 };
2603 if tag == (Tag::Closing { tag_num: 1 }) {
2604 break;
2605 }
2606 let property_id = match tag {
2607 Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2608 Ok(v) => PropertyId::from_u32(v),
2609 Err(_) => return,
2610 },
2611 _ => return,
2612 };
2613 let array_index = if !r.is_empty() {
2614 match dispatch_peek_context_tag(r, 1) {
2615 Some(len) => {
2616 match Tag::decode(r) {
2617 Ok(_) => {}
2618 Err(_) => return,
2619 }
2620 match decode_unsigned(r, len as usize) {
2621 Ok(idx) => Some(idx),
2622 Err(_) => return,
2623 }
2624 }
2625 None => None,
2626 }
2627 } else {
2628 None
2629 };
2630 props.push((property_id, array_index));
2631 }
2632 specs.push((object_id, props));
2633 }
2634
2635 let mut buf = [0u8; 1400];
2636 let mut w = Writer::new(&mut buf);
2637 if Npdu::new(0).encode(&mut w).is_err() {
2638 return;
2639 }
2640 if (ComplexAckHeader {
2641 segmented: false,
2642 more_follows: false,
2643 invoke_id,
2644 sequence_number: None,
2645 proposed_window_size: None,
2646 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2647 })
2648 .encode(&mut w)
2649 .is_err()
2650 {
2651 return;
2652 }
2653
2654 for (object_id, props) in &specs {
2655 if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2656 return;
2657 }
2658 if (Tag::Opening { tag_num: 1 }).encode(&mut w).is_err() {
2659 return;
2660 }
2661
2662 for (property_id, array_index) in props {
2663 if encode_ctx_unsigned(&mut w, 2, property_id.to_u32()).is_err() {
2664 return;
2665 }
2666 if let Some(idx) = array_index {
2667 if encode_ctx_unsigned(&mut w, 3, *idx).is_err() {
2668 return;
2669 }
2670 }
2671 if (Tag::Opening { tag_num: 4 }).encode(&mut w).is_err() {
2672 return;
2673 }
2674
2675 match handler.read_property(*object_id, *property_id, *array_index) {
2676 Ok(value) => {
2677 let borrowed = dispatch_client_value_to_borrowed(&value);
2678 if encode_application_data_value(&mut w, &borrowed).is_err() {
2679 return;
2680 }
2681 }
2682 Err(err) => {
2683 let (class, code) = dispatch_error_class_code(err);
2684 if (Tag::Opening { tag_num: 5 }).encode(&mut w).is_err() {
2685 return;
2686 }
2687 if encode_ctx_unsigned(&mut w, 0, class as u32).is_err() {
2688 return;
2689 }
2690 if encode_ctx_unsigned(&mut w, 1, code as u32).is_err() {
2691 return;
2692 }
2693 if (Tag::Closing { tag_num: 5 }).encode(&mut w).is_err() {
2694 return;
2695 }
2696 }
2697 }
2698
2699 if (Tag::Closing { tag_num: 4 }).encode(&mut w).is_err() {
2700 return;
2701 }
2702 }
2703
2704 if (Tag::Closing { tag_num: 1 }).encode(&mut w).is_err() {
2705 return;
2706 }
2707 }
2708
2709 let _ = datalink.send(source, w.as_written()).await;
2710}
2711
2712async fn dispatch_handle_write_property_multiple<D: DataLink>(
2713 datalink: &D,
2714 handler: &dyn crate::server::ServiceHandler,
2715 r: &mut Reader<'_>,
2716 invoke_id: u8,
2717 source: DataLinkAddress,
2718) {
2719 while !r.is_empty() {
2720 let object_id = match crate::decode_ctx_object_id(r) {
2721 Ok(v) => v,
2722 Err(_) => return,
2723 };
2724 match Tag::decode(r) {
2725 Ok(Tag::Opening { tag_num: 1 }) => {}
2726 _ => return,
2727 }
2728 loop {
2729 let tag = match Tag::decode(r) {
2730 Ok(t) => t,
2731 Err(_) => return,
2732 };
2733 if tag == (Tag::Closing { tag_num: 1 }) {
2734 break;
2735 }
2736 let property_id = match tag {
2737 Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2738 Ok(v) => PropertyId::from_u32(v),
2739 Err(_) => return,
2740 },
2741 _ => return,
2742 };
2743 let array_index = if !r.is_empty() {
2744 match dispatch_peek_context_tag(r, 1) {
2745 Some(len) => {
2746 let _ = Tag::decode(r);
2747 decode_unsigned(r, len as usize).ok()
2748 }
2749 None => None,
2750 }
2751 } else {
2752 None
2753 };
2754 match Tag::decode(r) {
2755 Ok(Tag::Opening { tag_num: 2 }) => {}
2756 _ => return,
2757 }
2758 let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2759 Ok(v) => v,
2760 Err(_) => return,
2761 };
2762 match Tag::decode(r) {
2763 Ok(Tag::Closing { tag_num: 2 }) => {}
2764 _ => return,
2765 }
2766 let priority = if !r.is_empty() {
2767 match dispatch_peek_context_tag(r, 3) {
2768 Some(len) => {
2769 let _ = Tag::decode(r);
2770 decode_unsigned(r, len as usize).ok().map(|p| p as u8)
2771 }
2772 None => None,
2773 }
2774 } else {
2775 None
2776 };
2777 let client_val = crate::data_value_to_client(val);
2778 if let Err(err) =
2779 handler.write_property(object_id, property_id, array_index, client_val, priority)
2780 {
2781 dispatch_send_error(
2782 datalink,
2783 invoke_id,
2784 SERVICE_WRITE_PROPERTY_MULTIPLE,
2785 err,
2786 source,
2787 )
2788 .await;
2789 return;
2790 }
2791 }
2792 }
2793 let mut buf = [0u8; 32];
2795 let mut w = Writer::new(&mut buf);
2796 if Npdu::new(0).encode(&mut w).is_err() {
2797 return;
2798 }
2799 if (SimpleAck {
2800 invoke_id,
2801 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
2802 })
2803 .encode(&mut w)
2804 .is_err()
2805 {
2806 return;
2807 }
2808 let _ = datalink.send(source, w.as_written()).await;
2809}
2810
2811async fn dispatch_handle_subscribe_cov<D: DataLink>(
2812 datalink: &D,
2813 handler: &dyn crate::server::ServiceHandler,
2814 r: &mut Reader<'_>,
2815 invoke_id: u8,
2816 source: DataLinkAddress,
2817) {
2818 let subscriber_process_id = match Tag::decode(r) {
2820 Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2821 Ok(v) => v,
2822 Err(_) => return,
2823 },
2824 _ => return,
2825 };
2826 let monitored_object_id = match crate::decode_ctx_object_id(r) {
2828 Ok(v) => v,
2829 Err(_) => return,
2830 };
2831 let issue_confirmed = match Tag::decode(r) {
2833 Ok(Tag::Context { tag_num: 2, len }) => match decode_unsigned(r, len as usize) {
2834 Ok(v) => v != 0,
2835 Err(_) => return,
2836 },
2837 _ => return,
2838 };
2839 let lifetime = if !r.is_empty() {
2841 match dispatch_peek_context_tag(r, 3) {
2842 Some(len) => {
2843 let _ = Tag::decode(r);
2844 decode_unsigned(r, len as usize).ok()
2845 }
2846 None => None,
2847 }
2848 } else {
2849 None
2850 };
2851
2852 match handler.subscribe_cov(
2853 subscriber_process_id,
2854 monitored_object_id,
2855 issue_confirmed,
2856 lifetime,
2857 ) {
2858 Ok(()) => {
2859 let mut buf = [0u8; 32];
2860 let mut w = Writer::new(&mut buf);
2861 if Npdu::new(0).encode(&mut w).is_err() {
2862 return;
2863 }
2864 if (SimpleAck {
2865 invoke_id,
2866 service_choice: SERVICE_SUBSCRIBE_COV,
2867 })
2868 .encode(&mut w)
2869 .is_err()
2870 {
2871 return;
2872 }
2873 let _ = datalink.send(source, w.as_written()).await;
2874 }
2875 Err(err) => {
2876 dispatch_send_error(datalink, invoke_id, SERVICE_SUBSCRIBE_COV, err, source).await;
2877 }
2878 }
2879}
2880
2881async fn dispatch_handle_create_object<D: DataLink>(
2882 datalink: &D,
2883 handler: &dyn crate::server::ServiceHandler,
2884 r: &mut Reader<'_>,
2885 invoke_id: u8,
2886 source: DataLinkAddress,
2887) {
2888 match Tag::decode(r) {
2890 Ok(Tag::Opening { tag_num: 0 }) => {}
2891 _ => return,
2892 }
2893 let object_type_raw = match Tag::decode(r) {
2895 Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2896 Ok(v) => v,
2897 Err(_) => return,
2898 },
2899 _ => return,
2900 };
2901 let object_type = ObjectType::from_u16(object_type_raw as u16);
2902 match Tag::decode(r) {
2904 Ok(Tag::Closing { tag_num: 0 }) => {}
2905 _ => return,
2906 }
2907
2908 match handler.create_object(object_type) {
2909 Ok(created_id) => {
2910 let mut buf = [0u8; 64];
2911 let mut w = Writer::new(&mut buf);
2912 if Npdu::new(0).encode(&mut w).is_err() {
2913 return;
2914 }
2915 if (ComplexAckHeader {
2916 segmented: false,
2917 more_follows: false,
2918 invoke_id,
2919 sequence_number: None,
2920 proposed_window_size: None,
2921 service_choice: SERVICE_CREATE_OBJECT,
2922 })
2923 .encode(&mut w)
2924 .is_err()
2925 {
2926 return;
2927 }
2928 if encode_ctx_unsigned(&mut w, 0, created_id.raw()).is_err() {
2929 return;
2930 }
2931 let _ = datalink.send(source, w.as_written()).await;
2932 }
2933 Err(err) => {
2934 dispatch_send_error(datalink, invoke_id, SERVICE_CREATE_OBJECT, err, source).await;
2935 }
2936 }
2937}
2938
2939async fn dispatch_handle_delete_object<D: DataLink>(
2940 datalink: &D,
2941 handler: &dyn crate::server::ServiceHandler,
2942 r: &mut Reader<'_>,
2943 invoke_id: u8,
2944 source: DataLinkAddress,
2945) {
2946 let object_id = match crate::decode_ctx_object_id(r) {
2947 Ok(v) => v,
2948 Err(_) => return,
2949 };
2950
2951 match handler.delete_object(object_id) {
2952 Ok(()) => {
2953 let mut buf = [0u8; 32];
2954 let mut w = Writer::new(&mut buf);
2955 if Npdu::new(0).encode(&mut w).is_err() {
2956 return;
2957 }
2958 if (SimpleAck {
2959 invoke_id,
2960 service_choice: SERVICE_DELETE_OBJECT,
2961 })
2962 .encode(&mut w)
2963 .is_err()
2964 {
2965 return;
2966 }
2967 let _ = datalink.send(source, w.as_written()).await;
2968 }
2969 Err(err) => {
2970 dispatch_send_error(datalink, invoke_id, SERVICE_DELETE_OBJECT, err, source).await;
2971 }
2972 }
2973}
2974
2975async fn dispatch_send_error<D: DataLink>(
2976 datalink: &D,
2977 invoke_id: u8,
2978 service_choice: u8,
2979 err: crate::server::BacnetServiceError,
2980 target: DataLinkAddress,
2981) {
2982 let (class, code) = dispatch_error_class_code(err);
2983 let mut buf = [0u8; 64];
2984 let mut w = Writer::new(&mut buf);
2985 if Npdu::new(0).encode(&mut w).is_err() {
2986 return;
2987 }
2988 if w.write_u8(0x50).is_err() {
2990 return;
2991 }
2992 if w.write_u8(invoke_id).is_err() {
2993 return;
2994 }
2995 if w.write_u8(service_choice).is_err() {
2996 return;
2997 }
2998 if (Tag::Application {
3000 tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3001 len: 1,
3002 })
3003 .encode(&mut w)
3004 .is_err()
3005 {
3006 return;
3007 }
3008 if w.write_u8(class).is_err() {
3009 return;
3010 }
3011 if (Tag::Application {
3013 tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3014 len: 1,
3015 })
3016 .encode(&mut w)
3017 .is_err()
3018 {
3019 return;
3020 }
3021 if w.write_u8(code).is_err() {
3022 return;
3023 }
3024 let _ = datalink.send(target, w.as_written()).await;
3025}
3026
3027async fn dispatch_send_reject<D: DataLink>(
3028 datalink: &D,
3029 invoke_id: u8,
3030 reason: u8,
3031 target: DataLinkAddress,
3032) {
3033 let mut buf = [0u8; 16];
3034 let mut w = Writer::new(&mut buf);
3035 if Npdu::new(0).encode(&mut w).is_err() {
3036 return;
3037 }
3038 if w.write_u8(0x60).is_err() {
3040 return;
3041 }
3042 if w.write_u8(invoke_id).is_err() {
3043 return;
3044 }
3045 if w.write_u8(reason).is_err() {
3046 return;
3047 }
3048 let _ = datalink.send(target, w.as_written()).await;
3049}
3050
3051fn dispatch_decode_who_is_limits(r: &mut Reader<'_>) -> Option<(u32, u32)> {
3053 if r.is_empty() {
3054 return None;
3055 }
3056 let tag0 = Tag::decode(r).ok()?;
3057 let low = match tag0 {
3058 Tag::Context { tag_num: 0, len } => decode_unsigned(r, len as usize).ok()?,
3059 _ => return None,
3060 };
3061 let tag1 = Tag::decode(r).ok()?;
3062 let high = match tag1 {
3063 Tag::Context { tag_num: 1, len } => decode_unsigned(r, len as usize).ok()?,
3064 _ => return None,
3065 };
3066 Some((low, high))
3067}
3068
3069fn dispatch_error_class_code(err: crate::server::BacnetServiceError) -> (u8, u8) {
3070 use crate::server::BacnetServiceError;
3071 match err {
3072 BacnetServiceError::UnknownObject => (1, 31),
3073 BacnetServiceError::UnknownProperty => (2, 32),
3074 BacnetServiceError::WriteAccessDenied => (2, 40),
3075 BacnetServiceError::InvalidDataType => (2, 9),
3076 BacnetServiceError::ServiceNotSupported => (5, 53),
3077 }
3078}
3079
3080fn dispatch_matches_who_is(device_id: u32, limits: Option<(u32, u32)>) -> bool {
3081 match limits {
3082 None => true,
3083 Some((low, high)) => device_id >= low && device_id <= high,
3084 }
3085}
3086
3087fn dispatch_decode_optional_array_index(r: &mut Reader<'_>) -> Option<u32> {
3088 if r.is_empty() {
3089 return None;
3090 }
3091 let tag = Tag::decode(r).ok()?;
3092 match tag {
3093 Tag::Context { tag_num: 2, len } => decode_unsigned(r, len as usize).ok(),
3094 _ => None,
3095 }
3096}
3097
3098fn dispatch_peek_context_tag(r: &mut Reader<'_>, tag_num: u8) -> Option<u32> {
3099 let first = r.peek_u8().ok()?;
3100 let is_context = (first & 0x08) != 0 && (first & 0x07) < 6;
3101 if !is_context {
3102 return None;
3103 }
3104 let this_tag_num = first >> 4;
3105 if this_tag_num != tag_num {
3106 return None;
3107 }
3108 let short_len = first & 0x07;
3109 if short_len < 5 {
3110 Some(short_len as u32)
3111 } else {
3112 None
3113 }
3114}
3115
3116fn dispatch_client_value_to_borrowed(val: &ClientDataValue) -> DataValue<'_> {
3117 match val {
3118 ClientDataValue::Null => DataValue::Null,
3119 ClientDataValue::Boolean(v) => DataValue::Boolean(*v),
3120 ClientDataValue::Unsigned(v) => DataValue::Unsigned(*v),
3121 ClientDataValue::Signed(v) => DataValue::Signed(*v),
3122 ClientDataValue::Real(v) => DataValue::Real(*v),
3123 ClientDataValue::Double(v) => DataValue::Double(*v),
3124 ClientDataValue::OctetString(v) => DataValue::OctetString(v),
3125 ClientDataValue::CharacterString(v) => DataValue::CharacterString(v),
3126 ClientDataValue::BitString { unused_bits, data } => {
3127 DataValue::BitString(rustbac_core::types::BitString {
3128 unused_bits: *unused_bits,
3129 data,
3130 })
3131 }
3132 ClientDataValue::Enumerated(v) => DataValue::Enumerated(*v),
3133 ClientDataValue::Date(v) => DataValue::Date(*v),
3134 ClientDataValue::Time(v) => DataValue::Time(*v),
3135 ClientDataValue::ObjectId(v) => DataValue::ObjectId(*v),
3136 ClientDataValue::Constructed { tag_num, values } => DataValue::Constructed {
3137 tag_num: *tag_num,
3138 values: values
3139 .iter()
3140 .map(dispatch_client_value_to_borrowed)
3141 .collect(),
3142 },
3143 }
3144}
3145
3146fn remote_service_error(err: BacnetError) -> ClientError {
3147 ClientError::RemoteServiceError {
3148 service_choice: err.service_choice,
3149 error_class_raw: err.error_class,
3150 error_code_raw: err.error_code,
3151 error_class: err.error_class.and_then(ErrorClass::from_u32),
3152 error_code: err.error_code.and_then(ErrorCode::from_u32),
3153 }
3154}
3155
3156fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
3157 Ok(match value {
3158 DataValue::Null => ClientDataValue::Null,
3159 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
3160 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
3161 DataValue::Signed(v) => ClientDataValue::Signed(v),
3162 DataValue::Real(v) => ClientDataValue::Real(v),
3163 DataValue::Double(v) => ClientDataValue::Double(v),
3164 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
3165 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
3166 DataValue::BitString(v) => ClientDataValue::BitString {
3167 unused_bits: v.unused_bits,
3168 data: v.data.to_vec(),
3169 },
3170 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
3171 DataValue::Date(v) => ClientDataValue::Date(v),
3172 DataValue::Time(v) => ClientDataValue::Time(v),
3173 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
3174 DataValue::Constructed { tag_num, values } => {
3175 let mut children = Vec::with_capacity(values.len());
3176 for child in values {
3177 children.push(into_client_value(child)?);
3178 }
3179 ClientDataValue::Constructed {
3180 tag_num,
3181 values: children,
3182 }
3183 }
3184 })
3185}
3186
3187fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
3188 value
3189 .into_iter()
3190 .map(|item| AlarmSummaryItem {
3191 object_id: item.object_id,
3192 alarm_state_raw: item.alarm_state,
3193 alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3194 item.alarm_state,
3195 ),
3196 acknowledged_transitions: ClientBitString {
3197 unused_bits: item.acknowledged_transitions.unused_bits,
3198 data: item.acknowledged_transitions.data.to_vec(),
3199 },
3200 })
3201 .collect()
3202}
3203
3204fn into_client_enrollment_summary(
3205 value: Vec<CoreEnrollmentSummaryItem>,
3206) -> Vec<EnrollmentSummaryItem> {
3207 value
3208 .into_iter()
3209 .map(|item| EnrollmentSummaryItem {
3210 object_id: item.object_id,
3211 event_type: item.event_type,
3212 event_state_raw: item.event_state,
3213 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3214 item.event_state,
3215 ),
3216 priority: item.priority,
3217 notification_class: item.notification_class,
3218 })
3219 .collect()
3220}
3221
3222fn into_client_event_information(
3223 value: Vec<CoreEventSummaryItem<'_>>,
3224) -> Vec<EventInformationItem> {
3225 value
3226 .into_iter()
3227 .map(|item| EventInformationItem {
3228 object_id: item.object_id,
3229 event_state_raw: item.event_state,
3230 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3231 item.event_state,
3232 ),
3233 acknowledged_transitions: ClientBitString {
3234 unused_bits: item.acknowledged_transitions.unused_bits,
3235 data: item.acknowledged_transitions.data.to_vec(),
3236 },
3237 notify_type: item.notify_type,
3238 event_enable: ClientBitString {
3239 unused_bits: item.event_enable.unused_bits,
3240 data: item.event_enable.data.to_vec(),
3241 },
3242 event_priorities: item.event_priorities,
3243 })
3244 .collect()
3245}
3246
3247fn into_client_cov_notification(
3248 source: DataLinkAddress,
3249 confirmed: bool,
3250 value: CovNotificationRequest<'_>,
3251) -> Result<CovNotification, ClientError> {
3252 let mut values = Vec::with_capacity(value.values.len());
3253 for property in value.values {
3254 values.push(CovPropertyValue {
3255 property_id: property.property_id,
3256 array_index: property.array_index,
3257 value: into_client_value(property.value)?,
3258 priority: property.priority,
3259 });
3260 }
3261
3262 Ok(CovNotification {
3263 source,
3264 confirmed,
3265 subscriber_process_id: value.subscriber_process_id,
3266 initiating_device_id: value.initiating_device_id,
3267 monitored_object_id: value.monitored_object_id,
3268 time_remaining_seconds: value.time_remaining_seconds,
3269 values,
3270 })
3271}
3272
3273fn into_client_event_notification(
3274 source: DataLinkAddress,
3275 confirmed: bool,
3276 value: EventNotificationRequest<'_>,
3277) -> EventNotification {
3278 EventNotification {
3279 source,
3280 confirmed,
3281 process_id: value.process_id,
3282 initiating_device_id: value.initiating_device_id,
3283 event_object_id: value.event_object_id,
3284 timestamp: value.timestamp,
3285 notification_class: value.notification_class,
3286 priority: value.priority,
3287 event_type: value.event_type,
3288 message_text: value.message_text.map(str::to_string),
3289 notify_type: value.notify_type,
3290 ack_required: value.ack_required,
3291 from_state_raw: value.from_state,
3292 from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3293 value.from_state,
3294 ),
3295 to_state_raw: value.to_state,
3296 to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
3297 }
3298}
3299
3300fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
3301 let mut items = Vec::with_capacity(value.items.len());
3302 for item in value.items {
3303 items.push(into_client_value(item)?);
3304 }
3305 Ok(ReadRangeResult {
3306 object_id: value.object_id,
3307 property_id: value.property_id,
3308 array_index: value.array_index,
3309 result_flags: ClientBitString {
3310 unused_bits: value.result_flags.unused_bits,
3311 data: value.result_flags.data.to_vec(),
3312 },
3313 item_count: value.item_count,
3314 items,
3315 })
3316}
3317
3318fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
3319 match value.access_method {
3320 AtomicReadFileAckAccess::Stream {
3321 file_start_position,
3322 file_data,
3323 } => AtomicReadFileResult::Stream {
3324 end_of_file: value.end_of_file,
3325 file_start_position,
3326 file_data: file_data.to_vec(),
3327 },
3328 AtomicReadFileAckAccess::Record {
3329 file_start_record,
3330 returned_record_count,
3331 file_record_data,
3332 } => AtomicReadFileResult::Record {
3333 end_of_file: value.end_of_file,
3334 file_start_record,
3335 returned_record_count,
3336 file_record_data: file_record_data
3337 .into_iter()
3338 .map(|record| record.to_vec())
3339 .collect(),
3340 },
3341 }
3342}
3343
3344fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
3345 match value {
3346 AtomicWriteFileAck::Stream {
3347 file_start_position,
3348 } => AtomicWriteFileResult::Stream {
3349 file_start_position,
3350 },
3351 AtomicWriteFileAck::Record { file_start_record } => {
3352 AtomicWriteFileResult::Record { file_start_record }
3353 }
3354 }
3355}
3356
3357#[cfg(test)]
3358mod tests {
3359 use super::BacnetClient;
3360 use crate::{
3361 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
3362 EnrollmentSummaryItem, EventInformationItem, EventNotification,
3363 };
3364 use rustbac_core::apdu::{
3365 ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
3366 UnconfirmedRequestHeader,
3367 };
3368 use rustbac_core::encoding::{
3369 primitives::{
3370 decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
3371 encode_ctx_object_id, encode_ctx_unsigned,
3372 },
3373 reader::Reader,
3374 tag::{AppTag, Tag},
3375 writer::Writer,
3376 };
3377 use rustbac_core::npdu::Npdu;
3378 use rustbac_core::services::acknowledge_alarm::{
3379 AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
3380 };
3381 use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
3382 use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
3383 use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
3384 use rustbac_core::services::cov_notification::{
3385 SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3386 };
3387 use rustbac_core::services::device_management::{
3388 DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
3389 SERVICE_REINITIALIZE_DEVICE,
3390 };
3391 use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
3392 use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
3393 use rustbac_core::services::event_notification::{
3394 SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3395 };
3396 use rustbac_core::services::list_element::{
3397 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
3398 SERVICE_REMOVE_LIST_ELEMENT,
3399 };
3400 use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
3401 use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
3402 use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
3403 use rustbac_core::services::read_range::SERVICE_READ_RANGE;
3404 use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
3405 use rustbac_core::services::subscribe_cov_property::{
3406 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
3407 };
3408 use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
3409 use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
3410 use rustbac_core::services::write_property_multiple::{
3411 PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
3412 };
3413 use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
3414 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
3415 use std::collections::VecDeque;
3416 use std::sync::Arc;
3417 use std::time::Duration;
3418 use tokio::sync::Mutex;
3419
3420 #[derive(Debug, Default)]
3421 struct MockState {
3422 sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
3423 recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
3424 }
3425
3426 #[derive(Debug, Clone)]
3427 struct MockDataLink {
3428 state: Arc<MockState>,
3429 }
3430
3431 impl MockDataLink {
3432 fn new() -> (Self, Arc<MockState>) {
3433 let state = Arc::new(MockState::default());
3434 (
3435 Self {
3436 state: state.clone(),
3437 },
3438 state,
3439 )
3440 }
3441 }
3442
3443 impl DataLink for MockDataLink {
3444 async fn send(
3445 &self,
3446 address: DataLinkAddress,
3447 payload: &[u8],
3448 ) -> Result<(), DataLinkError> {
3449 self.state
3450 .sent
3451 .lock()
3452 .await
3453 .push((address, payload.to_vec()));
3454 Ok(())
3455 }
3456
3457 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
3458 let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
3459 return Err(DataLinkError::InvalidFrame);
3460 };
3461 if payload.len() > buf.len() {
3462 return Err(DataLinkError::FrameTooLarge);
3463 }
3464 buf[..payload.len()].copy_from_slice(&payload);
3465 Ok((payload.len(), addr))
3466 }
3467 }
3468
3469 fn with_npdu(apdu: &[u8]) -> Vec<u8> {
3470 let mut out = [0u8; 512];
3471 let mut w = Writer::new(&mut out);
3472 Npdu::new(0).encode(&mut w).unwrap();
3473 w.write_all(apdu).unwrap();
3474 w.as_written().to_vec()
3475 }
3476
3477 fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3478 let mut apdu_buf = [0u8; 256];
3479 let mut w = Writer::new(&mut apdu_buf);
3480 ComplexAckHeader {
3481 segmented: false,
3482 more_follows: false,
3483 invoke_id,
3484 sequence_number: None,
3485 proposed_window_size: None,
3486 service_choice: SERVICE_READ_RANGE,
3487 }
3488 .encode(&mut w)
3489 .unwrap();
3490 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3491 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3492 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3493 w.write_u8(5).unwrap();
3494 w.write_u8(0b1110_0000).unwrap();
3495 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3496 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3497 encode_app_real(&mut w, 42.0).unwrap();
3498 encode_app_real(&mut w, 43.0).unwrap();
3499 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3500 w.as_written().to_vec()
3501 }
3502
3503 fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
3504 let mut apdu_buf = [0u8; 256];
3505 let mut w = Writer::new(&mut apdu_buf);
3506 ComplexAckHeader {
3507 segmented: false,
3508 more_follows: false,
3509 invoke_id,
3510 sequence_number: None,
3511 proposed_window_size: None,
3512 service_choice: SERVICE_ATOMIC_READ_FILE,
3513 }
3514 .encode(&mut w)
3515 .unwrap();
3516 Tag::Application {
3517 tag: AppTag::Boolean,
3518 len: if eof { 1 } else { 0 },
3519 }
3520 .encode(&mut w)
3521 .unwrap();
3522 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3523 Tag::Application {
3524 tag: AppTag::SignedInt,
3525 len: 1,
3526 }
3527 .encode(&mut w)
3528 .unwrap();
3529 w.write_u8(0).unwrap();
3530 Tag::Application {
3531 tag: AppTag::OctetString,
3532 len: data.len() as u32,
3533 }
3534 .encode(&mut w)
3535 .unwrap();
3536 w.write_all(data).unwrap();
3537 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3538 w.as_written().to_vec()
3539 }
3540
3541 fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
3542 let mut apdu_buf = [0u8; 256];
3543 let mut w = Writer::new(&mut apdu_buf);
3544 ComplexAckHeader {
3545 segmented: false,
3546 more_follows: false,
3547 invoke_id,
3548 sequence_number: None,
3549 proposed_window_size: None,
3550 service_choice: SERVICE_ATOMIC_READ_FILE,
3551 }
3552 .encode(&mut w)
3553 .unwrap();
3554 Tag::Application {
3555 tag: AppTag::Boolean,
3556 len: 0,
3557 }
3558 .encode(&mut w)
3559 .unwrap();
3560 Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
3561 Tag::Application {
3562 tag: AppTag::SignedInt,
3563 len: 1,
3564 }
3565 .encode(&mut w)
3566 .unwrap();
3567 w.write_u8(7).unwrap();
3568 Tag::Application {
3569 tag: AppTag::UnsignedInt,
3570 len: 1,
3571 }
3572 .encode(&mut w)
3573 .unwrap();
3574 w.write_u8(2).unwrap();
3575 Tag::Application {
3576 tag: AppTag::OctetString,
3577 len: 2,
3578 }
3579 .encode(&mut w)
3580 .unwrap();
3581 w.write_all(&[0x01, 0x02]).unwrap();
3582 Tag::Application {
3583 tag: AppTag::OctetString,
3584 len: 3,
3585 }
3586 .encode(&mut w)
3587 .unwrap();
3588 w.write_all(&[0x03, 0x04, 0x05]).unwrap();
3589 Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
3590 w.as_written().to_vec()
3591 }
3592
3593 fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
3594 let mut apdu_buf = [0u8; 64];
3595 let mut w = Writer::new(&mut apdu_buf);
3596 ComplexAckHeader {
3597 segmented: false,
3598 more_follows: false,
3599 invoke_id,
3600 sequence_number: None,
3601 proposed_window_size: None,
3602 service_choice: SERVICE_ATOMIC_WRITE_FILE,
3603 }
3604 .encode(&mut w)
3605 .unwrap();
3606 Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
3607 w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
3608 w.as_written().to_vec()
3609 }
3610
3611 fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
3612 let mut apdu_buf = [0u8; 64];
3613 let mut w = Writer::new(&mut apdu_buf);
3614 ComplexAckHeader {
3615 segmented: false,
3616 more_follows: false,
3617 invoke_id,
3618 sequence_number: None,
3619 proposed_window_size: None,
3620 service_choice: SERVICE_ATOMIC_WRITE_FILE,
3621 }
3622 .encode(&mut w)
3623 .unwrap();
3624 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3625 w.write_u8(start_record as u8).unwrap();
3626 w.as_written().to_vec()
3627 }
3628
3629 fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3630 let mut apdu_buf = [0u8; 64];
3631 let mut w = Writer::new(&mut apdu_buf);
3632 ComplexAckHeader {
3633 segmented: false,
3634 more_follows: false,
3635 invoke_id,
3636 sequence_number: None,
3637 proposed_window_size: None,
3638 service_choice: SERVICE_CREATE_OBJECT,
3639 }
3640 .encode(&mut w)
3641 .unwrap();
3642 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3643 w.as_written().to_vec()
3644 }
3645
3646 fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3647 let mut apdu_buf = [0u8; 128];
3648 let mut w = Writer::new(&mut apdu_buf);
3649 ComplexAckHeader {
3650 segmented: false,
3651 more_follows: false,
3652 invoke_id,
3653 sequence_number: None,
3654 proposed_window_size: None,
3655 service_choice: SERVICE_GET_ALARM_SUMMARY,
3656 }
3657 .encode(&mut w)
3658 .unwrap();
3659 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3660 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3661 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3662 w.write_u8(5).unwrap();
3663 w.write_u8(0b1110_0000).unwrap();
3664
3665 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
3666 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3667 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3668 w.write_u8(5).unwrap();
3669 w.write_u8(0b1100_0000).unwrap();
3670 w.as_written().to_vec()
3671 }
3672
3673 fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3674 let mut apdu_buf = [0u8; 160];
3675 let mut w = Writer::new(&mut apdu_buf);
3676 ComplexAckHeader {
3677 segmented: false,
3678 more_follows: false,
3679 invoke_id,
3680 sequence_number: None,
3681 proposed_window_size: None,
3682 service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
3683 }
3684 .encode(&mut w)
3685 .unwrap();
3686 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3687 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3688 encode_ctx_unsigned(&mut w, 2, 2).unwrap();
3689 encode_ctx_unsigned(&mut w, 3, 200).unwrap();
3690 encode_ctx_unsigned(&mut w, 4, 10).unwrap();
3691
3692 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
3693 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3694 encode_ctx_unsigned(&mut w, 2, 0).unwrap();
3695 encode_ctx_unsigned(&mut w, 3, 20).unwrap();
3696 encode_ctx_unsigned(&mut w, 4, 11).unwrap();
3697 w.as_written().to_vec()
3698 }
3699
3700 fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
3701 let mut apdu_buf = [0u8; 256];
3702 let mut w = Writer::new(&mut apdu_buf);
3703 ComplexAckHeader {
3704 segmented: false,
3705 more_follows: false,
3706 invoke_id,
3707 sequence_number: None,
3708 proposed_window_size: None,
3709 service_choice: SERVICE_GET_EVENT_INFORMATION,
3710 }
3711 .encode(&mut w)
3712 .unwrap();
3713 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3714 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3715 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3716 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3717 w.write_u8(5).unwrap();
3718 w.write_u8(0b1110_0000).unwrap();
3719 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3720 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3721 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3722 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3723 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3724 encode_ctx_unsigned(&mut w, 4, 0).unwrap();
3725 Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
3726 w.write_u8(5).unwrap();
3727 w.write_u8(0b1100_0000).unwrap();
3728 Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
3729 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3730 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3731 encode_ctx_unsigned(&mut w, 2, 3).unwrap();
3732 Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
3733 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3734 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3735 w.write_u8(0).unwrap();
3736 w.as_written().to_vec()
3737 }
3738
3739 #[tokio::test]
3740 async fn who_has_object_name_collects_i_have() {
3741 let (dl, state) = MockDataLink::new();
3742 let client = BacnetClient::with_datalink(dl);
3743 let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
3744
3745 let mut apdu = [0u8; 128];
3746 let mut w = Writer::new(&mut apdu);
3747 UnconfirmedRequestHeader {
3748 service_choice: SERVICE_I_HAVE,
3749 }
3750 .encode(&mut w)
3751 .unwrap();
3752 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
3753 encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3754 encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
3755
3756 state
3757 .recv
3758 .lock()
3759 .await
3760 .push_back((with_npdu(w.as_written()), addr));
3761
3762 let results = client
3763 .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
3764 .await
3765 .unwrap();
3766 assert_eq!(results.len(), 1);
3767 assert_eq!(results[0].address, addr);
3768 assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
3769 assert_eq!(
3770 results[0].object_id,
3771 ObjectId::new(ObjectType::AnalogInput, 7)
3772 );
3773 assert_eq!(results[0].object_name, "Zone Temp");
3774
3775 let sent = state.sent.lock().await;
3776 assert_eq!(sent.len(), 1);
3777 let mut r = Reader::new(&sent[0].1);
3778 let _npdu = Npdu::decode(&mut r).unwrap();
3779 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3780 assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
3781 }
3782
3783 #[tokio::test]
3784 async fn device_communication_control_handles_simple_ack() {
3785 let (dl, state) = MockDataLink::new();
3786 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3787 let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
3788
3789 let mut apdu = [0u8; 32];
3790 let mut w = Writer::new(&mut apdu);
3791 SimpleAck {
3792 invoke_id: 1,
3793 service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
3794 }
3795 .encode(&mut w)
3796 .unwrap();
3797 state
3798 .recv
3799 .lock()
3800 .await
3801 .push_back((with_npdu(w.as_written()), addr));
3802
3803 client
3804 .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
3805 .await
3806 .unwrap();
3807
3808 let sent = state.sent.lock().await;
3809 assert_eq!(sent.len(), 1);
3810 let mut r = Reader::new(&sent[0].1);
3811 let _npdu = Npdu::decode(&mut r).unwrap();
3812 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3813 assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
3814 }
3815
3816 #[tokio::test]
3817 async fn reinitialize_device_handles_simple_ack() {
3818 let (dl, state) = MockDataLink::new();
3819 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3820 let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
3821
3822 let mut apdu = [0u8; 32];
3823 let mut w = Writer::new(&mut apdu);
3824 SimpleAck {
3825 invoke_id: 1,
3826 service_choice: SERVICE_REINITIALIZE_DEVICE,
3827 }
3828 .encode(&mut w)
3829 .unwrap();
3830 state
3831 .recv
3832 .lock()
3833 .await
3834 .push_back((with_npdu(w.as_written()), addr));
3835
3836 client
3837 .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
3838 .await
3839 .unwrap();
3840
3841 let sent = state.sent.lock().await;
3842 assert_eq!(sent.len(), 1);
3843 let mut r = Reader::new(&sent[0].1);
3844 let _npdu = Npdu::decode(&mut r).unwrap();
3845 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3846 assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
3847 }
3848
3849 #[tokio::test]
3850 async fn time_synchronize_sends_unconfirmed_request() {
3851 let (dl, state) = MockDataLink::new();
3852 let client = BacnetClient::with_datalink(dl);
3853 let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
3854
3855 client
3856 .time_synchronize(
3857 addr,
3858 Date {
3859 year_since_1900: 126,
3860 month: 2,
3861 day: 7,
3862 weekday: 6,
3863 },
3864 Time {
3865 hour: 10,
3866 minute: 11,
3867 second: 12,
3868 hundredths: 13,
3869 },
3870 false,
3871 )
3872 .await
3873 .unwrap();
3874
3875 let sent = state.sent.lock().await;
3876 assert_eq!(sent.len(), 1);
3877 let mut r = Reader::new(&sent[0].1);
3878 let _npdu = Npdu::decode(&mut r).unwrap();
3879 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3880 assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
3881 }
3882
3883 #[tokio::test]
3884 async fn get_alarm_summary_decodes_complex_ack() {
3885 let (dl, state) = MockDataLink::new();
3886 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3887 let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
3888
3889 state
3890 .recv
3891 .lock()
3892 .await
3893 .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
3894
3895 let summaries = client.get_alarm_summary(addr).await.unwrap();
3896 assert_eq!(summaries.len(), 2);
3897 assert_eq!(
3898 summaries[0],
3899 AlarmSummaryItem {
3900 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3901 alarm_state_raw: 1,
3902 alarm_state: Some(EventState::Fault),
3903 acknowledged_transitions: crate::ClientBitString {
3904 unused_bits: 5,
3905 data: vec![0b1110_0000],
3906 },
3907 }
3908 );
3909 assert_eq!(
3910 summaries[1],
3911 AlarmSummaryItem {
3912 object_id: ObjectId::new(ObjectType::BinaryInput, 2),
3913 alarm_state_raw: 0,
3914 alarm_state: Some(EventState::Normal),
3915 acknowledged_transitions: crate::ClientBitString {
3916 unused_bits: 5,
3917 data: vec![0b1100_0000],
3918 },
3919 }
3920 );
3921
3922 let sent = state.sent.lock().await;
3923 assert_eq!(sent.len(), 1);
3924 let mut r = Reader::new(&sent[0].1);
3925 let _npdu = Npdu::decode(&mut r).unwrap();
3926 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3927 assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
3928 }
3929
3930 #[tokio::test]
3931 async fn get_enrollment_summary_decodes_complex_ack() {
3932 let (dl, state) = MockDataLink::new();
3933 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3934 let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
3935
3936 state
3937 .recv
3938 .lock()
3939 .await
3940 .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
3941
3942 let summaries = client.get_enrollment_summary(addr).await.unwrap();
3943 assert_eq!(summaries.len(), 2);
3944 assert_eq!(
3945 summaries[0],
3946 EnrollmentSummaryItem {
3947 object_id: ObjectId::new(ObjectType::AnalogInput, 7),
3948 event_type: 1,
3949 event_state_raw: 2,
3950 event_state: Some(EventState::Offnormal),
3951 priority: 200,
3952 notification_class: 10,
3953 }
3954 );
3955 assert_eq!(
3956 summaries[1],
3957 EnrollmentSummaryItem {
3958 object_id: ObjectId::new(ObjectType::BinaryInput, 8),
3959 event_type: 0,
3960 event_state_raw: 0,
3961 event_state: Some(EventState::Normal),
3962 priority: 20,
3963 notification_class: 11,
3964 }
3965 );
3966
3967 let sent = state.sent.lock().await;
3968 assert_eq!(sent.len(), 1);
3969 let mut r = Reader::new(&sent[0].1);
3970 let _npdu = Npdu::decode(&mut r).unwrap();
3971 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3972 assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
3973 }
3974
3975 #[tokio::test]
3976 async fn get_event_information_decodes_complex_ack() {
3977 let (dl, state) = MockDataLink::new();
3978 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3979 let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
3980
3981 state
3982 .recv
3983 .lock()
3984 .await
3985 .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
3986
3987 let result = client.get_event_information(addr, None).await.unwrap();
3988 assert!(!result.more_events);
3989 assert_eq!(result.summaries.len(), 1);
3990 assert_eq!(
3991 result.summaries[0],
3992 EventInformationItem {
3993 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3994 event_state_raw: 2,
3995 event_state: Some(EventState::Offnormal),
3996 acknowledged_transitions: crate::ClientBitString {
3997 unused_bits: 5,
3998 data: vec![0b1110_0000],
3999 },
4000 notify_type: 0,
4001 event_enable: crate::ClientBitString {
4002 unused_bits: 5,
4003 data: vec![0b1100_0000],
4004 },
4005 event_priorities: [1, 2, 3],
4006 }
4007 );
4008 }
4009
4010 #[tokio::test]
4011 async fn acknowledge_alarm_handles_simple_ack() {
4012 let (dl, state) = MockDataLink::new();
4013 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4014 let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
4015
4016 let mut apdu = [0u8; 32];
4017 let mut w = Writer::new(&mut apdu);
4018 SimpleAck {
4019 invoke_id: 1,
4020 service_choice: SERVICE_ACKNOWLEDGE_ALARM,
4021 }
4022 .encode(&mut w)
4023 .unwrap();
4024 state
4025 .recv
4026 .lock()
4027 .await
4028 .push_back((with_npdu(w.as_written()), addr));
4029
4030 client
4031 .acknowledge_alarm(
4032 addr,
4033 AcknowledgeAlarmRequest {
4034 acknowledging_process_id: 10,
4035 event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4036 event_state_acknowledged: EventState::Offnormal,
4037 event_time_stamp: TimeStamp::SequenceNumber(42),
4038 acknowledgment_source: "operator",
4039 time_of_acknowledgment: TimeStamp::DateTime {
4040 date: Date {
4041 year_since_1900: 126,
4042 month: 2,
4043 day: 7,
4044 weekday: 6,
4045 },
4046 time: Time {
4047 hour: 10,
4048 minute: 11,
4049 second: 12,
4050 hundredths: 13,
4051 },
4052 },
4053 invoke_id: 0,
4054 },
4055 )
4056 .await
4057 .unwrap();
4058
4059 let sent = state.sent.lock().await;
4060 assert_eq!(sent.len(), 1);
4061 let mut r = Reader::new(&sent[0].1);
4062 let _npdu = Npdu::decode(&mut r).unwrap();
4063 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4064 assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
4065 }
4066
4067 #[tokio::test]
4068 async fn create_object_by_type_decodes_complex_ack() {
4069 let (dl, state) = MockDataLink::new();
4070 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4071 let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
4072 let created = ObjectId::new(ObjectType::AnalogValue, 42);
4073
4074 state
4075 .recv
4076 .lock()
4077 .await
4078 .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
4079
4080 let result = client
4081 .create_object_by_type(addr, ObjectType::AnalogValue)
4082 .await
4083 .unwrap();
4084 assert_eq!(result, created);
4085
4086 let sent = state.sent.lock().await;
4087 let mut r = Reader::new(&sent[0].1);
4088 let _npdu = Npdu::decode(&mut r).unwrap();
4089 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4090 assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
4091 }
4092
4093 #[tokio::test]
4094 async fn delete_object_handles_simple_ack() {
4095 let (dl, state) = MockDataLink::new();
4096 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4097 let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
4098
4099 let mut apdu = [0u8; 32];
4100 let mut w = Writer::new(&mut apdu);
4101 SimpleAck {
4102 invoke_id: 1,
4103 service_choice: SERVICE_DELETE_OBJECT,
4104 }
4105 .encode(&mut w)
4106 .unwrap();
4107 state
4108 .recv
4109 .lock()
4110 .await
4111 .push_back((with_npdu(w.as_written()), addr));
4112
4113 client
4114 .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
4115 .await
4116 .unwrap();
4117 }
4118
4119 #[tokio::test]
4120 async fn add_list_element_handles_simple_ack() {
4121 let (dl, state) = MockDataLink::new();
4122 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4123 let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
4124
4125 let mut apdu = [0u8; 32];
4126 let mut w = Writer::new(&mut apdu);
4127 SimpleAck {
4128 invoke_id: 1,
4129 service_choice: SERVICE_ADD_LIST_ELEMENT,
4130 }
4131 .encode(&mut w)
4132 .unwrap();
4133 state
4134 .recv
4135 .lock()
4136 .await
4137 .push_back((with_npdu(w.as_written()), addr));
4138
4139 let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
4140 client
4141 .add_list_element(
4142 addr,
4143 AddListElementRequest {
4144 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4145 property_id: PropertyId::Proprietary(512),
4146 array_index: None,
4147 elements: &values,
4148 invoke_id: 0,
4149 },
4150 )
4151 .await
4152 .unwrap();
4153 }
4154
4155 #[tokio::test]
4156 async fn remove_list_element_handles_simple_ack() {
4157 let (dl, state) = MockDataLink::new();
4158 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4159 let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
4160
4161 let mut apdu = [0u8; 32];
4162 let mut w = Writer::new(&mut apdu);
4163 SimpleAck {
4164 invoke_id: 1,
4165 service_choice: SERVICE_REMOVE_LIST_ELEMENT,
4166 }
4167 .encode(&mut w)
4168 .unwrap();
4169 state
4170 .recv
4171 .lock()
4172 .await
4173 .push_back((with_npdu(w.as_written()), addr));
4174
4175 let values = [DataValue::Unsigned(1)];
4176 client
4177 .remove_list_element(
4178 addr,
4179 RemoveListElementRequest {
4180 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4181 property_id: PropertyId::Proprietary(513),
4182 array_index: None,
4183 elements: &values,
4184 invoke_id: 0,
4185 },
4186 )
4187 .await
4188 .unwrap();
4189 }
4190
4191 #[tokio::test]
4192 async fn atomic_read_file_stream_decodes_complex_ack() {
4193 let (dl, state) = MockDataLink::new();
4194 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4195 let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
4196 let file_object = ObjectId::new(ObjectType::File, 2);
4197
4198 state.recv.lock().await.push_back((
4199 with_npdu(&atomic_read_file_stream_ack_apdu(
4200 1,
4201 true,
4202 &[0xAA, 0xBB, 0xCC],
4203 )),
4204 addr,
4205 ));
4206
4207 let result = client
4208 .atomic_read_file_stream(addr, file_object, 0, 3)
4209 .await
4210 .unwrap();
4211
4212 assert_eq!(
4213 result,
4214 AtomicReadFileResult::Stream {
4215 end_of_file: true,
4216 file_start_position: 0,
4217 file_data: vec![0xAA, 0xBB, 0xCC],
4218 }
4219 );
4220
4221 let sent = state.sent.lock().await;
4222 assert_eq!(sent.len(), 1);
4223 let mut r = Reader::new(&sent[0].1);
4224 let _npdu = Npdu::decode(&mut r).unwrap();
4225 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4226 assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
4227 }
4228
4229 #[tokio::test]
4230 async fn atomic_read_file_record_decodes_complex_ack() {
4231 let (dl, state) = MockDataLink::new();
4232 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4233 let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
4234 let file_object = ObjectId::new(ObjectType::File, 5);
4235
4236 state
4237 .recv
4238 .lock()
4239 .await
4240 .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
4241
4242 let result = client
4243 .atomic_read_file_record(addr, file_object, 7, 2)
4244 .await
4245 .unwrap();
4246
4247 assert_eq!(
4248 result,
4249 AtomicReadFileResult::Record {
4250 end_of_file: false,
4251 file_start_record: 7,
4252 returned_record_count: 2,
4253 file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
4254 }
4255 );
4256 }
4257
4258 #[tokio::test]
4259 async fn atomic_write_file_stream_decodes_complex_ack() {
4260 let (dl, state) = MockDataLink::new();
4261 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4262 let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
4263 let file_object = ObjectId::new(ObjectType::File, 3);
4264
4265 state
4266 .recv
4267 .lock()
4268 .await
4269 .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
4270
4271 let result = client
4272 .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
4273 .await
4274 .unwrap();
4275
4276 assert_eq!(
4277 result,
4278 AtomicWriteFileResult::Stream {
4279 file_start_position: 128
4280 }
4281 );
4282 }
4283
4284 #[tokio::test]
4285 async fn atomic_write_file_record_decodes_complex_ack() {
4286 let (dl, state) = MockDataLink::new();
4287 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4288 let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
4289 let file_object = ObjectId::new(ObjectType::File, 9);
4290 let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
4291
4292 state
4293 .recv
4294 .lock()
4295 .await
4296 .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
4297
4298 let result = client
4299 .atomic_write_file_record(addr, file_object, 7, &records)
4300 .await
4301 .unwrap();
4302
4303 assert_eq!(
4304 result,
4305 AtomicWriteFileResult::Record {
4306 file_start_record: 7
4307 }
4308 );
4309 }
4310
4311 #[tokio::test]
4312 async fn read_properties_decodes_complex_ack() {
4313 let (dl, state) = MockDataLink::new();
4314 let client = BacnetClient::with_datalink(dl);
4315 let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
4316 let object_id = ObjectId::new(ObjectType::Device, 1);
4317
4318 let mut apdu_buf = [0u8; 256];
4319 let mut w = Writer::new(&mut apdu_buf);
4320 ComplexAckHeader {
4321 segmented: false,
4322 more_follows: false,
4323 invoke_id: 1,
4324 sequence_number: None,
4325 proposed_window_size: None,
4326 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4327 }
4328 .encode(&mut w)
4329 .unwrap();
4330 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4331 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4332 .encode(&mut w)
4333 .unwrap();
4334 encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
4335 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4336 .encode(&mut w)
4337 .unwrap();
4338 encode_app_real(&mut w, 55.5).unwrap();
4339 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4340 .encode(&mut w)
4341 .unwrap();
4342 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4343 .encode(&mut w)
4344 .unwrap();
4345
4346 state
4347 .recv
4348 .lock()
4349 .await
4350 .push_back((with_npdu(w.as_written()), addr));
4351
4352 let values = client
4353 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4354 .await
4355 .unwrap();
4356 assert_eq!(values.len(), 1);
4357 assert_eq!(values[0].0, PropertyId::PresentValue);
4358 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
4359
4360 let sent = state.sent.lock().await;
4361 assert_eq!(sent.len(), 1);
4362 let mut r = Reader::new(&sent[0].1);
4363 let _npdu = Npdu::decode(&mut r).unwrap();
4364 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4365 assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
4366 }
4367
4368 #[tokio::test]
4369 async fn read_property_multiple_reassembles_segmented_complex_ack() {
4370 let (dl, state) = MockDataLink::new();
4371 let client = BacnetClient::with_datalink(dl);
4372 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4373 let object_id = ObjectId::new(ObjectType::Device, 1);
4374
4375 let mut payload_buf = [0u8; 256];
4376 let mut pw = Writer::new(&mut payload_buf);
4377 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4378 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4379 .encode(&mut pw)
4380 .unwrap();
4381 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4382 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4383 .encode(&mut pw)
4384 .unwrap();
4385 encode_app_real(&mut pw, 66.0).unwrap();
4386 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4387 .encode(&mut pw)
4388 .unwrap();
4389 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4390 .encode(&mut pw)
4391 .unwrap();
4392 let payload = pw.as_written();
4393 let split = payload.len() / 2;
4394
4395 let mut apdu1 = [0u8; 256];
4396 let mut w1 = Writer::new(&mut apdu1);
4397 ComplexAckHeader {
4398 segmented: true,
4399 more_follows: true,
4400 invoke_id: 1,
4401 sequence_number: Some(0),
4402 proposed_window_size: Some(1),
4403 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4404 }
4405 .encode(&mut w1)
4406 .unwrap();
4407 w1.write_all(&payload[..split]).unwrap();
4408
4409 let mut apdu2 = [0u8; 256];
4410 let mut w2 = Writer::new(&mut apdu2);
4411 ComplexAckHeader {
4412 segmented: true,
4413 more_follows: false,
4414 invoke_id: 1,
4415 sequence_number: Some(1),
4416 proposed_window_size: Some(1),
4417 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4418 }
4419 .encode(&mut w2)
4420 .unwrap();
4421 w2.write_all(&payload[split..]).unwrap();
4422
4423 state
4424 .recv
4425 .lock()
4426 .await
4427 .push_back((with_npdu(w1.as_written()), addr));
4428 state
4429 .recv
4430 .lock()
4431 .await
4432 .push_back((with_npdu(w2.as_written()), addr));
4433
4434 let values = client
4435 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4436 .await
4437 .unwrap();
4438 assert_eq!(values.len(), 1);
4439 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4440
4441 let sent = state.sent.lock().await;
4442 assert!(sent.len() >= 3);
4443
4444 let mut saw_segment_ack = 0usize;
4445 for (_, frame) in sent.iter().skip(1) {
4446 let mut r = Reader::new(frame);
4447 let _npdu = Npdu::decode(&mut r).unwrap();
4448 let apdu = r.read_exact(r.remaining()).unwrap();
4449 if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
4450 let mut sr = Reader::new(apdu);
4451 let sack = SegmentAck::decode(&mut sr).unwrap();
4452 assert_eq!(sack.invoke_id, 1);
4453 saw_segment_ack += 1;
4454 }
4455 }
4456 assert!(saw_segment_ack >= 1);
4457 }
4458
4459 #[tokio::test]
4460 async fn read_property_multiple_tolerates_duplicate_segment() {
4461 let (dl, state) = MockDataLink::new();
4462 let client = BacnetClient::with_datalink(dl);
4463 let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
4464 let object_id = ObjectId::new(ObjectType::Device, 1);
4465
4466 let mut payload_buf = [0u8; 256];
4467 let mut pw = Writer::new(&mut payload_buf);
4468 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4469 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4470 .encode(&mut pw)
4471 .unwrap();
4472 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4473 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4474 .encode(&mut pw)
4475 .unwrap();
4476 encode_app_real(&mut pw, 66.0).unwrap();
4477 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4478 .encode(&mut pw)
4479 .unwrap();
4480 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4481 .encode(&mut pw)
4482 .unwrap();
4483 let payload = pw.as_written();
4484 let split = payload.len() / 2;
4485
4486 let mut apdu1 = [0u8; 256];
4487 let mut w1 = Writer::new(&mut apdu1);
4488 ComplexAckHeader {
4489 segmented: true,
4490 more_follows: true,
4491 invoke_id: 1,
4492 sequence_number: Some(0),
4493 proposed_window_size: Some(1),
4494 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4495 }
4496 .encode(&mut w1)
4497 .unwrap();
4498 w1.write_all(&payload[..split]).unwrap();
4499
4500 let mut dup = [0u8; 256];
4501 let mut wd = Writer::new(&mut dup);
4502 ComplexAckHeader {
4503 segmented: true,
4504 more_follows: true,
4505 invoke_id: 1,
4506 sequence_number: Some(0),
4507 proposed_window_size: Some(1),
4508 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4509 }
4510 .encode(&mut wd)
4511 .unwrap();
4512 wd.write_all(&payload[..split]).unwrap();
4513
4514 let mut apdu2 = [0u8; 256];
4515 let mut w2 = Writer::new(&mut apdu2);
4516 ComplexAckHeader {
4517 segmented: true,
4518 more_follows: false,
4519 invoke_id: 1,
4520 sequence_number: Some(1),
4521 proposed_window_size: Some(1),
4522 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4523 }
4524 .encode(&mut w2)
4525 .unwrap();
4526 w2.write_all(&payload[split..]).unwrap();
4527
4528 {
4529 let mut recv = state.recv.lock().await;
4530 recv.push_back((with_npdu(w1.as_written()), addr));
4531 recv.push_back((with_npdu(wd.as_written()), addr));
4532 recv.push_back((with_npdu(w2.as_written()), addr));
4533 }
4534
4535 let values = client
4536 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4537 .await
4538 .unwrap();
4539 assert_eq!(values.len(), 1);
4540 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4541 }
4542
4543 #[tokio::test]
4544 async fn write_properties_handles_simple_ack() {
4545 let (dl, state) = MockDataLink::new();
4546 let client = BacnetClient::with_datalink(dl);
4547 let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
4548 let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
4549
4550 let mut apdu_buf = [0u8; 32];
4551 let mut w = Writer::new(&mut apdu_buf);
4552 SimpleAck {
4553 invoke_id: 1,
4554 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4555 }
4556 .encode(&mut w)
4557 .unwrap();
4558 state
4559 .recv
4560 .lock()
4561 .await
4562 .push_back((with_npdu(w.as_written()), addr));
4563
4564 let writes = [PropertyWriteSpec {
4565 property_id: PropertyId::PresentValue,
4566 array_index: None,
4567 value: DataValue::Real(12.5),
4568 priority: Some(8),
4569 }];
4570 client
4571 .write_property_multiple(addr, object_id, &writes)
4572 .await
4573 .unwrap();
4574
4575 let sent = state.sent.lock().await;
4576 assert_eq!(sent.len(), 1);
4577 let mut r = Reader::new(&sent[0].1);
4578 let _npdu = Npdu::decode(&mut r).unwrap();
4579 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4580 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4581 }
4582
4583 #[tokio::test]
4584 async fn subscribe_cov_handles_simple_ack() {
4585 let (dl, state) = MockDataLink::new();
4586 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4587 let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
4588
4589 let mut apdu_buf = [0u8; 32];
4590 let mut w = Writer::new(&mut apdu_buf);
4591 SimpleAck {
4592 invoke_id: 1,
4593 service_choice: SERVICE_SUBSCRIBE_COV,
4594 }
4595 .encode(&mut w)
4596 .unwrap();
4597 state
4598 .recv
4599 .lock()
4600 .await
4601 .push_back((with_npdu(w.as_written()), addr));
4602
4603 client
4604 .subscribe_cov(
4605 addr,
4606 SubscribeCovRequest {
4607 subscriber_process_id: 10,
4608 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4609 issue_confirmed_notifications: Some(false),
4610 lifetime_seconds: Some(300),
4611 invoke_id: 0,
4612 },
4613 )
4614 .await
4615 .unwrap();
4616
4617 let sent = state.sent.lock().await;
4618 assert_eq!(sent.len(), 1);
4619 let mut r = Reader::new(&sent[0].1);
4620 let _npdu = Npdu::decode(&mut r).unwrap();
4621 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4622 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
4623 }
4624
4625 #[tokio::test]
4626 async fn subscribe_cov_property_handles_simple_ack() {
4627 let (dl, state) = MockDataLink::new();
4628 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4629 let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
4630
4631 let mut apdu_buf = [0u8; 32];
4632 let mut w = Writer::new(&mut apdu_buf);
4633 SimpleAck {
4634 invoke_id: 1,
4635 service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
4636 }
4637 .encode(&mut w)
4638 .unwrap();
4639 state
4640 .recv
4641 .lock()
4642 .await
4643 .push_back((with_npdu(w.as_written()), addr));
4644
4645 client
4646 .subscribe_cov_property(
4647 addr,
4648 SubscribeCovPropertyRequest {
4649 subscriber_process_id: 22,
4650 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4651 issue_confirmed_notifications: Some(true),
4652 lifetime_seconds: Some(120),
4653 monitored_property_id: PropertyId::PresentValue,
4654 monitored_property_array_index: None,
4655 cov_increment: Some(0.1),
4656 invoke_id: 0,
4657 },
4658 )
4659 .await
4660 .unwrap();
4661
4662 let sent = state.sent.lock().await;
4663 assert_eq!(sent.len(), 1);
4664 let mut r = Reader::new(&sent[0].1);
4665 let _npdu = Npdu::decode(&mut r).unwrap();
4666 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4667 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
4668 }
4669
4670 #[tokio::test]
4671 async fn read_range_by_position_decodes_complex_ack() {
4672 let (dl, state) = MockDataLink::new();
4673 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4674 let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
4675 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4676
4677 let mut apdu_buf = [0u8; 256];
4678 let mut w = Writer::new(&mut apdu_buf);
4679 ComplexAckHeader {
4680 segmented: false,
4681 more_follows: false,
4682 invoke_id: 1,
4683 sequence_number: None,
4684 proposed_window_size: None,
4685 service_choice: SERVICE_READ_RANGE,
4686 }
4687 .encode(&mut w)
4688 .unwrap();
4689 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
4690 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4691 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
4692 w.write_u8(5).unwrap();
4693 w.write_u8(0b1110_0000).unwrap();
4694 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
4695 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
4696 encode_app_real(&mut w, 42.0).unwrap();
4697 encode_app_real(&mut w, 43.0).unwrap();
4698 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
4699
4700 state
4701 .recv
4702 .lock()
4703 .await
4704 .push_back((with_npdu(w.as_written()), addr));
4705
4706 let result = client
4707 .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
4708 .await
4709 .unwrap();
4710 assert_eq!(result.object_id, object_id);
4711 assert_eq!(result.item_count, 2);
4712 assert_eq!(result.items.len(), 2);
4713 assert!(matches!(
4714 result.items[0],
4715 ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
4716 ));
4717 }
4718
4719 #[tokio::test]
4720 async fn read_range_by_sequence_number_encodes_range_selector() {
4721 let (dl, state) = MockDataLink::new();
4722 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4723 let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
4724 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4725
4726 state
4727 .recv
4728 .lock()
4729 .await
4730 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4731
4732 let _ = client
4733 .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
4734 .await
4735 .unwrap();
4736
4737 let sent = state.sent.lock().await;
4738 let mut r = Reader::new(&sent[0].1);
4739 let _npdu = Npdu::decode(&mut r).unwrap();
4740 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4741 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4742 match Tag::decode(&mut r).unwrap() {
4743 Tag::Context { tag_num: 0, len: 4 } => {
4744 let _ = r.read_exact(4).unwrap();
4745 }
4746 other => panic!("unexpected object id tag: {other:?}"),
4747 }
4748 match Tag::decode(&mut r).unwrap() {
4749 Tag::Context { tag_num: 1, len } => {
4750 let _ = decode_unsigned(&mut r, len as usize).unwrap();
4751 }
4752 other => panic!("unexpected property tag: {other:?}"),
4753 }
4754 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
4755 match Tag::decode(&mut r).unwrap() {
4756 Tag::Application {
4757 tag: AppTag::UnsignedInt,
4758 len,
4759 } => {
4760 assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
4761 }
4762 other => panic!("unexpected ref seq tag: {other:?}"),
4763 }
4764 match Tag::decode(&mut r).unwrap() {
4765 Tag::Application {
4766 tag: AppTag::SignedInt,
4767 len,
4768 } => {
4769 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4770 }
4771 other => panic!("unexpected count tag: {other:?}"),
4772 }
4773 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
4774 }
4775
4776 #[tokio::test]
4777 async fn read_range_by_time_encodes_range_selector() {
4778 let (dl, state) = MockDataLink::new();
4779 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4780 let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
4781 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4782 let date = Date {
4783 year_since_1900: 126,
4784 month: 2,
4785 day: 7,
4786 weekday: 6,
4787 };
4788 let time = Time {
4789 hour: 10,
4790 minute: 11,
4791 second: 12,
4792 hundredths: 13,
4793 };
4794
4795 state
4796 .recv
4797 .lock()
4798 .await
4799 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4800
4801 let _ = client
4802 .read_range_by_time(
4803 addr,
4804 object_id,
4805 PropertyId::PresentValue,
4806 None,
4807 (date, time),
4808 2,
4809 )
4810 .await
4811 .unwrap();
4812
4813 let sent = state.sent.lock().await;
4814 let mut r = Reader::new(&sent[0].1);
4815 let _npdu = Npdu::decode(&mut r).unwrap();
4816 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4817 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4818 match Tag::decode(&mut r).unwrap() {
4819 Tag::Context { tag_num: 0, len: 4 } => {
4820 let _ = r.read_exact(4).unwrap();
4821 }
4822 other => panic!("unexpected object id tag: {other:?}"),
4823 }
4824 match Tag::decode(&mut r).unwrap() {
4825 Tag::Context { tag_num: 1, len } => {
4826 let _ = decode_unsigned(&mut r, len as usize).unwrap();
4827 }
4828 other => panic!("unexpected property tag: {other:?}"),
4829 }
4830 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
4831 match Tag::decode(&mut r).unwrap() {
4832 Tag::Application {
4833 tag: AppTag::Date,
4834 len: 4,
4835 } => {
4836 let raw = r.read_exact(4).unwrap();
4837 assert_eq!(
4838 raw,
4839 &[date.year_since_1900, date.month, date.day, date.weekday]
4840 );
4841 }
4842 other => panic!("unexpected date tag: {other:?}"),
4843 }
4844 match Tag::decode(&mut r).unwrap() {
4845 Tag::Application {
4846 tag: AppTag::Time,
4847 len: 4,
4848 } => {
4849 let raw = r.read_exact(4).unwrap();
4850 assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
4851 }
4852 other => panic!("unexpected time tag: {other:?}"),
4853 }
4854 match Tag::decode(&mut r).unwrap() {
4855 Tag::Application {
4856 tag: AppTag::SignedInt,
4857 len,
4858 } => {
4859 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4860 }
4861 other => panic!("unexpected count tag: {other:?}"),
4862 }
4863 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
4864 }
4865
4866 #[tokio::test]
4867 async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
4868 let (dl, state) = MockDataLink::new();
4869 let client = BacnetClient::with_datalink(dl);
4870 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4871
4872 let mut apdu = [0u8; 256];
4873 let mut w = Writer::new(&mut apdu);
4874 UnconfirmedRequestHeader {
4875 service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
4876 }
4877 .encode(&mut w)
4878 .unwrap();
4879 encode_ctx_unsigned(&mut w, 0, 17).unwrap();
4880 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4881 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
4882 encode_ctx_unsigned(&mut w, 3, 60).unwrap();
4883 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4884 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4885 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4886 encode_app_real(&mut w, 73.25).unwrap();
4887 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4888 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4889
4890 state
4891 .recv
4892 .lock()
4893 .await
4894 .push_back((with_npdu(w.as_written()), addr));
4895
4896 let notification = client
4897 .recv_cov_notification(Duration::from_secs(1))
4898 .await
4899 .unwrap()
4900 .unwrap();
4901 assert!(!notification.confirmed);
4902 assert_eq!(notification.subscriber_process_id, 17);
4903 assert_eq!(notification.values.len(), 1);
4904 assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
4905 assert!(matches!(
4906 notification.values[0].value,
4907 ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
4908 ));
4909
4910 let sent = state.sent.lock().await;
4911 assert!(sent.is_empty());
4912 }
4913
4914 #[tokio::test]
4915 async fn recv_confirmed_cov_notification_sends_simple_ack() {
4916 let (dl, state) = MockDataLink::new();
4917 let client = BacnetClient::with_datalink(dl);
4918 let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
4919
4920 let mut apdu = [0u8; 256];
4921 let mut w = Writer::new(&mut apdu);
4922 ConfirmedRequestHeader {
4923 segmented: false,
4924 more_follows: false,
4925 segmented_response_accepted: false,
4926 max_segments: 0,
4927 max_apdu: 5,
4928 invoke_id: 9,
4929 sequence_number: None,
4930 proposed_window_size: None,
4931 service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
4932 }
4933 .encode(&mut w)
4934 .unwrap();
4935 encode_ctx_unsigned(&mut w, 0, 18).unwrap();
4936 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4937 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
4938 encode_ctx_unsigned(&mut w, 3, 120).unwrap();
4939 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4940 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4941 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4942 encode_app_real(&mut w, 55.0).unwrap();
4943 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4944 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4945
4946 state
4947 .recv
4948 .lock()
4949 .await
4950 .push_back((with_npdu(w.as_written()), addr));
4951
4952 let notification = client
4953 .recv_cov_notification(Duration::from_secs(1))
4954 .await
4955 .unwrap()
4956 .unwrap();
4957 assert!(notification.confirmed);
4958 assert_eq!(notification.values.len(), 1);
4959
4960 let sent = state.sent.lock().await;
4961 assert_eq!(sent.len(), 1);
4962 let mut r = Reader::new(&sent[0].1);
4963 let _npdu = Npdu::decode(&mut r).unwrap();
4964 let ack = SimpleAck::decode(&mut r).unwrap();
4965 assert_eq!(ack.invoke_id, 9);
4966 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
4967 }
4968
4969 #[tokio::test]
4970 async fn recv_unconfirmed_event_notification_returns_decoded_value() {
4971 let (dl, state) = MockDataLink::new();
4972 let client = BacnetClient::with_datalink(dl);
4973 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4974
4975 let mut apdu = [0u8; 256];
4976 let mut w = Writer::new(&mut apdu);
4977 UnconfirmedRequestHeader {
4978 service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
4979 }
4980 .encode(&mut w)
4981 .unwrap();
4982 encode_ctx_unsigned(&mut w, 0, 99).unwrap();
4983 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4984 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
4985 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4986 encode_ctx_unsigned(&mut w, 1, 55).unwrap();
4987 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4988 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4989 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4990 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4991 encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
4992 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4993 Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
4994 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4995 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4996 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4997 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4998 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4999 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5000 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5001
5002 state
5003 .recv
5004 .lock()
5005 .await
5006 .push_back((with_npdu(w.as_written()), addr));
5007
5008 let notification: EventNotification = client
5009 .recv_event_notification(Duration::from_secs(1))
5010 .await
5011 .unwrap()
5012 .unwrap();
5013 assert!(!notification.confirmed);
5014 assert_eq!(notification.process_id, 99);
5015 assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
5016 assert_eq!(notification.ack_required, Some(true));
5017 assert_eq!(notification.from_state, Some(EventState::Offnormal));
5018 assert_eq!(notification.to_state, Some(EventState::Normal));
5019 assert_eq!(notification.notify_type, 0);
5020
5021 let sent = state.sent.lock().await;
5022 assert!(sent.is_empty());
5023 }
5024
5025 #[tokio::test]
5026 async fn recv_confirmed_event_notification_sends_simple_ack() {
5027 let (dl, state) = MockDataLink::new();
5028 let client = BacnetClient::with_datalink(dl);
5029 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5030
5031 let mut apdu = [0u8; 256];
5032 let mut w = Writer::new(&mut apdu);
5033 ConfirmedRequestHeader {
5034 segmented: false,
5035 more_follows: false,
5036 segmented_response_accepted: false,
5037 max_segments: 0,
5038 max_apdu: 5,
5039 invoke_id: 11,
5040 sequence_number: None,
5041 proposed_window_size: None,
5042 service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
5043 }
5044 .encode(&mut w)
5045 .unwrap();
5046 encode_ctx_unsigned(&mut w, 0, 100).unwrap();
5047 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5048 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
5049 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5050 encode_ctx_unsigned(&mut w, 1, 56).unwrap();
5051 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5052 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5053 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5054 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5055 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5056 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5057 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5058 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5059 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5060 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5061 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5062 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5063
5064 state
5065 .recv
5066 .lock()
5067 .await
5068 .push_back((with_npdu(w.as_written()), addr));
5069
5070 let notification = client
5071 .recv_event_notification(Duration::from_secs(1))
5072 .await
5073 .unwrap()
5074 .unwrap();
5075 assert!(notification.confirmed);
5076
5077 let sent = state.sent.lock().await;
5078 assert_eq!(sent.len(), 1);
5079 let mut r = Reader::new(&sent[0].1);
5080 let _npdu = Npdu::decode(&mut r).unwrap();
5081 let ack = SimpleAck::decode(&mut r).unwrap();
5082 assert_eq!(ack.invoke_id, 11);
5083 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
5084 }
5085
5086 #[tokio::test]
5087 async fn write_property_multiple_segments_large_request() {
5088 let (dl, state) = MockDataLink::new();
5089 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5090 let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
5091 let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
5092
5093 {
5094 let mut recv = state.recv.lock().await;
5095 for seq in 0u8..=254 {
5096 let mut apdu = [0u8; 16];
5097 let mut w = Writer::new(&mut apdu);
5098 SegmentAck {
5099 negative_ack: false,
5100 sent_by_server: true,
5101 invoke_id: 1,
5102 sequence_number: seq,
5103 actual_window_size: 1,
5104 }
5105 .encode(&mut w)
5106 .unwrap();
5107 recv.push_back((with_npdu(w.as_written()), addr));
5108 }
5109
5110 let mut apdu = [0u8; 16];
5111 let mut w = Writer::new(&mut apdu);
5112 SimpleAck {
5113 invoke_id: 1,
5114 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5115 }
5116 .encode(&mut w)
5117 .unwrap();
5118 recv.push_back((with_npdu(w.as_written()), addr));
5119 }
5120
5121 let writes: Vec<PropertyWriteSpec> = (0..180)
5122 .map(|_| PropertyWriteSpec {
5123 property_id: PropertyId::Description,
5124 array_index: None,
5125 value: DataValue::CharacterString(
5126 "rustbac segmented write test payload................................................................",
5127 ),
5128 priority: None,
5129 })
5130 .collect();
5131
5132 client
5133 .write_property_multiple(addr, object_id, &writes)
5134 .await
5135 .unwrap();
5136
5137 let sent = state.sent.lock().await;
5138 assert!(sent.len() > 1);
5139
5140 let mut seqs = Vec::new();
5141 let mut saw_more_follows = false;
5142 let mut saw_last = false;
5143 for (_, frame) in sent.iter() {
5144 let mut r = Reader::new(frame);
5145 let _npdu = Npdu::decode(&mut r).unwrap();
5146 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5147 assert!(hdr.segmented);
5148 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5149 if hdr.more_follows {
5150 saw_more_follows = true;
5151 } else {
5152 saw_last = true;
5153 }
5154 seqs.push(hdr.sequence_number.unwrap());
5155 }
5156
5157 assert!(saw_more_follows);
5158 assert!(saw_last);
5159 for (idx, seq) in seqs.iter().enumerate() {
5160 assert_eq!(*seq as usize, idx);
5161 }
5162 }
5163
5164 #[tokio::test]
5165 async fn write_property_multiple_uses_configured_segment_window() {
5166 let (dl, state) = MockDataLink::new();
5167 let client = BacnetClient::with_datalink(dl)
5168 .with_response_timeout(Duration::from_secs(1))
5169 .with_segmented_request_window_size(4);
5170 let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
5171 let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
5172
5173 {
5174 let mut recv = state.recv.lock().await;
5175 for seq in 0u8..=254 {
5176 let mut apdu = [0u8; 16];
5177 let mut w = Writer::new(&mut apdu);
5178 SegmentAck {
5179 negative_ack: false,
5180 sent_by_server: true,
5181 invoke_id: 1,
5182 sequence_number: seq,
5183 actual_window_size: 4,
5184 }
5185 .encode(&mut w)
5186 .unwrap();
5187 recv.push_back((with_npdu(w.as_written()), addr));
5188 }
5189
5190 let mut apdu = [0u8; 16];
5191 let mut w = Writer::new(&mut apdu);
5192 SimpleAck {
5193 invoke_id: 1,
5194 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5195 }
5196 .encode(&mut w)
5197 .unwrap();
5198 recv.push_back((with_npdu(w.as_written()), addr));
5199 }
5200
5201 let writes: Vec<PropertyWriteSpec> = (0..180)
5202 .map(|_| PropertyWriteSpec {
5203 property_id: PropertyId::Description,
5204 array_index: None,
5205 value: DataValue::CharacterString(
5206 "rustbac segmented write test payload................................................................",
5207 ),
5208 priority: None,
5209 })
5210 .collect();
5211
5212 client
5213 .write_property_multiple(addr, object_id, &writes)
5214 .await
5215 .unwrap();
5216
5217 let sent = state.sent.lock().await;
5218 assert!(sent.len() > 4);
5219 for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
5220 let mut r = Reader::new(frame);
5221 let _npdu = Npdu::decode(&mut r).unwrap();
5222 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5223 assert!(hdr.segmented);
5224 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5225 assert_eq!(hdr.sequence_number, Some(idx as u8));
5226 assert_eq!(hdr.proposed_window_size, Some(4));
5227 }
5228 }
5229
5230 #[tokio::test]
5231 async fn write_property_multiple_adapts_window_to_peer_ack_window() {
5232 let (dl, state) = MockDataLink::new();
5233 let client = BacnetClient::with_datalink(dl)
5234 .with_response_timeout(Duration::from_secs(1))
5235 .with_segmented_request_window_size(4);
5236 let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
5237 let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
5238
5239 {
5240 let mut recv = state.recv.lock().await;
5241 for seq in 0u8..=254 {
5242 let mut apdu = [0u8; 16];
5243 let mut w = Writer::new(&mut apdu);
5244 SegmentAck {
5245 negative_ack: false,
5246 sent_by_server: true,
5247 invoke_id: 1,
5248 sequence_number: seq,
5249 actual_window_size: 2,
5250 }
5251 .encode(&mut w)
5252 .unwrap();
5253 recv.push_back((with_npdu(w.as_written()), addr));
5254 }
5255
5256 let mut apdu = [0u8; 16];
5257 let mut w = Writer::new(&mut apdu);
5258 SimpleAck {
5259 invoke_id: 1,
5260 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5261 }
5262 .encode(&mut w)
5263 .unwrap();
5264 recv.push_back((with_npdu(w.as_written()), addr));
5265 }
5266
5267 let writes: Vec<PropertyWriteSpec> = (0..180)
5268 .map(|_| PropertyWriteSpec {
5269 property_id: PropertyId::Description,
5270 array_index: None,
5271 value: DataValue::CharacterString(
5272 "rustbac segmented write test payload................................................................",
5273 ),
5274 priority: None,
5275 })
5276 .collect();
5277
5278 client
5279 .write_property_multiple(addr, object_id, &writes)
5280 .await
5281 .unwrap();
5282
5283 let sent = state.sent.lock().await;
5284 let mut saw_adapted_window = false;
5285 for (_, frame) in sent.iter() {
5286 let mut r = Reader::new(frame);
5287 let _npdu = Npdu::decode(&mut r).unwrap();
5288 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5289 if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
5290 saw_adapted_window = true;
5291 break;
5292 }
5293 }
5294 assert!(saw_adapted_window);
5295 }
5296
5297 #[tokio::test]
5298 async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
5299 let (dl, state) = MockDataLink::new();
5300 let client = BacnetClient::with_datalink(dl)
5301 .with_response_timeout(Duration::from_secs(1))
5302 .with_segmented_request_window_size(1)
5303 .with_segmented_request_retries(1);
5304 let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
5305 let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
5306
5307 {
5308 let mut recv = state.recv.lock().await;
5309
5310 let mut nack_apdu = [0u8; 16];
5311 let mut nack_w = Writer::new(&mut nack_apdu);
5312 SegmentAck {
5313 negative_ack: true,
5314 sent_by_server: true,
5315 invoke_id: 1,
5316 sequence_number: 0,
5317 actual_window_size: 1,
5318 }
5319 .encode(&mut nack_w)
5320 .unwrap();
5321 recv.push_back((with_npdu(nack_w.as_written()), addr));
5322
5323 for seq in 0u8..=254 {
5324 let mut apdu = [0u8; 16];
5325 let mut w = Writer::new(&mut apdu);
5326 SegmentAck {
5327 negative_ack: false,
5328 sent_by_server: true,
5329 invoke_id: 1,
5330 sequence_number: seq,
5331 actual_window_size: 1,
5332 }
5333 .encode(&mut w)
5334 .unwrap();
5335 recv.push_back((with_npdu(w.as_written()), addr));
5336 }
5337
5338 let mut apdu = [0u8; 16];
5339 let mut w = Writer::new(&mut apdu);
5340 SimpleAck {
5341 invoke_id: 1,
5342 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5343 }
5344 .encode(&mut w)
5345 .unwrap();
5346 recv.push_back((with_npdu(w.as_written()), addr));
5347 }
5348
5349 let writes: Vec<PropertyWriteSpec> = (0..180)
5350 .map(|_| PropertyWriteSpec {
5351 property_id: PropertyId::Description,
5352 array_index: None,
5353 value: DataValue::CharacterString(
5354 "rustbac segmented write test payload................................................................",
5355 ),
5356 priority: None,
5357 })
5358 .collect();
5359
5360 client
5361 .write_property_multiple(addr, object_id, &writes)
5362 .await
5363 .unwrap();
5364
5365 let sent = state.sent.lock().await;
5366 let mut seq0_frames = 0usize;
5367 for (_, frame) in sent.iter() {
5368 let mut r = Reader::new(frame);
5369 let _npdu = Npdu::decode(&mut r).unwrap();
5370 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5371 if hdr.sequence_number == Some(0) {
5372 seq0_frames += 1;
5373 }
5374 }
5375 assert!(seq0_frames >= 2);
5376 }
5377
5378 #[tokio::test]
5379 async fn read_property_ignores_invalid_frames_until_valid_response() {
5380 let (dl, state) = MockDataLink::new();
5381 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5382 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
5383 let state_for_task = state.clone();
5384
5385 tokio::spawn(async move {
5386 tokio::time::sleep(Duration::from_millis(20)).await;
5387 let mut apdu = [0u8; 128];
5388 let mut w = Writer::new(&mut apdu);
5389 ComplexAckHeader {
5390 segmented: false,
5391 more_follows: false,
5392 invoke_id: 1,
5393 sequence_number: None,
5394 proposed_window_size: None,
5395 service_choice: SERVICE_READ_PROPERTY,
5396 }
5397 .encode(&mut w)
5398 .unwrap();
5399 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5400 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
5401 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5402 encode_app_real(&mut w, 77.0).unwrap();
5403 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5404 state_for_task
5405 .recv
5406 .lock()
5407 .await
5408 .push_back((with_npdu(w.as_written()), addr));
5409 });
5410
5411 let value = client
5412 .read_property(
5413 addr,
5414 ObjectId::new(ObjectType::Device, 1),
5415 PropertyId::PresentValue,
5416 )
5417 .await
5418 .unwrap();
5419 assert!(matches!(
5420 value,
5421 ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
5422 ));
5423 }
5424
5425 #[tokio::test]
5426 async fn read_property_maps_reject() {
5427 let (dl, state) = MockDataLink::new();
5428 let client = BacnetClient::with_datalink(dl);
5429 let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
5430
5431 let mut apdu = [0u8; 8];
5432 let mut w = Writer::new(&mut apdu);
5433 w.write_u8((ApduType::Reject as u8) << 4).unwrap();
5434 w.write_u8(1).unwrap(); w.write_u8(2).unwrap(); state
5437 .recv
5438 .lock()
5439 .await
5440 .push_back((with_npdu(w.as_written()), addr));
5441
5442 let err = client
5443 .read_property(
5444 addr,
5445 ObjectId::new(ObjectType::Device, 1),
5446 PropertyId::ObjectName,
5447 )
5448 .await
5449 .unwrap_err();
5450 assert!(matches!(
5451 err,
5452 crate::ClientError::RemoteReject { reason: 2 }
5453 ));
5454 }
5455
5456 #[tokio::test]
5457 async fn read_property_maps_remote_error_details() {
5458 let (dl, state) = MockDataLink::new();
5459 let client = BacnetClient::with_datalink(dl);
5460 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5461
5462 let mut apdu = [0u8; 16];
5463 let mut w = Writer::new(&mut apdu);
5464 w.write_u8((ApduType::Error as u8) << 4).unwrap();
5465 w.write_u8(1).unwrap(); w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
5467 .unwrap();
5468 Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
5469 w.write_u8(2).unwrap(); Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
5471 w.write_u8(32).unwrap(); state
5474 .recv
5475 .lock()
5476 .await
5477 .push_back((with_npdu(w.as_written()), addr));
5478
5479 let err = client
5480 .read_property(
5481 addr,
5482 ObjectId::new(ObjectType::Device, 1),
5483 PropertyId::ObjectName,
5484 )
5485 .await
5486 .unwrap_err();
5487 assert!(matches!(
5488 err,
5489 crate::ClientError::RemoteServiceError {
5490 service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
5491 error_class_raw: Some(2),
5492 error_code_raw: Some(32),
5493 error_class: Some(rustbac_core::types::ErrorClass::Property),
5494 error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
5495 }
5496 ));
5497 }
5498
5499 #[tokio::test]
5500 async fn write_property_maps_abort() {
5501 let (dl, state) = MockDataLink::new();
5502 let client = BacnetClient::with_datalink(dl);
5503 let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
5504
5505 let mut apdu = [0u8; 8];
5506 let mut w = Writer::new(&mut apdu);
5507 w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); w.write_u8(1).unwrap(); w.write_u8(9).unwrap(); state
5511 .recv
5512 .lock()
5513 .await
5514 .push_back((with_npdu(w.as_written()), addr));
5515
5516 let req = rustbac_core::services::write_property::WritePropertyRequest {
5517 object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
5518 property_id: PropertyId::PresentValue,
5519 value: DataValue::Real(10.0),
5520 priority: Some(8),
5521 ..Default::default()
5522 };
5523 let err = client.write_property(addr, req).await.unwrap_err();
5524 assert!(matches!(
5525 err,
5526 crate::ClientError::RemoteAbort {
5527 reason: 9,
5528 server: true
5529 }
5530 ));
5531 }
5532
5533 #[tokio::test]
5534 async fn read_property_multiple_returns_owned_string() {
5535 let (dl, state) = MockDataLink::new();
5536 let client = BacnetClient::with_datalink(dl);
5537 let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
5538 let object_id = ObjectId::new(ObjectType::Device, 1);
5539
5540 let mut apdu_buf = [0u8; 256];
5541 let mut w = Writer::new(&mut apdu_buf);
5542 ComplexAckHeader {
5543 segmented: false,
5544 more_follows: false,
5545 invoke_id: 1,
5546 sequence_number: None,
5547 proposed_window_size: None,
5548 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
5549 }
5550 .encode(&mut w)
5551 .unwrap();
5552 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
5553 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
5554 .encode(&mut w)
5555 .unwrap();
5556 encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
5557 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
5558 .encode(&mut w)
5559 .unwrap();
5560 rustbac_core::services::value_codec::encode_application_data_value(
5561 &mut w,
5562 &DataValue::CharacterString("AHU-1"),
5563 )
5564 .unwrap();
5565 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
5566 .encode(&mut w)
5567 .unwrap();
5568 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
5569 .encode(&mut w)
5570 .unwrap();
5571
5572 state
5573 .recv
5574 .lock()
5575 .await
5576 .push_back((with_npdu(w.as_written()), addr));
5577
5578 let values = client
5579 .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
5580 .await
5581 .unwrap();
5582 assert_eq!(values.len(), 1);
5583 assert_eq!(values[0].0, PropertyId::ObjectName);
5584 assert!(matches!(
5585 &values[0].1,
5586 ClientDataValue::CharacterString(s) if s == "AHU-1"
5587 ));
5588 }
5589
5590 #[tokio::test]
5591 async fn new_sc_rejects_invalid_endpoint() {
5592 let err = BacnetClient::new_sc("not a url").await.unwrap_err();
5593 assert!(matches!(err, crate::ClientError::DataLink(_)));
5594 }
5595}