1use crate::{
2 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3 ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4 DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5 EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9 AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10 SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15 AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18 AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19 SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22 AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25 AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32 DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33 ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36 EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37 GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40 EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41 SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50 SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53 CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54 SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57 ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61 ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64 PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65 ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77 PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78 SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83 BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::{HashMap, HashSet};
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::sync::RwLock;
89use std::time::Duration;
90use tokio::sync::Mutex;
91use tokio::task::JoinHandle;
92use tokio::time::{timeout, Instant};
93
94const MIN_SEGMENT_DATA_LEN: usize = 32;
95const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
96
97#[derive(Debug)]
113pub struct BacnetClient<D: DataLink> {
114 datalink: D,
115 invoke_id: Mutex<u8>,
116 request_io_lock: Mutex<()>,
117 response_timeout: Duration,
118 segmented_request_window_size: u8,
119 segmented_request_retries: u8,
120 segment_ack_timeout: Duration,
121 capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
123}
124
125#[derive(Debug)]
131pub struct ForeignDeviceRenewal {
132 task: JoinHandle<()>,
133}
134
135impl ForeignDeviceRenewal {
136 pub fn stop(self) {
138 self.task.abort();
139 }
140}
141
142impl Drop for ForeignDeviceRenewal {
143 fn drop(&mut self) {
144 self.task.abort();
145 }
146}
147
148impl BacnetClient<BacnetIpTransport> {
149 pub async fn new() -> Result<Self, ClientError> {
153 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
154 let datalink = BacnetIpTransport::bind(bind_addr).await?;
155 Ok(Self {
156 datalink,
157 invoke_id: Mutex::new(1),
158 request_io_lock: Mutex::new(()),
159 response_timeout: Duration::from_secs(3),
160 segmented_request_window_size: 16,
161 segmented_request_retries: 2,
162 segment_ack_timeout: Duration::from_millis(500),
163 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
164 })
165 }
166
167 pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
174 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
175 let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
176 datalink.register_foreign_device(ttl_seconds).await?;
177 Ok(Self {
178 datalink,
179 invoke_id: Mutex::new(1),
180 request_io_lock: Mutex::new(()),
181 response_timeout: Duration::from_secs(3),
182 segmented_request_window_size: 16,
183 segmented_request_retries: 2,
184 segment_ack_timeout: Duration::from_millis(500),
185 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
186 })
187 }
188
189 pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
192 let _io = self.request_io_lock.lock().await;
193 self.datalink.register_foreign_device(ttl_seconds).await?;
194 Ok(())
195 }
196
197 pub async fn read_broadcast_distribution_table(
199 &self,
200 ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
201 let _io = self.request_io_lock.lock().await;
202 self.datalink
203 .read_broadcast_distribution_table()
204 .await
205 .map_err(ClientError::from)
206 }
207
208 pub async fn write_broadcast_distribution_table(
210 &self,
211 entries: &[BroadcastDistributionEntry],
212 ) -> Result<(), ClientError> {
213 let _io = self.request_io_lock.lock().await;
214 self.datalink
215 .write_broadcast_distribution_table(entries)
216 .await?;
217 Ok(())
218 }
219
220 pub async fn read_foreign_device_table(
222 &self,
223 ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
224 let _io = self.request_io_lock.lock().await;
225 self.datalink
226 .read_foreign_device_table()
227 .await
228 .map_err(ClientError::from)
229 }
230
231 pub async fn delete_foreign_device_table_entry(
233 &self,
234 address: SocketAddrV4,
235 ) -> Result<(), ClientError> {
236 let _io = self.request_io_lock.lock().await;
237 self.datalink
238 .delete_foreign_device_table_entry(address)
239 .await?;
240 Ok(())
241 }
242
243 pub fn start_foreign_device_renewal(
249 &self,
250 ttl_seconds: u16,
251 ) -> Result<ForeignDeviceRenewal, ClientError> {
252 if ttl_seconds == 0 {
253 return Err(EncodeError::InvalidLength.into());
254 }
255
256 let datalink = self.datalink.clone();
257 let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
258 let interval = Duration::from_secs(refresh_seconds.max(1));
259 let task = tokio::spawn(async move {
260 loop {
261 tokio::time::sleep(interval).await;
262 if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
263 log::warn!("foreign device renewal send failed: {err}");
264 }
265 }
266 });
267 Ok(ForeignDeviceRenewal { task })
268 }
269}
270
271impl BacnetClient<BacnetScTransport> {
272 pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
275 let datalink = BacnetScTransport::connect(endpoint).await?;
276 Ok(Self::with_datalink(datalink))
277 }
278}
279
280impl<D: DataLink> BacnetClient<D> {
281 pub fn with_datalink(datalink: D) -> Self {
285 Self {
286 datalink,
287 invoke_id: Mutex::new(1),
288 request_io_lock: Mutex::new(()),
289 response_timeout: Duration::from_secs(3),
290 segmented_request_window_size: 16,
291 segmented_request_retries: 2,
292 segment_ack_timeout: Duration::from_millis(500),
293 capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
294 }
295 }
296
297 pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
299 self.response_timeout = timeout;
300 self
301 }
302
303 pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
306 self.segmented_request_window_size = window_size.max(1);
307 self
308 }
309
310 pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
313 self.segmented_request_retries = retries;
314 self
315 }
316
317 pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
320 self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
321 self
322 }
323
324 async fn next_invoke_id(&self) -> u8 {
325 let mut lock = self.invoke_id.lock().await;
326 let id = *lock;
327 *lock = lock.wrapping_add(1);
328 if *lock == 0 {
329 *lock = 1;
330 }
331 id
332 }
333
334 async fn send_segment_ack(
335 &self,
336 address: DataLinkAddress,
337 invoke_id: u8,
338 sequence_number: u8,
339 window_size: u8,
340 ) -> Result<(), ClientError> {
341 let mut tx = [0u8; 64];
342 let mut w = Writer::new(&mut tx);
343 Npdu::new(0).encode(&mut w)?;
344 SegmentAck {
345 negative_ack: false,
346 sent_by_server: false,
347 invoke_id,
348 sequence_number,
349 actual_window_size: window_size,
350 }
351 .encode(&mut w)?;
352 self.datalink.send(address, w.as_written()).await?;
353 Ok(())
354 }
355
356 async fn recv_ignoring_invalid_frame(
357 &self,
358 buf: &mut [u8],
359 deadline: Instant,
360 ) -> Result<(usize, DataLinkAddress), ClientError> {
361 loop {
362 let remaining = deadline.saturating_duration_since(Instant::now());
363 if remaining.is_zero() {
364 return Err(ClientError::Timeout);
365 }
366
367 match timeout(remaining, self.datalink.recv(buf)).await {
368 Err(_) => return Err(ClientError::Timeout),
369 Ok(Err(DataLinkError::InvalidFrame)) => continue,
370 Ok(Err(e)) => return Err(e.into()),
371 Ok(Ok(v)) => return Ok(v),
372 }
373 }
374 }
375
376 async fn send_simple_ack(
377 &self,
378 address: DataLinkAddress,
379 invoke_id: u8,
380 service_choice: u8,
381 ) -> Result<(), ClientError> {
382 let mut tx = [0u8; 64];
383 let mut w = Writer::new(&mut tx);
384 Npdu::new(0).encode(&mut w)?;
385 SimpleAck {
386 invoke_id,
387 service_choice,
388 }
389 .encode(&mut w)?;
390 self.datalink.send(address, w.as_written()).await?;
391 Ok(())
392 }
393
394 fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
395 where
396 F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
397 {
398 for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
399 let mut buf = vec![0u8; size];
400 let mut w = Writer::new(&mut buf);
401 match encode(&mut w) {
402 Ok(()) => {
403 let written_len = w.as_written().len();
404 buf.truncate(written_len);
405 return Ok(buf);
406 }
407 Err(EncodeError::BufferTooSmall) => continue,
408 Err(e) => return Err(e.into()),
409 }
410 }
411 Err(ClientError::SegmentedRequestTooLarge)
412 }
413
414 const fn max_apdu_octets(max_apdu_code: u8) -> usize {
415 match max_apdu_code & 0x0f {
416 0 => 50,
417 1 => 128,
418 2 => 206,
419 3 => 480,
420 4 => 1024,
421 5 => 1476,
422 _ => 480,
423 }
424 }
425
426 async fn await_segment_ack(
427 &self,
428 address: DataLinkAddress,
429 invoke_id: u8,
430 service_choice: u8,
431 expected_sequence: u8,
432 deadline: Instant,
433 ) -> Result<SegmentAck, ClientError> {
434 loop {
435 let remaining = deadline.saturating_duration_since(Instant::now());
436 if remaining.is_zero() {
437 return Err(ClientError::Timeout);
438 }
439
440 let mut rx = [0u8; 1500];
441 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
442 let (n, src) = match recv {
443 Err(_) => return Err(ClientError::Timeout),
444 Ok(Err(DataLinkError::InvalidFrame)) => continue,
445 Ok(Err(e)) => return Err(e.into()),
446 Ok(Ok(v)) => v,
447 };
448 if src != address {
449 continue;
450 }
451
452 let Ok(apdu) = extract_apdu(&rx[..n]) else {
453 continue;
454 };
455 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
456 match ApduType::from_u8(first >> 4) {
457 Some(ApduType::SegmentAck) => {
458 let mut r = Reader::new(apdu);
459 let ack = SegmentAck::decode(&mut r)?;
460 if ack.invoke_id != invoke_id || !ack.sent_by_server {
461 continue;
462 }
463 if ack.negative_ack {
464 return Err(ClientError::SegmentNegativeAck {
465 sequence_number: ack.sequence_number,
466 });
467 }
468 if ack.sequence_number == expected_sequence {
469 return Ok(ack);
470 }
471 }
472 Some(ApduType::Error) => {
473 let mut r = Reader::new(apdu);
474 let err = BacnetError::decode(&mut r)?;
475 if err.invoke_id == invoke_id && err.service_choice == service_choice {
476 return Err(remote_service_error(err));
477 }
478 }
479 Some(ApduType::Reject) => {
480 let mut r = Reader::new(apdu);
481 let rej = RejectPdu::decode(&mut r)?;
482 if rej.invoke_id == invoke_id {
483 return Err(ClientError::RemoteReject { reason: rej.reason });
484 }
485 }
486 Some(ApduType::Abort) => {
487 let mut r = Reader::new(apdu);
488 let abort = AbortPdu::decode(&mut r)?;
489 if abort.invoke_id == invoke_id {
490 return Err(ClientError::RemoteAbort {
491 reason: abort.reason,
492 server: abort.server,
493 });
494 }
495 }
496 _ => continue,
497 }
498 }
499 }
500
501 async fn send_confirmed_request(
502 &self,
503 address: DataLinkAddress,
504 frame: &[u8],
505 deadline: Instant,
506 ) -> Result<(), ClientError> {
507 let mut pr = Reader::new(frame);
508 let _npdu = Npdu::decode(&mut pr)?;
509 let npdu_len = frame.len() - pr.remaining();
510 let npdu_bytes = &frame[..npdu_len];
511 let apdu = &frame[npdu_len..];
512
513 let mut ar = Reader::new(apdu);
514 let header = ConfirmedRequestHeader::decode(&mut ar)?;
515 let service_payload = ar.read_exact(ar.remaining())?;
516
517 let peer_max_apdu = self
520 .capability_cache
521 .read()
522 .ok()
523 .and_then(|c| c.get(&address).copied())
524 .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
525 let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
526 let segment_count = service_payload.len().div_ceil(segment_data_len);
527
528 if segment_count <= 1 {
529 self.datalink.send(address, frame).await?;
530 return Ok(());
531 }
532
533 if segment_count > usize::from(u8::MAX) + 1 {
534 return Err(ClientError::SegmentedRequestTooLarge);
535 }
536
537 let configured_window_size = self.segmented_request_window_size.max(1);
538 let mut window_size = configured_window_size;
539 let mut peer_window_ceiling = configured_window_size;
540 let mut batch_start = 0usize;
541 while batch_start < segment_count {
542 let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
543 let expected_sequence = (batch_end - 1) as u8;
544
545 let mut frames = Vec::with_capacity(batch_end - batch_start);
546 for segment_index in batch_start..batch_end {
547 let seq = segment_index as u8;
548 let more_follows = segment_index + 1 < segment_count;
549 let start = segment_index * segment_data_len;
550 let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
551 let segment = &service_payload[start..end];
552
553 let seg_header = ConfirmedRequestHeader {
554 segmented: true,
555 more_follows,
556 segmented_response_accepted: header.segmented_response_accepted,
557 max_segments: header.max_segments,
558 max_apdu: header.max_apdu,
559 invoke_id: header.invoke_id,
560 sequence_number: Some(seq),
561 proposed_window_size: Some(window_size),
562 service_choice: header.service_choice,
563 };
564
565 let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
566 let written_len = {
567 let mut w = Writer::new(&mut tx);
568 w.write_all(npdu_bytes)?;
569 seg_header.encode(&mut w)?;
570 w.write_all(segment)?;
571 w.as_written().len()
572 };
573 tx.truncate(written_len);
574 frames.push(tx);
575 }
576
577 let mut retries_remaining = self.segmented_request_retries;
578 loop {
579 for frame in &frames {
580 self.datalink.send(address, frame).await?;
581 }
582
583 if batch_end == segment_count {
584 break;
585 }
586
587 let remaining = deadline.saturating_duration_since(Instant::now());
588 if remaining.is_zero() {
589 return Err(ClientError::Timeout);
590 }
591 let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
592 match self
593 .await_segment_ack(
594 address,
595 header.invoke_id,
596 header.service_choice,
597 expected_sequence,
598 ack_wait_deadline,
599 )
600 .await
601 {
602 Ok(ack) => {
603 peer_window_ceiling =
604 peer_window_ceiling.min(ack.actual_window_size.max(1));
605 window_size = window_size
606 .saturating_add(1)
607 .min(configured_window_size)
608 .min(peer_window_ceiling)
609 .max(1);
610 break;
611 }
612 Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
613 if retries_remaining > 0 =>
614 {
615 retries_remaining -= 1;
616 window_size = window_size.saturating_div(2).max(1);
617 continue;
618 }
619 Err(e) => return Err(e),
620 }
621 }
622
623 batch_start = batch_end;
624 }
625
626 Ok(())
627 }
628
629 async fn collect_complex_ack_payload(
630 &self,
631 address: DataLinkAddress,
632 invoke_id: u8,
633 service_choice: u8,
634 first_header: ComplexAckHeader,
635 first_payload: &[u8],
636 deadline: Instant,
637 ) -> Result<Vec<u8>, ClientError> {
638 let mut payload = first_payload.to_vec();
639 if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
640 return Err(ClientError::ResponseTooLarge {
641 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
642 });
643 }
644 if !first_header.segmented {
645 return Ok(payload);
646 }
647
648 let mut last_seq = first_header
649 .sequence_number
650 .ok_or(ClientError::UnsupportedResponse)?;
651 let mut window_size = first_header.proposed_window_size.unwrap_or(1);
652 self.send_segment_ack(address, invoke_id, last_seq, window_size)
653 .await?;
654 let mut more_follows = first_header.more_follows;
655
656 while more_follows {
657 let mut rx = [0u8; 1500];
658 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
659 if src != address {
660 continue;
661 }
662
663 let Ok(apdu) = extract_apdu(&rx[..n]) else {
664 continue;
665 };
666 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
667 match ApduType::from_u8(first >> 4) {
668 Some(ApduType::ComplexAck) => {
669 let mut r = Reader::new(apdu);
670 let seg = ComplexAckHeader::decode(&mut r)?;
671 if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
672 continue;
673 }
674 if !seg.segmented {
675 return Err(ClientError::UnsupportedResponse);
676 }
677 let seq = seg
678 .sequence_number
679 .ok_or(ClientError::UnsupportedResponse)?;
680 if seq == last_seq {
681 self.send_segment_ack(address, invoke_id, last_seq, window_size)
683 .await?;
684 continue;
685 }
686 if seq != last_seq.wrapping_add(1) {
687 continue;
688 }
689
690 let seg_payload = r.read_exact(r.remaining())?;
691 if payload.len().saturating_add(seg_payload.len())
692 > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
693 {
694 return Err(ClientError::ResponseTooLarge {
695 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
696 });
697 }
698 payload.extend_from_slice(seg_payload);
699
700 last_seq = seq;
701 more_follows = seg.more_follows;
702 window_size = seg.proposed_window_size.unwrap_or(window_size);
703 self.send_segment_ack(address, invoke_id, last_seq, window_size)
704 .await?;
705 }
706 Some(ApduType::Error) => {
707 let mut r = Reader::new(apdu);
708 let err = BacnetError::decode(&mut r)?;
709 if err.invoke_id == invoke_id && err.service_choice == service_choice {
710 return Err(remote_service_error(err));
711 }
712 }
713 Some(ApduType::Reject) => {
714 let mut r = Reader::new(apdu);
715 let rej = RejectPdu::decode(&mut r)?;
716 if rej.invoke_id == invoke_id {
717 return Err(ClientError::RemoteReject { reason: rej.reason });
718 }
719 }
720 Some(ApduType::Abort) => {
721 let mut r = Reader::new(apdu);
722 let abort = AbortPdu::decode(&mut r)?;
723 if abort.invoke_id == invoke_id {
724 return Err(ClientError::RemoteAbort {
725 reason: abort.reason,
726 server: abort.server,
727 });
728 }
729 }
730 _ => continue,
731 }
732 }
733
734 Ok(payload)
735 }
736
737 pub async fn who_is(
743 &self,
744 range: Option<(u32, u32)>,
745 wait: Duration,
746 ) -> Result<Vec<DiscoveredDevice>, ClientError> {
747 let req = match range {
750 Some((low, high)) => WhoIsRequest {
751 low_limit: Some(low),
752 high_limit: Some(high),
753 },
754 None => WhoIsRequest::global(),
755 };
756
757 let mut tx = [0u8; 128];
758 let mut w = Writer::new(&mut tx);
759 Npdu::new(0).encode(&mut w)?;
760 req.encode(&mut w)?;
761
762 self.datalink
763 .send(
764 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
765 w.as_written(),
766 )
767 .await?;
768
769 let mut devices = Vec::new();
770 let mut seen = HashSet::new();
771 let deadline = tokio::time::Instant::now() + wait;
772
773 while tokio::time::Instant::now() < deadline {
774 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
775 let mut rx = [0u8; 1500];
776 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
777 match recv {
778 Ok(Ok((n, src))) => {
779 let Ok(apdu) = extract_apdu(&rx[..n]) else {
780 continue;
781 };
782 let mut r = Reader::new(apdu);
783 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
784 continue;
785 };
786 if unconfirmed.service_choice != SERVICE_I_AM {
787 continue;
788 }
789 let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
790 continue;
791 };
792 if seen.insert(i_am.device_id) {
793 devices.push(DiscoveredDevice {
794 address: src,
795 device_id: Some(i_am.device_id),
796 });
797 if let Ok(mut cache) = self.capability_cache.write() {
800 cache.insert(src, i_am.max_apdu as usize);
801 }
802 }
803 }
804 Ok(Err(DataLinkError::InvalidFrame)) => continue,
805 Ok(Err(e)) => return Err(e.into()),
806 Err(_) => break,
807 }
808 }
809
810 Ok(devices)
811 }
812
813 pub async fn who_has_object_id(
818 &self,
819 range: Option<(u32, u32)>,
820 object_id: ObjectId,
821 wait: Duration,
822 ) -> Result<Vec<DiscoveredObject>, ClientError> {
823 let req = WhoHasRequest {
824 low_limit: range.map(|(low, _)| low),
825 high_limit: range.map(|(_, high)| high),
826 object: WhoHasObject::ObjectId(object_id),
827 };
828 self.who_has(req, wait).await
829 }
830
831 pub async fn who_has_object_name(
835 &self,
836 range: Option<(u32, u32)>,
837 object_name: &str,
838 wait: Duration,
839 ) -> Result<Vec<DiscoveredObject>, ClientError> {
840 let req = WhoHasRequest {
841 low_limit: range.map(|(low, _)| low),
842 high_limit: range.map(|(_, high)| high),
843 object: WhoHasObject::ObjectName(object_name),
844 };
845 self.who_has(req, wait).await
846 }
847
848 async fn who_has(
849 &self,
850 request: WhoHasRequest<'_>,
851 wait: Duration,
852 ) -> Result<Vec<DiscoveredObject>, ClientError> {
853 let tx = self.encode_with_growth(|w| {
855 Npdu::new(0).encode(w)?;
856 request.encode(w)
857 })?;
858 self.datalink
859 .send(
860 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
861 &tx,
862 )
863 .await?;
864
865 let mut objects = Vec::new();
866 let mut seen = HashSet::new();
867 let deadline = tokio::time::Instant::now() + wait;
868
869 while tokio::time::Instant::now() < deadline {
870 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
871 let mut rx = [0u8; 1500];
872 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
873 match recv {
874 Ok(Ok((n, src))) => {
875 let Ok(apdu) = extract_apdu(&rx[..n]) else {
876 continue;
877 };
878 let mut r = Reader::new(apdu);
879 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
880 continue;
881 };
882 if unconfirmed.service_choice != SERVICE_I_HAVE {
883 continue;
884 }
885 let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
886 continue;
887 };
888 if !seen.insert((src, i_have.object_id.raw())) {
889 continue;
890 }
891 objects.push(DiscoveredObject {
892 address: src,
893 device_id: i_have.device_id,
894 object_id: i_have.object_id,
895 object_name: i_have.object_name.to_string(),
896 });
897 }
898 Ok(Err(DataLinkError::InvalidFrame)) => continue,
899 Ok(Err(e)) => return Err(e.into()),
900 Err(_) => break,
901 }
902 }
903
904 Ok(objects)
905 }
906
907 pub async fn device_communication_control(
913 &self,
914 address: DataLinkAddress,
915 time_duration_seconds: Option<u16>,
916 enable_disable: DeviceCommunicationState,
917 password: Option<&str>,
918 ) -> Result<(), ClientError> {
919 let invoke_id = self.next_invoke_id().await;
920 let request = DeviceCommunicationControlRequest {
921 time_duration_seconds,
922 enable_disable,
923 password,
924 invoke_id,
925 };
926 let tx = self.encode_with_growth(|w| {
927 Npdu::new(0).encode(w)?;
928 request.encode(w)
929 })?;
930 self.await_simple_ack_or_error(
931 address,
932 &tx,
933 invoke_id,
934 SERVICE_DEVICE_COMMUNICATION_CONTROL,
935 self.response_timeout,
936 )
937 .await
938 }
939
940 pub async fn reinitialize_device(
944 &self,
945 address: DataLinkAddress,
946 state: ReinitializeState,
947 password: Option<&str>,
948 ) -> Result<(), ClientError> {
949 let invoke_id = self.next_invoke_id().await;
950 let request = ReinitializeDeviceRequest {
951 state,
952 password,
953 invoke_id,
954 };
955 let tx = self.encode_with_growth(|w| {
956 Npdu::new(0).encode(w)?;
957 request.encode(w)
958 })?;
959 self.await_simple_ack_or_error(
960 address,
961 &tx,
962 invoke_id,
963 SERVICE_REINITIALIZE_DEVICE,
964 self.response_timeout,
965 )
966 .await
967 }
968
969 pub async fn time_synchronize(
974 &self,
975 address: DataLinkAddress,
976 date: Date,
977 time: Time,
978 utc: bool,
979 ) -> Result<(), ClientError> {
980 let request = if utc {
981 TimeSynchronizationRequest::utc(date, time)
982 } else {
983 TimeSynchronizationRequest::local(date, time)
984 };
985 let tx = self.encode_with_growth(|w| {
986 Npdu::new(0).encode(w)?;
987 request.encode(w)
988 })?;
989 self.datalink.send(address, &tx).await?;
990 Ok(())
991 }
992
993 pub async fn create_object_by_type(
996 &self,
997 address: DataLinkAddress,
998 object_type: rustbac_core::types::ObjectType,
999 ) -> Result<ObjectId, ClientError> {
1000 self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1001 .await
1002 }
1003
1004 pub async fn create_object(
1008 &self,
1009 address: DataLinkAddress,
1010 mut request: CreateObjectRequest,
1011 ) -> Result<ObjectId, ClientError> {
1012 request.invoke_id = self.next_invoke_id().await;
1013 let invoke_id = request.invoke_id;
1014 let tx = self.encode_with_growth(|w| {
1015 Npdu::new(0).encode(w)?;
1016 request.encode(w)
1017 })?;
1018 let payload = self
1019 .await_complex_ack_payload_or_error(
1020 address,
1021 &tx,
1022 invoke_id,
1023 SERVICE_CREATE_OBJECT,
1024 self.response_timeout,
1025 )
1026 .await?;
1027 let mut pr = Reader::new(&payload);
1028 let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1029 Ok(parsed.object_id)
1030 }
1031
1032 pub async fn delete_object(
1034 &self,
1035 address: DataLinkAddress,
1036 object_id: ObjectId,
1037 ) -> Result<(), ClientError> {
1038 let invoke_id = self.next_invoke_id().await;
1039 let request = DeleteObjectRequest {
1040 object_id,
1041 invoke_id,
1042 };
1043 let tx = self.encode_with_growth(|w| {
1044 Npdu::new(0).encode(w)?;
1045 request.encode(w)
1046 })?;
1047 self.await_simple_ack_or_error(
1048 address,
1049 &tx,
1050 invoke_id,
1051 SERVICE_DELETE_OBJECT,
1052 self.response_timeout,
1053 )
1054 .await
1055 }
1056
1057 pub async fn add_list_element(
1059 &self,
1060 address: DataLinkAddress,
1061 mut request: AddListElementRequest<'_>,
1062 ) -> Result<(), ClientError> {
1063 request.invoke_id = self.next_invoke_id().await;
1064 let invoke_id = request.invoke_id;
1065 let tx = self.encode_with_growth(|w| {
1066 Npdu::new(0).encode(w)?;
1067 request.encode(w)
1068 })?;
1069 self.await_simple_ack_or_error(
1070 address,
1071 &tx,
1072 invoke_id,
1073 SERVICE_ADD_LIST_ELEMENT,
1074 self.response_timeout,
1075 )
1076 .await
1077 }
1078
1079 pub async fn remove_list_element(
1081 &self,
1082 address: DataLinkAddress,
1083 mut request: RemoveListElementRequest<'_>,
1084 ) -> Result<(), ClientError> {
1085 request.invoke_id = self.next_invoke_id().await;
1086 let invoke_id = request.invoke_id;
1087 let tx = self.encode_with_growth(|w| {
1088 Npdu::new(0).encode(w)?;
1089 request.encode(w)
1090 })?;
1091 self.await_simple_ack_or_error(
1092 address,
1093 &tx,
1094 invoke_id,
1095 SERVICE_REMOVE_LIST_ELEMENT,
1096 self.response_timeout,
1097 )
1098 .await
1099 }
1100
1101 async fn await_simple_ack_or_error(
1102 &self,
1103 address: DataLinkAddress,
1104 tx: &[u8],
1105 invoke_id: u8,
1106 service_choice: u8,
1107 timeout_window: Duration,
1108 ) -> Result<(), ClientError> {
1109 #[cfg(feature = "tracing")]
1110 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1111 let _io_lock = self.request_io_lock.lock().await;
1112 let deadline = tokio::time::Instant::now() + timeout_window;
1113 self.send_confirmed_request(address, tx, deadline).await?;
1114 while tokio::time::Instant::now() < deadline {
1115 let mut rx = [0u8; 1500];
1116 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1117 if src != address {
1118 continue;
1119 }
1120 let apdu = extract_apdu(&rx[..n])?;
1121 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1122 match ApduType::from_u8(first >> 4) {
1123 Some(ApduType::SimpleAck) => {
1124 let mut r = Reader::new(apdu);
1125 let ack = SimpleAck::decode(&mut r)?;
1126 if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1127 return Ok(());
1128 }
1129 }
1130 Some(ApduType::Error) => {
1131 let mut r = Reader::new(apdu);
1132 let err = BacnetError::decode(&mut r)?;
1133 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1134 return Err(remote_service_error(err));
1135 }
1136 }
1137 Some(ApduType::Reject) => {
1138 let mut r = Reader::new(apdu);
1139 let rej = RejectPdu::decode(&mut r)?;
1140 if rej.invoke_id == invoke_id {
1141 return Err(ClientError::RemoteReject { reason: rej.reason });
1142 }
1143 }
1144 Some(ApduType::Abort) => {
1145 let mut r = Reader::new(apdu);
1146 let abort = AbortPdu::decode(&mut r)?;
1147 if abort.invoke_id == invoke_id {
1148 return Err(ClientError::RemoteAbort {
1149 reason: abort.reason,
1150 server: abort.server,
1151 });
1152 }
1153 }
1154 _ => continue,
1155 }
1156 }
1157 #[cfg(feature = "tracing")]
1158 tracing::warn!(invoke_id = invoke_id, "request timed out");
1159 Err(ClientError::Timeout)
1160 }
1161
1162 async fn await_complex_ack_payload_or_error(
1163 &self,
1164 address: DataLinkAddress,
1165 tx: &[u8],
1166 invoke_id: u8,
1167 service_choice: u8,
1168 timeout_window: Duration,
1169 ) -> Result<Vec<u8>, ClientError> {
1170 #[cfg(feature = "tracing")]
1171 tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1172 let _io_lock = self.request_io_lock.lock().await;
1173 let deadline = tokio::time::Instant::now() + timeout_window;
1174 self.send_confirmed_request(address, tx, deadline).await?;
1175 while tokio::time::Instant::now() < deadline {
1176 let mut rx = [0u8; 1500];
1177 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1178 if src != address {
1179 continue;
1180 }
1181
1182 let apdu = extract_apdu(&rx[..n])?;
1183 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1184 match ApduType::from_u8(first >> 4) {
1185 Some(ApduType::ComplexAck) => {
1186 let mut r = Reader::new(apdu);
1187 let ack = ComplexAckHeader::decode(&mut r)?;
1188 if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1189 continue;
1190 }
1191 return self
1192 .collect_complex_ack_payload(
1193 address,
1194 invoke_id,
1195 service_choice,
1196 ack,
1197 r.read_exact(r.remaining())?,
1198 deadline,
1199 )
1200 .await;
1201 }
1202 Some(ApduType::Error) => {
1203 let mut r = Reader::new(apdu);
1204 let err = BacnetError::decode(&mut r)?;
1205 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1206 return Err(remote_service_error(err));
1207 }
1208 }
1209 Some(ApduType::Reject) => {
1210 let mut r = Reader::new(apdu);
1211 let rej = RejectPdu::decode(&mut r)?;
1212 if rej.invoke_id == invoke_id {
1213 return Err(ClientError::RemoteReject { reason: rej.reason });
1214 }
1215 }
1216 Some(ApduType::Abort) => {
1217 let mut r = Reader::new(apdu);
1218 let abort = AbortPdu::decode(&mut r)?;
1219 if abort.invoke_id == invoke_id {
1220 return Err(ClientError::RemoteAbort {
1221 reason: abort.reason,
1222 server: abort.server,
1223 });
1224 }
1225 }
1226 _ => continue,
1227 }
1228 }
1229 #[cfg(feature = "tracing")]
1230 tracing::warn!(invoke_id = invoke_id, "request timed out");
1231 Err(ClientError::Timeout)
1232 }
1233
1234 pub async fn get_alarm_summary(
1236 &self,
1237 address: DataLinkAddress,
1238 ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1239 let invoke_id = self.next_invoke_id().await;
1240 let request = GetAlarmSummaryRequest { invoke_id };
1241 let tx = self.encode_with_growth(|w| {
1242 Npdu::new(0).encode(w)?;
1243 request.encode(w)
1244 })?;
1245 let payload = self
1246 .await_complex_ack_payload_or_error(
1247 address,
1248 &tx,
1249 invoke_id,
1250 SERVICE_GET_ALARM_SUMMARY,
1251 self.response_timeout,
1252 )
1253 .await?;
1254 let mut pr = Reader::new(&payload);
1255 let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1256 Ok(into_client_alarm_summary(parsed.summaries))
1257 }
1258
1259 pub async fn get_enrollment_summary(
1261 &self,
1262 address: DataLinkAddress,
1263 ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1264 let invoke_id = self.next_invoke_id().await;
1265 let request = GetEnrollmentSummaryRequest { invoke_id };
1266 let tx = self.encode_with_growth(|w| {
1267 Npdu::new(0).encode(w)?;
1268 request.encode(w)
1269 })?;
1270 let payload = self
1271 .await_complex_ack_payload_or_error(
1272 address,
1273 &tx,
1274 invoke_id,
1275 SERVICE_GET_ENROLLMENT_SUMMARY,
1276 self.response_timeout,
1277 )
1278 .await?;
1279 let mut pr = Reader::new(&payload);
1280 let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1281 Ok(into_client_enrollment_summary(parsed.summaries))
1282 }
1283
1284 pub async fn get_event_information(
1289 &self,
1290 address: DataLinkAddress,
1291 last_received_object_id: Option<ObjectId>,
1292 ) -> Result<EventInformationResult, ClientError> {
1293 let invoke_id = self.next_invoke_id().await;
1294 let request = GetEventInformationRequest {
1295 last_received_object_id,
1296 invoke_id,
1297 };
1298 let tx = self.encode_with_growth(|w| {
1299 Npdu::new(0).encode(w)?;
1300 request.encode(w)
1301 })?;
1302 let payload = self
1303 .await_complex_ack_payload_or_error(
1304 address,
1305 &tx,
1306 invoke_id,
1307 SERVICE_GET_EVENT_INFORMATION,
1308 self.response_timeout,
1309 )
1310 .await?;
1311 let mut pr = Reader::new(&payload);
1312 let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1313 Ok(EventInformationResult {
1314 summaries: into_client_event_information(parsed.summaries),
1315 more_events: parsed.more_events,
1316 })
1317 }
1318
1319 pub async fn acknowledge_alarm(
1321 &self,
1322 address: DataLinkAddress,
1323 mut request: AcknowledgeAlarmRequest<'_>,
1324 ) -> Result<(), ClientError> {
1325 request.invoke_id = self.next_invoke_id().await;
1326 let invoke_id = request.invoke_id;
1327 let tx = self.encode_with_growth(|w| {
1328 Npdu::new(0).encode(w)?;
1329 request.encode(w)
1330 })?;
1331 self.await_simple_ack_or_error(
1332 address,
1333 &tx,
1334 invoke_id,
1335 SERVICE_ACKNOWLEDGE_ALARM,
1336 self.response_timeout,
1337 )
1338 .await
1339 }
1340
1341 pub async fn atomic_read_file_stream(
1346 &self,
1347 address: DataLinkAddress,
1348 file_object_id: ObjectId,
1349 file_start_position: i32,
1350 requested_octet_count: u32,
1351 ) -> Result<AtomicReadFileResult, ClientError> {
1352 let invoke_id = self.next_invoke_id().await;
1353 let request = AtomicReadFileRequest::stream(
1354 file_object_id,
1355 file_start_position,
1356 requested_octet_count,
1357 invoke_id,
1358 );
1359 self.atomic_read_file(address, request).await
1360 }
1361
1362 pub async fn atomic_read_file_record(
1367 &self,
1368 address: DataLinkAddress,
1369 file_object_id: ObjectId,
1370 file_start_record: i32,
1371 requested_record_count: u32,
1372 ) -> Result<AtomicReadFileResult, ClientError> {
1373 let invoke_id = self.next_invoke_id().await;
1374 let request = AtomicReadFileRequest::record(
1375 file_object_id,
1376 file_start_record,
1377 requested_record_count,
1378 invoke_id,
1379 );
1380 self.atomic_read_file(address, request).await
1381 }
1382
1383 async fn atomic_read_file(
1384 &self,
1385 address: DataLinkAddress,
1386 request: AtomicReadFileRequest,
1387 ) -> Result<AtomicReadFileResult, ClientError> {
1388 let invoke_id = request.invoke_id;
1389 let tx = self.encode_with_growth(|w| {
1390 Npdu::new(0).encode(w)?;
1391 request.encode(w)
1392 })?;
1393 let payload = self
1394 .await_complex_ack_payload_or_error(
1395 address,
1396 &tx,
1397 invoke_id,
1398 SERVICE_ATOMIC_READ_FILE,
1399 self.response_timeout,
1400 )
1401 .await?;
1402 let mut pr = Reader::new(&payload);
1403 let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1404 Ok(into_client_atomic_read_result(parsed))
1405 }
1406
1407 pub async fn atomic_write_file_stream(
1410 &self,
1411 address: DataLinkAddress,
1412 file_object_id: ObjectId,
1413 file_start_position: i32,
1414 file_data: &[u8],
1415 ) -> Result<AtomicWriteFileResult, ClientError> {
1416 let invoke_id = self.next_invoke_id().await;
1417 let request = AtomicWriteFileRequest::stream(
1418 file_object_id,
1419 file_start_position,
1420 file_data,
1421 invoke_id,
1422 );
1423 self.atomic_write_file(address, request).await
1424 }
1425
1426 pub async fn atomic_write_file_record(
1430 &self,
1431 address: DataLinkAddress,
1432 file_object_id: ObjectId,
1433 file_start_record: i32,
1434 file_record_data: &[&[u8]],
1435 ) -> Result<AtomicWriteFileResult, ClientError> {
1436 let invoke_id = self.next_invoke_id().await;
1437 let request = AtomicWriteFileRequest::record(
1438 file_object_id,
1439 file_start_record,
1440 file_record_data,
1441 invoke_id,
1442 );
1443 self.atomic_write_file(address, request).await
1444 }
1445
1446 async fn atomic_write_file(
1447 &self,
1448 address: DataLinkAddress,
1449 request: AtomicWriteFileRequest<'_>,
1450 ) -> Result<AtomicWriteFileResult, ClientError> {
1451 let invoke_id = request.invoke_id;
1452 let tx = self.encode_with_growth(|w| {
1453 Npdu::new(0).encode(w)?;
1454 request.encode(w)
1455 })?;
1456 let payload = self
1457 .await_complex_ack_payload_or_error(
1458 address,
1459 &tx,
1460 invoke_id,
1461 SERVICE_ATOMIC_WRITE_FILE,
1462 self.response_timeout,
1463 )
1464 .await?;
1465 let mut pr = Reader::new(&payload);
1466 let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1467 Ok(into_client_atomic_write_result(parsed))
1468 }
1469
1470 pub async fn subscribe_cov(
1474 &self,
1475 address: DataLinkAddress,
1476 mut request: SubscribeCovRequest,
1477 ) -> Result<(), ClientError> {
1478 request.invoke_id = self.next_invoke_id().await;
1479 let invoke_id = request.invoke_id;
1480 let tx = self.encode_with_growth(|w| {
1481 Npdu::new(0).encode(w)?;
1482 request.encode(w)
1483 })?;
1484 self.await_simple_ack_or_error(
1485 address,
1486 &tx,
1487 invoke_id,
1488 SERVICE_SUBSCRIBE_COV,
1489 self.response_timeout,
1490 )
1491 .await
1492 }
1493
1494 pub async fn cancel_cov_subscription(
1497 &self,
1498 address: DataLinkAddress,
1499 subscriber_process_id: u32,
1500 monitored_object_id: ObjectId,
1501 ) -> Result<(), ClientError> {
1502 self.subscribe_cov(
1503 address,
1504 SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1505 )
1506 .await
1507 }
1508
1509 pub async fn subscribe_cov_property(
1514 &self,
1515 address: DataLinkAddress,
1516 mut request: SubscribeCovPropertyRequest,
1517 ) -> Result<(), ClientError> {
1518 request.invoke_id = self.next_invoke_id().await;
1519 let invoke_id = request.invoke_id;
1520 let tx = self.encode_with_growth(|w| {
1521 Npdu::new(0).encode(w)?;
1522 request.encode(w)
1523 })?;
1524 self.await_simple_ack_or_error(
1525 address,
1526 &tx,
1527 invoke_id,
1528 SERVICE_SUBSCRIBE_COV_PROPERTY,
1529 self.response_timeout,
1530 )
1531 .await
1532 }
1533
1534 pub async fn cancel_cov_property_subscription(
1540 &self,
1541 address: DataLinkAddress,
1542 subscriber_process_id: u32,
1543 monitored_object_id: ObjectId,
1544 monitored_property_id: PropertyId,
1545 monitored_property_array_index: Option<u32>,
1546 ) -> Result<(), ClientError> {
1547 self.subscribe_cov_property(
1548 address,
1549 SubscribeCovPropertyRequest::cancel(
1550 subscriber_process_id,
1551 monitored_object_id,
1552 monitored_property_id,
1553 monitored_property_array_index,
1554 0,
1555 ),
1556 )
1557 .await
1558 }
1559
1560 pub async fn read_range_by_position(
1565 &self,
1566 address: DataLinkAddress,
1567 object_id: ObjectId,
1568 property_id: PropertyId,
1569 array_index: Option<u32>,
1570 reference_index: i32,
1571 count: i16,
1572 ) -> Result<ReadRangeResult, ClientError> {
1573 let invoke_id = self.next_invoke_id().await;
1574 let req = ReadRangeRequest::by_position(
1575 object_id,
1576 property_id,
1577 array_index,
1578 reference_index,
1579 count,
1580 invoke_id,
1581 );
1582 self.read_range_with_request(address, req).await
1583 }
1584
1585 pub async fn read_range_by_sequence_number(
1590 &self,
1591 address: DataLinkAddress,
1592 object_id: ObjectId,
1593 property_id: PropertyId,
1594 array_index: Option<u32>,
1595 reference_sequence: u32,
1596 count: i16,
1597 ) -> Result<ReadRangeResult, ClientError> {
1598 let invoke_id = self.next_invoke_id().await;
1599 let req = ReadRangeRequest::by_sequence_number(
1600 object_id,
1601 property_id,
1602 array_index,
1603 reference_sequence,
1604 count,
1605 invoke_id,
1606 );
1607 self.read_range_with_request(address, req).await
1608 }
1609
1610 pub async fn read_range_by_time(
1615 &self,
1616 address: DataLinkAddress,
1617 object_id: ObjectId,
1618 property_id: PropertyId,
1619 array_index: Option<u32>,
1620 at: (Date, Time),
1621 count: i16,
1622 ) -> Result<ReadRangeResult, ClientError> {
1623 let (date, time) = at;
1624 let invoke_id = self.next_invoke_id().await;
1625 let req = ReadRangeRequest::by_time(
1626 object_id,
1627 property_id,
1628 array_index,
1629 date,
1630 time,
1631 count,
1632 invoke_id,
1633 );
1634 self.read_range_with_request(address, req).await
1635 }
1636
1637 async fn read_range_with_request(
1638 &self,
1639 address: DataLinkAddress,
1640 req: ReadRangeRequest,
1641 ) -> Result<ReadRangeResult, ClientError> {
1642 let invoke_id = req.invoke_id;
1643 let tx = self.encode_with_growth(|w| {
1644 Npdu::new(0).encode(w)?;
1645 req.encode(w)
1646 })?;
1647 let payload = self
1648 .await_complex_ack_payload_or_error(
1649 address,
1650 &tx,
1651 invoke_id,
1652 SERVICE_READ_RANGE,
1653 self.response_timeout,
1654 )
1655 .await?;
1656 let mut pr = Reader::new(&payload);
1657 let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1658 into_client_read_range(parsed)
1659 }
1660
1661 pub async fn recv_cov_notification(
1667 &self,
1668 wait: Duration,
1669 ) -> Result<Option<CovNotification>, ClientError> {
1670 let _io_lock = self.request_io_lock.lock().await;
1671 let deadline = tokio::time::Instant::now() + wait;
1672
1673 while tokio::time::Instant::now() < deadline {
1674 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1675 let mut rx = [0u8; 1500];
1676 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1677 let (n, source) = match recv {
1678 Ok(Ok(v)) => v,
1679 Ok(Err(e)) => return Err(e.into()),
1680 Err(_) => break,
1681 };
1682
1683 let apdu = extract_apdu(&rx[..n])?;
1684 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1685 match ApduType::from_u8(first >> 4) {
1686 Some(ApduType::UnconfirmedRequest) => {
1687 let mut r = Reader::new(apdu);
1688 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1689 if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1690 continue;
1691 }
1692 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1693 return Ok(Some(into_client_cov_notification(source, false, cov)?));
1694 }
1695 Some(ApduType::ConfirmedRequest) => {
1696 let mut r = Reader::new(apdu);
1697 let header = ConfirmedRequestHeader::decode(&mut r)?;
1698 if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1699 continue;
1700 }
1701 if header.segmented {
1702 return Err(ClientError::UnsupportedResponse);
1703 }
1704
1705 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1706 self.send_simple_ack(
1707 source,
1708 header.invoke_id,
1709 SERVICE_CONFIRMED_COV_NOTIFICATION,
1710 )
1711 .await?;
1712 return Ok(Some(into_client_cov_notification(source, true, cov)?));
1713 }
1714 _ => continue,
1715 }
1716 }
1717
1718 Ok(None)
1719 }
1720
1721 pub async fn recv_event_notification(
1727 &self,
1728 wait: Duration,
1729 ) -> Result<Option<EventNotification>, ClientError> {
1730 let _io_lock = self.request_io_lock.lock().await;
1731 let deadline = tokio::time::Instant::now() + wait;
1732
1733 while tokio::time::Instant::now() < deadline {
1734 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1735 let mut rx = [0u8; 1500];
1736 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1737 let (n, source) = match recv {
1738 Ok(Ok(v)) => v,
1739 Ok(Err(e)) => return Err(e.into()),
1740 Err(_) => break,
1741 };
1742
1743 let apdu = extract_apdu(&rx[..n])?;
1744 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1745 match ApduType::from_u8(first >> 4) {
1746 Some(ApduType::UnconfirmedRequest) => {
1747 let mut r = Reader::new(apdu);
1748 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1749 if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1750 continue;
1751 }
1752 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1753 return Ok(Some(into_client_event_notification(
1754 source,
1755 false,
1756 notification,
1757 )));
1758 }
1759 Some(ApduType::ConfirmedRequest) => {
1760 let mut r = Reader::new(apdu);
1761 let header = ConfirmedRequestHeader::decode(&mut r)?;
1762 if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1763 continue;
1764 }
1765 if header.segmented {
1766 return Err(ClientError::UnsupportedResponse);
1767 }
1768 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1769 self.send_simple_ack(
1770 source,
1771 header.invoke_id,
1772 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1773 )
1774 .await?;
1775 return Ok(Some(into_client_event_notification(
1776 source,
1777 true,
1778 notification,
1779 )));
1780 }
1781 _ => continue,
1782 }
1783 }
1784
1785 Ok(None)
1786 }
1787
1788 pub async fn read_property(
1793 &self,
1794 address: DataLinkAddress,
1795 object_id: ObjectId,
1796 property_id: PropertyId,
1797 ) -> Result<ClientDataValue, ClientError> {
1798 let invoke_id = self.next_invoke_id().await;
1799 let req = ReadPropertyRequest {
1800 object_id,
1801 property_id,
1802 array_index: None,
1803 invoke_id,
1804 };
1805 let tx = self.encode_with_growth(|w| {
1806 Npdu::new(0).encode(w)?;
1807 req.encode(w)
1808 })?;
1809 let payload = self
1810 .await_complex_ack_payload_or_error(
1811 address,
1812 &tx,
1813 invoke_id,
1814 SERVICE_READ_PROPERTY,
1815 self.response_timeout,
1816 )
1817 .await?;
1818 let mut pr = Reader::new(&payload);
1819 let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1820 into_client_value(parsed.value)
1821 }
1822
1823 pub async fn write_property(
1825 &self,
1826 address: DataLinkAddress,
1827 mut request: WritePropertyRequest<'_>,
1828 ) -> Result<(), ClientError> {
1829 request.invoke_id = self.next_invoke_id().await;
1830 let invoke_id = request.invoke_id;
1831 let tx = self.encode_with_growth(|w| {
1832 Npdu::new(0).encode(w)?;
1833 request.encode(w)
1834 })?;
1835 self.await_simple_ack_or_error(
1836 address,
1837 &tx,
1838 invoke_id,
1839 SERVICE_WRITE_PROPERTY,
1840 self.response_timeout,
1841 )
1842 .await
1843 }
1844
1845 pub async fn read_property_multiple(
1850 &self,
1851 address: DataLinkAddress,
1852 object_id: ObjectId,
1853 property_ids: &[PropertyId],
1854 ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1855 let refs: Vec<PropertyReference> = property_ids
1856 .iter()
1857 .copied()
1858 .map(|property_id| PropertyReference {
1859 property_id,
1860 array_index: None,
1861 })
1862 .collect();
1863 let specs = [ReadAccessSpecification {
1864 object_id,
1865 properties: &refs,
1866 }];
1867
1868 let invoke_id = self.next_invoke_id().await;
1869 let req = ReadPropertyMultipleRequest {
1870 specs: &specs,
1871 invoke_id,
1872 };
1873
1874 let tx = self.encode_with_growth(|w| {
1875 Npdu::new(0).encode(w)?;
1876 req.encode(w)
1877 })?;
1878 let payload = self
1879 .await_complex_ack_payload_or_error(
1880 address,
1881 &tx,
1882 invoke_id,
1883 SERVICE_READ_PROPERTY_MULTIPLE,
1884 self.response_timeout,
1885 )
1886 .await?;
1887 let mut pr = Reader::new(&payload);
1888 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1889 let mut out = Vec::new();
1890 for access in parsed.results {
1891 if access.object_id != object_id {
1892 continue;
1893 }
1894 for item in access.results {
1895 out.push((item.property_id, into_client_value(item.value)?));
1896 }
1897 }
1898 Ok(out)
1899 }
1900
1901 pub async fn write_property_multiple(
1904 &self,
1905 address: DataLinkAddress,
1906 object_id: ObjectId,
1907 properties: &[PropertyWriteSpec<'_>],
1908 ) -> Result<(), ClientError> {
1909 let invoke_id = self.next_invoke_id().await;
1910 let specs = [WriteAccessSpecification {
1911 object_id,
1912 properties,
1913 }];
1914 let req = WritePropertyMultipleRequest {
1915 specs: &specs,
1916 invoke_id,
1917 };
1918
1919 let tx = self.encode_with_growth(|w| {
1920 Npdu::new(0).encode(w)?;
1921 req.encode(w)
1922 })?;
1923 self.await_simple_ack_or_error(
1924 address,
1925 &tx,
1926 invoke_id,
1927 SERVICE_WRITE_PROPERTY_MULTIPLE,
1928 self.response_timeout,
1929 )
1930 .await
1931 }
1932
1933 pub async fn private_transfer(
1935 &self,
1936 address: DataLinkAddress,
1937 vendor_id: u32,
1938 service_number: u32,
1939 service_parameters: Option<&[u8]>,
1940 ) -> Result<PrivateTransferAck, ClientError> {
1941 let invoke_id = self.next_invoke_id().await;
1942 let req = ConfirmedPrivateTransferRequest {
1943 vendor_id,
1944 service_number,
1945 service_parameters,
1946 invoke_id,
1947 };
1948
1949 let tx = self.encode_with_growth(|w| {
1950 Npdu::new(0).encode(w)?;
1951 req.encode(w)
1952 })?;
1953 let payload = self
1954 .await_complex_ack_payload_or_error(
1955 address,
1956 &tx,
1957 invoke_id,
1958 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1959 self.response_timeout,
1960 )
1961 .await?;
1962 let mut r = Reader::new(&payload);
1963 PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1964 }
1965
1966 pub async fn read_many(
1972 &self,
1973 address: DataLinkAddress,
1974 requests: &[(ObjectId, PropertyId)],
1975 ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
1976 let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
1978 for &(oid, pid) in requests {
1979 if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
1980 entry.1.push(PropertyReference { property_id: pid, array_index: None });
1981 } else {
1982 grouped.push((oid, vec![PropertyReference { property_id: pid, array_index: None }]));
1983 }
1984 }
1985
1986 let specs: Vec<ReadAccessSpecification<'_>> = grouped
1987 .iter()
1988 .map(|(oid, props)| ReadAccessSpecification { object_id: *oid, properties: props })
1989 .collect();
1990
1991 let invoke_id = self.next_invoke_id().await;
1992 let req = ReadPropertyMultipleRequest { specs: &specs, invoke_id };
1993 let tx = self.encode_with_growth(|w| {
1994 Npdu::new(0).encode(w)?;
1995 req.encode(w)
1996 })?;
1997 let payload = self
1998 .await_complex_ack_payload_or_error(
1999 address, &tx, invoke_id, SERVICE_READ_PROPERTY_MULTIPLE, self.response_timeout,
2000 )
2001 .await?;
2002
2003 let mut pr = Reader::new(&payload);
2004 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2005 let mut out = HashMap::new();
2006 for access in parsed.results {
2007 for item in access.results {
2008 if let Ok(v) = into_client_value(item.value) {
2009 out.insert((access.object_id, item.property_id), v);
2010 }
2011 }
2012 }
2013 Ok(out)
2014 }
2015
2016 pub async fn write_many(
2023 &self,
2024 address: DataLinkAddress,
2025 writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2026 ) -> Result<(), ClientError> {
2027 use rustbac_core::types::{BitString, DataValue as DV};
2028
2029 fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2030 match v {
2031 ClientDataValue::Null => DV::Null,
2032 ClientDataValue::Boolean(b) => DV::Boolean(*b),
2033 ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2034 ClientDataValue::Signed(n) => DV::Signed(*n),
2035 ClientDataValue::Real(f) => DV::Real(*f),
2036 ClientDataValue::Double(f) => DV::Double(*f),
2037 ClientDataValue::OctetString(b) => DV::OctetString(b),
2038 ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2039 ClientDataValue::BitString { unused_bits, data } => {
2040 DV::BitString(BitString { unused_bits: *unused_bits, data })
2041 }
2042 ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2043 ClientDataValue::Date(d) => DV::Date(*d),
2044 ClientDataValue::Time(t) => DV::Time(*t),
2045 ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2046 ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2047 tag_num: *tag_num,
2048 values: values.iter().map(cv_to_dv).collect(),
2049 },
2050 }
2051 }
2052
2053 let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2055 .iter()
2056 .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2057 .collect();
2058
2059 let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2061 for (oid, pid, val, prio) in &converted {
2062 let spec = PropertyWriteSpec {
2063 property_id: *pid,
2064 array_index: None,
2065 value: val.clone(),
2066 priority: *prio,
2067 };
2068 if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2069 entry.1.push(spec);
2070 } else {
2071 grouped.push((*oid, vec![spec]));
2072 }
2073 }
2074
2075 let specs: Vec<WriteAccessSpecification<'_>> = grouped
2076 .iter()
2077 .map(|(oid, props)| WriteAccessSpecification { object_id: *oid, properties: props })
2078 .collect();
2079
2080 let invoke_id = self.next_invoke_id().await;
2081 let req = WritePropertyMultipleRequest { specs: &specs, invoke_id };
2082 let tx = self.encode_with_growth(|w| {
2083 Npdu::new(0).encode(w)?;
2084 req.encode(w)
2085 })?;
2086 self.await_simple_ack_or_error(
2087 address, &tx, invoke_id, SERVICE_WRITE_PROPERTY_MULTIPLE, self.response_timeout,
2088 )
2089 .await
2090 }
2091}
2092
2093fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2094 let mut r = Reader::new(payload);
2095 let _npdu = Npdu::decode(&mut r)?;
2096 r.read_exact(r.remaining()).map_err(ClientError::from)
2097}
2098
2099fn remote_service_error(err: BacnetError) -> ClientError {
2100 ClientError::RemoteServiceError {
2101 service_choice: err.service_choice,
2102 error_class_raw: err.error_class,
2103 error_code_raw: err.error_code,
2104 error_class: err.error_class.and_then(ErrorClass::from_u32),
2105 error_code: err.error_code.and_then(ErrorCode::from_u32),
2106 }
2107}
2108
2109fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
2110 Ok(match value {
2111 DataValue::Null => ClientDataValue::Null,
2112 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
2113 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
2114 DataValue::Signed(v) => ClientDataValue::Signed(v),
2115 DataValue::Real(v) => ClientDataValue::Real(v),
2116 DataValue::Double(v) => ClientDataValue::Double(v),
2117 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
2118 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
2119 DataValue::BitString(v) => ClientDataValue::BitString {
2120 unused_bits: v.unused_bits,
2121 data: v.data.to_vec(),
2122 },
2123 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
2124 DataValue::Date(v) => ClientDataValue::Date(v),
2125 DataValue::Time(v) => ClientDataValue::Time(v),
2126 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
2127 DataValue::Constructed { tag_num, values } => {
2128 let mut children = Vec::with_capacity(values.len());
2129 for child in values {
2130 children.push(into_client_value(child)?);
2131 }
2132 ClientDataValue::Constructed {
2133 tag_num,
2134 values: children,
2135 }
2136 }
2137 })
2138}
2139
2140fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
2141 value
2142 .into_iter()
2143 .map(|item| AlarmSummaryItem {
2144 object_id: item.object_id,
2145 alarm_state_raw: item.alarm_state,
2146 alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2147 item.alarm_state,
2148 ),
2149 acknowledged_transitions: ClientBitString {
2150 unused_bits: item.acknowledged_transitions.unused_bits,
2151 data: item.acknowledged_transitions.data.to_vec(),
2152 },
2153 })
2154 .collect()
2155}
2156
2157fn into_client_enrollment_summary(
2158 value: Vec<CoreEnrollmentSummaryItem>,
2159) -> Vec<EnrollmentSummaryItem> {
2160 value
2161 .into_iter()
2162 .map(|item| EnrollmentSummaryItem {
2163 object_id: item.object_id,
2164 event_type: item.event_type,
2165 event_state_raw: item.event_state,
2166 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2167 item.event_state,
2168 ),
2169 priority: item.priority,
2170 notification_class: item.notification_class,
2171 })
2172 .collect()
2173}
2174
2175fn into_client_event_information(
2176 value: Vec<CoreEventSummaryItem<'_>>,
2177) -> Vec<EventInformationItem> {
2178 value
2179 .into_iter()
2180 .map(|item| EventInformationItem {
2181 object_id: item.object_id,
2182 event_state_raw: item.event_state,
2183 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2184 item.event_state,
2185 ),
2186 acknowledged_transitions: ClientBitString {
2187 unused_bits: item.acknowledged_transitions.unused_bits,
2188 data: item.acknowledged_transitions.data.to_vec(),
2189 },
2190 notify_type: item.notify_type,
2191 event_enable: ClientBitString {
2192 unused_bits: item.event_enable.unused_bits,
2193 data: item.event_enable.data.to_vec(),
2194 },
2195 event_priorities: item.event_priorities,
2196 })
2197 .collect()
2198}
2199
2200fn into_client_cov_notification(
2201 source: DataLinkAddress,
2202 confirmed: bool,
2203 value: CovNotificationRequest<'_>,
2204) -> Result<CovNotification, ClientError> {
2205 let mut values = Vec::with_capacity(value.values.len());
2206 for property in value.values {
2207 values.push(CovPropertyValue {
2208 property_id: property.property_id,
2209 array_index: property.array_index,
2210 value: into_client_value(property.value)?,
2211 priority: property.priority,
2212 });
2213 }
2214
2215 Ok(CovNotification {
2216 source,
2217 confirmed,
2218 subscriber_process_id: value.subscriber_process_id,
2219 initiating_device_id: value.initiating_device_id,
2220 monitored_object_id: value.monitored_object_id,
2221 time_remaining_seconds: value.time_remaining_seconds,
2222 values,
2223 })
2224}
2225
2226fn into_client_event_notification(
2227 source: DataLinkAddress,
2228 confirmed: bool,
2229 value: EventNotificationRequest<'_>,
2230) -> EventNotification {
2231 EventNotification {
2232 source,
2233 confirmed,
2234 process_id: value.process_id,
2235 initiating_device_id: value.initiating_device_id,
2236 event_object_id: value.event_object_id,
2237 timestamp: value.timestamp,
2238 notification_class: value.notification_class,
2239 priority: value.priority,
2240 event_type: value.event_type,
2241 message_text: value.message_text.map(str::to_string),
2242 notify_type: value.notify_type,
2243 ack_required: value.ack_required,
2244 from_state_raw: value.from_state,
2245 from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2246 value.from_state,
2247 ),
2248 to_state_raw: value.to_state,
2249 to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
2250 }
2251}
2252
2253fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
2254 let mut items = Vec::with_capacity(value.items.len());
2255 for item in value.items {
2256 items.push(into_client_value(item)?);
2257 }
2258 Ok(ReadRangeResult {
2259 object_id: value.object_id,
2260 property_id: value.property_id,
2261 array_index: value.array_index,
2262 result_flags: ClientBitString {
2263 unused_bits: value.result_flags.unused_bits,
2264 data: value.result_flags.data.to_vec(),
2265 },
2266 item_count: value.item_count,
2267 items,
2268 })
2269}
2270
2271fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
2272 match value.access_method {
2273 AtomicReadFileAckAccess::Stream {
2274 file_start_position,
2275 file_data,
2276 } => AtomicReadFileResult::Stream {
2277 end_of_file: value.end_of_file,
2278 file_start_position,
2279 file_data: file_data.to_vec(),
2280 },
2281 AtomicReadFileAckAccess::Record {
2282 file_start_record,
2283 returned_record_count,
2284 file_record_data,
2285 } => AtomicReadFileResult::Record {
2286 end_of_file: value.end_of_file,
2287 file_start_record,
2288 returned_record_count,
2289 file_record_data: file_record_data
2290 .into_iter()
2291 .map(|record| record.to_vec())
2292 .collect(),
2293 },
2294 }
2295}
2296
2297fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
2298 match value {
2299 AtomicWriteFileAck::Stream {
2300 file_start_position,
2301 } => AtomicWriteFileResult::Stream {
2302 file_start_position,
2303 },
2304 AtomicWriteFileAck::Record { file_start_record } => {
2305 AtomicWriteFileResult::Record { file_start_record }
2306 }
2307 }
2308}
2309
2310#[cfg(test)]
2311mod tests {
2312 use super::BacnetClient;
2313 use crate::{
2314 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2315 EnrollmentSummaryItem, EventInformationItem, EventNotification,
2316 };
2317 use rustbac_core::apdu::{
2318 ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2319 UnconfirmedRequestHeader,
2320 };
2321 use rustbac_core::encoding::{
2322 primitives::{
2323 decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2324 encode_ctx_object_id, encode_ctx_unsigned,
2325 },
2326 reader::Reader,
2327 tag::{AppTag, Tag},
2328 writer::Writer,
2329 };
2330 use rustbac_core::npdu::Npdu;
2331 use rustbac_core::services::acknowledge_alarm::{
2332 AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2333 };
2334 use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2335 use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2336 use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2337 use rustbac_core::services::cov_notification::{
2338 SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2339 };
2340 use rustbac_core::services::device_management::{
2341 DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2342 SERVICE_REINITIALIZE_DEVICE,
2343 };
2344 use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2345 use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2346 use rustbac_core::services::event_notification::{
2347 SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2348 };
2349 use rustbac_core::services::list_element::{
2350 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2351 SERVICE_REMOVE_LIST_ELEMENT,
2352 };
2353 use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2354 use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2355 use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2356 use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2357 use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2358 use rustbac_core::services::subscribe_cov_property::{
2359 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2360 };
2361 use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2362 use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2363 use rustbac_core::services::write_property_multiple::{
2364 PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2365 };
2366 use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2367 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2368 use std::collections::VecDeque;
2369 use std::sync::Arc;
2370 use std::time::Duration;
2371 use tokio::sync::Mutex;
2372
2373 #[derive(Debug, Default)]
2374 struct MockState {
2375 sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2376 recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2377 }
2378
2379 #[derive(Debug, Clone)]
2380 struct MockDataLink {
2381 state: Arc<MockState>,
2382 }
2383
2384 impl MockDataLink {
2385 fn new() -> (Self, Arc<MockState>) {
2386 let state = Arc::new(MockState::default());
2387 (
2388 Self {
2389 state: state.clone(),
2390 },
2391 state,
2392 )
2393 }
2394 }
2395
2396 impl DataLink for MockDataLink {
2397 async fn send(
2398 &self,
2399 address: DataLinkAddress,
2400 payload: &[u8],
2401 ) -> Result<(), DataLinkError> {
2402 self.state
2403 .sent
2404 .lock()
2405 .await
2406 .push((address, payload.to_vec()));
2407 Ok(())
2408 }
2409
2410 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2411 let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2412 return Err(DataLinkError::InvalidFrame);
2413 };
2414 if payload.len() > buf.len() {
2415 return Err(DataLinkError::FrameTooLarge);
2416 }
2417 buf[..payload.len()].copy_from_slice(&payload);
2418 Ok((payload.len(), addr))
2419 }
2420 }
2421
2422 fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2423 let mut out = [0u8; 512];
2424 let mut w = Writer::new(&mut out);
2425 Npdu::new(0).encode(&mut w).unwrap();
2426 w.write_all(apdu).unwrap();
2427 w.as_written().to_vec()
2428 }
2429
2430 fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2431 let mut apdu_buf = [0u8; 256];
2432 let mut w = Writer::new(&mut apdu_buf);
2433 ComplexAckHeader {
2434 segmented: false,
2435 more_follows: false,
2436 invoke_id,
2437 sequence_number: None,
2438 proposed_window_size: None,
2439 service_choice: SERVICE_READ_RANGE,
2440 }
2441 .encode(&mut w)
2442 .unwrap();
2443 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2444 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2445 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2446 w.write_u8(5).unwrap();
2447 w.write_u8(0b1110_0000).unwrap();
2448 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2449 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2450 encode_app_real(&mut w, 42.0).unwrap();
2451 encode_app_real(&mut w, 43.0).unwrap();
2452 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2453 w.as_written().to_vec()
2454 }
2455
2456 fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2457 let mut apdu_buf = [0u8; 256];
2458 let mut w = Writer::new(&mut apdu_buf);
2459 ComplexAckHeader {
2460 segmented: false,
2461 more_follows: false,
2462 invoke_id,
2463 sequence_number: None,
2464 proposed_window_size: None,
2465 service_choice: SERVICE_ATOMIC_READ_FILE,
2466 }
2467 .encode(&mut w)
2468 .unwrap();
2469 Tag::Application {
2470 tag: AppTag::Boolean,
2471 len: if eof { 1 } else { 0 },
2472 }
2473 .encode(&mut w)
2474 .unwrap();
2475 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2476 Tag::Application {
2477 tag: AppTag::SignedInt,
2478 len: 1,
2479 }
2480 .encode(&mut w)
2481 .unwrap();
2482 w.write_u8(0).unwrap();
2483 Tag::Application {
2484 tag: AppTag::OctetString,
2485 len: data.len() as u32,
2486 }
2487 .encode(&mut w)
2488 .unwrap();
2489 w.write_all(data).unwrap();
2490 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2491 w.as_written().to_vec()
2492 }
2493
2494 fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2495 let mut apdu_buf = [0u8; 256];
2496 let mut w = Writer::new(&mut apdu_buf);
2497 ComplexAckHeader {
2498 segmented: false,
2499 more_follows: false,
2500 invoke_id,
2501 sequence_number: None,
2502 proposed_window_size: None,
2503 service_choice: SERVICE_ATOMIC_READ_FILE,
2504 }
2505 .encode(&mut w)
2506 .unwrap();
2507 Tag::Application {
2508 tag: AppTag::Boolean,
2509 len: 0,
2510 }
2511 .encode(&mut w)
2512 .unwrap();
2513 Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2514 Tag::Application {
2515 tag: AppTag::SignedInt,
2516 len: 1,
2517 }
2518 .encode(&mut w)
2519 .unwrap();
2520 w.write_u8(7).unwrap();
2521 Tag::Application {
2522 tag: AppTag::UnsignedInt,
2523 len: 1,
2524 }
2525 .encode(&mut w)
2526 .unwrap();
2527 w.write_u8(2).unwrap();
2528 Tag::Application {
2529 tag: AppTag::OctetString,
2530 len: 2,
2531 }
2532 .encode(&mut w)
2533 .unwrap();
2534 w.write_all(&[0x01, 0x02]).unwrap();
2535 Tag::Application {
2536 tag: AppTag::OctetString,
2537 len: 3,
2538 }
2539 .encode(&mut w)
2540 .unwrap();
2541 w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2542 Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2543 w.as_written().to_vec()
2544 }
2545
2546 fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2547 let mut apdu_buf = [0u8; 64];
2548 let mut w = Writer::new(&mut apdu_buf);
2549 ComplexAckHeader {
2550 segmented: false,
2551 more_follows: false,
2552 invoke_id,
2553 sequence_number: None,
2554 proposed_window_size: None,
2555 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2556 }
2557 .encode(&mut w)
2558 .unwrap();
2559 Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2560 w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2561 w.as_written().to_vec()
2562 }
2563
2564 fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2565 let mut apdu_buf = [0u8; 64];
2566 let mut w = Writer::new(&mut apdu_buf);
2567 ComplexAckHeader {
2568 segmented: false,
2569 more_follows: false,
2570 invoke_id,
2571 sequence_number: None,
2572 proposed_window_size: None,
2573 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2574 }
2575 .encode(&mut w)
2576 .unwrap();
2577 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2578 w.write_u8(start_record as u8).unwrap();
2579 w.as_written().to_vec()
2580 }
2581
2582 fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2583 let mut apdu_buf = [0u8; 64];
2584 let mut w = Writer::new(&mut apdu_buf);
2585 ComplexAckHeader {
2586 segmented: false,
2587 more_follows: false,
2588 invoke_id,
2589 sequence_number: None,
2590 proposed_window_size: None,
2591 service_choice: SERVICE_CREATE_OBJECT,
2592 }
2593 .encode(&mut w)
2594 .unwrap();
2595 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2596 w.as_written().to_vec()
2597 }
2598
2599 fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2600 let mut apdu_buf = [0u8; 128];
2601 let mut w = Writer::new(&mut apdu_buf);
2602 ComplexAckHeader {
2603 segmented: false,
2604 more_follows: false,
2605 invoke_id,
2606 sequence_number: None,
2607 proposed_window_size: None,
2608 service_choice: SERVICE_GET_ALARM_SUMMARY,
2609 }
2610 .encode(&mut w)
2611 .unwrap();
2612 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2613 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2614 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2615 w.write_u8(5).unwrap();
2616 w.write_u8(0b1110_0000).unwrap();
2617
2618 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2619 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2620 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2621 w.write_u8(5).unwrap();
2622 w.write_u8(0b1100_0000).unwrap();
2623 w.as_written().to_vec()
2624 }
2625
2626 fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2627 let mut apdu_buf = [0u8; 160];
2628 let mut w = Writer::new(&mut apdu_buf);
2629 ComplexAckHeader {
2630 segmented: false,
2631 more_follows: false,
2632 invoke_id,
2633 sequence_number: None,
2634 proposed_window_size: None,
2635 service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2636 }
2637 .encode(&mut w)
2638 .unwrap();
2639 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2640 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2641 encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2642 encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2643 encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2644
2645 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2646 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2647 encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2648 encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2649 encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2650 w.as_written().to_vec()
2651 }
2652
2653 fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2654 let mut apdu_buf = [0u8; 256];
2655 let mut w = Writer::new(&mut apdu_buf);
2656 ComplexAckHeader {
2657 segmented: false,
2658 more_follows: false,
2659 invoke_id,
2660 sequence_number: None,
2661 proposed_window_size: None,
2662 service_choice: SERVICE_GET_EVENT_INFORMATION,
2663 }
2664 .encode(&mut w)
2665 .unwrap();
2666 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2667 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2668 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2669 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2670 w.write_u8(5).unwrap();
2671 w.write_u8(0b1110_0000).unwrap();
2672 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2673 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2674 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2675 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2676 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2677 encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2678 Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2679 w.write_u8(5).unwrap();
2680 w.write_u8(0b1100_0000).unwrap();
2681 Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2682 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2683 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2684 encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2685 Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2686 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2687 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2688 w.write_u8(0).unwrap();
2689 w.as_written().to_vec()
2690 }
2691
2692 #[tokio::test]
2693 async fn who_has_object_name_collects_i_have() {
2694 let (dl, state) = MockDataLink::new();
2695 let client = BacnetClient::with_datalink(dl);
2696 let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2697
2698 let mut apdu = [0u8; 128];
2699 let mut w = Writer::new(&mut apdu);
2700 UnconfirmedRequestHeader {
2701 service_choice: SERVICE_I_HAVE,
2702 }
2703 .encode(&mut w)
2704 .unwrap();
2705 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2706 encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2707 encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2708
2709 state
2710 .recv
2711 .lock()
2712 .await
2713 .push_back((with_npdu(w.as_written()), addr));
2714
2715 let results = client
2716 .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2717 .await
2718 .unwrap();
2719 assert_eq!(results.len(), 1);
2720 assert_eq!(results[0].address, addr);
2721 assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2722 assert_eq!(
2723 results[0].object_id,
2724 ObjectId::new(ObjectType::AnalogInput, 7)
2725 );
2726 assert_eq!(results[0].object_name, "Zone Temp");
2727
2728 let sent = state.sent.lock().await;
2729 assert_eq!(sent.len(), 1);
2730 let mut r = Reader::new(&sent[0].1);
2731 let _npdu = Npdu::decode(&mut r).unwrap();
2732 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2733 assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2734 }
2735
2736 #[tokio::test]
2737 async fn device_communication_control_handles_simple_ack() {
2738 let (dl, state) = MockDataLink::new();
2739 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2740 let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2741
2742 let mut apdu = [0u8; 32];
2743 let mut w = Writer::new(&mut apdu);
2744 SimpleAck {
2745 invoke_id: 1,
2746 service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2747 }
2748 .encode(&mut w)
2749 .unwrap();
2750 state
2751 .recv
2752 .lock()
2753 .await
2754 .push_back((with_npdu(w.as_written()), addr));
2755
2756 client
2757 .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2758 .await
2759 .unwrap();
2760
2761 let sent = state.sent.lock().await;
2762 assert_eq!(sent.len(), 1);
2763 let mut r = Reader::new(&sent[0].1);
2764 let _npdu = Npdu::decode(&mut r).unwrap();
2765 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2766 assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2767 }
2768
2769 #[tokio::test]
2770 async fn reinitialize_device_handles_simple_ack() {
2771 let (dl, state) = MockDataLink::new();
2772 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2773 let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2774
2775 let mut apdu = [0u8; 32];
2776 let mut w = Writer::new(&mut apdu);
2777 SimpleAck {
2778 invoke_id: 1,
2779 service_choice: SERVICE_REINITIALIZE_DEVICE,
2780 }
2781 .encode(&mut w)
2782 .unwrap();
2783 state
2784 .recv
2785 .lock()
2786 .await
2787 .push_back((with_npdu(w.as_written()), addr));
2788
2789 client
2790 .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2791 .await
2792 .unwrap();
2793
2794 let sent = state.sent.lock().await;
2795 assert_eq!(sent.len(), 1);
2796 let mut r = Reader::new(&sent[0].1);
2797 let _npdu = Npdu::decode(&mut r).unwrap();
2798 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2799 assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2800 }
2801
2802 #[tokio::test]
2803 async fn time_synchronize_sends_unconfirmed_request() {
2804 let (dl, state) = MockDataLink::new();
2805 let client = BacnetClient::with_datalink(dl);
2806 let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2807
2808 client
2809 .time_synchronize(
2810 addr,
2811 Date {
2812 year_since_1900: 126,
2813 month: 2,
2814 day: 7,
2815 weekday: 6,
2816 },
2817 Time {
2818 hour: 10,
2819 minute: 11,
2820 second: 12,
2821 hundredths: 13,
2822 },
2823 false,
2824 )
2825 .await
2826 .unwrap();
2827
2828 let sent = state.sent.lock().await;
2829 assert_eq!(sent.len(), 1);
2830 let mut r = Reader::new(&sent[0].1);
2831 let _npdu = Npdu::decode(&mut r).unwrap();
2832 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2833 assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2834 }
2835
2836 #[tokio::test]
2837 async fn get_alarm_summary_decodes_complex_ack() {
2838 let (dl, state) = MockDataLink::new();
2839 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2840 let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2841
2842 state
2843 .recv
2844 .lock()
2845 .await
2846 .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2847
2848 let summaries = client.get_alarm_summary(addr).await.unwrap();
2849 assert_eq!(summaries.len(), 2);
2850 assert_eq!(
2851 summaries[0],
2852 AlarmSummaryItem {
2853 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2854 alarm_state_raw: 1,
2855 alarm_state: Some(EventState::Fault),
2856 acknowledged_transitions: crate::ClientBitString {
2857 unused_bits: 5,
2858 data: vec![0b1110_0000],
2859 },
2860 }
2861 );
2862 assert_eq!(
2863 summaries[1],
2864 AlarmSummaryItem {
2865 object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2866 alarm_state_raw: 0,
2867 alarm_state: Some(EventState::Normal),
2868 acknowledged_transitions: crate::ClientBitString {
2869 unused_bits: 5,
2870 data: vec![0b1100_0000],
2871 },
2872 }
2873 );
2874
2875 let sent = state.sent.lock().await;
2876 assert_eq!(sent.len(), 1);
2877 let mut r = Reader::new(&sent[0].1);
2878 let _npdu = Npdu::decode(&mut r).unwrap();
2879 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2880 assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2881 }
2882
2883 #[tokio::test]
2884 async fn get_enrollment_summary_decodes_complex_ack() {
2885 let (dl, state) = MockDataLink::new();
2886 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2887 let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2888
2889 state
2890 .recv
2891 .lock()
2892 .await
2893 .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2894
2895 let summaries = client.get_enrollment_summary(addr).await.unwrap();
2896 assert_eq!(summaries.len(), 2);
2897 assert_eq!(
2898 summaries[0],
2899 EnrollmentSummaryItem {
2900 object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2901 event_type: 1,
2902 event_state_raw: 2,
2903 event_state: Some(EventState::Offnormal),
2904 priority: 200,
2905 notification_class: 10,
2906 }
2907 );
2908 assert_eq!(
2909 summaries[1],
2910 EnrollmentSummaryItem {
2911 object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2912 event_type: 0,
2913 event_state_raw: 0,
2914 event_state: Some(EventState::Normal),
2915 priority: 20,
2916 notification_class: 11,
2917 }
2918 );
2919
2920 let sent = state.sent.lock().await;
2921 assert_eq!(sent.len(), 1);
2922 let mut r = Reader::new(&sent[0].1);
2923 let _npdu = Npdu::decode(&mut r).unwrap();
2924 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2925 assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2926 }
2927
2928 #[tokio::test]
2929 async fn get_event_information_decodes_complex_ack() {
2930 let (dl, state) = MockDataLink::new();
2931 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2932 let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2933
2934 state
2935 .recv
2936 .lock()
2937 .await
2938 .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2939
2940 let result = client.get_event_information(addr, None).await.unwrap();
2941 assert!(!result.more_events);
2942 assert_eq!(result.summaries.len(), 1);
2943 assert_eq!(
2944 result.summaries[0],
2945 EventInformationItem {
2946 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2947 event_state_raw: 2,
2948 event_state: Some(EventState::Offnormal),
2949 acknowledged_transitions: crate::ClientBitString {
2950 unused_bits: 5,
2951 data: vec![0b1110_0000],
2952 },
2953 notify_type: 0,
2954 event_enable: crate::ClientBitString {
2955 unused_bits: 5,
2956 data: vec![0b1100_0000],
2957 },
2958 event_priorities: [1, 2, 3],
2959 }
2960 );
2961 }
2962
2963 #[tokio::test]
2964 async fn acknowledge_alarm_handles_simple_ack() {
2965 let (dl, state) = MockDataLink::new();
2966 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2967 let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2968
2969 let mut apdu = [0u8; 32];
2970 let mut w = Writer::new(&mut apdu);
2971 SimpleAck {
2972 invoke_id: 1,
2973 service_choice: SERVICE_ACKNOWLEDGE_ALARM,
2974 }
2975 .encode(&mut w)
2976 .unwrap();
2977 state
2978 .recv
2979 .lock()
2980 .await
2981 .push_back((with_npdu(w.as_written()), addr));
2982
2983 client
2984 .acknowledge_alarm(
2985 addr,
2986 AcknowledgeAlarmRequest {
2987 acknowledging_process_id: 10,
2988 event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2989 event_state_acknowledged: EventState::Offnormal,
2990 event_time_stamp: TimeStamp::SequenceNumber(42),
2991 acknowledgment_source: "operator",
2992 time_of_acknowledgment: TimeStamp::DateTime {
2993 date: Date {
2994 year_since_1900: 126,
2995 month: 2,
2996 day: 7,
2997 weekday: 6,
2998 },
2999 time: Time {
3000 hour: 10,
3001 minute: 11,
3002 second: 12,
3003 hundredths: 13,
3004 },
3005 },
3006 invoke_id: 0,
3007 },
3008 )
3009 .await
3010 .unwrap();
3011
3012 let sent = state.sent.lock().await;
3013 assert_eq!(sent.len(), 1);
3014 let mut r = Reader::new(&sent[0].1);
3015 let _npdu = Npdu::decode(&mut r).unwrap();
3016 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3017 assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
3018 }
3019
3020 #[tokio::test]
3021 async fn create_object_by_type_decodes_complex_ack() {
3022 let (dl, state) = MockDataLink::new();
3023 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3024 let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
3025 let created = ObjectId::new(ObjectType::AnalogValue, 42);
3026
3027 state
3028 .recv
3029 .lock()
3030 .await
3031 .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
3032
3033 let result = client
3034 .create_object_by_type(addr, ObjectType::AnalogValue)
3035 .await
3036 .unwrap();
3037 assert_eq!(result, created);
3038
3039 let sent = state.sent.lock().await;
3040 let mut r = Reader::new(&sent[0].1);
3041 let _npdu = Npdu::decode(&mut r).unwrap();
3042 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3043 assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
3044 }
3045
3046 #[tokio::test]
3047 async fn delete_object_handles_simple_ack() {
3048 let (dl, state) = MockDataLink::new();
3049 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3050 let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
3051
3052 let mut apdu = [0u8; 32];
3053 let mut w = Writer::new(&mut apdu);
3054 SimpleAck {
3055 invoke_id: 1,
3056 service_choice: SERVICE_DELETE_OBJECT,
3057 }
3058 .encode(&mut w)
3059 .unwrap();
3060 state
3061 .recv
3062 .lock()
3063 .await
3064 .push_back((with_npdu(w.as_written()), addr));
3065
3066 client
3067 .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
3068 .await
3069 .unwrap();
3070 }
3071
3072 #[tokio::test]
3073 async fn add_list_element_handles_simple_ack() {
3074 let (dl, state) = MockDataLink::new();
3075 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3076 let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
3077
3078 let mut apdu = [0u8; 32];
3079 let mut w = Writer::new(&mut apdu);
3080 SimpleAck {
3081 invoke_id: 1,
3082 service_choice: SERVICE_ADD_LIST_ELEMENT,
3083 }
3084 .encode(&mut w)
3085 .unwrap();
3086 state
3087 .recv
3088 .lock()
3089 .await
3090 .push_back((with_npdu(w.as_written()), addr));
3091
3092 let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
3093 client
3094 .add_list_element(
3095 addr,
3096 AddListElementRequest {
3097 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3098 property_id: PropertyId::Proprietary(512),
3099 array_index: None,
3100 elements: &values,
3101 invoke_id: 0,
3102 },
3103 )
3104 .await
3105 .unwrap();
3106 }
3107
3108 #[tokio::test]
3109 async fn remove_list_element_handles_simple_ack() {
3110 let (dl, state) = MockDataLink::new();
3111 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3112 let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
3113
3114 let mut apdu = [0u8; 32];
3115 let mut w = Writer::new(&mut apdu);
3116 SimpleAck {
3117 invoke_id: 1,
3118 service_choice: SERVICE_REMOVE_LIST_ELEMENT,
3119 }
3120 .encode(&mut w)
3121 .unwrap();
3122 state
3123 .recv
3124 .lock()
3125 .await
3126 .push_back((with_npdu(w.as_written()), addr));
3127
3128 let values = [DataValue::Unsigned(1)];
3129 client
3130 .remove_list_element(
3131 addr,
3132 RemoveListElementRequest {
3133 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3134 property_id: PropertyId::Proprietary(513),
3135 array_index: None,
3136 elements: &values,
3137 invoke_id: 0,
3138 },
3139 )
3140 .await
3141 .unwrap();
3142 }
3143
3144 #[tokio::test]
3145 async fn atomic_read_file_stream_decodes_complex_ack() {
3146 let (dl, state) = MockDataLink::new();
3147 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3148 let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
3149 let file_object = ObjectId::new(ObjectType::File, 2);
3150
3151 state.recv.lock().await.push_back((
3152 with_npdu(&atomic_read_file_stream_ack_apdu(
3153 1,
3154 true,
3155 &[0xAA, 0xBB, 0xCC],
3156 )),
3157 addr,
3158 ));
3159
3160 let result = client
3161 .atomic_read_file_stream(addr, file_object, 0, 3)
3162 .await
3163 .unwrap();
3164
3165 assert_eq!(
3166 result,
3167 AtomicReadFileResult::Stream {
3168 end_of_file: true,
3169 file_start_position: 0,
3170 file_data: vec![0xAA, 0xBB, 0xCC],
3171 }
3172 );
3173
3174 let sent = state.sent.lock().await;
3175 assert_eq!(sent.len(), 1);
3176 let mut r = Reader::new(&sent[0].1);
3177 let _npdu = Npdu::decode(&mut r).unwrap();
3178 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3179 assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
3180 }
3181
3182 #[tokio::test]
3183 async fn atomic_read_file_record_decodes_complex_ack() {
3184 let (dl, state) = MockDataLink::new();
3185 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3186 let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
3187 let file_object = ObjectId::new(ObjectType::File, 5);
3188
3189 state
3190 .recv
3191 .lock()
3192 .await
3193 .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
3194
3195 let result = client
3196 .atomic_read_file_record(addr, file_object, 7, 2)
3197 .await
3198 .unwrap();
3199
3200 assert_eq!(
3201 result,
3202 AtomicReadFileResult::Record {
3203 end_of_file: false,
3204 file_start_record: 7,
3205 returned_record_count: 2,
3206 file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
3207 }
3208 );
3209 }
3210
3211 #[tokio::test]
3212 async fn atomic_write_file_stream_decodes_complex_ack() {
3213 let (dl, state) = MockDataLink::new();
3214 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3215 let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
3216 let file_object = ObjectId::new(ObjectType::File, 3);
3217
3218 state
3219 .recv
3220 .lock()
3221 .await
3222 .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
3223
3224 let result = client
3225 .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
3226 .await
3227 .unwrap();
3228
3229 assert_eq!(
3230 result,
3231 AtomicWriteFileResult::Stream {
3232 file_start_position: 128
3233 }
3234 );
3235 }
3236
3237 #[tokio::test]
3238 async fn atomic_write_file_record_decodes_complex_ack() {
3239 let (dl, state) = MockDataLink::new();
3240 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3241 let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
3242 let file_object = ObjectId::new(ObjectType::File, 9);
3243 let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
3244
3245 state
3246 .recv
3247 .lock()
3248 .await
3249 .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
3250
3251 let result = client
3252 .atomic_write_file_record(addr, file_object, 7, &records)
3253 .await
3254 .unwrap();
3255
3256 assert_eq!(
3257 result,
3258 AtomicWriteFileResult::Record {
3259 file_start_record: 7
3260 }
3261 );
3262 }
3263
3264 #[tokio::test]
3265 async fn read_properties_decodes_complex_ack() {
3266 let (dl, state) = MockDataLink::new();
3267 let client = BacnetClient::with_datalink(dl);
3268 let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
3269 let object_id = ObjectId::new(ObjectType::Device, 1);
3270
3271 let mut apdu_buf = [0u8; 256];
3272 let mut w = Writer::new(&mut apdu_buf);
3273 ComplexAckHeader {
3274 segmented: false,
3275 more_follows: false,
3276 invoke_id: 1,
3277 sequence_number: None,
3278 proposed_window_size: None,
3279 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3280 }
3281 .encode(&mut w)
3282 .unwrap();
3283 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
3284 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3285 .encode(&mut w)
3286 .unwrap();
3287 encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
3288 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3289 .encode(&mut w)
3290 .unwrap();
3291 encode_app_real(&mut w, 55.5).unwrap();
3292 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3293 .encode(&mut w)
3294 .unwrap();
3295 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3296 .encode(&mut w)
3297 .unwrap();
3298
3299 state
3300 .recv
3301 .lock()
3302 .await
3303 .push_back((with_npdu(w.as_written()), addr));
3304
3305 let values = client
3306 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3307 .await
3308 .unwrap();
3309 assert_eq!(values.len(), 1);
3310 assert_eq!(values[0].0, PropertyId::PresentValue);
3311 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3312
3313 let sent = state.sent.lock().await;
3314 assert_eq!(sent.len(), 1);
3315 let mut r = Reader::new(&sent[0].1);
3316 let _npdu = Npdu::decode(&mut r).unwrap();
3317 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3318 assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3319 }
3320
3321 #[tokio::test]
3322 async fn read_property_multiple_reassembles_segmented_complex_ack() {
3323 let (dl, state) = MockDataLink::new();
3324 let client = BacnetClient::with_datalink(dl);
3325 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3326 let object_id = ObjectId::new(ObjectType::Device, 1);
3327
3328 let mut payload_buf = [0u8; 256];
3329 let mut pw = Writer::new(&mut payload_buf);
3330 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3331 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3332 .encode(&mut pw)
3333 .unwrap();
3334 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3335 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3336 .encode(&mut pw)
3337 .unwrap();
3338 encode_app_real(&mut pw, 66.0).unwrap();
3339 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3340 .encode(&mut pw)
3341 .unwrap();
3342 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3343 .encode(&mut pw)
3344 .unwrap();
3345 let payload = pw.as_written();
3346 let split = payload.len() / 2;
3347
3348 let mut apdu1 = [0u8; 256];
3349 let mut w1 = Writer::new(&mut apdu1);
3350 ComplexAckHeader {
3351 segmented: true,
3352 more_follows: true,
3353 invoke_id: 1,
3354 sequence_number: Some(0),
3355 proposed_window_size: Some(1),
3356 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3357 }
3358 .encode(&mut w1)
3359 .unwrap();
3360 w1.write_all(&payload[..split]).unwrap();
3361
3362 let mut apdu2 = [0u8; 256];
3363 let mut w2 = Writer::new(&mut apdu2);
3364 ComplexAckHeader {
3365 segmented: true,
3366 more_follows: false,
3367 invoke_id: 1,
3368 sequence_number: Some(1),
3369 proposed_window_size: Some(1),
3370 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3371 }
3372 .encode(&mut w2)
3373 .unwrap();
3374 w2.write_all(&payload[split..]).unwrap();
3375
3376 state
3377 .recv
3378 .lock()
3379 .await
3380 .push_back((with_npdu(w1.as_written()), addr));
3381 state
3382 .recv
3383 .lock()
3384 .await
3385 .push_back((with_npdu(w2.as_written()), addr));
3386
3387 let values = client
3388 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3389 .await
3390 .unwrap();
3391 assert_eq!(values.len(), 1);
3392 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3393
3394 let sent = state.sent.lock().await;
3395 assert!(sent.len() >= 3);
3396
3397 let mut saw_segment_ack = 0usize;
3398 for (_, frame) in sent.iter().skip(1) {
3399 let mut r = Reader::new(frame);
3400 let _npdu = Npdu::decode(&mut r).unwrap();
3401 let apdu = r.read_exact(r.remaining()).unwrap();
3402 if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3403 let mut sr = Reader::new(apdu);
3404 let sack = SegmentAck::decode(&mut sr).unwrap();
3405 assert_eq!(sack.invoke_id, 1);
3406 saw_segment_ack += 1;
3407 }
3408 }
3409 assert!(saw_segment_ack >= 1);
3410 }
3411
3412 #[tokio::test]
3413 async fn read_property_multiple_tolerates_duplicate_segment() {
3414 let (dl, state) = MockDataLink::new();
3415 let client = BacnetClient::with_datalink(dl);
3416 let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3417 let object_id = ObjectId::new(ObjectType::Device, 1);
3418
3419 let mut payload_buf = [0u8; 256];
3420 let mut pw = Writer::new(&mut payload_buf);
3421 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3422 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3423 .encode(&mut pw)
3424 .unwrap();
3425 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3426 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3427 .encode(&mut pw)
3428 .unwrap();
3429 encode_app_real(&mut pw, 66.0).unwrap();
3430 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3431 .encode(&mut pw)
3432 .unwrap();
3433 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3434 .encode(&mut pw)
3435 .unwrap();
3436 let payload = pw.as_written();
3437 let split = payload.len() / 2;
3438
3439 let mut apdu1 = [0u8; 256];
3440 let mut w1 = Writer::new(&mut apdu1);
3441 ComplexAckHeader {
3442 segmented: true,
3443 more_follows: true,
3444 invoke_id: 1,
3445 sequence_number: Some(0),
3446 proposed_window_size: Some(1),
3447 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3448 }
3449 .encode(&mut w1)
3450 .unwrap();
3451 w1.write_all(&payload[..split]).unwrap();
3452
3453 let mut dup = [0u8; 256];
3454 let mut wd = Writer::new(&mut dup);
3455 ComplexAckHeader {
3456 segmented: true,
3457 more_follows: true,
3458 invoke_id: 1,
3459 sequence_number: Some(0),
3460 proposed_window_size: Some(1),
3461 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3462 }
3463 .encode(&mut wd)
3464 .unwrap();
3465 wd.write_all(&payload[..split]).unwrap();
3466
3467 let mut apdu2 = [0u8; 256];
3468 let mut w2 = Writer::new(&mut apdu2);
3469 ComplexAckHeader {
3470 segmented: true,
3471 more_follows: false,
3472 invoke_id: 1,
3473 sequence_number: Some(1),
3474 proposed_window_size: Some(1),
3475 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3476 }
3477 .encode(&mut w2)
3478 .unwrap();
3479 w2.write_all(&payload[split..]).unwrap();
3480
3481 {
3482 let mut recv = state.recv.lock().await;
3483 recv.push_back((with_npdu(w1.as_written()), addr));
3484 recv.push_back((with_npdu(wd.as_written()), addr));
3485 recv.push_back((with_npdu(w2.as_written()), addr));
3486 }
3487
3488 let values = client
3489 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3490 .await
3491 .unwrap();
3492 assert_eq!(values.len(), 1);
3493 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3494 }
3495
3496 #[tokio::test]
3497 async fn write_properties_handles_simple_ack() {
3498 let (dl, state) = MockDataLink::new();
3499 let client = BacnetClient::with_datalink(dl);
3500 let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3501 let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3502
3503 let mut apdu_buf = [0u8; 32];
3504 let mut w = Writer::new(&mut apdu_buf);
3505 SimpleAck {
3506 invoke_id: 1,
3507 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3508 }
3509 .encode(&mut w)
3510 .unwrap();
3511 state
3512 .recv
3513 .lock()
3514 .await
3515 .push_back((with_npdu(w.as_written()), addr));
3516
3517 let writes = [PropertyWriteSpec {
3518 property_id: PropertyId::PresentValue,
3519 array_index: None,
3520 value: DataValue::Real(12.5),
3521 priority: Some(8),
3522 }];
3523 client
3524 .write_property_multiple(addr, object_id, &writes)
3525 .await
3526 .unwrap();
3527
3528 let sent = state.sent.lock().await;
3529 assert_eq!(sent.len(), 1);
3530 let mut r = Reader::new(&sent[0].1);
3531 let _npdu = Npdu::decode(&mut r).unwrap();
3532 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3533 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3534 }
3535
3536 #[tokio::test]
3537 async fn subscribe_cov_handles_simple_ack() {
3538 let (dl, state) = MockDataLink::new();
3539 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3540 let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3541
3542 let mut apdu_buf = [0u8; 32];
3543 let mut w = Writer::new(&mut apdu_buf);
3544 SimpleAck {
3545 invoke_id: 1,
3546 service_choice: SERVICE_SUBSCRIBE_COV,
3547 }
3548 .encode(&mut w)
3549 .unwrap();
3550 state
3551 .recv
3552 .lock()
3553 .await
3554 .push_back((with_npdu(w.as_written()), addr));
3555
3556 client
3557 .subscribe_cov(
3558 addr,
3559 SubscribeCovRequest {
3560 subscriber_process_id: 10,
3561 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3562 issue_confirmed_notifications: Some(false),
3563 lifetime_seconds: Some(300),
3564 invoke_id: 0,
3565 },
3566 )
3567 .await
3568 .unwrap();
3569
3570 let sent = state.sent.lock().await;
3571 assert_eq!(sent.len(), 1);
3572 let mut r = Reader::new(&sent[0].1);
3573 let _npdu = Npdu::decode(&mut r).unwrap();
3574 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3575 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3576 }
3577
3578 #[tokio::test]
3579 async fn subscribe_cov_property_handles_simple_ack() {
3580 let (dl, state) = MockDataLink::new();
3581 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3582 let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3583
3584 let mut apdu_buf = [0u8; 32];
3585 let mut w = Writer::new(&mut apdu_buf);
3586 SimpleAck {
3587 invoke_id: 1,
3588 service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3589 }
3590 .encode(&mut w)
3591 .unwrap();
3592 state
3593 .recv
3594 .lock()
3595 .await
3596 .push_back((with_npdu(w.as_written()), addr));
3597
3598 client
3599 .subscribe_cov_property(
3600 addr,
3601 SubscribeCovPropertyRequest {
3602 subscriber_process_id: 22,
3603 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3604 issue_confirmed_notifications: Some(true),
3605 lifetime_seconds: Some(120),
3606 monitored_property_id: PropertyId::PresentValue,
3607 monitored_property_array_index: None,
3608 cov_increment: Some(0.1),
3609 invoke_id: 0,
3610 },
3611 )
3612 .await
3613 .unwrap();
3614
3615 let sent = state.sent.lock().await;
3616 assert_eq!(sent.len(), 1);
3617 let mut r = Reader::new(&sent[0].1);
3618 let _npdu = Npdu::decode(&mut r).unwrap();
3619 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3620 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3621 }
3622
3623 #[tokio::test]
3624 async fn read_range_by_position_decodes_complex_ack() {
3625 let (dl, state) = MockDataLink::new();
3626 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3627 let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3628 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3629
3630 let mut apdu_buf = [0u8; 256];
3631 let mut w = Writer::new(&mut apdu_buf);
3632 ComplexAckHeader {
3633 segmented: false,
3634 more_follows: false,
3635 invoke_id: 1,
3636 sequence_number: None,
3637 proposed_window_size: None,
3638 service_choice: SERVICE_READ_RANGE,
3639 }
3640 .encode(&mut w)
3641 .unwrap();
3642 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3643 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3644 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3645 w.write_u8(5).unwrap();
3646 w.write_u8(0b1110_0000).unwrap();
3647 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3648 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3649 encode_app_real(&mut w, 42.0).unwrap();
3650 encode_app_real(&mut w, 43.0).unwrap();
3651 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3652
3653 state
3654 .recv
3655 .lock()
3656 .await
3657 .push_back((with_npdu(w.as_written()), addr));
3658
3659 let result = client
3660 .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3661 .await
3662 .unwrap();
3663 assert_eq!(result.object_id, object_id);
3664 assert_eq!(result.item_count, 2);
3665 assert_eq!(result.items.len(), 2);
3666 assert!(matches!(
3667 result.items[0],
3668 ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3669 ));
3670 }
3671
3672 #[tokio::test]
3673 async fn read_range_by_sequence_number_encodes_range_selector() {
3674 let (dl, state) = MockDataLink::new();
3675 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3676 let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3677 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3678
3679 state
3680 .recv
3681 .lock()
3682 .await
3683 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3684
3685 let _ = client
3686 .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3687 .await
3688 .unwrap();
3689
3690 let sent = state.sent.lock().await;
3691 let mut r = Reader::new(&sent[0].1);
3692 let _npdu = Npdu::decode(&mut r).unwrap();
3693 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3694 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3695 match Tag::decode(&mut r).unwrap() {
3696 Tag::Context { tag_num: 0, len: 4 } => {
3697 let _ = r.read_exact(4).unwrap();
3698 }
3699 other => panic!("unexpected object id tag: {other:?}"),
3700 }
3701 match Tag::decode(&mut r).unwrap() {
3702 Tag::Context { tag_num: 1, len } => {
3703 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3704 }
3705 other => panic!("unexpected property tag: {other:?}"),
3706 }
3707 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3708 match Tag::decode(&mut r).unwrap() {
3709 Tag::Application {
3710 tag: AppTag::UnsignedInt,
3711 len,
3712 } => {
3713 assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3714 }
3715 other => panic!("unexpected ref seq tag: {other:?}"),
3716 }
3717 match Tag::decode(&mut r).unwrap() {
3718 Tag::Application {
3719 tag: AppTag::SignedInt,
3720 len,
3721 } => {
3722 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3723 }
3724 other => panic!("unexpected count tag: {other:?}"),
3725 }
3726 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3727 }
3728
3729 #[tokio::test]
3730 async fn read_range_by_time_encodes_range_selector() {
3731 let (dl, state) = MockDataLink::new();
3732 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3733 let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3734 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3735 let date = Date {
3736 year_since_1900: 126,
3737 month: 2,
3738 day: 7,
3739 weekday: 6,
3740 };
3741 let time = Time {
3742 hour: 10,
3743 minute: 11,
3744 second: 12,
3745 hundredths: 13,
3746 };
3747
3748 state
3749 .recv
3750 .lock()
3751 .await
3752 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3753
3754 let _ = client
3755 .read_range_by_time(
3756 addr,
3757 object_id,
3758 PropertyId::PresentValue,
3759 None,
3760 (date, time),
3761 2,
3762 )
3763 .await
3764 .unwrap();
3765
3766 let sent = state.sent.lock().await;
3767 let mut r = Reader::new(&sent[0].1);
3768 let _npdu = Npdu::decode(&mut r).unwrap();
3769 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3770 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3771 match Tag::decode(&mut r).unwrap() {
3772 Tag::Context { tag_num: 0, len: 4 } => {
3773 let _ = r.read_exact(4).unwrap();
3774 }
3775 other => panic!("unexpected object id tag: {other:?}"),
3776 }
3777 match Tag::decode(&mut r).unwrap() {
3778 Tag::Context { tag_num: 1, len } => {
3779 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3780 }
3781 other => panic!("unexpected property tag: {other:?}"),
3782 }
3783 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3784 match Tag::decode(&mut r).unwrap() {
3785 Tag::Application {
3786 tag: AppTag::Date,
3787 len: 4,
3788 } => {
3789 let raw = r.read_exact(4).unwrap();
3790 assert_eq!(
3791 raw,
3792 &[date.year_since_1900, date.month, date.day, date.weekday]
3793 );
3794 }
3795 other => panic!("unexpected date tag: {other:?}"),
3796 }
3797 match Tag::decode(&mut r).unwrap() {
3798 Tag::Application {
3799 tag: AppTag::Time,
3800 len: 4,
3801 } => {
3802 let raw = r.read_exact(4).unwrap();
3803 assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3804 }
3805 other => panic!("unexpected time tag: {other:?}"),
3806 }
3807 match Tag::decode(&mut r).unwrap() {
3808 Tag::Application {
3809 tag: AppTag::SignedInt,
3810 len,
3811 } => {
3812 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3813 }
3814 other => panic!("unexpected count tag: {other:?}"),
3815 }
3816 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3817 }
3818
3819 #[tokio::test]
3820 async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3821 let (dl, state) = MockDataLink::new();
3822 let client = BacnetClient::with_datalink(dl);
3823 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3824
3825 let mut apdu = [0u8; 256];
3826 let mut w = Writer::new(&mut apdu);
3827 UnconfirmedRequestHeader {
3828 service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3829 }
3830 .encode(&mut w)
3831 .unwrap();
3832 encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3833 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3834 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3835 encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3836 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3837 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3838 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3839 encode_app_real(&mut w, 73.25).unwrap();
3840 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3841 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3842
3843 state
3844 .recv
3845 .lock()
3846 .await
3847 .push_back((with_npdu(w.as_written()), addr));
3848
3849 let notification = client
3850 .recv_cov_notification(Duration::from_secs(1))
3851 .await
3852 .unwrap()
3853 .unwrap();
3854 assert!(!notification.confirmed);
3855 assert_eq!(notification.subscriber_process_id, 17);
3856 assert_eq!(notification.values.len(), 1);
3857 assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3858 assert!(matches!(
3859 notification.values[0].value,
3860 ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3861 ));
3862
3863 let sent = state.sent.lock().await;
3864 assert!(sent.is_empty());
3865 }
3866
3867 #[tokio::test]
3868 async fn recv_confirmed_cov_notification_sends_simple_ack() {
3869 let (dl, state) = MockDataLink::new();
3870 let client = BacnetClient::with_datalink(dl);
3871 let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3872
3873 let mut apdu = [0u8; 256];
3874 let mut w = Writer::new(&mut apdu);
3875 ConfirmedRequestHeader {
3876 segmented: false,
3877 more_follows: false,
3878 segmented_response_accepted: false,
3879 max_segments: 0,
3880 max_apdu: 5,
3881 invoke_id: 9,
3882 sequence_number: None,
3883 proposed_window_size: None,
3884 service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3885 }
3886 .encode(&mut w)
3887 .unwrap();
3888 encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3889 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3890 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3891 encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3892 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3893 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3894 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3895 encode_app_real(&mut w, 55.0).unwrap();
3896 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3897 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3898
3899 state
3900 .recv
3901 .lock()
3902 .await
3903 .push_back((with_npdu(w.as_written()), addr));
3904
3905 let notification = client
3906 .recv_cov_notification(Duration::from_secs(1))
3907 .await
3908 .unwrap()
3909 .unwrap();
3910 assert!(notification.confirmed);
3911 assert_eq!(notification.values.len(), 1);
3912
3913 let sent = state.sent.lock().await;
3914 assert_eq!(sent.len(), 1);
3915 let mut r = Reader::new(&sent[0].1);
3916 let _npdu = Npdu::decode(&mut r).unwrap();
3917 let ack = SimpleAck::decode(&mut r).unwrap();
3918 assert_eq!(ack.invoke_id, 9);
3919 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3920 }
3921
3922 #[tokio::test]
3923 async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3924 let (dl, state) = MockDataLink::new();
3925 let client = BacnetClient::with_datalink(dl);
3926 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3927
3928 let mut apdu = [0u8; 256];
3929 let mut w = Writer::new(&mut apdu);
3930 UnconfirmedRequestHeader {
3931 service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3932 }
3933 .encode(&mut w)
3934 .unwrap();
3935 encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3936 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3937 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3938 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3939 encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3940 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3941 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3942 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3943 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3944 encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3945 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3946 Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3947 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3948 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3949 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3950 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3951 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3952 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3953 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3954
3955 state
3956 .recv
3957 .lock()
3958 .await
3959 .push_back((with_npdu(w.as_written()), addr));
3960
3961 let notification: EventNotification = client
3962 .recv_event_notification(Duration::from_secs(1))
3963 .await
3964 .unwrap()
3965 .unwrap();
3966 assert!(!notification.confirmed);
3967 assert_eq!(notification.process_id, 99);
3968 assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3969 assert_eq!(notification.ack_required, Some(true));
3970 assert_eq!(notification.from_state, Some(EventState::Offnormal));
3971 assert_eq!(notification.to_state, Some(EventState::Normal));
3972 assert_eq!(notification.notify_type, 0);
3973
3974 let sent = state.sent.lock().await;
3975 assert!(sent.is_empty());
3976 }
3977
3978 #[tokio::test]
3979 async fn recv_confirmed_event_notification_sends_simple_ack() {
3980 let (dl, state) = MockDataLink::new();
3981 let client = BacnetClient::with_datalink(dl);
3982 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
3983
3984 let mut apdu = [0u8; 256];
3985 let mut w = Writer::new(&mut apdu);
3986 ConfirmedRequestHeader {
3987 segmented: false,
3988 more_follows: false,
3989 segmented_response_accepted: false,
3990 max_segments: 0,
3991 max_apdu: 5,
3992 invoke_id: 11,
3993 sequence_number: None,
3994 proposed_window_size: None,
3995 service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
3996 }
3997 .encode(&mut w)
3998 .unwrap();
3999 encode_ctx_unsigned(&mut w, 0, 100).unwrap();
4000 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4001 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
4002 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4003 encode_ctx_unsigned(&mut w, 1, 56).unwrap();
4004 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4005 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4006 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4007 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4008 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4009 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4010 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4011 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4012 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4013 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4014 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
4015 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
4016
4017 state
4018 .recv
4019 .lock()
4020 .await
4021 .push_back((with_npdu(w.as_written()), addr));
4022
4023 let notification = client
4024 .recv_event_notification(Duration::from_secs(1))
4025 .await
4026 .unwrap()
4027 .unwrap();
4028 assert!(notification.confirmed);
4029
4030 let sent = state.sent.lock().await;
4031 assert_eq!(sent.len(), 1);
4032 let mut r = Reader::new(&sent[0].1);
4033 let _npdu = Npdu::decode(&mut r).unwrap();
4034 let ack = SimpleAck::decode(&mut r).unwrap();
4035 assert_eq!(ack.invoke_id, 11);
4036 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
4037 }
4038
4039 #[tokio::test]
4040 async fn write_property_multiple_segments_large_request() {
4041 let (dl, state) = MockDataLink::new();
4042 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4043 let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
4044 let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
4045
4046 {
4047 let mut recv = state.recv.lock().await;
4048 for seq in 0u8..=254 {
4049 let mut apdu = [0u8; 16];
4050 let mut w = Writer::new(&mut apdu);
4051 SegmentAck {
4052 negative_ack: false,
4053 sent_by_server: true,
4054 invoke_id: 1,
4055 sequence_number: seq,
4056 actual_window_size: 1,
4057 }
4058 .encode(&mut w)
4059 .unwrap();
4060 recv.push_back((with_npdu(w.as_written()), addr));
4061 }
4062
4063 let mut apdu = [0u8; 16];
4064 let mut w = Writer::new(&mut apdu);
4065 SimpleAck {
4066 invoke_id: 1,
4067 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4068 }
4069 .encode(&mut w)
4070 .unwrap();
4071 recv.push_back((with_npdu(w.as_written()), addr));
4072 }
4073
4074 let writes: Vec<PropertyWriteSpec> = (0..180)
4075 .map(|_| PropertyWriteSpec {
4076 property_id: PropertyId::Description,
4077 array_index: None,
4078 value: DataValue::CharacterString(
4079 "rustbac segmented write test payload................................................................",
4080 ),
4081 priority: None,
4082 })
4083 .collect();
4084
4085 client
4086 .write_property_multiple(addr, object_id, &writes)
4087 .await
4088 .unwrap();
4089
4090 let sent = state.sent.lock().await;
4091 assert!(sent.len() > 1);
4092
4093 let mut seqs = Vec::new();
4094 let mut saw_more_follows = false;
4095 let mut saw_last = false;
4096 for (_, frame) in sent.iter() {
4097 let mut r = Reader::new(frame);
4098 let _npdu = Npdu::decode(&mut r).unwrap();
4099 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4100 assert!(hdr.segmented);
4101 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4102 if hdr.more_follows {
4103 saw_more_follows = true;
4104 } else {
4105 saw_last = true;
4106 }
4107 seqs.push(hdr.sequence_number.unwrap());
4108 }
4109
4110 assert!(saw_more_follows);
4111 assert!(saw_last);
4112 for (idx, seq) in seqs.iter().enumerate() {
4113 assert_eq!(*seq as usize, idx);
4114 }
4115 }
4116
4117 #[tokio::test]
4118 async fn write_property_multiple_uses_configured_segment_window() {
4119 let (dl, state) = MockDataLink::new();
4120 let client = BacnetClient::with_datalink(dl)
4121 .with_response_timeout(Duration::from_secs(1))
4122 .with_segmented_request_window_size(4);
4123 let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
4124 let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
4125
4126 {
4127 let mut recv = state.recv.lock().await;
4128 for seq in 0u8..=254 {
4129 let mut apdu = [0u8; 16];
4130 let mut w = Writer::new(&mut apdu);
4131 SegmentAck {
4132 negative_ack: false,
4133 sent_by_server: true,
4134 invoke_id: 1,
4135 sequence_number: seq,
4136 actual_window_size: 4,
4137 }
4138 .encode(&mut w)
4139 .unwrap();
4140 recv.push_back((with_npdu(w.as_written()), addr));
4141 }
4142
4143 let mut apdu = [0u8; 16];
4144 let mut w = Writer::new(&mut apdu);
4145 SimpleAck {
4146 invoke_id: 1,
4147 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4148 }
4149 .encode(&mut w)
4150 .unwrap();
4151 recv.push_back((with_npdu(w.as_written()), addr));
4152 }
4153
4154 let writes: Vec<PropertyWriteSpec> = (0..180)
4155 .map(|_| PropertyWriteSpec {
4156 property_id: PropertyId::Description,
4157 array_index: None,
4158 value: DataValue::CharacterString(
4159 "rustbac segmented write test payload................................................................",
4160 ),
4161 priority: None,
4162 })
4163 .collect();
4164
4165 client
4166 .write_property_multiple(addr, object_id, &writes)
4167 .await
4168 .unwrap();
4169
4170 let sent = state.sent.lock().await;
4171 assert!(sent.len() > 4);
4172 for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
4173 let mut r = Reader::new(frame);
4174 let _npdu = Npdu::decode(&mut r).unwrap();
4175 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4176 assert!(hdr.segmented);
4177 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4178 assert_eq!(hdr.sequence_number, Some(idx as u8));
4179 assert_eq!(hdr.proposed_window_size, Some(4));
4180 }
4181 }
4182
4183 #[tokio::test]
4184 async fn write_property_multiple_adapts_window_to_peer_ack_window() {
4185 let (dl, state) = MockDataLink::new();
4186 let client = BacnetClient::with_datalink(dl)
4187 .with_response_timeout(Duration::from_secs(1))
4188 .with_segmented_request_window_size(4);
4189 let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
4190 let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
4191
4192 {
4193 let mut recv = state.recv.lock().await;
4194 for seq in 0u8..=254 {
4195 let mut apdu = [0u8; 16];
4196 let mut w = Writer::new(&mut apdu);
4197 SegmentAck {
4198 negative_ack: false,
4199 sent_by_server: true,
4200 invoke_id: 1,
4201 sequence_number: seq,
4202 actual_window_size: 2,
4203 }
4204 .encode(&mut w)
4205 .unwrap();
4206 recv.push_back((with_npdu(w.as_written()), addr));
4207 }
4208
4209 let mut apdu = [0u8; 16];
4210 let mut w = Writer::new(&mut apdu);
4211 SimpleAck {
4212 invoke_id: 1,
4213 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4214 }
4215 .encode(&mut w)
4216 .unwrap();
4217 recv.push_back((with_npdu(w.as_written()), addr));
4218 }
4219
4220 let writes: Vec<PropertyWriteSpec> = (0..180)
4221 .map(|_| PropertyWriteSpec {
4222 property_id: PropertyId::Description,
4223 array_index: None,
4224 value: DataValue::CharacterString(
4225 "rustbac segmented write test payload................................................................",
4226 ),
4227 priority: None,
4228 })
4229 .collect();
4230
4231 client
4232 .write_property_multiple(addr, object_id, &writes)
4233 .await
4234 .unwrap();
4235
4236 let sent = state.sent.lock().await;
4237 let mut saw_adapted_window = false;
4238 for (_, frame) in sent.iter() {
4239 let mut r = Reader::new(frame);
4240 let _npdu = Npdu::decode(&mut r).unwrap();
4241 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4242 if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
4243 saw_adapted_window = true;
4244 break;
4245 }
4246 }
4247 assert!(saw_adapted_window);
4248 }
4249
4250 #[tokio::test]
4251 async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
4252 let (dl, state) = MockDataLink::new();
4253 let client = BacnetClient::with_datalink(dl)
4254 .with_response_timeout(Duration::from_secs(1))
4255 .with_segmented_request_window_size(1)
4256 .with_segmented_request_retries(1);
4257 let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
4258 let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
4259
4260 {
4261 let mut recv = state.recv.lock().await;
4262
4263 let mut nack_apdu = [0u8; 16];
4264 let mut nack_w = Writer::new(&mut nack_apdu);
4265 SegmentAck {
4266 negative_ack: true,
4267 sent_by_server: true,
4268 invoke_id: 1,
4269 sequence_number: 0,
4270 actual_window_size: 1,
4271 }
4272 .encode(&mut nack_w)
4273 .unwrap();
4274 recv.push_back((with_npdu(nack_w.as_written()), addr));
4275
4276 for seq in 0u8..=254 {
4277 let mut apdu = [0u8; 16];
4278 let mut w = Writer::new(&mut apdu);
4279 SegmentAck {
4280 negative_ack: false,
4281 sent_by_server: true,
4282 invoke_id: 1,
4283 sequence_number: seq,
4284 actual_window_size: 1,
4285 }
4286 .encode(&mut w)
4287 .unwrap();
4288 recv.push_back((with_npdu(w.as_written()), addr));
4289 }
4290
4291 let mut apdu = [0u8; 16];
4292 let mut w = Writer::new(&mut apdu);
4293 SimpleAck {
4294 invoke_id: 1,
4295 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4296 }
4297 .encode(&mut w)
4298 .unwrap();
4299 recv.push_back((with_npdu(w.as_written()), addr));
4300 }
4301
4302 let writes: Vec<PropertyWriteSpec> = (0..180)
4303 .map(|_| PropertyWriteSpec {
4304 property_id: PropertyId::Description,
4305 array_index: None,
4306 value: DataValue::CharacterString(
4307 "rustbac segmented write test payload................................................................",
4308 ),
4309 priority: None,
4310 })
4311 .collect();
4312
4313 client
4314 .write_property_multiple(addr, object_id, &writes)
4315 .await
4316 .unwrap();
4317
4318 let sent = state.sent.lock().await;
4319 let mut seq0_frames = 0usize;
4320 for (_, frame) in sent.iter() {
4321 let mut r = Reader::new(frame);
4322 let _npdu = Npdu::decode(&mut r).unwrap();
4323 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4324 if hdr.sequence_number == Some(0) {
4325 seq0_frames += 1;
4326 }
4327 }
4328 assert!(seq0_frames >= 2);
4329 }
4330
4331 #[tokio::test]
4332 async fn read_property_ignores_invalid_frames_until_valid_response() {
4333 let (dl, state) = MockDataLink::new();
4334 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4335 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4336 let state_for_task = state.clone();
4337
4338 tokio::spawn(async move {
4339 tokio::time::sleep(Duration::from_millis(20)).await;
4340 let mut apdu = [0u8; 128];
4341 let mut w = Writer::new(&mut apdu);
4342 ComplexAckHeader {
4343 segmented: false,
4344 more_follows: false,
4345 invoke_id: 1,
4346 sequence_number: None,
4347 proposed_window_size: None,
4348 service_choice: SERVICE_READ_PROPERTY,
4349 }
4350 .encode(&mut w)
4351 .unwrap();
4352 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4353 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4354 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4355 encode_app_real(&mut w, 77.0).unwrap();
4356 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4357 state_for_task
4358 .recv
4359 .lock()
4360 .await
4361 .push_back((with_npdu(w.as_written()), addr));
4362 });
4363
4364 let value = client
4365 .read_property(
4366 addr,
4367 ObjectId::new(ObjectType::Device, 1),
4368 PropertyId::PresentValue,
4369 )
4370 .await
4371 .unwrap();
4372 assert!(matches!(
4373 value,
4374 ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4375 ));
4376 }
4377
4378 #[tokio::test]
4379 async fn read_property_maps_reject() {
4380 let (dl, state) = MockDataLink::new();
4381 let client = BacnetClient::with_datalink(dl);
4382 let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4383
4384 let mut apdu = [0u8; 8];
4385 let mut w = Writer::new(&mut apdu);
4386 w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4387 w.write_u8(1).unwrap(); w.write_u8(2).unwrap(); state
4390 .recv
4391 .lock()
4392 .await
4393 .push_back((with_npdu(w.as_written()), addr));
4394
4395 let err = client
4396 .read_property(
4397 addr,
4398 ObjectId::new(ObjectType::Device, 1),
4399 PropertyId::ObjectName,
4400 )
4401 .await
4402 .unwrap_err();
4403 assert!(matches!(
4404 err,
4405 crate::ClientError::RemoteReject { reason: 2 }
4406 ));
4407 }
4408
4409 #[tokio::test]
4410 async fn read_property_maps_remote_error_details() {
4411 let (dl, state) = MockDataLink::new();
4412 let client = BacnetClient::with_datalink(dl);
4413 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4414
4415 let mut apdu = [0u8; 16];
4416 let mut w = Writer::new(&mut apdu);
4417 w.write_u8((ApduType::Error as u8) << 4).unwrap();
4418 w.write_u8(1).unwrap(); w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4420 .unwrap();
4421 Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4422 w.write_u8(2).unwrap(); Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4424 w.write_u8(32).unwrap(); state
4427 .recv
4428 .lock()
4429 .await
4430 .push_back((with_npdu(w.as_written()), addr));
4431
4432 let err = client
4433 .read_property(
4434 addr,
4435 ObjectId::new(ObjectType::Device, 1),
4436 PropertyId::ObjectName,
4437 )
4438 .await
4439 .unwrap_err();
4440 assert!(matches!(
4441 err,
4442 crate::ClientError::RemoteServiceError {
4443 service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4444 error_class_raw: Some(2),
4445 error_code_raw: Some(32),
4446 error_class: Some(rustbac_core::types::ErrorClass::Property),
4447 error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4448 }
4449 ));
4450 }
4451
4452 #[tokio::test]
4453 async fn write_property_maps_abort() {
4454 let (dl, state) = MockDataLink::new();
4455 let client = BacnetClient::with_datalink(dl);
4456 let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4457
4458 let mut apdu = [0u8; 8];
4459 let mut w = Writer::new(&mut apdu);
4460 w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); w.write_u8(1).unwrap(); w.write_u8(9).unwrap(); state
4464 .recv
4465 .lock()
4466 .await
4467 .push_back((with_npdu(w.as_written()), addr));
4468
4469 let req = rustbac_core::services::write_property::WritePropertyRequest {
4470 object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4471 property_id: PropertyId::PresentValue,
4472 value: DataValue::Real(10.0),
4473 priority: Some(8),
4474 ..Default::default()
4475 };
4476 let err = client.write_property(addr, req).await.unwrap_err();
4477 assert!(matches!(
4478 err,
4479 crate::ClientError::RemoteAbort {
4480 reason: 9,
4481 server: true
4482 }
4483 ));
4484 }
4485
4486 #[tokio::test]
4487 async fn read_property_multiple_returns_owned_string() {
4488 let (dl, state) = MockDataLink::new();
4489 let client = BacnetClient::with_datalink(dl);
4490 let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4491 let object_id = ObjectId::new(ObjectType::Device, 1);
4492
4493 let mut apdu_buf = [0u8; 256];
4494 let mut w = Writer::new(&mut apdu_buf);
4495 ComplexAckHeader {
4496 segmented: false,
4497 more_follows: false,
4498 invoke_id: 1,
4499 sequence_number: None,
4500 proposed_window_size: None,
4501 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4502 }
4503 .encode(&mut w)
4504 .unwrap();
4505 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4506 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4507 .encode(&mut w)
4508 .unwrap();
4509 encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4510 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4511 .encode(&mut w)
4512 .unwrap();
4513 rustbac_core::services::value_codec::encode_application_data_value(
4514 &mut w,
4515 &DataValue::CharacterString("AHU-1"),
4516 )
4517 .unwrap();
4518 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4519 .encode(&mut w)
4520 .unwrap();
4521 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4522 .encode(&mut w)
4523 .unwrap();
4524
4525 state
4526 .recv
4527 .lock()
4528 .await
4529 .push_back((with_npdu(w.as_written()), addr));
4530
4531 let values = client
4532 .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4533 .await
4534 .unwrap();
4535 assert_eq!(values.len(), 1);
4536 assert_eq!(values[0].0, PropertyId::ObjectName);
4537 assert!(matches!(
4538 &values[0].1,
4539 ClientDataValue::CharacterString(s) if s == "AHU-1"
4540 ));
4541 }
4542
4543 #[tokio::test]
4544 async fn new_sc_rejects_invalid_endpoint() {
4545 let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4546 assert!(matches!(err, crate::ClientError::DataLink(_)));
4547 }
4548}