1use crate::{
2 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3 ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4 DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5 EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9 AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10 SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15 AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18 AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19 SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22 AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25 AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32 DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33 ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36 EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37 GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40 EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41 SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50 SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53 CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54 SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57 ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61 ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64 PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65 ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77 PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78 SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83 BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::{HashMap, HashSet};
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::sync::RwLock;
89use std::time::Duration;
90use tokio::sync::Mutex;
91use tokio::task::JoinHandle;
92use tokio::time::{timeout, Instant};
93
94const MIN_SEGMENT_DATA_LEN: usize = 32;
95const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
96
97#[derive(Debug)]
113pub struct BacnetClient<D: DataLink> {
114 datalink: D,
115 invoke_id: Mutex<u8>,
116 request_io_lock: Mutex<()>,
117 response_timeout: Duration,
118 segmented_request_window_size: u8,
119 segmented_request_retries: u8,
120 segment_ack_timeout: Duration,
121 capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
123}
124
125#[derive(Debug)]
131pub struct ForeignDeviceRenewal {
132 task: JoinHandle<()>,
133}
134
135impl ForeignDeviceRenewal {
136 pub fn stop(self) {
138 self.task.abort();
139 }
140}
141
142impl Drop for ForeignDeviceRenewal {
143 fn drop(&mut self) {
144 self.task.abort();
145 }
146}
147
148impl BacnetClient<BacnetIpTransport> {
149 pub async fn new() -> Result<Self, ClientError> {
153 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
154 let datalink = BacnetIpTransport::bind(bind_addr).await?;
155 Ok(Self {
156 datalink,
157 invoke_id: Mutex::new(1),
158 request_io_lock: Mutex::new(()),
159 response_timeout: Duration::from_secs(3),
160 segmented_request_window_size: 16,
161 segmented_request_retries: 2,
162 segment_ack_timeout: Duration::from_millis(500),
163 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
164 })
165 }
166
167 pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
174 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
175 let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
176 datalink.register_foreign_device(ttl_seconds).await?;
177 Ok(Self {
178 datalink,
179 invoke_id: Mutex::new(1),
180 request_io_lock: Mutex::new(()),
181 response_timeout: Duration::from_secs(3),
182 segmented_request_window_size: 16,
183 segmented_request_retries: 2,
184 segment_ack_timeout: Duration::from_millis(500),
185 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
186 })
187 }
188
189 pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
192 let _io = self.request_io_lock.lock().await;
193 self.datalink.register_foreign_device(ttl_seconds).await?;
194 Ok(())
195 }
196
197 pub async fn read_broadcast_distribution_table(
199 &self,
200 ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
201 let _io = self.request_io_lock.lock().await;
202 self.datalink
203 .read_broadcast_distribution_table()
204 .await
205 .map_err(ClientError::from)
206 }
207
208 pub async fn write_broadcast_distribution_table(
210 &self,
211 entries: &[BroadcastDistributionEntry],
212 ) -> Result<(), ClientError> {
213 let _io = self.request_io_lock.lock().await;
214 self.datalink
215 .write_broadcast_distribution_table(entries)
216 .await?;
217 Ok(())
218 }
219
220 pub async fn read_foreign_device_table(
222 &self,
223 ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
224 let _io = self.request_io_lock.lock().await;
225 self.datalink
226 .read_foreign_device_table()
227 .await
228 .map_err(ClientError::from)
229 }
230
231 pub async fn delete_foreign_device_table_entry(
233 &self,
234 address: SocketAddrV4,
235 ) -> Result<(), ClientError> {
236 let _io = self.request_io_lock.lock().await;
237 self.datalink
238 .delete_foreign_device_table_entry(address)
239 .await?;
240 Ok(())
241 }
242
243 pub fn start_foreign_device_renewal(
249 &self,
250 ttl_seconds: u16,
251 ) -> Result<ForeignDeviceRenewal, ClientError> {
252 if ttl_seconds == 0 {
253 return Err(EncodeError::InvalidLength.into());
254 }
255
256 let datalink = self.datalink.clone();
257 let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
258 let interval = Duration::from_secs(refresh_seconds.max(1));
259 let task = tokio::spawn(async move {
260 loop {
261 tokio::time::sleep(interval).await;
262 if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
263 log::warn!("foreign device renewal send failed: {err}");
264 }
265 }
266 });
267 Ok(ForeignDeviceRenewal { task })
268 }
269}
270
271impl BacnetClient<BacnetScTransport> {
272 pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
275 let datalink = BacnetScTransport::connect(endpoint).await?;
276 Ok(Self::with_datalink(datalink))
277 }
278}
279
280impl<D: DataLink> BacnetClient<D> {
281 pub fn with_datalink(datalink: D) -> Self {
285 Self {
286 datalink,
287 invoke_id: Mutex::new(1),
288 request_io_lock: Mutex::new(()),
289 response_timeout: Duration::from_secs(3),
290 segmented_request_window_size: 16,
291 segmented_request_retries: 2,
292 segment_ack_timeout: Duration::from_millis(500),
293 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
294 }
295 }
296
297 pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
299 self.response_timeout = timeout;
300 self
301 }
302
303 pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
306 self.segmented_request_window_size = window_size.max(1);
307 self
308 }
309
310 pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
313 self.segmented_request_retries = retries;
314 self
315 }
316
317 pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
320 self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
321 self
322 }
323
324 async fn next_invoke_id(&self) -> u8 {
325 let mut lock = self.invoke_id.lock().await;
326 let id = *lock;
327 *lock = lock.wrapping_add(1);
328 if *lock == 0 {
329 *lock = 1;
330 }
331 id
332 }
333
334 async fn send_segment_ack(
335 &self,
336 address: DataLinkAddress,
337 invoke_id: u8,
338 sequence_number: u8,
339 window_size: u8,
340 ) -> Result<(), ClientError> {
341 let mut tx = [0u8; 64];
342 let mut w = Writer::new(&mut tx);
343 Npdu::new(0).encode(&mut w)?;
344 SegmentAck {
345 negative_ack: false,
346 sent_by_server: false,
347 invoke_id,
348 sequence_number,
349 actual_window_size: window_size,
350 }
351 .encode(&mut w)?;
352 self.datalink.send(address, w.as_written()).await?;
353 Ok(())
354 }
355
356 async fn recv_ignoring_invalid_frame(
357 &self,
358 buf: &mut [u8],
359 deadline: Instant,
360 ) -> Result<(usize, DataLinkAddress), ClientError> {
361 loop {
362 let remaining = deadline.saturating_duration_since(Instant::now());
363 if remaining.is_zero() {
364 return Err(ClientError::Timeout);
365 }
366
367 match timeout(remaining, self.datalink.recv(buf)).await {
368 Err(_) => return Err(ClientError::Timeout),
369 Ok(Err(DataLinkError::InvalidFrame)) => continue,
370 Ok(Err(e)) => return Err(e.into()),
371 Ok(Ok(v)) => return Ok(v),
372 }
373 }
374 }
375
376 async fn send_simple_ack(
377 &self,
378 address: DataLinkAddress,
379 invoke_id: u8,
380 service_choice: u8,
381 ) -> Result<(), ClientError> {
382 let mut tx = [0u8; 64];
383 let mut w = Writer::new(&mut tx);
384 Npdu::new(0).encode(&mut w)?;
385 SimpleAck {
386 invoke_id,
387 service_choice,
388 }
389 .encode(&mut w)?;
390 self.datalink.send(address, w.as_written()).await?;
391 Ok(())
392 }
393
394 fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
395 where
396 F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
397 {
398 for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
399 let mut buf = vec![0u8; size];
400 let mut w = Writer::new(&mut buf);
401 match encode(&mut w) {
402 Ok(()) => {
403 let written_len = w.as_written().len();
404 buf.truncate(written_len);
405 return Ok(buf);
406 }
407 Err(EncodeError::BufferTooSmall) => continue,
408 Err(e) => return Err(e.into()),
409 }
410 }
411 Err(ClientError::SegmentedRequestTooLarge)
412 }
413
414 const fn max_apdu_octets(max_apdu_code: u8) -> usize {
415 match max_apdu_code & 0x0f {
416 0 => 50,
417 1 => 128,
418 2 => 206,
419 3 => 480,
420 4 => 1024,
421 5 => 1476,
422 _ => 480,
423 }
424 }
425
426 async fn await_segment_ack(
427 &self,
428 address: DataLinkAddress,
429 invoke_id: u8,
430 service_choice: u8,
431 expected_sequence: u8,
432 deadline: Instant,
433 ) -> Result<SegmentAck, ClientError> {
434 loop {
435 let remaining = deadline.saturating_duration_since(Instant::now());
436 if remaining.is_zero() {
437 return Err(ClientError::Timeout);
438 }
439
440 let mut rx = [0u8; 1500];
441 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
442 let (n, src) = match recv {
443 Err(_) => return Err(ClientError::Timeout),
444 Ok(Err(DataLinkError::InvalidFrame)) => continue,
445 Ok(Err(e)) => return Err(e.into()),
446 Ok(Ok(v)) => v,
447 };
448 if src != address {
449 continue;
450 }
451
452 let Ok(apdu) = extract_apdu(&rx[..n]) else {
453 continue;
454 };
455 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
456 match ApduType::from_u8(first >> 4) {
457 Some(ApduType::SegmentAck) => {
458 let mut r = Reader::new(apdu);
459 let ack = SegmentAck::decode(&mut r)?;
460 if ack.invoke_id != invoke_id || !ack.sent_by_server {
461 continue;
462 }
463 if ack.negative_ack {
464 return Err(ClientError::SegmentNegativeAck {
465 sequence_number: ack.sequence_number,
466 });
467 }
468 if ack.sequence_number == expected_sequence {
469 return Ok(ack);
470 }
471 }
472 Some(ApduType::Error) => {
473 let mut r = Reader::new(apdu);
474 let err = BacnetError::decode(&mut r)?;
475 if err.invoke_id == invoke_id && err.service_choice == service_choice {
476 return Err(remote_service_error(err));
477 }
478 }
479 Some(ApduType::Reject) => {
480 let mut r = Reader::new(apdu);
481 let rej = RejectPdu::decode(&mut r)?;
482 if rej.invoke_id == invoke_id {
483 return Err(ClientError::RemoteReject { reason: rej.reason });
484 }
485 }
486 Some(ApduType::Abort) => {
487 let mut r = Reader::new(apdu);
488 let abort = AbortPdu::decode(&mut r)?;
489 if abort.invoke_id == invoke_id {
490 return Err(ClientError::RemoteAbort {
491 reason: abort.reason,
492 server: abort.server,
493 });
494 }
495 }
496 _ => continue,
497 }
498 }
499 }
500
501 async fn send_confirmed_request(
502 &self,
503 address: DataLinkAddress,
504 frame: &[u8],
505 deadline: Instant,
506 ) -> Result<(), ClientError> {
507 let mut pr = Reader::new(frame);
508 let _npdu = Npdu::decode(&mut pr)?;
509 let npdu_len = frame.len() - pr.remaining();
510 let npdu_bytes = &frame[..npdu_len];
511 let apdu = &frame[npdu_len..];
512
513 let mut ar = Reader::new(apdu);
514 let header = ConfirmedRequestHeader::decode(&mut ar)?;
515 let service_payload = ar.read_exact(ar.remaining())?;
516
517 let peer_max_apdu = self
520 .capability_cache
521 .read()
522 .ok()
523 .and_then(|c| c.get(&address).copied())
524 .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
525 let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
526 let segment_count = service_payload.len().div_ceil(segment_data_len);
527
528 if segment_count <= 1 {
529 self.datalink.send(address, frame).await?;
530 return Ok(());
531 }
532
533 if segment_count > usize::from(u8::MAX) + 1 {
534 return Err(ClientError::SegmentedRequestTooLarge);
535 }
536
537 let configured_window_size = self.segmented_request_window_size.max(1);
538 let mut window_size = configured_window_size;
539 let mut peer_window_ceiling = configured_window_size;
540 let mut batch_start = 0usize;
541 while batch_start < segment_count {
542 let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
543 let expected_sequence = (batch_end - 1) as u8;
544
545 let mut frames = Vec::with_capacity(batch_end - batch_start);
546 for segment_index in batch_start..batch_end {
547 let seq = segment_index as u8;
548 let more_follows = segment_index + 1 < segment_count;
549 let start = segment_index * segment_data_len;
550 let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
551 let segment = &service_payload[start..end];
552
553 let seg_header = ConfirmedRequestHeader {
554 segmented: true,
555 more_follows,
556 segmented_response_accepted: header.segmented_response_accepted,
557 max_segments: header.max_segments,
558 max_apdu: header.max_apdu,
559 invoke_id: header.invoke_id,
560 sequence_number: Some(seq),
561 proposed_window_size: Some(window_size),
562 service_choice: header.service_choice,
563 };
564
565 let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
566 let written_len = {
567 let mut w = Writer::new(&mut tx);
568 w.write_all(npdu_bytes)?;
569 seg_header.encode(&mut w)?;
570 w.write_all(segment)?;
571 w.as_written().len()
572 };
573 tx.truncate(written_len);
574 frames.push(tx);
575 }
576
577 let mut retries_remaining = self.segmented_request_retries;
578 loop {
579 for frame in &frames {
580 self.datalink.send(address, frame).await?;
581 }
582
583 if batch_end == segment_count {
584 break;
585 }
586
587 let remaining = deadline.saturating_duration_since(Instant::now());
588 if remaining.is_zero() {
589 return Err(ClientError::Timeout);
590 }
591 let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
592 match self
593 .await_segment_ack(
594 address,
595 header.invoke_id,
596 header.service_choice,
597 expected_sequence,
598 ack_wait_deadline,
599 )
600 .await
601 {
602 Ok(ack) => {
603 peer_window_ceiling =
604 peer_window_ceiling.min(ack.actual_window_size.max(1));
605 window_size = window_size
606 .saturating_add(1)
607 .min(configured_window_size)
608 .min(peer_window_ceiling)
609 .max(1);
610 break;
611 }
612 Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
613 if retries_remaining > 0 =>
614 {
615 retries_remaining -= 1;
616 window_size = window_size.saturating_div(2).max(1);
617 continue;
618 }
619 Err(e) => return Err(e),
620 }
621 }
622
623 batch_start = batch_end;
624 }
625
626 Ok(())
627 }
628
629 async fn collect_complex_ack_payload(
630 &self,
631 address: DataLinkAddress,
632 invoke_id: u8,
633 service_choice: u8,
634 first_header: ComplexAckHeader,
635 first_payload: &[u8],
636 deadline: Instant,
637 ) -> Result<Vec<u8>, ClientError> {
638 let mut payload = first_payload.to_vec();
639 if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
640 return Err(ClientError::ResponseTooLarge {
641 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
642 });
643 }
644 if !first_header.segmented {
645 return Ok(payload);
646 }
647
648 let mut last_seq = first_header
649 .sequence_number
650 .ok_or(ClientError::UnsupportedResponse)?;
651 let mut window_size = first_header.proposed_window_size.unwrap_or(1);
652 self.send_segment_ack(address, invoke_id, last_seq, window_size)
653 .await?;
654 let mut more_follows = first_header.more_follows;
655
656 while more_follows {
657 let mut rx = [0u8; 1500];
658 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
659 if src != address {
660 continue;
661 }
662
663 let Ok(apdu) = extract_apdu(&rx[..n]) else {
664 continue;
665 };
666 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
667 match ApduType::from_u8(first >> 4) {
668 Some(ApduType::ComplexAck) => {
669 let mut r = Reader::new(apdu);
670 let seg = ComplexAckHeader::decode(&mut r)?;
671 if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
672 continue;
673 }
674 if !seg.segmented {
675 return Err(ClientError::UnsupportedResponse);
676 }
677 let seq = seg
678 .sequence_number
679 .ok_or(ClientError::UnsupportedResponse)?;
680 if seq == last_seq {
681 self.send_segment_ack(address, invoke_id, last_seq, window_size)
683 .await?;
684 continue;
685 }
686 if seq != last_seq.wrapping_add(1) {
687 continue;
688 }
689
690 let seg_payload = r.read_exact(r.remaining())?;
691 if payload.len().saturating_add(seg_payload.len())
692 > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
693 {
694 return Err(ClientError::ResponseTooLarge {
695 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
696 });
697 }
698 payload.extend_from_slice(seg_payload);
699
700 last_seq = seq;
701 more_follows = seg.more_follows;
702 window_size = seg.proposed_window_size.unwrap_or(window_size);
703 self.send_segment_ack(address, invoke_id, last_seq, window_size)
704 .await?;
705 }
706 Some(ApduType::Error) => {
707 let mut r = Reader::new(apdu);
708 let err = BacnetError::decode(&mut r)?;
709 if err.invoke_id == invoke_id && err.service_choice == service_choice {
710 return Err(remote_service_error(err));
711 }
712 }
713 Some(ApduType::Reject) => {
714 let mut r = Reader::new(apdu);
715 let rej = RejectPdu::decode(&mut r)?;
716 if rej.invoke_id == invoke_id {
717 return Err(ClientError::RemoteReject { reason: rej.reason });
718 }
719 }
720 Some(ApduType::Abort) => {
721 let mut r = Reader::new(apdu);
722 let abort = AbortPdu::decode(&mut r)?;
723 if abort.invoke_id == invoke_id {
724 return Err(ClientError::RemoteAbort {
725 reason: abort.reason,
726 server: abort.server,
727 });
728 }
729 }
730 _ => continue,
731 }
732 }
733
734 Ok(payload)
735 }
736
737 pub async fn who_is(
743 &self,
744 range: Option<(u32, u32)>,
745 wait: Duration,
746 ) -> Result<Vec<DiscoveredDevice>, ClientError> {
747 let req = match range {
750 Some((low, high)) => WhoIsRequest {
751 low_limit: Some(low),
752 high_limit: Some(high),
753 },
754 None => WhoIsRequest::global(),
755 };
756
757 let mut tx = [0u8; 128];
758 let mut w = Writer::new(&mut tx);
759 Npdu::new(0).encode(&mut w)?;
760 req.encode(&mut w)?;
761
762 self.datalink
763 .send(
764 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
765 w.as_written(),
766 )
767 .await?;
768
769 let mut devices = Vec::new();
770 let mut seen = HashSet::new();
771 let deadline = tokio::time::Instant::now() + wait;
772
773 while tokio::time::Instant::now() < deadline {
774 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
775 let mut rx = [0u8; 1500];
776 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
777 match recv {
778 Ok(Ok((n, src))) => {
779 let Ok(apdu) = extract_apdu(&rx[..n]) else {
780 continue;
781 };
782 let mut r = Reader::new(apdu);
783 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
784 continue;
785 };
786 if unconfirmed.service_choice != SERVICE_I_AM {
787 continue;
788 }
789 let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
790 continue;
791 };
792 if seen.insert(i_am.device_id) {
793 devices.push(DiscoveredDevice {
794 address: src,
795 device_id: Some(i_am.device_id),
796 });
797 if let Ok(mut cache) = self.capability_cache.write() {
800 cache.insert(src, i_am.max_apdu as usize);
801 }
802 }
803 }
804 Ok(Err(DataLinkError::InvalidFrame)) => continue,
805 Ok(Err(e)) => return Err(e.into()),
806 Err(_) => break,
807 }
808 }
809
810 Ok(devices)
811 }
812
813 pub async fn who_has_object_id(
818 &self,
819 range: Option<(u32, u32)>,
820 object_id: ObjectId,
821 wait: Duration,
822 ) -> Result<Vec<DiscoveredObject>, ClientError> {
823 let req = WhoHasRequest {
824 low_limit: range.map(|(low, _)| low),
825 high_limit: range.map(|(_, high)| high),
826 object: WhoHasObject::ObjectId(object_id),
827 };
828 self.who_has(req, wait).await
829 }
830
831 pub async fn who_has_object_name(
835 &self,
836 range: Option<(u32, u32)>,
837 object_name: &str,
838 wait: Duration,
839 ) -> Result<Vec<DiscoveredObject>, ClientError> {
840 let req = WhoHasRequest {
841 low_limit: range.map(|(low, _)| low),
842 high_limit: range.map(|(_, high)| high),
843 object: WhoHasObject::ObjectName(object_name),
844 };
845 self.who_has(req, wait).await
846 }
847
848 async fn who_has(
849 &self,
850 request: WhoHasRequest<'_>,
851 wait: Duration,
852 ) -> Result<Vec<DiscoveredObject>, ClientError> {
853 let tx = self.encode_with_growth(|w| {
855 Npdu::new(0).encode(w)?;
856 request.encode(w)
857 })?;
858 self.datalink
859 .send(
860 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
861 &tx,
862 )
863 .await?;
864
865 let mut objects = Vec::new();
866 let mut seen = HashSet::new();
867 let deadline = tokio::time::Instant::now() + wait;
868
869 while tokio::time::Instant::now() < deadline {
870 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
871 let mut rx = [0u8; 1500];
872 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
873 match recv {
874 Ok(Ok((n, src))) => {
875 let Ok(apdu) = extract_apdu(&rx[..n]) else {
876 continue;
877 };
878 let mut r = Reader::new(apdu);
879 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
880 continue;
881 };
882 if unconfirmed.service_choice != SERVICE_I_HAVE {
883 continue;
884 }
885 let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
886 continue;
887 };
888 if !seen.insert((src, i_have.object_id.raw())) {
889 continue;
890 }
891 objects.push(DiscoveredObject {
892 address: src,
893 device_id: i_have.device_id,
894 object_id: i_have.object_id,
895 object_name: i_have.object_name.to_string(),
896 });
897 }
898 Ok(Err(DataLinkError::InvalidFrame)) => continue,
899 Ok(Err(e)) => return Err(e.into()),
900 Err(_) => break,
901 }
902 }
903
904 Ok(objects)
905 }
906
907 pub async fn device_communication_control(
913 &self,
914 address: DataLinkAddress,
915 time_duration_seconds: Option<u16>,
916 enable_disable: DeviceCommunicationState,
917 password: Option<&str>,
918 ) -> Result<(), ClientError> {
919 let invoke_id = self.next_invoke_id().await;
920 let request = DeviceCommunicationControlRequest {
921 time_duration_seconds,
922 enable_disable,
923 password,
924 invoke_id,
925 };
926 let tx = self.encode_with_growth(|w| {
927 Npdu::new(0).encode(w)?;
928 request.encode(w)
929 })?;
930 self.await_simple_ack_or_error(
931 address,
932 &tx,
933 invoke_id,
934 SERVICE_DEVICE_COMMUNICATION_CONTROL,
935 self.response_timeout,
936 )
937 .await
938 }
939
940 pub async fn reinitialize_device(
944 &self,
945 address: DataLinkAddress,
946 state: ReinitializeState,
947 password: Option<&str>,
948 ) -> Result<(), ClientError> {
949 let invoke_id = self.next_invoke_id().await;
950 let request = ReinitializeDeviceRequest {
951 state,
952 password,
953 invoke_id,
954 };
955 let tx = self.encode_with_growth(|w| {
956 Npdu::new(0).encode(w)?;
957 request.encode(w)
958 })?;
959 self.await_simple_ack_or_error(
960 address,
961 &tx,
962 invoke_id,
963 SERVICE_REINITIALIZE_DEVICE,
964 self.response_timeout,
965 )
966 .await
967 }
968
969 pub async fn time_synchronize(
974 &self,
975 address: DataLinkAddress,
976 date: Date,
977 time: Time,
978 utc: bool,
979 ) -> Result<(), ClientError> {
980 let request = if utc {
981 TimeSynchronizationRequest::utc(date, time)
982 } else {
983 TimeSynchronizationRequest::local(date, time)
984 };
985 let tx = self.encode_with_growth(|w| {
986 Npdu::new(0).encode(w)?;
987 request.encode(w)
988 })?;
989 self.datalink.send(address, &tx).await?;
990 Ok(())
991 }
992
993 pub async fn create_object_by_type(
996 &self,
997 address: DataLinkAddress,
998 object_type: rustbac_core::types::ObjectType,
999 ) -> Result<ObjectId, ClientError> {
1000 self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1001 .await
1002 }
1003
1004 pub async fn create_object(
1008 &self,
1009 address: DataLinkAddress,
1010 mut request: CreateObjectRequest,
1011 ) -> Result<ObjectId, ClientError> {
1012 request.invoke_id = self.next_invoke_id().await;
1013 let invoke_id = request.invoke_id;
1014 let tx = self.encode_with_growth(|w| {
1015 Npdu::new(0).encode(w)?;
1016 request.encode(w)
1017 })?;
1018 let payload = self
1019 .await_complex_ack_payload_or_error(
1020 address,
1021 &tx,
1022 invoke_id,
1023 SERVICE_CREATE_OBJECT,
1024 self.response_timeout,
1025 )
1026 .await?;
1027 let mut pr = Reader::new(&payload);
1028 let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1029 Ok(parsed.object_id)
1030 }
1031
1032 pub async fn delete_object(
1034 &self,
1035 address: DataLinkAddress,
1036 object_id: ObjectId,
1037 ) -> Result<(), ClientError> {
1038 let invoke_id = self.next_invoke_id().await;
1039 let request = DeleteObjectRequest {
1040 object_id,
1041 invoke_id,
1042 };
1043 let tx = self.encode_with_growth(|w| {
1044 Npdu::new(0).encode(w)?;
1045 request.encode(w)
1046 })?;
1047 self.await_simple_ack_or_error(
1048 address,
1049 &tx,
1050 invoke_id,
1051 SERVICE_DELETE_OBJECT,
1052 self.response_timeout,
1053 )
1054 .await
1055 }
1056
1057 pub async fn add_list_element(
1059 &self,
1060 address: DataLinkAddress,
1061 mut request: AddListElementRequest<'_>,
1062 ) -> Result<(), ClientError> {
1063 request.invoke_id = self.next_invoke_id().await;
1064 let invoke_id = request.invoke_id;
1065 let tx = self.encode_with_growth(|w| {
1066 Npdu::new(0).encode(w)?;
1067 request.encode(w)
1068 })?;
1069 self.await_simple_ack_or_error(
1070 address,
1071 &tx,
1072 invoke_id,
1073 SERVICE_ADD_LIST_ELEMENT,
1074 self.response_timeout,
1075 )
1076 .await
1077 }
1078
1079 pub async fn remove_list_element(
1081 &self,
1082 address: DataLinkAddress,
1083 mut request: RemoveListElementRequest<'_>,
1084 ) -> Result<(), ClientError> {
1085 request.invoke_id = self.next_invoke_id().await;
1086 let invoke_id = request.invoke_id;
1087 let tx = self.encode_with_growth(|w| {
1088 Npdu::new(0).encode(w)?;
1089 request.encode(w)
1090 })?;
1091 self.await_simple_ack_or_error(
1092 address,
1093 &tx,
1094 invoke_id,
1095 SERVICE_REMOVE_LIST_ELEMENT,
1096 self.response_timeout,
1097 )
1098 .await
1099 }
1100
1101 async fn await_simple_ack_or_error(
1102 &self,
1103 address: DataLinkAddress,
1104 tx: &[u8],
1105 invoke_id: u8,
1106 service_choice: u8,
1107 timeout_window: Duration,
1108 ) -> Result<(), ClientError> {
1109 #[cfg(feature = "tracing")]
1110 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1111 let _io_lock = self.request_io_lock.lock().await;
1112 let deadline = tokio::time::Instant::now() + timeout_window;
1113 self.send_confirmed_request(address, tx, deadline).await?;
1114 while tokio::time::Instant::now() < deadline {
1115 let mut rx = [0u8; 1500];
1116 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1117 if src != address {
1118 continue;
1119 }
1120 let apdu = extract_apdu(&rx[..n])?;
1121 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1122 match ApduType::from_u8(first >> 4) {
1123 Some(ApduType::SimpleAck) => {
1124 let mut r = Reader::new(apdu);
1125 let ack = SimpleAck::decode(&mut r)?;
1126 if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1127 return Ok(());
1128 }
1129 }
1130 Some(ApduType::Error) => {
1131 let mut r = Reader::new(apdu);
1132 let err = BacnetError::decode(&mut r)?;
1133 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1134 return Err(remote_service_error(err));
1135 }
1136 }
1137 Some(ApduType::Reject) => {
1138 let mut r = Reader::new(apdu);
1139 let rej = RejectPdu::decode(&mut r)?;
1140 if rej.invoke_id == invoke_id {
1141 return Err(ClientError::RemoteReject { reason: rej.reason });
1142 }
1143 }
1144 Some(ApduType::Abort) => {
1145 let mut r = Reader::new(apdu);
1146 let abort = AbortPdu::decode(&mut r)?;
1147 if abort.invoke_id == invoke_id {
1148 return Err(ClientError::RemoteAbort {
1149 reason: abort.reason,
1150 server: abort.server,
1151 });
1152 }
1153 }
1154 _ => continue,
1155 }
1156 }
1157 #[cfg(feature = "tracing")]
1158 tracing::warn!(invoke_id = invoke_id, "request timed out");
1159 Err(ClientError::Timeout)
1160 }
1161
1162 async fn await_complex_ack_payload_or_error(
1163 &self,
1164 address: DataLinkAddress,
1165 tx: &[u8],
1166 invoke_id: u8,
1167 service_choice: u8,
1168 timeout_window: Duration,
1169 ) -> Result<Vec<u8>, ClientError> {
1170 #[cfg(feature = "tracing")]
1171 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1172 let _io_lock = self.request_io_lock.lock().await;
1173 let deadline = tokio::time::Instant::now() + timeout_window;
1174 self.send_confirmed_request(address, tx, deadline).await?;
1175 while tokio::time::Instant::now() < deadline {
1176 let mut rx = [0u8; 1500];
1177 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1178 if src != address {
1179 continue;
1180 }
1181
1182 let apdu = extract_apdu(&rx[..n])?;
1183 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1184 match ApduType::from_u8(first >> 4) {
1185 Some(ApduType::ComplexAck) => {
1186 let mut r = Reader::new(apdu);
1187 let ack = ComplexAckHeader::decode(&mut r)?;
1188 if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1189 continue;
1190 }
1191 return self
1192 .collect_complex_ack_payload(
1193 address,
1194 invoke_id,
1195 service_choice,
1196 ack,
1197 r.read_exact(r.remaining())?,
1198 deadline,
1199 )
1200 .await;
1201 }
1202 Some(ApduType::Error) => {
1203 let mut r = Reader::new(apdu);
1204 let err = BacnetError::decode(&mut r)?;
1205 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1206 return Err(remote_service_error(err));
1207 }
1208 }
1209 Some(ApduType::Reject) => {
1210 let mut r = Reader::new(apdu);
1211 let rej = RejectPdu::decode(&mut r)?;
1212 if rej.invoke_id == invoke_id {
1213 return Err(ClientError::RemoteReject { reason: rej.reason });
1214 }
1215 }
1216 Some(ApduType::Abort) => {
1217 let mut r = Reader::new(apdu);
1218 let abort = AbortPdu::decode(&mut r)?;
1219 if abort.invoke_id == invoke_id {
1220 return Err(ClientError::RemoteAbort {
1221 reason: abort.reason,
1222 server: abort.server,
1223 });
1224 }
1225 }
1226 _ => continue,
1227 }
1228 }
1229 #[cfg(feature = "tracing")]
1230 tracing::warn!(invoke_id = invoke_id, "request timed out");
1231 Err(ClientError::Timeout)
1232 }
1233
1234 pub async fn get_alarm_summary(
1236 &self,
1237 address: DataLinkAddress,
1238 ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1239 let invoke_id = self.next_invoke_id().await;
1240 let request = GetAlarmSummaryRequest { invoke_id };
1241 let tx = self.encode_with_growth(|w| {
1242 Npdu::new(0).encode(w)?;
1243 request.encode(w)
1244 })?;
1245 let payload = self
1246 .await_complex_ack_payload_or_error(
1247 address,
1248 &tx,
1249 invoke_id,
1250 SERVICE_GET_ALARM_SUMMARY,
1251 self.response_timeout,
1252 )
1253 .await?;
1254 let mut pr = Reader::new(&payload);
1255 let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1256 Ok(into_client_alarm_summary(parsed.summaries))
1257 }
1258
1259 pub async fn get_enrollment_summary(
1261 &self,
1262 address: DataLinkAddress,
1263 ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1264 let invoke_id = self.next_invoke_id().await;
1265 let request = GetEnrollmentSummaryRequest { invoke_id };
1266 let tx = self.encode_with_growth(|w| {
1267 Npdu::new(0).encode(w)?;
1268 request.encode(w)
1269 })?;
1270 let payload = self
1271 .await_complex_ack_payload_or_error(
1272 address,
1273 &tx,
1274 invoke_id,
1275 SERVICE_GET_ENROLLMENT_SUMMARY,
1276 self.response_timeout,
1277 )
1278 .await?;
1279 let mut pr = Reader::new(&payload);
1280 let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1281 Ok(into_client_enrollment_summary(parsed.summaries))
1282 }
1283
1284 pub async fn get_event_information(
1289 &self,
1290 address: DataLinkAddress,
1291 last_received_object_id: Option<ObjectId>,
1292 ) -> Result<EventInformationResult, ClientError> {
1293 let invoke_id = self.next_invoke_id().await;
1294 let request = GetEventInformationRequest {
1295 last_received_object_id,
1296 invoke_id,
1297 };
1298 let tx = self.encode_with_growth(|w| {
1299 Npdu::new(0).encode(w)?;
1300 request.encode(w)
1301 })?;
1302 let payload = self
1303 .await_complex_ack_payload_or_error(
1304 address,
1305 &tx,
1306 invoke_id,
1307 SERVICE_GET_EVENT_INFORMATION,
1308 self.response_timeout,
1309 )
1310 .await?;
1311 let mut pr = Reader::new(&payload);
1312 let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1313 Ok(EventInformationResult {
1314 summaries: into_client_event_information(parsed.summaries),
1315 more_events: parsed.more_events,
1316 })
1317 }
1318
1319 pub async fn acknowledge_alarm(
1321 &self,
1322 address: DataLinkAddress,
1323 mut request: AcknowledgeAlarmRequest<'_>,
1324 ) -> Result<(), ClientError> {
1325 request.invoke_id = self.next_invoke_id().await;
1326 let invoke_id = request.invoke_id;
1327 let tx = self.encode_with_growth(|w| {
1328 Npdu::new(0).encode(w)?;
1329 request.encode(w)
1330 })?;
1331 self.await_simple_ack_or_error(
1332 address,
1333 &tx,
1334 invoke_id,
1335 SERVICE_ACKNOWLEDGE_ALARM,
1336 self.response_timeout,
1337 )
1338 .await
1339 }
1340
1341 pub async fn atomic_read_file_stream(
1346 &self,
1347 address: DataLinkAddress,
1348 file_object_id: ObjectId,
1349 file_start_position: i32,
1350 requested_octet_count: u32,
1351 ) -> Result<AtomicReadFileResult, ClientError> {
1352 let invoke_id = self.next_invoke_id().await;
1353 let request = AtomicReadFileRequest::stream(
1354 file_object_id,
1355 file_start_position,
1356 requested_octet_count,
1357 invoke_id,
1358 );
1359 self.atomic_read_file(address, request).await
1360 }
1361
1362 pub async fn atomic_read_file_record(
1367 &self,
1368 address: DataLinkAddress,
1369 file_object_id: ObjectId,
1370 file_start_record: i32,
1371 requested_record_count: u32,
1372 ) -> Result<AtomicReadFileResult, ClientError> {
1373 let invoke_id = self.next_invoke_id().await;
1374 let request = AtomicReadFileRequest::record(
1375 file_object_id,
1376 file_start_record,
1377 requested_record_count,
1378 invoke_id,
1379 );
1380 self.atomic_read_file(address, request).await
1381 }
1382
1383 async fn atomic_read_file(
1384 &self,
1385 address: DataLinkAddress,
1386 request: AtomicReadFileRequest,
1387 ) -> Result<AtomicReadFileResult, ClientError> {
1388 let invoke_id = request.invoke_id;
1389 let tx = self.encode_with_growth(|w| {
1390 Npdu::new(0).encode(w)?;
1391 request.encode(w)
1392 })?;
1393 let payload = self
1394 .await_complex_ack_payload_or_error(
1395 address,
1396 &tx,
1397 invoke_id,
1398 SERVICE_ATOMIC_READ_FILE,
1399 self.response_timeout,
1400 )
1401 .await?;
1402 let mut pr = Reader::new(&payload);
1403 let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1404 Ok(into_client_atomic_read_result(parsed))
1405 }
1406
1407 pub async fn atomic_write_file_stream(
1410 &self,
1411 address: DataLinkAddress,
1412 file_object_id: ObjectId,
1413 file_start_position: i32,
1414 file_data: &[u8],
1415 ) -> Result<AtomicWriteFileResult, ClientError> {
1416 let invoke_id = self.next_invoke_id().await;
1417 let request = AtomicWriteFileRequest::stream(
1418 file_object_id,
1419 file_start_position,
1420 file_data,
1421 invoke_id,
1422 );
1423 self.atomic_write_file(address, request).await
1424 }
1425
1426 pub async fn atomic_write_file_record(
1430 &self,
1431 address: DataLinkAddress,
1432 file_object_id: ObjectId,
1433 file_start_record: i32,
1434 file_record_data: &[&[u8]],
1435 ) -> Result<AtomicWriteFileResult, ClientError> {
1436 let invoke_id = self.next_invoke_id().await;
1437 let request = AtomicWriteFileRequest::record(
1438 file_object_id,
1439 file_start_record,
1440 file_record_data,
1441 invoke_id,
1442 );
1443 self.atomic_write_file(address, request).await
1444 }
1445
1446 async fn atomic_write_file(
1447 &self,
1448 address: DataLinkAddress,
1449 request: AtomicWriteFileRequest<'_>,
1450 ) -> Result<AtomicWriteFileResult, ClientError> {
1451 let invoke_id = request.invoke_id;
1452 let tx = self.encode_with_growth(|w| {
1453 Npdu::new(0).encode(w)?;
1454 request.encode(w)
1455 })?;
1456 let payload = self
1457 .await_complex_ack_payload_or_error(
1458 address,
1459 &tx,
1460 invoke_id,
1461 SERVICE_ATOMIC_WRITE_FILE,
1462 self.response_timeout,
1463 )
1464 .await?;
1465 let mut pr = Reader::new(&payload);
1466 let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1467 Ok(into_client_atomic_write_result(parsed))
1468 }
1469
1470 pub async fn subscribe_cov(
1474 &self,
1475 address: DataLinkAddress,
1476 mut request: SubscribeCovRequest,
1477 ) -> Result<(), ClientError> {
1478 request.invoke_id = self.next_invoke_id().await;
1479 let invoke_id = request.invoke_id;
1480 let tx = self.encode_with_growth(|w| {
1481 Npdu::new(0).encode(w)?;
1482 request.encode(w)
1483 })?;
1484 self.await_simple_ack_or_error(
1485 address,
1486 &tx,
1487 invoke_id,
1488 SERVICE_SUBSCRIBE_COV,
1489 self.response_timeout,
1490 )
1491 .await
1492 }
1493
1494 pub async fn cancel_cov_subscription(
1497 &self,
1498 address: DataLinkAddress,
1499 subscriber_process_id: u32,
1500 monitored_object_id: ObjectId,
1501 ) -> Result<(), ClientError> {
1502 self.subscribe_cov(
1503 address,
1504 SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1505 )
1506 .await
1507 }
1508
1509 pub async fn subscribe_cov_property(
1514 &self,
1515 address: DataLinkAddress,
1516 mut request: SubscribeCovPropertyRequest,
1517 ) -> Result<(), ClientError> {
1518 request.invoke_id = self.next_invoke_id().await;
1519 let invoke_id = request.invoke_id;
1520 let tx = self.encode_with_growth(|w| {
1521 Npdu::new(0).encode(w)?;
1522 request.encode(w)
1523 })?;
1524 self.await_simple_ack_or_error(
1525 address,
1526 &tx,
1527 invoke_id,
1528 SERVICE_SUBSCRIBE_COV_PROPERTY,
1529 self.response_timeout,
1530 )
1531 .await
1532 }
1533
1534 pub async fn cancel_cov_property_subscription(
1540 &self,
1541 address: DataLinkAddress,
1542 subscriber_process_id: u32,
1543 monitored_object_id: ObjectId,
1544 monitored_property_id: PropertyId,
1545 monitored_property_array_index: Option<u32>,
1546 ) -> Result<(), ClientError> {
1547 self.subscribe_cov_property(
1548 address,
1549 SubscribeCovPropertyRequest::cancel(
1550 subscriber_process_id,
1551 monitored_object_id,
1552 monitored_property_id,
1553 monitored_property_array_index,
1554 0,
1555 ),
1556 )
1557 .await
1558 }
1559
1560 pub async fn read_range_by_position(
1565 &self,
1566 address: DataLinkAddress,
1567 object_id: ObjectId,
1568 property_id: PropertyId,
1569 array_index: Option<u32>,
1570 reference_index: i32,
1571 count: i16,
1572 ) -> Result<ReadRangeResult, ClientError> {
1573 let invoke_id = self.next_invoke_id().await;
1574 let req = ReadRangeRequest::by_position(
1575 object_id,
1576 property_id,
1577 array_index,
1578 reference_index,
1579 count,
1580 invoke_id,
1581 );
1582 self.read_range_with_request(address, req).await
1583 }
1584
1585 pub async fn read_range_by_sequence_number(
1590 &self,
1591 address: DataLinkAddress,
1592 object_id: ObjectId,
1593 property_id: PropertyId,
1594 array_index: Option<u32>,
1595 reference_sequence: u32,
1596 count: i16,
1597 ) -> Result<ReadRangeResult, ClientError> {
1598 let invoke_id = self.next_invoke_id().await;
1599 let req = ReadRangeRequest::by_sequence_number(
1600 object_id,
1601 property_id,
1602 array_index,
1603 reference_sequence,
1604 count,
1605 invoke_id,
1606 );
1607 self.read_range_with_request(address, req).await
1608 }
1609
1610 pub async fn read_range_by_time(
1615 &self,
1616 address: DataLinkAddress,
1617 object_id: ObjectId,
1618 property_id: PropertyId,
1619 array_index: Option<u32>,
1620 at: (Date, Time),
1621 count: i16,
1622 ) -> Result<ReadRangeResult, ClientError> {
1623 let (date, time) = at;
1624 let invoke_id = self.next_invoke_id().await;
1625 let req = ReadRangeRequest::by_time(
1626 object_id,
1627 property_id,
1628 array_index,
1629 date,
1630 time,
1631 count,
1632 invoke_id,
1633 );
1634 self.read_range_with_request(address, req).await
1635 }
1636
1637 async fn read_range_with_request(
1638 &self,
1639 address: DataLinkAddress,
1640 req: ReadRangeRequest,
1641 ) -> Result<ReadRangeResult, ClientError> {
1642 let invoke_id = req.invoke_id;
1643 let tx = self.encode_with_growth(|w| {
1644 Npdu::new(0).encode(w)?;
1645 req.encode(w)
1646 })?;
1647 let payload = self
1648 .await_complex_ack_payload_or_error(
1649 address,
1650 &tx,
1651 invoke_id,
1652 SERVICE_READ_RANGE,
1653 self.response_timeout,
1654 )
1655 .await?;
1656 let mut pr = Reader::new(&payload);
1657 let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1658 into_client_read_range(parsed)
1659 }
1660
1661 pub async fn recv_cov_notification(
1667 &self,
1668 wait: Duration,
1669 ) -> Result<Option<CovNotification>, ClientError> {
1670 let _io_lock = self.request_io_lock.lock().await;
1671 let deadline = tokio::time::Instant::now() + wait;
1672
1673 while tokio::time::Instant::now() < deadline {
1674 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1675 let mut rx = [0u8; 1500];
1676 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1677 let (n, source) = match recv {
1678 Ok(Ok(v)) => v,
1679 Ok(Err(e)) => return Err(e.into()),
1680 Err(_) => break,
1681 };
1682
1683 let apdu = extract_apdu(&rx[..n])?;
1684 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1685 match ApduType::from_u8(first >> 4) {
1686 Some(ApduType::UnconfirmedRequest) => {
1687 let mut r = Reader::new(apdu);
1688 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1689 if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1690 continue;
1691 }
1692 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1693 return Ok(Some(into_client_cov_notification(source, false, cov)?));
1694 }
1695 Some(ApduType::ConfirmedRequest) => {
1696 let mut r = Reader::new(apdu);
1697 let header = ConfirmedRequestHeader::decode(&mut r)?;
1698 if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1699 continue;
1700 }
1701 if header.segmented {
1702 return Err(ClientError::UnsupportedResponse);
1703 }
1704
1705 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1706 self.send_simple_ack(
1707 source,
1708 header.invoke_id,
1709 SERVICE_CONFIRMED_COV_NOTIFICATION,
1710 )
1711 .await?;
1712 return Ok(Some(into_client_cov_notification(source, true, cov)?));
1713 }
1714 _ => continue,
1715 }
1716 }
1717
1718 Ok(None)
1719 }
1720
1721 pub async fn recv_event_notification(
1727 &self,
1728 wait: Duration,
1729 ) -> Result<Option<EventNotification>, ClientError> {
1730 let _io_lock = self.request_io_lock.lock().await;
1731 let deadline = tokio::time::Instant::now() + wait;
1732
1733 while tokio::time::Instant::now() < deadline {
1734 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1735 let mut rx = [0u8; 1500];
1736 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1737 let (n, source) = match recv {
1738 Ok(Ok(v)) => v,
1739 Ok(Err(e)) => return Err(e.into()),
1740 Err(_) => break,
1741 };
1742
1743 let apdu = extract_apdu(&rx[..n])?;
1744 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1745 match ApduType::from_u8(first >> 4) {
1746 Some(ApduType::UnconfirmedRequest) => {
1747 let mut r = Reader::new(apdu);
1748 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1749 if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1750 continue;
1751 }
1752 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1753 return Ok(Some(into_client_event_notification(
1754 source,
1755 false,
1756 notification,
1757 )));
1758 }
1759 Some(ApduType::ConfirmedRequest) => {
1760 let mut r = Reader::new(apdu);
1761 let header = ConfirmedRequestHeader::decode(&mut r)?;
1762 if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1763 continue;
1764 }
1765 if header.segmented {
1766 return Err(ClientError::UnsupportedResponse);
1767 }
1768 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1769 self.send_simple_ack(
1770 source,
1771 header.invoke_id,
1772 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1773 )
1774 .await?;
1775 return Ok(Some(into_client_event_notification(
1776 source,
1777 true,
1778 notification,
1779 )));
1780 }
1781 _ => continue,
1782 }
1783 }
1784
1785 Ok(None)
1786 }
1787
1788 pub async fn read_property(
1793 &self,
1794 address: DataLinkAddress,
1795 object_id: ObjectId,
1796 property_id: PropertyId,
1797 ) -> Result<ClientDataValue, ClientError> {
1798 let invoke_id = self.next_invoke_id().await;
1799 let req = ReadPropertyRequest {
1800 object_id,
1801 property_id,
1802 array_index: None,
1803 invoke_id,
1804 };
1805 let tx = self.encode_with_growth(|w| {
1806 Npdu::new(0).encode(w)?;
1807 req.encode(w)
1808 })?;
1809 let payload = self
1810 .await_complex_ack_payload_or_error(
1811 address,
1812 &tx,
1813 invoke_id,
1814 SERVICE_READ_PROPERTY,
1815 self.response_timeout,
1816 )
1817 .await?;
1818 let mut pr = Reader::new(&payload);
1819 let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1820 into_client_value(parsed.value)
1821 }
1822
1823 pub async fn write_property(
1825 &self,
1826 address: DataLinkAddress,
1827 mut request: WritePropertyRequest<'_>,
1828 ) -> Result<(), ClientError> {
1829 request.invoke_id = self.next_invoke_id().await;
1830 let invoke_id = request.invoke_id;
1831 let tx = self.encode_with_growth(|w| {
1832 Npdu::new(0).encode(w)?;
1833 request.encode(w)
1834 })?;
1835 self.await_simple_ack_or_error(
1836 address,
1837 &tx,
1838 invoke_id,
1839 SERVICE_WRITE_PROPERTY,
1840 self.response_timeout,
1841 )
1842 .await
1843 }
1844
1845 pub async fn read_property_multiple(
1850 &self,
1851 address: DataLinkAddress,
1852 object_id: ObjectId,
1853 property_ids: &[PropertyId],
1854 ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1855 let refs: Vec<PropertyReference> = property_ids
1856 .iter()
1857 .copied()
1858 .map(|property_id| PropertyReference {
1859 property_id,
1860 array_index: None,
1861 })
1862 .collect();
1863 let specs = [ReadAccessSpecification {
1864 object_id,
1865 properties: &refs,
1866 }];
1867
1868 let invoke_id = self.next_invoke_id().await;
1869 let req = ReadPropertyMultipleRequest {
1870 specs: &specs,
1871 invoke_id,
1872 };
1873
1874 let tx = self.encode_with_growth(|w| {
1875 Npdu::new(0).encode(w)?;
1876 req.encode(w)
1877 })?;
1878 let payload = self
1879 .await_complex_ack_payload_or_error(
1880 address,
1881 &tx,
1882 invoke_id,
1883 SERVICE_READ_PROPERTY_MULTIPLE,
1884 self.response_timeout,
1885 )
1886 .await?;
1887 let mut pr = Reader::new(&payload);
1888 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1889 let mut out = Vec::new();
1890 for access in parsed.results {
1891 if access.object_id != object_id {
1892 continue;
1893 }
1894 for item in access.results {
1895 out.push((item.property_id, into_client_value(item.value)?));
1896 }
1897 }
1898 Ok(out)
1899 }
1900
1901 pub async fn write_property_multiple(
1904 &self,
1905 address: DataLinkAddress,
1906 object_id: ObjectId,
1907 properties: &[PropertyWriteSpec<'_>],
1908 ) -> Result<(), ClientError> {
1909 let invoke_id = self.next_invoke_id().await;
1910 let specs = [WriteAccessSpecification {
1911 object_id,
1912 properties,
1913 }];
1914 let req = WritePropertyMultipleRequest {
1915 specs: &specs,
1916 invoke_id,
1917 };
1918
1919 let tx = self.encode_with_growth(|w| {
1920 Npdu::new(0).encode(w)?;
1921 req.encode(w)
1922 })?;
1923 self.await_simple_ack_or_error(
1924 address,
1925 &tx,
1926 invoke_id,
1927 SERVICE_WRITE_PROPERTY_MULTIPLE,
1928 self.response_timeout,
1929 )
1930 .await
1931 }
1932
1933 pub async fn private_transfer(
1935 &self,
1936 address: DataLinkAddress,
1937 vendor_id: u32,
1938 service_number: u32,
1939 service_parameters: Option<&[u8]>,
1940 ) -> Result<PrivateTransferAck, ClientError> {
1941 let invoke_id = self.next_invoke_id().await;
1942 let req = ConfirmedPrivateTransferRequest {
1943 vendor_id,
1944 service_number,
1945 service_parameters,
1946 invoke_id,
1947 };
1948
1949 let tx = self.encode_with_growth(|w| {
1950 Npdu::new(0).encode(w)?;
1951 req.encode(w)
1952 })?;
1953 let payload = self
1954 .await_complex_ack_payload_or_error(
1955 address,
1956 &tx,
1957 invoke_id,
1958 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1959 self.response_timeout,
1960 )
1961 .await?;
1962 let mut r = Reader::new(&payload);
1963 PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1964 }
1965
1966 pub async fn read_many(
1972 &self,
1973 address: DataLinkAddress,
1974 requests: &[(ObjectId, PropertyId)],
1975 ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
1976 let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
1978 for &(oid, pid) in requests {
1979 if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
1980 entry.1.push(PropertyReference {
1981 property_id: pid,
1982 array_index: None,
1983 });
1984 } else {
1985 grouped.push((
1986 oid,
1987 vec![PropertyReference {
1988 property_id: pid,
1989 array_index: None,
1990 }],
1991 ));
1992 }
1993 }
1994
1995 let specs: Vec<ReadAccessSpecification<'_>> = grouped
1996 .iter()
1997 .map(|(oid, props)| ReadAccessSpecification {
1998 object_id: *oid,
1999 properties: props,
2000 })
2001 .collect();
2002
2003 let invoke_id = self.next_invoke_id().await;
2004 let req = ReadPropertyMultipleRequest {
2005 specs: &specs,
2006 invoke_id,
2007 };
2008 let tx = self.encode_with_growth(|w| {
2009 Npdu::new(0).encode(w)?;
2010 req.encode(w)
2011 })?;
2012 let payload = self
2013 .await_complex_ack_payload_or_error(
2014 address,
2015 &tx,
2016 invoke_id,
2017 SERVICE_READ_PROPERTY_MULTIPLE,
2018 self.response_timeout,
2019 )
2020 .await?;
2021
2022 let mut pr = Reader::new(&payload);
2023 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2024 let mut out = HashMap::new();
2025 for access in parsed.results {
2026 for item in access.results {
2027 if let Ok(v) = into_client_value(item.value) {
2028 out.insert((access.object_id, item.property_id), v);
2029 }
2030 }
2031 }
2032 Ok(out)
2033 }
2034
2035 pub async fn write_many(
2042 &self,
2043 address: DataLinkAddress,
2044 writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2045 ) -> Result<(), ClientError> {
2046 use rustbac_core::types::{BitString, DataValue as DV};
2047
2048 fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2049 match v {
2050 ClientDataValue::Null => DV::Null,
2051 ClientDataValue::Boolean(b) => DV::Boolean(*b),
2052 ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2053 ClientDataValue::Signed(n) => DV::Signed(*n),
2054 ClientDataValue::Real(f) => DV::Real(*f),
2055 ClientDataValue::Double(f) => DV::Double(*f),
2056 ClientDataValue::OctetString(b) => DV::OctetString(b),
2057 ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2058 ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2059 unused_bits: *unused_bits,
2060 data,
2061 }),
2062 ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2063 ClientDataValue::Date(d) => DV::Date(*d),
2064 ClientDataValue::Time(t) => DV::Time(*t),
2065 ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2066 ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2067 tag_num: *tag_num,
2068 values: values.iter().map(cv_to_dv).collect(),
2069 },
2070 }
2071 }
2072
2073 let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2075 .iter()
2076 .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2077 .collect();
2078
2079 let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2081 for (oid, pid, val, prio) in &converted {
2082 let spec = PropertyWriteSpec {
2083 property_id: *pid,
2084 array_index: None,
2085 value: val.clone(),
2086 priority: *prio,
2087 };
2088 if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2089 entry.1.push(spec);
2090 } else {
2091 grouped.push((*oid, vec![spec]));
2092 }
2093 }
2094
2095 let specs: Vec<WriteAccessSpecification<'_>> = grouped
2096 .iter()
2097 .map(|(oid, props)| WriteAccessSpecification {
2098 object_id: *oid,
2099 properties: props,
2100 })
2101 .collect();
2102
2103 let invoke_id = self.next_invoke_id().await;
2104 let req = WritePropertyMultipleRequest {
2105 specs: &specs,
2106 invoke_id,
2107 };
2108 let tx = self.encode_with_growth(|w| {
2109 Npdu::new(0).encode(w)?;
2110 req.encode(w)
2111 })?;
2112 self.await_simple_ack_or_error(
2113 address,
2114 &tx,
2115 invoke_id,
2116 SERVICE_WRITE_PROPERTY_MULTIPLE,
2117 self.response_timeout,
2118 )
2119 .await
2120 }
2121}
2122
2123fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2124 let mut r = Reader::new(payload);
2125 let _npdu = Npdu::decode(&mut r)?;
2126 r.read_exact(r.remaining()).map_err(ClientError::from)
2127}
2128
2129fn remote_service_error(err: BacnetError) -> ClientError {
2130 ClientError::RemoteServiceError {
2131 service_choice: err.service_choice,
2132 error_class_raw: err.error_class,
2133 error_code_raw: err.error_code,
2134 error_class: err.error_class.and_then(ErrorClass::from_u32),
2135 error_code: err.error_code.and_then(ErrorCode::from_u32),
2136 }
2137}
2138
2139fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
2140 Ok(match value {
2141 DataValue::Null => ClientDataValue::Null,
2142 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
2143 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
2144 DataValue::Signed(v) => ClientDataValue::Signed(v),
2145 DataValue::Real(v) => ClientDataValue::Real(v),
2146 DataValue::Double(v) => ClientDataValue::Double(v),
2147 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
2148 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
2149 DataValue::BitString(v) => ClientDataValue::BitString {
2150 unused_bits: v.unused_bits,
2151 data: v.data.to_vec(),
2152 },
2153 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
2154 DataValue::Date(v) => ClientDataValue::Date(v),
2155 DataValue::Time(v) => ClientDataValue::Time(v),
2156 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
2157 DataValue::Constructed { tag_num, values } => {
2158 let mut children = Vec::with_capacity(values.len());
2159 for child in values {
2160 children.push(into_client_value(child)?);
2161 }
2162 ClientDataValue::Constructed {
2163 tag_num,
2164 values: children,
2165 }
2166 }
2167 })
2168}
2169
2170fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
2171 value
2172 .into_iter()
2173 .map(|item| AlarmSummaryItem {
2174 object_id: item.object_id,
2175 alarm_state_raw: item.alarm_state,
2176 alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2177 item.alarm_state,
2178 ),
2179 acknowledged_transitions: ClientBitString {
2180 unused_bits: item.acknowledged_transitions.unused_bits,
2181 data: item.acknowledged_transitions.data.to_vec(),
2182 },
2183 })
2184 .collect()
2185}
2186
2187fn into_client_enrollment_summary(
2188 value: Vec<CoreEnrollmentSummaryItem>,
2189) -> Vec<EnrollmentSummaryItem> {
2190 value
2191 .into_iter()
2192 .map(|item| EnrollmentSummaryItem {
2193 object_id: item.object_id,
2194 event_type: item.event_type,
2195 event_state_raw: item.event_state,
2196 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2197 item.event_state,
2198 ),
2199 priority: item.priority,
2200 notification_class: item.notification_class,
2201 })
2202 .collect()
2203}
2204
2205fn into_client_event_information(
2206 value: Vec<CoreEventSummaryItem<'_>>,
2207) -> Vec<EventInformationItem> {
2208 value
2209 .into_iter()
2210 .map(|item| EventInformationItem {
2211 object_id: item.object_id,
2212 event_state_raw: item.event_state,
2213 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2214 item.event_state,
2215 ),
2216 acknowledged_transitions: ClientBitString {
2217 unused_bits: item.acknowledged_transitions.unused_bits,
2218 data: item.acknowledged_transitions.data.to_vec(),
2219 },
2220 notify_type: item.notify_type,
2221 event_enable: ClientBitString {
2222 unused_bits: item.event_enable.unused_bits,
2223 data: item.event_enable.data.to_vec(),
2224 },
2225 event_priorities: item.event_priorities,
2226 })
2227 .collect()
2228}
2229
2230fn into_client_cov_notification(
2231 source: DataLinkAddress,
2232 confirmed: bool,
2233 value: CovNotificationRequest<'_>,
2234) -> Result<CovNotification, ClientError> {
2235 let mut values = Vec::with_capacity(value.values.len());
2236 for property in value.values {
2237 values.push(CovPropertyValue {
2238 property_id: property.property_id,
2239 array_index: property.array_index,
2240 value: into_client_value(property.value)?,
2241 priority: property.priority,
2242 });
2243 }
2244
2245 Ok(CovNotification {
2246 source,
2247 confirmed,
2248 subscriber_process_id: value.subscriber_process_id,
2249 initiating_device_id: value.initiating_device_id,
2250 monitored_object_id: value.monitored_object_id,
2251 time_remaining_seconds: value.time_remaining_seconds,
2252 values,
2253 })
2254}
2255
2256fn into_client_event_notification(
2257 source: DataLinkAddress,
2258 confirmed: bool,
2259 value: EventNotificationRequest<'_>,
2260) -> EventNotification {
2261 EventNotification {
2262 source,
2263 confirmed,
2264 process_id: value.process_id,
2265 initiating_device_id: value.initiating_device_id,
2266 event_object_id: value.event_object_id,
2267 timestamp: value.timestamp,
2268 notification_class: value.notification_class,
2269 priority: value.priority,
2270 event_type: value.event_type,
2271 message_text: value.message_text.map(str::to_string),
2272 notify_type: value.notify_type,
2273 ack_required: value.ack_required,
2274 from_state_raw: value.from_state,
2275 from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2276 value.from_state,
2277 ),
2278 to_state_raw: value.to_state,
2279 to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
2280 }
2281}
2282
2283fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
2284 let mut items = Vec::with_capacity(value.items.len());
2285 for item in value.items {
2286 items.push(into_client_value(item)?);
2287 }
2288 Ok(ReadRangeResult {
2289 object_id: value.object_id,
2290 property_id: value.property_id,
2291 array_index: value.array_index,
2292 result_flags: ClientBitString {
2293 unused_bits: value.result_flags.unused_bits,
2294 data: value.result_flags.data.to_vec(),
2295 },
2296 item_count: value.item_count,
2297 items,
2298 })
2299}
2300
2301fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
2302 match value.access_method {
2303 AtomicReadFileAckAccess::Stream {
2304 file_start_position,
2305 file_data,
2306 } => AtomicReadFileResult::Stream {
2307 end_of_file: value.end_of_file,
2308 file_start_position,
2309 file_data: file_data.to_vec(),
2310 },
2311 AtomicReadFileAckAccess::Record {
2312 file_start_record,
2313 returned_record_count,
2314 file_record_data,
2315 } => AtomicReadFileResult::Record {
2316 end_of_file: value.end_of_file,
2317 file_start_record,
2318 returned_record_count,
2319 file_record_data: file_record_data
2320 .into_iter()
2321 .map(|record| record.to_vec())
2322 .collect(),
2323 },
2324 }
2325}
2326
2327fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
2328 match value {
2329 AtomicWriteFileAck::Stream {
2330 file_start_position,
2331 } => AtomicWriteFileResult::Stream {
2332 file_start_position,
2333 },
2334 AtomicWriteFileAck::Record { file_start_record } => {
2335 AtomicWriteFileResult::Record { file_start_record }
2336 }
2337 }
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342 use super::BacnetClient;
2343 use crate::{
2344 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2345 EnrollmentSummaryItem, EventInformationItem, EventNotification,
2346 };
2347 use rustbac_core::apdu::{
2348 ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2349 UnconfirmedRequestHeader,
2350 };
2351 use rustbac_core::encoding::{
2352 primitives::{
2353 decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2354 encode_ctx_object_id, encode_ctx_unsigned,
2355 },
2356 reader::Reader,
2357 tag::{AppTag, Tag},
2358 writer::Writer,
2359 };
2360 use rustbac_core::npdu::Npdu;
2361 use rustbac_core::services::acknowledge_alarm::{
2362 AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2363 };
2364 use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2365 use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2366 use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2367 use rustbac_core::services::cov_notification::{
2368 SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2369 };
2370 use rustbac_core::services::device_management::{
2371 DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2372 SERVICE_REINITIALIZE_DEVICE,
2373 };
2374 use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2375 use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2376 use rustbac_core::services::event_notification::{
2377 SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2378 };
2379 use rustbac_core::services::list_element::{
2380 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2381 SERVICE_REMOVE_LIST_ELEMENT,
2382 };
2383 use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2384 use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2385 use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2386 use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2387 use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2388 use rustbac_core::services::subscribe_cov_property::{
2389 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2390 };
2391 use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2392 use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2393 use rustbac_core::services::write_property_multiple::{
2394 PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2395 };
2396 use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2397 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2398 use std::collections::VecDeque;
2399 use std::sync::Arc;
2400 use std::time::Duration;
2401 use tokio::sync::Mutex;
2402
2403 #[derive(Debug, Default)]
2404 struct MockState {
2405 sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2406 recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2407 }
2408
2409 #[derive(Debug, Clone)]
2410 struct MockDataLink {
2411 state: Arc<MockState>,
2412 }
2413
2414 impl MockDataLink {
2415 fn new() -> (Self, Arc<MockState>) {
2416 let state = Arc::new(MockState::default());
2417 (
2418 Self {
2419 state: state.clone(),
2420 },
2421 state,
2422 )
2423 }
2424 }
2425
2426 impl DataLink for MockDataLink {
2427 async fn send(
2428 &self,
2429 address: DataLinkAddress,
2430 payload: &[u8],
2431 ) -> Result<(), DataLinkError> {
2432 self.state
2433 .sent
2434 .lock()
2435 .await
2436 .push((address, payload.to_vec()));
2437 Ok(())
2438 }
2439
2440 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2441 let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2442 return Err(DataLinkError::InvalidFrame);
2443 };
2444 if payload.len() > buf.len() {
2445 return Err(DataLinkError::FrameTooLarge);
2446 }
2447 buf[..payload.len()].copy_from_slice(&payload);
2448 Ok((payload.len(), addr))
2449 }
2450 }
2451
2452 fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2453 let mut out = [0u8; 512];
2454 let mut w = Writer::new(&mut out);
2455 Npdu::new(0).encode(&mut w).unwrap();
2456 w.write_all(apdu).unwrap();
2457 w.as_written().to_vec()
2458 }
2459
2460 fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2461 let mut apdu_buf = [0u8; 256];
2462 let mut w = Writer::new(&mut apdu_buf);
2463 ComplexAckHeader {
2464 segmented: false,
2465 more_follows: false,
2466 invoke_id,
2467 sequence_number: None,
2468 proposed_window_size: None,
2469 service_choice: SERVICE_READ_RANGE,
2470 }
2471 .encode(&mut w)
2472 .unwrap();
2473 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2474 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2475 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2476 w.write_u8(5).unwrap();
2477 w.write_u8(0b1110_0000).unwrap();
2478 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2479 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2480 encode_app_real(&mut w, 42.0).unwrap();
2481 encode_app_real(&mut w, 43.0).unwrap();
2482 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2483 w.as_written().to_vec()
2484 }
2485
2486 fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2487 let mut apdu_buf = [0u8; 256];
2488 let mut w = Writer::new(&mut apdu_buf);
2489 ComplexAckHeader {
2490 segmented: false,
2491 more_follows: false,
2492 invoke_id,
2493 sequence_number: None,
2494 proposed_window_size: None,
2495 service_choice: SERVICE_ATOMIC_READ_FILE,
2496 }
2497 .encode(&mut w)
2498 .unwrap();
2499 Tag::Application {
2500 tag: AppTag::Boolean,
2501 len: if eof { 1 } else { 0 },
2502 }
2503 .encode(&mut w)
2504 .unwrap();
2505 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2506 Tag::Application {
2507 tag: AppTag::SignedInt,
2508 len: 1,
2509 }
2510 .encode(&mut w)
2511 .unwrap();
2512 w.write_u8(0).unwrap();
2513 Tag::Application {
2514 tag: AppTag::OctetString,
2515 len: data.len() as u32,
2516 }
2517 .encode(&mut w)
2518 .unwrap();
2519 w.write_all(data).unwrap();
2520 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2521 w.as_written().to_vec()
2522 }
2523
2524 fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2525 let mut apdu_buf = [0u8; 256];
2526 let mut w = Writer::new(&mut apdu_buf);
2527 ComplexAckHeader {
2528 segmented: false,
2529 more_follows: false,
2530 invoke_id,
2531 sequence_number: None,
2532 proposed_window_size: None,
2533 service_choice: SERVICE_ATOMIC_READ_FILE,
2534 }
2535 .encode(&mut w)
2536 .unwrap();
2537 Tag::Application {
2538 tag: AppTag::Boolean,
2539 len: 0,
2540 }
2541 .encode(&mut w)
2542 .unwrap();
2543 Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2544 Tag::Application {
2545 tag: AppTag::SignedInt,
2546 len: 1,
2547 }
2548 .encode(&mut w)
2549 .unwrap();
2550 w.write_u8(7).unwrap();
2551 Tag::Application {
2552 tag: AppTag::UnsignedInt,
2553 len: 1,
2554 }
2555 .encode(&mut w)
2556 .unwrap();
2557 w.write_u8(2).unwrap();
2558 Tag::Application {
2559 tag: AppTag::OctetString,
2560 len: 2,
2561 }
2562 .encode(&mut w)
2563 .unwrap();
2564 w.write_all(&[0x01, 0x02]).unwrap();
2565 Tag::Application {
2566 tag: AppTag::OctetString,
2567 len: 3,
2568 }
2569 .encode(&mut w)
2570 .unwrap();
2571 w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2572 Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2573 w.as_written().to_vec()
2574 }
2575
2576 fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2577 let mut apdu_buf = [0u8; 64];
2578 let mut w = Writer::new(&mut apdu_buf);
2579 ComplexAckHeader {
2580 segmented: false,
2581 more_follows: false,
2582 invoke_id,
2583 sequence_number: None,
2584 proposed_window_size: None,
2585 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2586 }
2587 .encode(&mut w)
2588 .unwrap();
2589 Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2590 w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2591 w.as_written().to_vec()
2592 }
2593
2594 fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2595 let mut apdu_buf = [0u8; 64];
2596 let mut w = Writer::new(&mut apdu_buf);
2597 ComplexAckHeader {
2598 segmented: false,
2599 more_follows: false,
2600 invoke_id,
2601 sequence_number: None,
2602 proposed_window_size: None,
2603 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2604 }
2605 .encode(&mut w)
2606 .unwrap();
2607 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2608 w.write_u8(start_record as u8).unwrap();
2609 w.as_written().to_vec()
2610 }
2611
2612 fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2613 let mut apdu_buf = [0u8; 64];
2614 let mut w = Writer::new(&mut apdu_buf);
2615 ComplexAckHeader {
2616 segmented: false,
2617 more_follows: false,
2618 invoke_id,
2619 sequence_number: None,
2620 proposed_window_size: None,
2621 service_choice: SERVICE_CREATE_OBJECT,
2622 }
2623 .encode(&mut w)
2624 .unwrap();
2625 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2626 w.as_written().to_vec()
2627 }
2628
2629 fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2630 let mut apdu_buf = [0u8; 128];
2631 let mut w = Writer::new(&mut apdu_buf);
2632 ComplexAckHeader {
2633 segmented: false,
2634 more_follows: false,
2635 invoke_id,
2636 sequence_number: None,
2637 proposed_window_size: None,
2638 service_choice: SERVICE_GET_ALARM_SUMMARY,
2639 }
2640 .encode(&mut w)
2641 .unwrap();
2642 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2643 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2644 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2645 w.write_u8(5).unwrap();
2646 w.write_u8(0b1110_0000).unwrap();
2647
2648 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2649 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2650 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2651 w.write_u8(5).unwrap();
2652 w.write_u8(0b1100_0000).unwrap();
2653 w.as_written().to_vec()
2654 }
2655
2656 fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2657 let mut apdu_buf = [0u8; 160];
2658 let mut w = Writer::new(&mut apdu_buf);
2659 ComplexAckHeader {
2660 segmented: false,
2661 more_follows: false,
2662 invoke_id,
2663 sequence_number: None,
2664 proposed_window_size: None,
2665 service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2666 }
2667 .encode(&mut w)
2668 .unwrap();
2669 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2670 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2671 encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2672 encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2673 encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2674
2675 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2676 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2677 encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2678 encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2679 encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2680 w.as_written().to_vec()
2681 }
2682
2683 fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2684 let mut apdu_buf = [0u8; 256];
2685 let mut w = Writer::new(&mut apdu_buf);
2686 ComplexAckHeader {
2687 segmented: false,
2688 more_follows: false,
2689 invoke_id,
2690 sequence_number: None,
2691 proposed_window_size: None,
2692 service_choice: SERVICE_GET_EVENT_INFORMATION,
2693 }
2694 .encode(&mut w)
2695 .unwrap();
2696 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2697 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2698 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2699 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2700 w.write_u8(5).unwrap();
2701 w.write_u8(0b1110_0000).unwrap();
2702 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2703 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2704 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2705 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2706 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2707 encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2708 Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2709 w.write_u8(5).unwrap();
2710 w.write_u8(0b1100_0000).unwrap();
2711 Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2712 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2713 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2714 encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2715 Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2716 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2717 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2718 w.write_u8(0).unwrap();
2719 w.as_written().to_vec()
2720 }
2721
2722 #[tokio::test]
2723 async fn who_has_object_name_collects_i_have() {
2724 let (dl, state) = MockDataLink::new();
2725 let client = BacnetClient::with_datalink(dl);
2726 let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2727
2728 let mut apdu = [0u8; 128];
2729 let mut w = Writer::new(&mut apdu);
2730 UnconfirmedRequestHeader {
2731 service_choice: SERVICE_I_HAVE,
2732 }
2733 .encode(&mut w)
2734 .unwrap();
2735 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2736 encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2737 encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2738
2739 state
2740 .recv
2741 .lock()
2742 .await
2743 .push_back((with_npdu(w.as_written()), addr));
2744
2745 let results = client
2746 .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2747 .await
2748 .unwrap();
2749 assert_eq!(results.len(), 1);
2750 assert_eq!(results[0].address, addr);
2751 assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2752 assert_eq!(
2753 results[0].object_id,
2754 ObjectId::new(ObjectType::AnalogInput, 7)
2755 );
2756 assert_eq!(results[0].object_name, "Zone Temp");
2757
2758 let sent = state.sent.lock().await;
2759 assert_eq!(sent.len(), 1);
2760 let mut r = Reader::new(&sent[0].1);
2761 let _npdu = Npdu::decode(&mut r).unwrap();
2762 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2763 assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2764 }
2765
2766 #[tokio::test]
2767 async fn device_communication_control_handles_simple_ack() {
2768 let (dl, state) = MockDataLink::new();
2769 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2770 let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2771
2772 let mut apdu = [0u8; 32];
2773 let mut w = Writer::new(&mut apdu);
2774 SimpleAck {
2775 invoke_id: 1,
2776 service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2777 }
2778 .encode(&mut w)
2779 .unwrap();
2780 state
2781 .recv
2782 .lock()
2783 .await
2784 .push_back((with_npdu(w.as_written()), addr));
2785
2786 client
2787 .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2788 .await
2789 .unwrap();
2790
2791 let sent = state.sent.lock().await;
2792 assert_eq!(sent.len(), 1);
2793 let mut r = Reader::new(&sent[0].1);
2794 let _npdu = Npdu::decode(&mut r).unwrap();
2795 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2796 assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2797 }
2798
2799 #[tokio::test]
2800 async fn reinitialize_device_handles_simple_ack() {
2801 let (dl, state) = MockDataLink::new();
2802 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2803 let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2804
2805 let mut apdu = [0u8; 32];
2806 let mut w = Writer::new(&mut apdu);
2807 SimpleAck {
2808 invoke_id: 1,
2809 service_choice: SERVICE_REINITIALIZE_DEVICE,
2810 }
2811 .encode(&mut w)
2812 .unwrap();
2813 state
2814 .recv
2815 .lock()
2816 .await
2817 .push_back((with_npdu(w.as_written()), addr));
2818
2819 client
2820 .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2821 .await
2822 .unwrap();
2823
2824 let sent = state.sent.lock().await;
2825 assert_eq!(sent.len(), 1);
2826 let mut r = Reader::new(&sent[0].1);
2827 let _npdu = Npdu::decode(&mut r).unwrap();
2828 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2829 assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2830 }
2831
2832 #[tokio::test]
2833 async fn time_synchronize_sends_unconfirmed_request() {
2834 let (dl, state) = MockDataLink::new();
2835 let client = BacnetClient::with_datalink(dl);
2836 let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2837
2838 client
2839 .time_synchronize(
2840 addr,
2841 Date {
2842 year_since_1900: 126,
2843 month: 2,
2844 day: 7,
2845 weekday: 6,
2846 },
2847 Time {
2848 hour: 10,
2849 minute: 11,
2850 second: 12,
2851 hundredths: 13,
2852 },
2853 false,
2854 )
2855 .await
2856 .unwrap();
2857
2858 let sent = state.sent.lock().await;
2859 assert_eq!(sent.len(), 1);
2860 let mut r = Reader::new(&sent[0].1);
2861 let _npdu = Npdu::decode(&mut r).unwrap();
2862 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2863 assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2864 }
2865
2866 #[tokio::test]
2867 async fn get_alarm_summary_decodes_complex_ack() {
2868 let (dl, state) = MockDataLink::new();
2869 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2870 let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2871
2872 state
2873 .recv
2874 .lock()
2875 .await
2876 .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2877
2878 let summaries = client.get_alarm_summary(addr).await.unwrap();
2879 assert_eq!(summaries.len(), 2);
2880 assert_eq!(
2881 summaries[0],
2882 AlarmSummaryItem {
2883 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2884 alarm_state_raw: 1,
2885 alarm_state: Some(EventState::Fault),
2886 acknowledged_transitions: crate::ClientBitString {
2887 unused_bits: 5,
2888 data: vec![0b1110_0000],
2889 },
2890 }
2891 );
2892 assert_eq!(
2893 summaries[1],
2894 AlarmSummaryItem {
2895 object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2896 alarm_state_raw: 0,
2897 alarm_state: Some(EventState::Normal),
2898 acknowledged_transitions: crate::ClientBitString {
2899 unused_bits: 5,
2900 data: vec![0b1100_0000],
2901 },
2902 }
2903 );
2904
2905 let sent = state.sent.lock().await;
2906 assert_eq!(sent.len(), 1);
2907 let mut r = Reader::new(&sent[0].1);
2908 let _npdu = Npdu::decode(&mut r).unwrap();
2909 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2910 assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2911 }
2912
2913 #[tokio::test]
2914 async fn get_enrollment_summary_decodes_complex_ack() {
2915 let (dl, state) = MockDataLink::new();
2916 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2917 let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2918
2919 state
2920 .recv
2921 .lock()
2922 .await
2923 .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2924
2925 let summaries = client.get_enrollment_summary(addr).await.unwrap();
2926 assert_eq!(summaries.len(), 2);
2927 assert_eq!(
2928 summaries[0],
2929 EnrollmentSummaryItem {
2930 object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2931 event_type: 1,
2932 event_state_raw: 2,
2933 event_state: Some(EventState::Offnormal),
2934 priority: 200,
2935 notification_class: 10,
2936 }
2937 );
2938 assert_eq!(
2939 summaries[1],
2940 EnrollmentSummaryItem {
2941 object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2942 event_type: 0,
2943 event_state_raw: 0,
2944 event_state: Some(EventState::Normal),
2945 priority: 20,
2946 notification_class: 11,
2947 }
2948 );
2949
2950 let sent = state.sent.lock().await;
2951 assert_eq!(sent.len(), 1);
2952 let mut r = Reader::new(&sent[0].1);
2953 let _npdu = Npdu::decode(&mut r).unwrap();
2954 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2955 assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2956 }
2957
2958 #[tokio::test]
2959 async fn get_event_information_decodes_complex_ack() {
2960 let (dl, state) = MockDataLink::new();
2961 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2962 let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2963
2964 state
2965 .recv
2966 .lock()
2967 .await
2968 .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2969
2970 let result = client.get_event_information(addr, None).await.unwrap();
2971 assert!(!result.more_events);
2972 assert_eq!(result.summaries.len(), 1);
2973 assert_eq!(
2974 result.summaries[0],
2975 EventInformationItem {
2976 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2977 event_state_raw: 2,
2978 event_state: Some(EventState::Offnormal),
2979 acknowledged_transitions: crate::ClientBitString {
2980 unused_bits: 5,
2981 data: vec![0b1110_0000],
2982 },
2983 notify_type: 0,
2984 event_enable: crate::ClientBitString {
2985 unused_bits: 5,
2986 data: vec![0b1100_0000],
2987 },
2988 event_priorities: [1, 2, 3],
2989 }
2990 );
2991 }
2992
2993 #[tokio::test]
2994 async fn acknowledge_alarm_handles_simple_ack() {
2995 let (dl, state) = MockDataLink::new();
2996 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2997 let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2998
2999 let mut apdu = [0u8; 32];
3000 let mut w = Writer::new(&mut apdu);
3001 SimpleAck {
3002 invoke_id: 1,
3003 service_choice: SERVICE_ACKNOWLEDGE_ALARM,
3004 }
3005 .encode(&mut w)
3006 .unwrap();
3007 state
3008 .recv
3009 .lock()
3010 .await
3011 .push_back((with_npdu(w.as_written()), addr));
3012
3013 client
3014 .acknowledge_alarm(
3015 addr,
3016 AcknowledgeAlarmRequest {
3017 acknowledging_process_id: 10,
3018 event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3019 event_state_acknowledged: EventState::Offnormal,
3020 event_time_stamp: TimeStamp::SequenceNumber(42),
3021 acknowledgment_source: "operator",
3022 time_of_acknowledgment: TimeStamp::DateTime {
3023 date: Date {
3024 year_since_1900: 126,
3025 month: 2,
3026 day: 7,
3027 weekday: 6,
3028 },
3029 time: Time {
3030 hour: 10,
3031 minute: 11,
3032 second: 12,
3033 hundredths: 13,
3034 },
3035 },
3036 invoke_id: 0,
3037 },
3038 )
3039 .await
3040 .unwrap();
3041
3042 let sent = state.sent.lock().await;
3043 assert_eq!(sent.len(), 1);
3044 let mut r = Reader::new(&sent[0].1);
3045 let _npdu = Npdu::decode(&mut r).unwrap();
3046 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3047 assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
3048 }
3049
3050 #[tokio::test]
3051 async fn create_object_by_type_decodes_complex_ack() {
3052 let (dl, state) = MockDataLink::new();
3053 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3054 let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
3055 let created = ObjectId::new(ObjectType::AnalogValue, 42);
3056
3057 state
3058 .recv
3059 .lock()
3060 .await
3061 .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
3062
3063 let result = client
3064 .create_object_by_type(addr, ObjectType::AnalogValue)
3065 .await
3066 .unwrap();
3067 assert_eq!(result, created);
3068
3069 let sent = state.sent.lock().await;
3070 let mut r = Reader::new(&sent[0].1);
3071 let _npdu = Npdu::decode(&mut r).unwrap();
3072 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3073 assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
3074 }
3075
3076 #[tokio::test]
3077 async fn delete_object_handles_simple_ack() {
3078 let (dl, state) = MockDataLink::new();
3079 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3080 let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
3081
3082 let mut apdu = [0u8; 32];
3083 let mut w = Writer::new(&mut apdu);
3084 SimpleAck {
3085 invoke_id: 1,
3086 service_choice: SERVICE_DELETE_OBJECT,
3087 }
3088 .encode(&mut w)
3089 .unwrap();
3090 state
3091 .recv
3092 .lock()
3093 .await
3094 .push_back((with_npdu(w.as_written()), addr));
3095
3096 client
3097 .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
3098 .await
3099 .unwrap();
3100 }
3101
3102 #[tokio::test]
3103 async fn add_list_element_handles_simple_ack() {
3104 let (dl, state) = MockDataLink::new();
3105 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3106 let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
3107
3108 let mut apdu = [0u8; 32];
3109 let mut w = Writer::new(&mut apdu);
3110 SimpleAck {
3111 invoke_id: 1,
3112 service_choice: SERVICE_ADD_LIST_ELEMENT,
3113 }
3114 .encode(&mut w)
3115 .unwrap();
3116 state
3117 .recv
3118 .lock()
3119 .await
3120 .push_back((with_npdu(w.as_written()), addr));
3121
3122 let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
3123 client
3124 .add_list_element(
3125 addr,
3126 AddListElementRequest {
3127 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3128 property_id: PropertyId::Proprietary(512),
3129 array_index: None,
3130 elements: &values,
3131 invoke_id: 0,
3132 },
3133 )
3134 .await
3135 .unwrap();
3136 }
3137
3138 #[tokio::test]
3139 async fn remove_list_element_handles_simple_ack() {
3140 let (dl, state) = MockDataLink::new();
3141 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3142 let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
3143
3144 let mut apdu = [0u8; 32];
3145 let mut w = Writer::new(&mut apdu);
3146 SimpleAck {
3147 invoke_id: 1,
3148 service_choice: SERVICE_REMOVE_LIST_ELEMENT,
3149 }
3150 .encode(&mut w)
3151 .unwrap();
3152 state
3153 .recv
3154 .lock()
3155 .await
3156 .push_back((with_npdu(w.as_written()), addr));
3157
3158 let values = [DataValue::Unsigned(1)];
3159 client
3160 .remove_list_element(
3161 addr,
3162 RemoveListElementRequest {
3163 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3164 property_id: PropertyId::Proprietary(513),
3165 array_index: None,
3166 elements: &values,
3167 invoke_id: 0,
3168 },
3169 )
3170 .await
3171 .unwrap();
3172 }
3173
3174 #[tokio::test]
3175 async fn atomic_read_file_stream_decodes_complex_ack() {
3176 let (dl, state) = MockDataLink::new();
3177 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3178 let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
3179 let file_object = ObjectId::new(ObjectType::File, 2);
3180
3181 state.recv.lock().await.push_back((
3182 with_npdu(&atomic_read_file_stream_ack_apdu(
3183 1,
3184 true,
3185 &[0xAA, 0xBB, 0xCC],
3186 )),
3187 addr,
3188 ));
3189
3190 let result = client
3191 .atomic_read_file_stream(addr, file_object, 0, 3)
3192 .await
3193 .unwrap();
3194
3195 assert_eq!(
3196 result,
3197 AtomicReadFileResult::Stream {
3198 end_of_file: true,
3199 file_start_position: 0,
3200 file_data: vec![0xAA, 0xBB, 0xCC],
3201 }
3202 );
3203
3204 let sent = state.sent.lock().await;
3205 assert_eq!(sent.len(), 1);
3206 let mut r = Reader::new(&sent[0].1);
3207 let _npdu = Npdu::decode(&mut r).unwrap();
3208 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3209 assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
3210 }
3211
3212 #[tokio::test]
3213 async fn atomic_read_file_record_decodes_complex_ack() {
3214 let (dl, state) = MockDataLink::new();
3215 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3216 let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
3217 let file_object = ObjectId::new(ObjectType::File, 5);
3218
3219 state
3220 .recv
3221 .lock()
3222 .await
3223 .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
3224
3225 let result = client
3226 .atomic_read_file_record(addr, file_object, 7, 2)
3227 .await
3228 .unwrap();
3229
3230 assert_eq!(
3231 result,
3232 AtomicReadFileResult::Record {
3233 end_of_file: false,
3234 file_start_record: 7,
3235 returned_record_count: 2,
3236 file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
3237 }
3238 );
3239 }
3240
3241 #[tokio::test]
3242 async fn atomic_write_file_stream_decodes_complex_ack() {
3243 let (dl, state) = MockDataLink::new();
3244 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3245 let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
3246 let file_object = ObjectId::new(ObjectType::File, 3);
3247
3248 state
3249 .recv
3250 .lock()
3251 .await
3252 .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
3253
3254 let result = client
3255 .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
3256 .await
3257 .unwrap();
3258
3259 assert_eq!(
3260 result,
3261 AtomicWriteFileResult::Stream {
3262 file_start_position: 128
3263 }
3264 );
3265 }
3266
3267 #[tokio::test]
3268 async fn atomic_write_file_record_decodes_complex_ack() {
3269 let (dl, state) = MockDataLink::new();
3270 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3271 let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
3272 let file_object = ObjectId::new(ObjectType::File, 9);
3273 let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
3274
3275 state
3276 .recv
3277 .lock()
3278 .await
3279 .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
3280
3281 let result = client
3282 .atomic_write_file_record(addr, file_object, 7, &records)
3283 .await
3284 .unwrap();
3285
3286 assert_eq!(
3287 result,
3288 AtomicWriteFileResult::Record {
3289 file_start_record: 7
3290 }
3291 );
3292 }
3293
3294 #[tokio::test]
3295 async fn read_properties_decodes_complex_ack() {
3296 let (dl, state) = MockDataLink::new();
3297 let client = BacnetClient::with_datalink(dl);
3298 let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
3299 let object_id = ObjectId::new(ObjectType::Device, 1);
3300
3301 let mut apdu_buf = [0u8; 256];
3302 let mut w = Writer::new(&mut apdu_buf);
3303 ComplexAckHeader {
3304 segmented: false,
3305 more_follows: false,
3306 invoke_id: 1,
3307 sequence_number: None,
3308 proposed_window_size: None,
3309 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3310 }
3311 .encode(&mut w)
3312 .unwrap();
3313 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
3314 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3315 .encode(&mut w)
3316 .unwrap();
3317 encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
3318 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3319 .encode(&mut w)
3320 .unwrap();
3321 encode_app_real(&mut w, 55.5).unwrap();
3322 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3323 .encode(&mut w)
3324 .unwrap();
3325 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3326 .encode(&mut w)
3327 .unwrap();
3328
3329 state
3330 .recv
3331 .lock()
3332 .await
3333 .push_back((with_npdu(w.as_written()), addr));
3334
3335 let values = client
3336 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3337 .await
3338 .unwrap();
3339 assert_eq!(values.len(), 1);
3340 assert_eq!(values[0].0, PropertyId::PresentValue);
3341 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3342
3343 let sent = state.sent.lock().await;
3344 assert_eq!(sent.len(), 1);
3345 let mut r = Reader::new(&sent[0].1);
3346 let _npdu = Npdu::decode(&mut r).unwrap();
3347 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3348 assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3349 }
3350
3351 #[tokio::test]
3352 async fn read_property_multiple_reassembles_segmented_complex_ack() {
3353 let (dl, state) = MockDataLink::new();
3354 let client = BacnetClient::with_datalink(dl);
3355 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3356 let object_id = ObjectId::new(ObjectType::Device, 1);
3357
3358 let mut payload_buf = [0u8; 256];
3359 let mut pw = Writer::new(&mut payload_buf);
3360 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3361 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3362 .encode(&mut pw)
3363 .unwrap();
3364 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3365 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3366 .encode(&mut pw)
3367 .unwrap();
3368 encode_app_real(&mut pw, 66.0).unwrap();
3369 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3370 .encode(&mut pw)
3371 .unwrap();
3372 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3373 .encode(&mut pw)
3374 .unwrap();
3375 let payload = pw.as_written();
3376 let split = payload.len() / 2;
3377
3378 let mut apdu1 = [0u8; 256];
3379 let mut w1 = Writer::new(&mut apdu1);
3380 ComplexAckHeader {
3381 segmented: true,
3382 more_follows: true,
3383 invoke_id: 1,
3384 sequence_number: Some(0),
3385 proposed_window_size: Some(1),
3386 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3387 }
3388 .encode(&mut w1)
3389 .unwrap();
3390 w1.write_all(&payload[..split]).unwrap();
3391
3392 let mut apdu2 = [0u8; 256];
3393 let mut w2 = Writer::new(&mut apdu2);
3394 ComplexAckHeader {
3395 segmented: true,
3396 more_follows: false,
3397 invoke_id: 1,
3398 sequence_number: Some(1),
3399 proposed_window_size: Some(1),
3400 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3401 }
3402 .encode(&mut w2)
3403 .unwrap();
3404 w2.write_all(&payload[split..]).unwrap();
3405
3406 state
3407 .recv
3408 .lock()
3409 .await
3410 .push_back((with_npdu(w1.as_written()), addr));
3411 state
3412 .recv
3413 .lock()
3414 .await
3415 .push_back((with_npdu(w2.as_written()), addr));
3416
3417 let values = client
3418 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3419 .await
3420 .unwrap();
3421 assert_eq!(values.len(), 1);
3422 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3423
3424 let sent = state.sent.lock().await;
3425 assert!(sent.len() >= 3);
3426
3427 let mut saw_segment_ack = 0usize;
3428 for (_, frame) in sent.iter().skip(1) {
3429 let mut r = Reader::new(frame);
3430 let _npdu = Npdu::decode(&mut r).unwrap();
3431 let apdu = r.read_exact(r.remaining()).unwrap();
3432 if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3433 let mut sr = Reader::new(apdu);
3434 let sack = SegmentAck::decode(&mut sr).unwrap();
3435 assert_eq!(sack.invoke_id, 1);
3436 saw_segment_ack += 1;
3437 }
3438 }
3439 assert!(saw_segment_ack >= 1);
3440 }
3441
3442 #[tokio::test]
3443 async fn read_property_multiple_tolerates_duplicate_segment() {
3444 let (dl, state) = MockDataLink::new();
3445 let client = BacnetClient::with_datalink(dl);
3446 let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3447 let object_id = ObjectId::new(ObjectType::Device, 1);
3448
3449 let mut payload_buf = [0u8; 256];
3450 let mut pw = Writer::new(&mut payload_buf);
3451 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3452 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3453 .encode(&mut pw)
3454 .unwrap();
3455 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3456 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3457 .encode(&mut pw)
3458 .unwrap();
3459 encode_app_real(&mut pw, 66.0).unwrap();
3460 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3461 .encode(&mut pw)
3462 .unwrap();
3463 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3464 .encode(&mut pw)
3465 .unwrap();
3466 let payload = pw.as_written();
3467 let split = payload.len() / 2;
3468
3469 let mut apdu1 = [0u8; 256];
3470 let mut w1 = Writer::new(&mut apdu1);
3471 ComplexAckHeader {
3472 segmented: true,
3473 more_follows: true,
3474 invoke_id: 1,
3475 sequence_number: Some(0),
3476 proposed_window_size: Some(1),
3477 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3478 }
3479 .encode(&mut w1)
3480 .unwrap();
3481 w1.write_all(&payload[..split]).unwrap();
3482
3483 let mut dup = [0u8; 256];
3484 let mut wd = Writer::new(&mut dup);
3485 ComplexAckHeader {
3486 segmented: true,
3487 more_follows: true,
3488 invoke_id: 1,
3489 sequence_number: Some(0),
3490 proposed_window_size: Some(1),
3491 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3492 }
3493 .encode(&mut wd)
3494 .unwrap();
3495 wd.write_all(&payload[..split]).unwrap();
3496
3497 let mut apdu2 = [0u8; 256];
3498 let mut w2 = Writer::new(&mut apdu2);
3499 ComplexAckHeader {
3500 segmented: true,
3501 more_follows: false,
3502 invoke_id: 1,
3503 sequence_number: Some(1),
3504 proposed_window_size: Some(1),
3505 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3506 }
3507 .encode(&mut w2)
3508 .unwrap();
3509 w2.write_all(&payload[split..]).unwrap();
3510
3511 {
3512 let mut recv = state.recv.lock().await;
3513 recv.push_back((with_npdu(w1.as_written()), addr));
3514 recv.push_back((with_npdu(wd.as_written()), addr));
3515 recv.push_back((with_npdu(w2.as_written()), addr));
3516 }
3517
3518 let values = client
3519 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3520 .await
3521 .unwrap();
3522 assert_eq!(values.len(), 1);
3523 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3524 }
3525
3526 #[tokio::test]
3527 async fn write_properties_handles_simple_ack() {
3528 let (dl, state) = MockDataLink::new();
3529 let client = BacnetClient::with_datalink(dl);
3530 let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3531 let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3532
3533 let mut apdu_buf = [0u8; 32];
3534 let mut w = Writer::new(&mut apdu_buf);
3535 SimpleAck {
3536 invoke_id: 1,
3537 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3538 }
3539 .encode(&mut w)
3540 .unwrap();
3541 state
3542 .recv
3543 .lock()
3544 .await
3545 .push_back((with_npdu(w.as_written()), addr));
3546
3547 let writes = [PropertyWriteSpec {
3548 property_id: PropertyId::PresentValue,
3549 array_index: None,
3550 value: DataValue::Real(12.5),
3551 priority: Some(8),
3552 }];
3553 client
3554 .write_property_multiple(addr, object_id, &writes)
3555 .await
3556 .unwrap();
3557
3558 let sent = state.sent.lock().await;
3559 assert_eq!(sent.len(), 1);
3560 let mut r = Reader::new(&sent[0].1);
3561 let _npdu = Npdu::decode(&mut r).unwrap();
3562 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3563 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3564 }
3565
3566 #[tokio::test]
3567 async fn subscribe_cov_handles_simple_ack() {
3568 let (dl, state) = MockDataLink::new();
3569 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3570 let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3571
3572 let mut apdu_buf = [0u8; 32];
3573 let mut w = Writer::new(&mut apdu_buf);
3574 SimpleAck {
3575 invoke_id: 1,
3576 service_choice: SERVICE_SUBSCRIBE_COV,
3577 }
3578 .encode(&mut w)
3579 .unwrap();
3580 state
3581 .recv
3582 .lock()
3583 .await
3584 .push_back((with_npdu(w.as_written()), addr));
3585
3586 client
3587 .subscribe_cov(
3588 addr,
3589 SubscribeCovRequest {
3590 subscriber_process_id: 10,
3591 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3592 issue_confirmed_notifications: Some(false),
3593 lifetime_seconds: Some(300),
3594 invoke_id: 0,
3595 },
3596 )
3597 .await
3598 .unwrap();
3599
3600 let sent = state.sent.lock().await;
3601 assert_eq!(sent.len(), 1);
3602 let mut r = Reader::new(&sent[0].1);
3603 let _npdu = Npdu::decode(&mut r).unwrap();
3604 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3605 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3606 }
3607
3608 #[tokio::test]
3609 async fn subscribe_cov_property_handles_simple_ack() {
3610 let (dl, state) = MockDataLink::new();
3611 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3612 let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3613
3614 let mut apdu_buf = [0u8; 32];
3615 let mut w = Writer::new(&mut apdu_buf);
3616 SimpleAck {
3617 invoke_id: 1,
3618 service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3619 }
3620 .encode(&mut w)
3621 .unwrap();
3622 state
3623 .recv
3624 .lock()
3625 .await
3626 .push_back((with_npdu(w.as_written()), addr));
3627
3628 client
3629 .subscribe_cov_property(
3630 addr,
3631 SubscribeCovPropertyRequest {
3632 subscriber_process_id: 22,
3633 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3634 issue_confirmed_notifications: Some(true),
3635 lifetime_seconds: Some(120),
3636 monitored_property_id: PropertyId::PresentValue,
3637 monitored_property_array_index: None,
3638 cov_increment: Some(0.1),
3639 invoke_id: 0,
3640 },
3641 )
3642 .await
3643 .unwrap();
3644
3645 let sent = state.sent.lock().await;
3646 assert_eq!(sent.len(), 1);
3647 let mut r = Reader::new(&sent[0].1);
3648 let _npdu = Npdu::decode(&mut r).unwrap();
3649 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3650 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3651 }
3652
3653 #[tokio::test]
3654 async fn read_range_by_position_decodes_complex_ack() {
3655 let (dl, state) = MockDataLink::new();
3656 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3657 let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3658 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3659
3660 let mut apdu_buf = [0u8; 256];
3661 let mut w = Writer::new(&mut apdu_buf);
3662 ComplexAckHeader {
3663 segmented: false,
3664 more_follows: false,
3665 invoke_id: 1,
3666 sequence_number: None,
3667 proposed_window_size: None,
3668 service_choice: SERVICE_READ_RANGE,
3669 }
3670 .encode(&mut w)
3671 .unwrap();
3672 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3673 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3674 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3675 w.write_u8(5).unwrap();
3676 w.write_u8(0b1110_0000).unwrap();
3677 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3678 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3679 encode_app_real(&mut w, 42.0).unwrap();
3680 encode_app_real(&mut w, 43.0).unwrap();
3681 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3682
3683 state
3684 .recv
3685 .lock()
3686 .await
3687 .push_back((with_npdu(w.as_written()), addr));
3688
3689 let result = client
3690 .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3691 .await
3692 .unwrap();
3693 assert_eq!(result.object_id, object_id);
3694 assert_eq!(result.item_count, 2);
3695 assert_eq!(result.items.len(), 2);
3696 assert!(matches!(
3697 result.items[0],
3698 ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3699 ));
3700 }
3701
3702 #[tokio::test]
3703 async fn read_range_by_sequence_number_encodes_range_selector() {
3704 let (dl, state) = MockDataLink::new();
3705 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3706 let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3707 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3708
3709 state
3710 .recv
3711 .lock()
3712 .await
3713 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3714
3715 let _ = client
3716 .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3717 .await
3718 .unwrap();
3719
3720 let sent = state.sent.lock().await;
3721 let mut r = Reader::new(&sent[0].1);
3722 let _npdu = Npdu::decode(&mut r).unwrap();
3723 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3724 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3725 match Tag::decode(&mut r).unwrap() {
3726 Tag::Context { tag_num: 0, len: 4 } => {
3727 let _ = r.read_exact(4).unwrap();
3728 }
3729 other => panic!("unexpected object id tag: {other:?}"),
3730 }
3731 match Tag::decode(&mut r).unwrap() {
3732 Tag::Context { tag_num: 1, len } => {
3733 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3734 }
3735 other => panic!("unexpected property tag: {other:?}"),
3736 }
3737 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3738 match Tag::decode(&mut r).unwrap() {
3739 Tag::Application {
3740 tag: AppTag::UnsignedInt,
3741 len,
3742 } => {
3743 assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3744 }
3745 other => panic!("unexpected ref seq tag: {other:?}"),
3746 }
3747 match Tag::decode(&mut r).unwrap() {
3748 Tag::Application {
3749 tag: AppTag::SignedInt,
3750 len,
3751 } => {
3752 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3753 }
3754 other => panic!("unexpected count tag: {other:?}"),
3755 }
3756 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3757 }
3758
3759 #[tokio::test]
3760 async fn read_range_by_time_encodes_range_selector() {
3761 let (dl, state) = MockDataLink::new();
3762 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3763 let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3764 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3765 let date = Date {
3766 year_since_1900: 126,
3767 month: 2,
3768 day: 7,
3769 weekday: 6,
3770 };
3771 let time = Time {
3772 hour: 10,
3773 minute: 11,
3774 second: 12,
3775 hundredths: 13,
3776 };
3777
3778 state
3779 .recv
3780 .lock()
3781 .await
3782 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3783
3784 let _ = client
3785 .read_range_by_time(
3786 addr,
3787 object_id,
3788 PropertyId::PresentValue,
3789 None,
3790 (date, time),
3791 2,
3792 )
3793 .await
3794 .unwrap();
3795
3796 let sent = state.sent.lock().await;
3797 let mut r = Reader::new(&sent[0].1);
3798 let _npdu = Npdu::decode(&mut r).unwrap();
3799 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3800 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3801 match Tag::decode(&mut r).unwrap() {
3802 Tag::Context { tag_num: 0, len: 4 } => {
3803 let _ = r.read_exact(4).unwrap();
3804 }
3805 other => panic!("unexpected object id tag: {other:?}"),
3806 }
3807 match Tag::decode(&mut r).unwrap() {
3808 Tag::Context { tag_num: 1, len } => {
3809 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3810 }
3811 other => panic!("unexpected property tag: {other:?}"),
3812 }
3813 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3814 match Tag::decode(&mut r).unwrap() {
3815 Tag::Application {
3816 tag: AppTag::Date,
3817 len: 4,
3818 } => {
3819 let raw = r.read_exact(4).unwrap();
3820 assert_eq!(
3821 raw,
3822 &[date.year_since_1900, date.month, date.day, date.weekday]
3823 );
3824 }
3825 other => panic!("unexpected date tag: {other:?}"),
3826 }
3827 match Tag::decode(&mut r).unwrap() {
3828 Tag::Application {
3829 tag: AppTag::Time,
3830 len: 4,
3831 } => {
3832 let raw = r.read_exact(4).unwrap();
3833 assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3834 }
3835 other => panic!("unexpected time tag: {other:?}"),
3836 }
3837 match Tag::decode(&mut r).unwrap() {
3838 Tag::Application {
3839 tag: AppTag::SignedInt,
3840 len,
3841 } => {
3842 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3843 }
3844 other => panic!("unexpected count tag: {other:?}"),
3845 }
3846 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3847 }
3848
3849 #[tokio::test]
3850 async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3851 let (dl, state) = MockDataLink::new();
3852 let client = BacnetClient::with_datalink(dl);
3853 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3854
3855 let mut apdu = [0u8; 256];
3856 let mut w = Writer::new(&mut apdu);
3857 UnconfirmedRequestHeader {
3858 service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3859 }
3860 .encode(&mut w)
3861 .unwrap();
3862 encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3863 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3864 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3865 encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3866 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3867 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3868 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3869 encode_app_real(&mut w, 73.25).unwrap();
3870 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3871 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3872
3873 state
3874 .recv
3875 .lock()
3876 .await
3877 .push_back((with_npdu(w.as_written()), addr));
3878
3879 let notification = client
3880 .recv_cov_notification(Duration::from_secs(1))
3881 .await
3882 .unwrap()
3883 .unwrap();
3884 assert!(!notification.confirmed);
3885 assert_eq!(notification.subscriber_process_id, 17);
3886 assert_eq!(notification.values.len(), 1);
3887 assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3888 assert!(matches!(
3889 notification.values[0].value,
3890 ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3891 ));
3892
3893 let sent = state.sent.lock().await;
3894 assert!(sent.is_empty());
3895 }
3896
3897 #[tokio::test]
3898 async fn recv_confirmed_cov_notification_sends_simple_ack() {
3899 let (dl, state) = MockDataLink::new();
3900 let client = BacnetClient::with_datalink(dl);
3901 let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3902
3903 let mut apdu = [0u8; 256];
3904 let mut w = Writer::new(&mut apdu);
3905 ConfirmedRequestHeader {
3906 segmented: false,
3907 more_follows: false,
3908 segmented_response_accepted: false,
3909 max_segments: 0,
3910 max_apdu: 5,
3911 invoke_id: 9,
3912 sequence_number: None,
3913 proposed_window_size: None,
3914 service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3915 }
3916 .encode(&mut w)
3917 .unwrap();
3918 encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3919 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3920 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3921 encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3922 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3923 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3924 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3925 encode_app_real(&mut w, 55.0).unwrap();
3926 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3927 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3928
3929 state
3930 .recv
3931 .lock()
3932 .await
3933 .push_back((with_npdu(w.as_written()), addr));
3934
3935 let notification = client
3936 .recv_cov_notification(Duration::from_secs(1))
3937 .await
3938 .unwrap()
3939 .unwrap();
3940 assert!(notification.confirmed);
3941 assert_eq!(notification.values.len(), 1);
3942
3943 let sent = state.sent.lock().await;
3944 assert_eq!(sent.len(), 1);
3945 let mut r = Reader::new(&sent[0].1);
3946 let _npdu = Npdu::decode(&mut r).unwrap();
3947 let ack = SimpleAck::decode(&mut r).unwrap();
3948 assert_eq!(ack.invoke_id, 9);
3949 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3950 }
3951
3952 #[tokio::test]
3953 async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3954 let (dl, state) = MockDataLink::new();
3955 let client = BacnetClient::with_datalink(dl);
3956 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3957
3958 let mut apdu = [0u8; 256];
3959 let mut w = Writer::new(&mut apdu);
3960 UnconfirmedRequestHeader {
3961 service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3962 }
3963 .encode(&mut w)
3964 .unwrap();
3965 encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3966 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3967 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3968 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3969 encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3970 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3971 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3972 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3973 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3974 encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3975 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3976 Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3977 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3978 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3979 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3980 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3981 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3982 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3983 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3984
3985 state
3986 .recv
3987 .lock()
3988 .await
3989 .push_back((with_npdu(w.as_written()), addr));
3990
3991 let notification: EventNotification = client
3992 .recv_event_notification(Duration::from_secs(1))
3993 .await
3994 .unwrap()
3995 .unwrap();
3996 assert!(!notification.confirmed);
3997 assert_eq!(notification.process_id, 99);
3998 assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3999 assert_eq!(notification.ack_required, Some(true));
4000 assert_eq!(notification.from_state, Some(EventState::Offnormal));
4001 assert_eq!(notification.to_state, Some(EventState::Normal));
4002 assert_eq!(notification.notify_type, 0);
4003
4004 let sent = state.sent.lock().await;
4005 assert!(sent.is_empty());
4006 }
4007
4008 #[tokio::test]
4009 async fn recv_confirmed_event_notification_sends_simple_ack() {
4010 let (dl, state) = MockDataLink::new();
4011 let client = BacnetClient::with_datalink(dl);
4012 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4013
4014 let mut apdu = [0u8; 256];
4015 let mut w = Writer::new(&mut apdu);
4016 ConfirmedRequestHeader {
4017 segmented: false,
4018 more_follows: false,
4019 segmented_response_accepted: false,
4020 max_segments: 0,
4021 max_apdu: 5,
4022 invoke_id: 11,
4023 sequence_number: None,
4024 proposed_window_size: None,
4025 service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
4026 }
4027 .encode(&mut w)
4028 .unwrap();
4029 encode_ctx_unsigned(&mut w, 0, 100).unwrap();
4030 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4031 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
4032 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4033 encode_ctx_unsigned(&mut w, 1, 56).unwrap();
4034 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4035 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4036 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4037 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4038 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4039 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4040 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4041 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4042 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4043 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4044 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
4045 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
4046
4047 state
4048 .recv
4049 .lock()
4050 .await
4051 .push_back((with_npdu(w.as_written()), addr));
4052
4053 let notification = client
4054 .recv_event_notification(Duration::from_secs(1))
4055 .await
4056 .unwrap()
4057 .unwrap();
4058 assert!(notification.confirmed);
4059
4060 let sent = state.sent.lock().await;
4061 assert_eq!(sent.len(), 1);
4062 let mut r = Reader::new(&sent[0].1);
4063 let _npdu = Npdu::decode(&mut r).unwrap();
4064 let ack = SimpleAck::decode(&mut r).unwrap();
4065 assert_eq!(ack.invoke_id, 11);
4066 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
4067 }
4068
4069 #[tokio::test]
4070 async fn write_property_multiple_segments_large_request() {
4071 let (dl, state) = MockDataLink::new();
4072 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4073 let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
4074 let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
4075
4076 {
4077 let mut recv = state.recv.lock().await;
4078 for seq in 0u8..=254 {
4079 let mut apdu = [0u8; 16];
4080 let mut w = Writer::new(&mut apdu);
4081 SegmentAck {
4082 negative_ack: false,
4083 sent_by_server: true,
4084 invoke_id: 1,
4085 sequence_number: seq,
4086 actual_window_size: 1,
4087 }
4088 .encode(&mut w)
4089 .unwrap();
4090 recv.push_back((with_npdu(w.as_written()), addr));
4091 }
4092
4093 let mut apdu = [0u8; 16];
4094 let mut w = Writer::new(&mut apdu);
4095 SimpleAck {
4096 invoke_id: 1,
4097 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4098 }
4099 .encode(&mut w)
4100 .unwrap();
4101 recv.push_back((with_npdu(w.as_written()), addr));
4102 }
4103
4104 let writes: Vec<PropertyWriteSpec> = (0..180)
4105 .map(|_| PropertyWriteSpec {
4106 property_id: PropertyId::Description,
4107 array_index: None,
4108 value: DataValue::CharacterString(
4109 "rustbac segmented write test payload................................................................",
4110 ),
4111 priority: None,
4112 })
4113 .collect();
4114
4115 client
4116 .write_property_multiple(addr, object_id, &writes)
4117 .await
4118 .unwrap();
4119
4120 let sent = state.sent.lock().await;
4121 assert!(sent.len() > 1);
4122
4123 let mut seqs = Vec::new();
4124 let mut saw_more_follows = false;
4125 let mut saw_last = false;
4126 for (_, frame) in sent.iter() {
4127 let mut r = Reader::new(frame);
4128 let _npdu = Npdu::decode(&mut r).unwrap();
4129 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4130 assert!(hdr.segmented);
4131 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4132 if hdr.more_follows {
4133 saw_more_follows = true;
4134 } else {
4135 saw_last = true;
4136 }
4137 seqs.push(hdr.sequence_number.unwrap());
4138 }
4139
4140 assert!(saw_more_follows);
4141 assert!(saw_last);
4142 for (idx, seq) in seqs.iter().enumerate() {
4143 assert_eq!(*seq as usize, idx);
4144 }
4145 }
4146
4147 #[tokio::test]
4148 async fn write_property_multiple_uses_configured_segment_window() {
4149 let (dl, state) = MockDataLink::new();
4150 let client = BacnetClient::with_datalink(dl)
4151 .with_response_timeout(Duration::from_secs(1))
4152 .with_segmented_request_window_size(4);
4153 let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
4154 let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
4155
4156 {
4157 let mut recv = state.recv.lock().await;
4158 for seq in 0u8..=254 {
4159 let mut apdu = [0u8; 16];
4160 let mut w = Writer::new(&mut apdu);
4161 SegmentAck {
4162 negative_ack: false,
4163 sent_by_server: true,
4164 invoke_id: 1,
4165 sequence_number: seq,
4166 actual_window_size: 4,
4167 }
4168 .encode(&mut w)
4169 .unwrap();
4170 recv.push_back((with_npdu(w.as_written()), addr));
4171 }
4172
4173 let mut apdu = [0u8; 16];
4174 let mut w = Writer::new(&mut apdu);
4175 SimpleAck {
4176 invoke_id: 1,
4177 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4178 }
4179 .encode(&mut w)
4180 .unwrap();
4181 recv.push_back((with_npdu(w.as_written()), addr));
4182 }
4183
4184 let writes: Vec<PropertyWriteSpec> = (0..180)
4185 .map(|_| PropertyWriteSpec {
4186 property_id: PropertyId::Description,
4187 array_index: None,
4188 value: DataValue::CharacterString(
4189 "rustbac segmented write test payload................................................................",
4190 ),
4191 priority: None,
4192 })
4193 .collect();
4194
4195 client
4196 .write_property_multiple(addr, object_id, &writes)
4197 .await
4198 .unwrap();
4199
4200 let sent = state.sent.lock().await;
4201 assert!(sent.len() > 4);
4202 for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
4203 let mut r = Reader::new(frame);
4204 let _npdu = Npdu::decode(&mut r).unwrap();
4205 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4206 assert!(hdr.segmented);
4207 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4208 assert_eq!(hdr.sequence_number, Some(idx as u8));
4209 assert_eq!(hdr.proposed_window_size, Some(4));
4210 }
4211 }
4212
4213 #[tokio::test]
4214 async fn write_property_multiple_adapts_window_to_peer_ack_window() {
4215 let (dl, state) = MockDataLink::new();
4216 let client = BacnetClient::with_datalink(dl)
4217 .with_response_timeout(Duration::from_secs(1))
4218 .with_segmented_request_window_size(4);
4219 let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
4220 let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
4221
4222 {
4223 let mut recv = state.recv.lock().await;
4224 for seq in 0u8..=254 {
4225 let mut apdu = [0u8; 16];
4226 let mut w = Writer::new(&mut apdu);
4227 SegmentAck {
4228 negative_ack: false,
4229 sent_by_server: true,
4230 invoke_id: 1,
4231 sequence_number: seq,
4232 actual_window_size: 2,
4233 }
4234 .encode(&mut w)
4235 .unwrap();
4236 recv.push_back((with_npdu(w.as_written()), addr));
4237 }
4238
4239 let mut apdu = [0u8; 16];
4240 let mut w = Writer::new(&mut apdu);
4241 SimpleAck {
4242 invoke_id: 1,
4243 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4244 }
4245 .encode(&mut w)
4246 .unwrap();
4247 recv.push_back((with_npdu(w.as_written()), addr));
4248 }
4249
4250 let writes: Vec<PropertyWriteSpec> = (0..180)
4251 .map(|_| PropertyWriteSpec {
4252 property_id: PropertyId::Description,
4253 array_index: None,
4254 value: DataValue::CharacterString(
4255 "rustbac segmented write test payload................................................................",
4256 ),
4257 priority: None,
4258 })
4259 .collect();
4260
4261 client
4262 .write_property_multiple(addr, object_id, &writes)
4263 .await
4264 .unwrap();
4265
4266 let sent = state.sent.lock().await;
4267 let mut saw_adapted_window = false;
4268 for (_, frame) in sent.iter() {
4269 let mut r = Reader::new(frame);
4270 let _npdu = Npdu::decode(&mut r).unwrap();
4271 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4272 if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
4273 saw_adapted_window = true;
4274 break;
4275 }
4276 }
4277 assert!(saw_adapted_window);
4278 }
4279
4280 #[tokio::test]
4281 async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
4282 let (dl, state) = MockDataLink::new();
4283 let client = BacnetClient::with_datalink(dl)
4284 .with_response_timeout(Duration::from_secs(1))
4285 .with_segmented_request_window_size(1)
4286 .with_segmented_request_retries(1);
4287 let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
4288 let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
4289
4290 {
4291 let mut recv = state.recv.lock().await;
4292
4293 let mut nack_apdu = [0u8; 16];
4294 let mut nack_w = Writer::new(&mut nack_apdu);
4295 SegmentAck {
4296 negative_ack: true,
4297 sent_by_server: true,
4298 invoke_id: 1,
4299 sequence_number: 0,
4300 actual_window_size: 1,
4301 }
4302 .encode(&mut nack_w)
4303 .unwrap();
4304 recv.push_back((with_npdu(nack_w.as_written()), addr));
4305
4306 for seq in 0u8..=254 {
4307 let mut apdu = [0u8; 16];
4308 let mut w = Writer::new(&mut apdu);
4309 SegmentAck {
4310 negative_ack: false,
4311 sent_by_server: true,
4312 invoke_id: 1,
4313 sequence_number: seq,
4314 actual_window_size: 1,
4315 }
4316 .encode(&mut w)
4317 .unwrap();
4318 recv.push_back((with_npdu(w.as_written()), addr));
4319 }
4320
4321 let mut apdu = [0u8; 16];
4322 let mut w = Writer::new(&mut apdu);
4323 SimpleAck {
4324 invoke_id: 1,
4325 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4326 }
4327 .encode(&mut w)
4328 .unwrap();
4329 recv.push_back((with_npdu(w.as_written()), addr));
4330 }
4331
4332 let writes: Vec<PropertyWriteSpec> = (0..180)
4333 .map(|_| PropertyWriteSpec {
4334 property_id: PropertyId::Description,
4335 array_index: None,
4336 value: DataValue::CharacterString(
4337 "rustbac segmented write test payload................................................................",
4338 ),
4339 priority: None,
4340 })
4341 .collect();
4342
4343 client
4344 .write_property_multiple(addr, object_id, &writes)
4345 .await
4346 .unwrap();
4347
4348 let sent = state.sent.lock().await;
4349 let mut seq0_frames = 0usize;
4350 for (_, frame) in sent.iter() {
4351 let mut r = Reader::new(frame);
4352 let _npdu = Npdu::decode(&mut r).unwrap();
4353 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4354 if hdr.sequence_number == Some(0) {
4355 seq0_frames += 1;
4356 }
4357 }
4358 assert!(seq0_frames >= 2);
4359 }
4360
4361 #[tokio::test]
4362 async fn read_property_ignores_invalid_frames_until_valid_response() {
4363 let (dl, state) = MockDataLink::new();
4364 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4365 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4366 let state_for_task = state.clone();
4367
4368 tokio::spawn(async move {
4369 tokio::time::sleep(Duration::from_millis(20)).await;
4370 let mut apdu = [0u8; 128];
4371 let mut w = Writer::new(&mut apdu);
4372 ComplexAckHeader {
4373 segmented: false,
4374 more_follows: false,
4375 invoke_id: 1,
4376 sequence_number: None,
4377 proposed_window_size: None,
4378 service_choice: SERVICE_READ_PROPERTY,
4379 }
4380 .encode(&mut w)
4381 .unwrap();
4382 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4383 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4384 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4385 encode_app_real(&mut w, 77.0).unwrap();
4386 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4387 state_for_task
4388 .recv
4389 .lock()
4390 .await
4391 .push_back((with_npdu(w.as_written()), addr));
4392 });
4393
4394 let value = client
4395 .read_property(
4396 addr,
4397 ObjectId::new(ObjectType::Device, 1),
4398 PropertyId::PresentValue,
4399 )
4400 .await
4401 .unwrap();
4402 assert!(matches!(
4403 value,
4404 ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4405 ));
4406 }
4407
4408 #[tokio::test]
4409 async fn read_property_maps_reject() {
4410 let (dl, state) = MockDataLink::new();
4411 let client = BacnetClient::with_datalink(dl);
4412 let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4413
4414 let mut apdu = [0u8; 8];
4415 let mut w = Writer::new(&mut apdu);
4416 w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4417 w.write_u8(1).unwrap(); w.write_u8(2).unwrap(); state
4420 .recv
4421 .lock()
4422 .await
4423 .push_back((with_npdu(w.as_written()), addr));
4424
4425 let err = client
4426 .read_property(
4427 addr,
4428 ObjectId::new(ObjectType::Device, 1),
4429 PropertyId::ObjectName,
4430 )
4431 .await
4432 .unwrap_err();
4433 assert!(matches!(
4434 err,
4435 crate::ClientError::RemoteReject { reason: 2 }
4436 ));
4437 }
4438
4439 #[tokio::test]
4440 async fn read_property_maps_remote_error_details() {
4441 let (dl, state) = MockDataLink::new();
4442 let client = BacnetClient::with_datalink(dl);
4443 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4444
4445 let mut apdu = [0u8; 16];
4446 let mut w = Writer::new(&mut apdu);
4447 w.write_u8((ApduType::Error as u8) << 4).unwrap();
4448 w.write_u8(1).unwrap(); w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4450 .unwrap();
4451 Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4452 w.write_u8(2).unwrap(); Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4454 w.write_u8(32).unwrap(); state
4457 .recv
4458 .lock()
4459 .await
4460 .push_back((with_npdu(w.as_written()), addr));
4461
4462 let err = client
4463 .read_property(
4464 addr,
4465 ObjectId::new(ObjectType::Device, 1),
4466 PropertyId::ObjectName,
4467 )
4468 .await
4469 .unwrap_err();
4470 assert!(matches!(
4471 err,
4472 crate::ClientError::RemoteServiceError {
4473 service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4474 error_class_raw: Some(2),
4475 error_code_raw: Some(32),
4476 error_class: Some(rustbac_core::types::ErrorClass::Property),
4477 error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4478 }
4479 ));
4480 }
4481
4482 #[tokio::test]
4483 async fn write_property_maps_abort() {
4484 let (dl, state) = MockDataLink::new();
4485 let client = BacnetClient::with_datalink(dl);
4486 let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4487
4488 let mut apdu = [0u8; 8];
4489 let mut w = Writer::new(&mut apdu);
4490 w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); w.write_u8(1).unwrap(); w.write_u8(9).unwrap(); state
4494 .recv
4495 .lock()
4496 .await
4497 .push_back((with_npdu(w.as_written()), addr));
4498
4499 let req = rustbac_core::services::write_property::WritePropertyRequest {
4500 object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4501 property_id: PropertyId::PresentValue,
4502 value: DataValue::Real(10.0),
4503 priority: Some(8),
4504 ..Default::default()
4505 };
4506 let err = client.write_property(addr, req).await.unwrap_err();
4507 assert!(matches!(
4508 err,
4509 crate::ClientError::RemoteAbort {
4510 reason: 9,
4511 server: true
4512 }
4513 ));
4514 }
4515
4516 #[tokio::test]
4517 async fn read_property_multiple_returns_owned_string() {
4518 let (dl, state) = MockDataLink::new();
4519 let client = BacnetClient::with_datalink(dl);
4520 let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4521 let object_id = ObjectId::new(ObjectType::Device, 1);
4522
4523 let mut apdu_buf = [0u8; 256];
4524 let mut w = Writer::new(&mut apdu_buf);
4525 ComplexAckHeader {
4526 segmented: false,
4527 more_follows: false,
4528 invoke_id: 1,
4529 sequence_number: None,
4530 proposed_window_size: None,
4531 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4532 }
4533 .encode(&mut w)
4534 .unwrap();
4535 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4536 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4537 .encode(&mut w)
4538 .unwrap();
4539 encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4540 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4541 .encode(&mut w)
4542 .unwrap();
4543 rustbac_core::services::value_codec::encode_application_data_value(
4544 &mut w,
4545 &DataValue::CharacterString("AHU-1"),
4546 )
4547 .unwrap();
4548 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4549 .encode(&mut w)
4550 .unwrap();
4551 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4552 .encode(&mut w)
4553 .unwrap();
4554
4555 state
4556 .recv
4557 .lock()
4558 .await
4559 .push_back((with_npdu(w.as_written()), addr));
4560
4561 let values = client
4562 .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4563 .await
4564 .unwrap();
4565 assert_eq!(values.len(), 1);
4566 assert_eq!(values[0].0, PropertyId::ObjectName);
4567 assert!(matches!(
4568 &values[0].1,
4569 ClientDataValue::CharacterString(s) if s == "AHU-1"
4570 ));
4571 }
4572
4573 #[tokio::test]
4574 async fn new_sc_rejects_invalid_endpoint() {
4575 let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4576 assert!(matches!(err, crate::ClientError::DataLink(_)));
4577 }
4578}