1use crate::{
2 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3 ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4 DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5 EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9 AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10 SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15 AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18 AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19 SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22 AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25 AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32 DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33 ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36 EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37 GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40 EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41 SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50 SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53 CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54 SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57 ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61 ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64 PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65 ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77 PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78 SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83 BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::HashSet;
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::time::Duration;
89use tokio::sync::Mutex;
90use tokio::task::JoinHandle;
91use tokio::time::{timeout, Instant};
92
93const MIN_SEGMENT_DATA_LEN: usize = 32;
94const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
95
96#[derive(Debug)]
97pub struct BacnetClient<D: DataLink> {
98 datalink: D,
99 invoke_id: Mutex<u8>,
100 request_io_lock: Mutex<()>,
101 response_timeout: Duration,
102 segmented_request_window_size: u8,
103 segmented_request_retries: u8,
104 segment_ack_timeout: Duration,
105}
106
107#[derive(Debug)]
108pub struct ForeignDeviceRenewal {
109 task: JoinHandle<()>,
110}
111
112impl ForeignDeviceRenewal {
113 pub fn stop(self) {
114 self.task.abort();
115 }
116}
117
118impl Drop for ForeignDeviceRenewal {
119 fn drop(&mut self) {
120 self.task.abort();
121 }
122}
123
124impl BacnetClient<BacnetIpTransport> {
125 pub async fn new() -> Result<Self, ClientError> {
126 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
127 let datalink = BacnetIpTransport::bind(bind_addr).await?;
128 Ok(Self {
129 datalink,
130 invoke_id: Mutex::new(1),
131 request_io_lock: Mutex::new(()),
132 response_timeout: Duration::from_secs(3),
133 segmented_request_window_size: 1,
134 segmented_request_retries: 2,
135 segment_ack_timeout: Duration::from_millis(500),
136 })
137 }
138
139 pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
140 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
141 let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
142 datalink.register_foreign_device(ttl_seconds).await?;
143 Ok(Self {
144 datalink,
145 invoke_id: Mutex::new(1),
146 request_io_lock: Mutex::new(()),
147 response_timeout: Duration::from_secs(3),
148 segmented_request_window_size: 1,
149 segmented_request_retries: 2,
150 segment_ack_timeout: Duration::from_millis(500),
151 })
152 }
153
154 pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
155 self.datalink.register_foreign_device(ttl_seconds).await?;
156 Ok(())
157 }
158
159 pub async fn read_broadcast_distribution_table(
160 &self,
161 ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
162 self.datalink
163 .read_broadcast_distribution_table()
164 .await
165 .map_err(ClientError::from)
166 }
167
168 pub async fn write_broadcast_distribution_table(
169 &self,
170 entries: &[BroadcastDistributionEntry],
171 ) -> Result<(), ClientError> {
172 self.datalink
173 .write_broadcast_distribution_table(entries)
174 .await?;
175 Ok(())
176 }
177
178 pub async fn read_foreign_device_table(
179 &self,
180 ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
181 self.datalink
182 .read_foreign_device_table()
183 .await
184 .map_err(ClientError::from)
185 }
186
187 pub async fn delete_foreign_device_table_entry(
188 &self,
189 address: SocketAddrV4,
190 ) -> Result<(), ClientError> {
191 self.datalink
192 .delete_foreign_device_table_entry(address)
193 .await?;
194 Ok(())
195 }
196
197 pub fn start_foreign_device_renewal(
198 &self,
199 ttl_seconds: u16,
200 ) -> Result<ForeignDeviceRenewal, ClientError> {
201 if ttl_seconds == 0 {
202 return Err(EncodeError::InvalidLength.into());
203 }
204
205 let datalink = self.datalink.clone();
206 let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
207 let interval = Duration::from_secs(refresh_seconds.max(1));
208 let task = tokio::spawn(async move {
209 loop {
210 tokio::time::sleep(interval).await;
211 if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
212 log::warn!("foreign device renewal send failed: {err}");
213 }
214 }
215 });
216 Ok(ForeignDeviceRenewal { task })
217 }
218}
219
220impl BacnetClient<BacnetScTransport> {
221 pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
222 let datalink = BacnetScTransport::connect(endpoint).await?;
223 Ok(Self::with_datalink(datalink))
224 }
225}
226
227impl<D: DataLink> BacnetClient<D> {
228 pub fn with_datalink(datalink: D) -> Self {
229 Self {
230 datalink,
231 invoke_id: Mutex::new(1),
232 request_io_lock: Mutex::new(()),
233 response_timeout: Duration::from_secs(3),
234 segmented_request_window_size: 1,
235 segmented_request_retries: 2,
236 segment_ack_timeout: Duration::from_millis(500),
237 }
238 }
239
240 pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
241 self.response_timeout = timeout;
242 self
243 }
244
245 pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
246 self.segmented_request_window_size = window_size.max(1);
247 self
248 }
249
250 pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
251 self.segmented_request_retries = retries;
252 self
253 }
254
255 pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
256 self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
257 self
258 }
259
260 async fn next_invoke_id(&self) -> u8 {
261 let mut lock = self.invoke_id.lock().await;
262 let id = *lock;
263 *lock = lock.wrapping_add(1);
264 if *lock == 0 {
265 *lock = 1;
266 }
267 id
268 }
269
270 async fn send_segment_ack(
271 &self,
272 address: DataLinkAddress,
273 invoke_id: u8,
274 sequence_number: u8,
275 window_size: u8,
276 ) -> Result<(), ClientError> {
277 let mut tx = [0u8; 64];
278 let mut w = Writer::new(&mut tx);
279 Npdu::new(0).encode(&mut w)?;
280 SegmentAck {
281 negative_ack: false,
282 sent_by_server: false,
283 invoke_id,
284 sequence_number,
285 actual_window_size: window_size,
286 }
287 .encode(&mut w)?;
288 self.datalink.send(address, w.as_written()).await?;
289 Ok(())
290 }
291
292 async fn recv_ignoring_invalid_frame(
293 &self,
294 buf: &mut [u8],
295 deadline: Instant,
296 ) -> Result<(usize, DataLinkAddress), ClientError> {
297 loop {
298 let remaining = deadline.saturating_duration_since(Instant::now());
299 if remaining.is_zero() {
300 return Err(ClientError::Timeout);
301 }
302
303 match timeout(remaining, self.datalink.recv(buf)).await {
304 Err(_) => return Err(ClientError::Timeout),
305 Ok(Err(DataLinkError::InvalidFrame)) => continue,
306 Ok(Err(e)) => return Err(e.into()),
307 Ok(Ok(v)) => return Ok(v),
308 }
309 }
310 }
311
312 async fn send_simple_ack(
313 &self,
314 address: DataLinkAddress,
315 invoke_id: u8,
316 service_choice: u8,
317 ) -> Result<(), ClientError> {
318 let mut tx = [0u8; 64];
319 let mut w = Writer::new(&mut tx);
320 Npdu::new(0).encode(&mut w)?;
321 SimpleAck {
322 invoke_id,
323 service_choice,
324 }
325 .encode(&mut w)?;
326 self.datalink.send(address, w.as_written()).await?;
327 Ok(())
328 }
329
330 fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
331 where
332 F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
333 {
334 for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
335 let mut buf = vec![0u8; size];
336 let mut w = Writer::new(&mut buf);
337 match encode(&mut w) {
338 Ok(()) => {
339 let written_len = w.as_written().len();
340 buf.truncate(written_len);
341 return Ok(buf);
342 }
343 Err(EncodeError::BufferTooSmall) => continue,
344 Err(e) => return Err(e.into()),
345 }
346 }
347 Err(ClientError::SegmentedRequestTooLarge)
348 }
349
350 const fn max_apdu_octets(max_apdu_code: u8) -> usize {
351 match max_apdu_code & 0x0f {
352 0 => 50,
353 1 => 128,
354 2 => 206,
355 3 => 480,
356 4 => 1024,
357 5 => 1476,
358 _ => 480,
359 }
360 }
361
362 async fn await_segment_ack(
363 &self,
364 address: DataLinkAddress,
365 invoke_id: u8,
366 service_choice: u8,
367 expected_sequence: u8,
368 deadline: Instant,
369 ) -> Result<SegmentAck, ClientError> {
370 loop {
371 let remaining = deadline.saturating_duration_since(Instant::now());
372 if remaining.is_zero() {
373 return Err(ClientError::Timeout);
374 }
375
376 let mut rx = [0u8; 1500];
377 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
378 let (n, src) = match recv {
379 Err(_) => return Err(ClientError::Timeout),
380 Ok(Err(DataLinkError::InvalidFrame)) => continue,
381 Ok(Err(e)) => return Err(e.into()),
382 Ok(Ok(v)) => v,
383 };
384 if src != address {
385 continue;
386 }
387
388 let Ok(apdu) = extract_apdu(&rx[..n]) else {
389 continue;
390 };
391 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
392 match ApduType::from_u8(first >> 4) {
393 Some(ApduType::SegmentAck) => {
394 let mut r = Reader::new(apdu);
395 let ack = SegmentAck::decode(&mut r)?;
396 if ack.invoke_id != invoke_id || !ack.sent_by_server {
397 continue;
398 }
399 if ack.negative_ack {
400 return Err(ClientError::SegmentNegativeAck {
401 sequence_number: ack.sequence_number,
402 });
403 }
404 if ack.sequence_number == expected_sequence {
405 return Ok(ack);
406 }
407 }
408 Some(ApduType::Error) => {
409 let mut r = Reader::new(apdu);
410 let err = BacnetError::decode(&mut r)?;
411 if err.invoke_id == invoke_id && err.service_choice == service_choice {
412 return Err(remote_service_error(err));
413 }
414 }
415 Some(ApduType::Reject) => {
416 let mut r = Reader::new(apdu);
417 let rej = RejectPdu::decode(&mut r)?;
418 if rej.invoke_id == invoke_id {
419 return Err(ClientError::RemoteReject { reason: rej.reason });
420 }
421 }
422 Some(ApduType::Abort) => {
423 let mut r = Reader::new(apdu);
424 let abort = AbortPdu::decode(&mut r)?;
425 if abort.invoke_id == invoke_id {
426 return Err(ClientError::RemoteAbort {
427 reason: abort.reason,
428 server: abort.server,
429 });
430 }
431 }
432 _ => continue,
433 }
434 }
435 }
436
437 async fn send_confirmed_request(
438 &self,
439 address: DataLinkAddress,
440 frame: &[u8],
441 deadline: Instant,
442 ) -> Result<(), ClientError> {
443 let mut pr = Reader::new(frame);
444 let _npdu = Npdu::decode(&mut pr)?;
445 let npdu_len = frame.len() - pr.remaining();
446 let npdu_bytes = &frame[..npdu_len];
447 let apdu = &frame[npdu_len..];
448
449 let mut ar = Reader::new(apdu);
450 let header = ConfirmedRequestHeader::decode(&mut ar)?;
451 let service_payload = ar.read_exact(ar.remaining())?;
452
453 let segment_data_len = Self::max_apdu_octets(header.max_apdu)
454 .saturating_sub(5)
455 .max(MIN_SEGMENT_DATA_LEN);
456 let segment_count = service_payload.len().div_ceil(segment_data_len);
457
458 if segment_count <= 1 {
459 self.datalink.send(address, frame).await?;
460 return Ok(());
461 }
462
463 if segment_count > usize::from(u8::MAX) + 1 {
464 return Err(ClientError::SegmentedRequestTooLarge);
465 }
466
467 let configured_window_size = self.segmented_request_window_size.max(1);
468 let mut window_size = configured_window_size;
469 let mut peer_window_ceiling = configured_window_size;
470 let mut batch_start = 0usize;
471 while batch_start < segment_count {
472 let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
473 let expected_sequence = (batch_end - 1) as u8;
474
475 let mut frames = Vec::with_capacity(batch_end - batch_start);
476 for segment_index in batch_start..batch_end {
477 let seq = segment_index as u8;
478 let more_follows = segment_index + 1 < segment_count;
479 let start = segment_index * segment_data_len;
480 let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
481 let segment = &service_payload[start..end];
482
483 let seg_header = ConfirmedRequestHeader {
484 segmented: true,
485 more_follows,
486 segmented_response_accepted: header.segmented_response_accepted,
487 max_segments: header.max_segments,
488 max_apdu: header.max_apdu,
489 invoke_id: header.invoke_id,
490 sequence_number: Some(seq),
491 proposed_window_size: Some(window_size),
492 service_choice: header.service_choice,
493 };
494
495 let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
496 let written_len = {
497 let mut w = Writer::new(&mut tx);
498 w.write_all(npdu_bytes)?;
499 seg_header.encode(&mut w)?;
500 w.write_all(segment)?;
501 w.as_written().len()
502 };
503 tx.truncate(written_len);
504 frames.push(tx);
505 }
506
507 let mut retries_remaining = self.segmented_request_retries;
508 loop {
509 for frame in &frames {
510 self.datalink.send(address, frame).await?;
511 }
512
513 if batch_end == segment_count {
514 break;
515 }
516
517 let remaining = deadline.saturating_duration_since(Instant::now());
518 if remaining.is_zero() {
519 return Err(ClientError::Timeout);
520 }
521 let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
522 match self
523 .await_segment_ack(
524 address,
525 header.invoke_id,
526 header.service_choice,
527 expected_sequence,
528 ack_wait_deadline,
529 )
530 .await
531 {
532 Ok(ack) => {
533 peer_window_ceiling =
534 peer_window_ceiling.min(ack.actual_window_size.max(1));
535 window_size = window_size
536 .saturating_add(1)
537 .min(configured_window_size)
538 .min(peer_window_ceiling)
539 .max(1);
540 break;
541 }
542 Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
543 if retries_remaining > 0 =>
544 {
545 retries_remaining -= 1;
546 window_size = window_size.saturating_div(2).max(1);
547 continue;
548 }
549 Err(e) => return Err(e),
550 }
551 }
552
553 batch_start = batch_end;
554 }
555
556 Ok(())
557 }
558
559 async fn collect_complex_ack_payload(
560 &self,
561 address: DataLinkAddress,
562 invoke_id: u8,
563 service_choice: u8,
564 first_header: ComplexAckHeader,
565 first_payload: &[u8],
566 deadline: Instant,
567 ) -> Result<Vec<u8>, ClientError> {
568 let mut payload = first_payload.to_vec();
569 if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
570 return Err(ClientError::ResponseTooLarge {
571 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
572 });
573 }
574 if !first_header.segmented {
575 return Ok(payload);
576 }
577
578 let mut last_seq = first_header
579 .sequence_number
580 .ok_or(ClientError::UnsupportedResponse)?;
581 let mut window_size = first_header.proposed_window_size.unwrap_or(1);
582 self.send_segment_ack(address, invoke_id, last_seq, window_size)
583 .await?;
584 let mut more_follows = first_header.more_follows;
585
586 while more_follows {
587 let mut rx = [0u8; 1500];
588 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
589 if src != address {
590 continue;
591 }
592
593 let Ok(apdu) = extract_apdu(&rx[..n]) else {
594 continue;
595 };
596 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
597 match ApduType::from_u8(first >> 4) {
598 Some(ApduType::ComplexAck) => {
599 let mut r = Reader::new(apdu);
600 let seg = ComplexAckHeader::decode(&mut r)?;
601 if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
602 continue;
603 }
604 if !seg.segmented {
605 return Err(ClientError::UnsupportedResponse);
606 }
607 let seq = seg
608 .sequence_number
609 .ok_or(ClientError::UnsupportedResponse)?;
610 if seq == last_seq {
611 self.send_segment_ack(address, invoke_id, last_seq, window_size)
613 .await?;
614 continue;
615 }
616 if seq != last_seq.wrapping_add(1) {
617 continue;
618 }
619
620 let seg_payload = r.read_exact(r.remaining())?;
621 if payload.len().saturating_add(seg_payload.len())
622 > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
623 {
624 return Err(ClientError::ResponseTooLarge {
625 limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
626 });
627 }
628 payload.extend_from_slice(seg_payload);
629
630 last_seq = seq;
631 more_follows = seg.more_follows;
632 window_size = seg.proposed_window_size.unwrap_or(window_size);
633 self.send_segment_ack(address, invoke_id, last_seq, window_size)
634 .await?;
635 }
636 Some(ApduType::Error) => {
637 let mut r = Reader::new(apdu);
638 let err = BacnetError::decode(&mut r)?;
639 if err.invoke_id == invoke_id && err.service_choice == service_choice {
640 return Err(remote_service_error(err));
641 }
642 }
643 Some(ApduType::Reject) => {
644 let mut r = Reader::new(apdu);
645 let rej = RejectPdu::decode(&mut r)?;
646 if rej.invoke_id == invoke_id {
647 return Err(ClientError::RemoteReject { reason: rej.reason });
648 }
649 }
650 Some(ApduType::Abort) => {
651 let mut r = Reader::new(apdu);
652 let abort = AbortPdu::decode(&mut r)?;
653 if abort.invoke_id == invoke_id {
654 return Err(ClientError::RemoteAbort {
655 reason: abort.reason,
656 server: abort.server,
657 });
658 }
659 }
660 _ => continue,
661 }
662 }
663
664 Ok(payload)
665 }
666
667 pub async fn who_is(
668 &self,
669 range: Option<(u32, u32)>,
670 wait: Duration,
671 ) -> Result<Vec<DiscoveredDevice>, ClientError> {
672 let _io_lock = self.request_io_lock.lock().await;
673 let req = match range {
674 Some((low, high)) => WhoIsRequest {
675 low_limit: Some(low),
676 high_limit: Some(high),
677 },
678 None => WhoIsRequest::global(),
679 };
680
681 let mut tx = [0u8; 128];
682 let mut w = Writer::new(&mut tx);
683 Npdu::new(0).encode(&mut w)?;
684 req.encode(&mut w)?;
685
686 self.datalink
687 .send(
688 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
689 w.as_written(),
690 )
691 .await?;
692
693 let mut devices = Vec::new();
694 let mut seen = HashSet::new();
695 let deadline = tokio::time::Instant::now() + wait;
696
697 while tokio::time::Instant::now() < deadline {
698 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
699 let mut rx = [0u8; 1500];
700 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
701 match recv {
702 Ok(Ok((n, src))) => {
703 let Ok(apdu) = extract_apdu(&rx[..n]) else {
704 continue;
705 };
706 let mut r = Reader::new(apdu);
707 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
708 continue;
709 };
710 if unconfirmed.service_choice != SERVICE_I_AM {
711 continue;
712 }
713 let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
714 continue;
715 };
716 if seen.insert(src) {
717 devices.push(DiscoveredDevice {
718 address: src,
719 device_id: Some(i_am.device_id),
720 });
721 }
722 }
723 Ok(Err(DataLinkError::InvalidFrame)) => continue,
724 Ok(Err(e)) => return Err(e.into()),
725 Err(_) => break,
726 }
727 }
728
729 Ok(devices)
730 }
731
732 pub async fn who_has_object_id(
733 &self,
734 range: Option<(u32, u32)>,
735 object_id: ObjectId,
736 wait: Duration,
737 ) -> Result<Vec<DiscoveredObject>, ClientError> {
738 let req = WhoHasRequest {
739 low_limit: range.map(|(low, _)| low),
740 high_limit: range.map(|(_, high)| high),
741 object: WhoHasObject::ObjectId(object_id),
742 };
743 self.who_has(req, wait).await
744 }
745
746 pub async fn who_has_object_name(
747 &self,
748 range: Option<(u32, u32)>,
749 object_name: &str,
750 wait: Duration,
751 ) -> Result<Vec<DiscoveredObject>, ClientError> {
752 let req = WhoHasRequest {
753 low_limit: range.map(|(low, _)| low),
754 high_limit: range.map(|(_, high)| high),
755 object: WhoHasObject::ObjectName(object_name),
756 };
757 self.who_has(req, wait).await
758 }
759
760 async fn who_has(
761 &self,
762 request: WhoHasRequest<'_>,
763 wait: Duration,
764 ) -> Result<Vec<DiscoveredObject>, ClientError> {
765 let _io_lock = self.request_io_lock.lock().await;
766 let tx = self.encode_with_growth(|w| {
767 Npdu::new(0).encode(w)?;
768 request.encode(w)
769 })?;
770 self.datalink
771 .send(
772 DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
773 &tx,
774 )
775 .await?;
776
777 let mut objects = Vec::new();
778 let mut seen = HashSet::new();
779 let deadline = tokio::time::Instant::now() + wait;
780
781 while tokio::time::Instant::now() < deadline {
782 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
783 let mut rx = [0u8; 1500];
784 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
785 match recv {
786 Ok(Ok((n, src))) => {
787 let Ok(apdu) = extract_apdu(&rx[..n]) else {
788 continue;
789 };
790 let mut r = Reader::new(apdu);
791 let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
792 continue;
793 };
794 if unconfirmed.service_choice != SERVICE_I_HAVE {
795 continue;
796 }
797 let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
798 continue;
799 };
800 if !seen.insert((src, i_have.object_id.raw())) {
801 continue;
802 }
803 objects.push(DiscoveredObject {
804 address: src,
805 device_id: i_have.device_id,
806 object_id: i_have.object_id,
807 object_name: i_have.object_name.to_string(),
808 });
809 }
810 Ok(Err(DataLinkError::InvalidFrame)) => continue,
811 Ok(Err(e)) => return Err(e.into()),
812 Err(_) => break,
813 }
814 }
815
816 Ok(objects)
817 }
818
819 pub async fn device_communication_control(
820 &self,
821 address: DataLinkAddress,
822 time_duration_seconds: Option<u16>,
823 enable_disable: DeviceCommunicationState,
824 password: Option<&str>,
825 ) -> Result<(), ClientError> {
826 let invoke_id = self.next_invoke_id().await;
827 let request = DeviceCommunicationControlRequest {
828 time_duration_seconds,
829 enable_disable,
830 password,
831 invoke_id,
832 };
833 let tx = self.encode_with_growth(|w| {
834 Npdu::new(0).encode(w)?;
835 request.encode(w)
836 })?;
837 self.await_simple_ack_or_error(
838 address,
839 &tx,
840 invoke_id,
841 SERVICE_DEVICE_COMMUNICATION_CONTROL,
842 self.response_timeout,
843 )
844 .await
845 }
846
847 pub async fn reinitialize_device(
848 &self,
849 address: DataLinkAddress,
850 state: ReinitializeState,
851 password: Option<&str>,
852 ) -> Result<(), ClientError> {
853 let invoke_id = self.next_invoke_id().await;
854 let request = ReinitializeDeviceRequest {
855 state,
856 password,
857 invoke_id,
858 };
859 let tx = self.encode_with_growth(|w| {
860 Npdu::new(0).encode(w)?;
861 request.encode(w)
862 })?;
863 self.await_simple_ack_or_error(
864 address,
865 &tx,
866 invoke_id,
867 SERVICE_REINITIALIZE_DEVICE,
868 self.response_timeout,
869 )
870 .await
871 }
872
873 pub async fn time_synchronize(
874 &self,
875 address: DataLinkAddress,
876 date: Date,
877 time: Time,
878 utc: bool,
879 ) -> Result<(), ClientError> {
880 let request = if utc {
881 TimeSynchronizationRequest::utc(date, time)
882 } else {
883 TimeSynchronizationRequest::local(date, time)
884 };
885 let tx = self.encode_with_growth(|w| {
886 Npdu::new(0).encode(w)?;
887 request.encode(w)
888 })?;
889 self.datalink.send(address, &tx).await?;
890 Ok(())
891 }
892
893 pub async fn create_object_by_type(
894 &self,
895 address: DataLinkAddress,
896 object_type: rustbac_core::types::ObjectType,
897 ) -> Result<ObjectId, ClientError> {
898 self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
899 .await
900 }
901
902 pub async fn create_object(
903 &self,
904 address: DataLinkAddress,
905 mut request: CreateObjectRequest,
906 ) -> Result<ObjectId, ClientError> {
907 request.invoke_id = self.next_invoke_id().await;
908 let invoke_id = request.invoke_id;
909 let tx = self.encode_with_growth(|w| {
910 Npdu::new(0).encode(w)?;
911 request.encode(w)
912 })?;
913 let payload = self
914 .await_complex_ack_payload_or_error(
915 address,
916 &tx,
917 invoke_id,
918 SERVICE_CREATE_OBJECT,
919 self.response_timeout,
920 )
921 .await?;
922 let mut pr = Reader::new(&payload);
923 let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
924 Ok(parsed.object_id)
925 }
926
927 pub async fn delete_object(
928 &self,
929 address: DataLinkAddress,
930 object_id: ObjectId,
931 ) -> Result<(), ClientError> {
932 let invoke_id = self.next_invoke_id().await;
933 let request = DeleteObjectRequest {
934 object_id,
935 invoke_id,
936 };
937 let tx = self.encode_with_growth(|w| {
938 Npdu::new(0).encode(w)?;
939 request.encode(w)
940 })?;
941 self.await_simple_ack_or_error(
942 address,
943 &tx,
944 invoke_id,
945 SERVICE_DELETE_OBJECT,
946 self.response_timeout,
947 )
948 .await
949 }
950
951 pub async fn add_list_element(
952 &self,
953 address: DataLinkAddress,
954 mut request: AddListElementRequest<'_>,
955 ) -> Result<(), ClientError> {
956 request.invoke_id = self.next_invoke_id().await;
957 let invoke_id = request.invoke_id;
958 let tx = self.encode_with_growth(|w| {
959 Npdu::new(0).encode(w)?;
960 request.encode(w)
961 })?;
962 self.await_simple_ack_or_error(
963 address,
964 &tx,
965 invoke_id,
966 SERVICE_ADD_LIST_ELEMENT,
967 self.response_timeout,
968 )
969 .await
970 }
971
972 pub async fn remove_list_element(
973 &self,
974 address: DataLinkAddress,
975 mut request: RemoveListElementRequest<'_>,
976 ) -> Result<(), ClientError> {
977 request.invoke_id = self.next_invoke_id().await;
978 let invoke_id = request.invoke_id;
979 let tx = self.encode_with_growth(|w| {
980 Npdu::new(0).encode(w)?;
981 request.encode(w)
982 })?;
983 self.await_simple_ack_or_error(
984 address,
985 &tx,
986 invoke_id,
987 SERVICE_REMOVE_LIST_ELEMENT,
988 self.response_timeout,
989 )
990 .await
991 }
992
993 async fn await_simple_ack_or_error(
994 &self,
995 address: DataLinkAddress,
996 tx: &[u8],
997 invoke_id: u8,
998 service_choice: u8,
999 timeout_window: Duration,
1000 ) -> Result<(), ClientError> {
1001 let _io_lock = self.request_io_lock.lock().await;
1002 let deadline = tokio::time::Instant::now() + timeout_window;
1003 self.send_confirmed_request(address, tx, deadline).await?;
1004 while tokio::time::Instant::now() < deadline {
1005 let mut rx = [0u8; 1500];
1006 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1007 if src != address {
1008 continue;
1009 }
1010 let apdu = extract_apdu(&rx[..n])?;
1011 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1012 match ApduType::from_u8(first >> 4) {
1013 Some(ApduType::SimpleAck) => {
1014 let mut r = Reader::new(apdu);
1015 let ack = SimpleAck::decode(&mut r)?;
1016 if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1017 return Ok(());
1018 }
1019 }
1020 Some(ApduType::Error) => {
1021 let mut r = Reader::new(apdu);
1022 let err = BacnetError::decode(&mut r)?;
1023 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1024 return Err(remote_service_error(err));
1025 }
1026 }
1027 Some(ApduType::Reject) => {
1028 let mut r = Reader::new(apdu);
1029 let rej = RejectPdu::decode(&mut r)?;
1030 if rej.invoke_id == invoke_id {
1031 return Err(ClientError::RemoteReject { reason: rej.reason });
1032 }
1033 }
1034 Some(ApduType::Abort) => {
1035 let mut r = Reader::new(apdu);
1036 let abort = AbortPdu::decode(&mut r)?;
1037 if abort.invoke_id == invoke_id {
1038 return Err(ClientError::RemoteAbort {
1039 reason: abort.reason,
1040 server: abort.server,
1041 });
1042 }
1043 }
1044 _ => continue,
1045 }
1046 }
1047 Err(ClientError::Timeout)
1048 }
1049
1050 async fn await_complex_ack_payload_or_error(
1051 &self,
1052 address: DataLinkAddress,
1053 tx: &[u8],
1054 invoke_id: u8,
1055 service_choice: u8,
1056 timeout_window: Duration,
1057 ) -> Result<Vec<u8>, ClientError> {
1058 let _io_lock = self.request_io_lock.lock().await;
1059 let deadline = tokio::time::Instant::now() + timeout_window;
1060 self.send_confirmed_request(address, tx, deadline).await?;
1061 while tokio::time::Instant::now() < deadline {
1062 let mut rx = [0u8; 1500];
1063 let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1064 if src != address {
1065 continue;
1066 }
1067
1068 let apdu = extract_apdu(&rx[..n])?;
1069 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1070 match ApduType::from_u8(first >> 4) {
1071 Some(ApduType::ComplexAck) => {
1072 let mut r = Reader::new(apdu);
1073 let ack = ComplexAckHeader::decode(&mut r)?;
1074 if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1075 continue;
1076 }
1077 return self
1078 .collect_complex_ack_payload(
1079 address,
1080 invoke_id,
1081 service_choice,
1082 ack,
1083 r.read_exact(r.remaining())?,
1084 deadline,
1085 )
1086 .await;
1087 }
1088 Some(ApduType::Error) => {
1089 let mut r = Reader::new(apdu);
1090 let err = BacnetError::decode(&mut r)?;
1091 if err.invoke_id == invoke_id && err.service_choice == service_choice {
1092 return Err(remote_service_error(err));
1093 }
1094 }
1095 Some(ApduType::Reject) => {
1096 let mut r = Reader::new(apdu);
1097 let rej = RejectPdu::decode(&mut r)?;
1098 if rej.invoke_id == invoke_id {
1099 return Err(ClientError::RemoteReject { reason: rej.reason });
1100 }
1101 }
1102 Some(ApduType::Abort) => {
1103 let mut r = Reader::new(apdu);
1104 let abort = AbortPdu::decode(&mut r)?;
1105 if abort.invoke_id == invoke_id {
1106 return Err(ClientError::RemoteAbort {
1107 reason: abort.reason,
1108 server: abort.server,
1109 });
1110 }
1111 }
1112 _ => continue,
1113 }
1114 }
1115 Err(ClientError::Timeout)
1116 }
1117
1118 pub async fn get_alarm_summary(
1119 &self,
1120 address: DataLinkAddress,
1121 ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1122 let invoke_id = self.next_invoke_id().await;
1123 let request = GetAlarmSummaryRequest { invoke_id };
1124 let tx = self.encode_with_growth(|w| {
1125 Npdu::new(0).encode(w)?;
1126 request.encode(w)
1127 })?;
1128 let payload = self
1129 .await_complex_ack_payload_or_error(
1130 address,
1131 &tx,
1132 invoke_id,
1133 SERVICE_GET_ALARM_SUMMARY,
1134 self.response_timeout,
1135 )
1136 .await?;
1137 let mut pr = Reader::new(&payload);
1138 let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1139 Ok(into_client_alarm_summary(parsed.summaries))
1140 }
1141
1142 pub async fn get_enrollment_summary(
1143 &self,
1144 address: DataLinkAddress,
1145 ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1146 let invoke_id = self.next_invoke_id().await;
1147 let request = GetEnrollmentSummaryRequest { invoke_id };
1148 let tx = self.encode_with_growth(|w| {
1149 Npdu::new(0).encode(w)?;
1150 request.encode(w)
1151 })?;
1152 let payload = self
1153 .await_complex_ack_payload_or_error(
1154 address,
1155 &tx,
1156 invoke_id,
1157 SERVICE_GET_ENROLLMENT_SUMMARY,
1158 self.response_timeout,
1159 )
1160 .await?;
1161 let mut pr = Reader::new(&payload);
1162 let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1163 Ok(into_client_enrollment_summary(parsed.summaries))
1164 }
1165
1166 pub async fn get_event_information(
1167 &self,
1168 address: DataLinkAddress,
1169 last_received_object_id: Option<ObjectId>,
1170 ) -> Result<EventInformationResult, ClientError> {
1171 let invoke_id = self.next_invoke_id().await;
1172 let request = GetEventInformationRequest {
1173 last_received_object_id,
1174 invoke_id,
1175 };
1176 let tx = self.encode_with_growth(|w| {
1177 Npdu::new(0).encode(w)?;
1178 request.encode(w)
1179 })?;
1180 let payload = self
1181 .await_complex_ack_payload_or_error(
1182 address,
1183 &tx,
1184 invoke_id,
1185 SERVICE_GET_EVENT_INFORMATION,
1186 self.response_timeout,
1187 )
1188 .await?;
1189 let mut pr = Reader::new(&payload);
1190 let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1191 Ok(EventInformationResult {
1192 summaries: into_client_event_information(parsed.summaries),
1193 more_events: parsed.more_events,
1194 })
1195 }
1196
1197 pub async fn acknowledge_alarm(
1198 &self,
1199 address: DataLinkAddress,
1200 mut request: AcknowledgeAlarmRequest<'_>,
1201 ) -> Result<(), ClientError> {
1202 request.invoke_id = self.next_invoke_id().await;
1203 let invoke_id = request.invoke_id;
1204 let tx = self.encode_with_growth(|w| {
1205 Npdu::new(0).encode(w)?;
1206 request.encode(w)
1207 })?;
1208 self.await_simple_ack_or_error(
1209 address,
1210 &tx,
1211 invoke_id,
1212 SERVICE_ACKNOWLEDGE_ALARM,
1213 self.response_timeout,
1214 )
1215 .await
1216 }
1217
1218 pub async fn atomic_read_file_stream(
1219 &self,
1220 address: DataLinkAddress,
1221 file_object_id: ObjectId,
1222 file_start_position: i32,
1223 requested_octet_count: u32,
1224 ) -> Result<AtomicReadFileResult, ClientError> {
1225 let invoke_id = self.next_invoke_id().await;
1226 let request = AtomicReadFileRequest::stream(
1227 file_object_id,
1228 file_start_position,
1229 requested_octet_count,
1230 invoke_id,
1231 );
1232 self.atomic_read_file(address, request).await
1233 }
1234
1235 pub async fn atomic_read_file_record(
1236 &self,
1237 address: DataLinkAddress,
1238 file_object_id: ObjectId,
1239 file_start_record: i32,
1240 requested_record_count: u32,
1241 ) -> Result<AtomicReadFileResult, ClientError> {
1242 let invoke_id = self.next_invoke_id().await;
1243 let request = AtomicReadFileRequest::record(
1244 file_object_id,
1245 file_start_record,
1246 requested_record_count,
1247 invoke_id,
1248 );
1249 self.atomic_read_file(address, request).await
1250 }
1251
1252 async fn atomic_read_file(
1253 &self,
1254 address: DataLinkAddress,
1255 request: AtomicReadFileRequest,
1256 ) -> Result<AtomicReadFileResult, ClientError> {
1257 let invoke_id = request.invoke_id;
1258 let tx = self.encode_with_growth(|w| {
1259 Npdu::new(0).encode(w)?;
1260 request.encode(w)
1261 })?;
1262 let payload = self
1263 .await_complex_ack_payload_or_error(
1264 address,
1265 &tx,
1266 invoke_id,
1267 SERVICE_ATOMIC_READ_FILE,
1268 self.response_timeout,
1269 )
1270 .await?;
1271 let mut pr = Reader::new(&payload);
1272 let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1273 Ok(into_client_atomic_read_result(parsed))
1274 }
1275
1276 pub async fn atomic_write_file_stream(
1277 &self,
1278 address: DataLinkAddress,
1279 file_object_id: ObjectId,
1280 file_start_position: i32,
1281 file_data: &[u8],
1282 ) -> Result<AtomicWriteFileResult, ClientError> {
1283 let invoke_id = self.next_invoke_id().await;
1284 let request = AtomicWriteFileRequest::stream(
1285 file_object_id,
1286 file_start_position,
1287 file_data,
1288 invoke_id,
1289 );
1290 self.atomic_write_file(address, request).await
1291 }
1292
1293 pub async fn atomic_write_file_record(
1294 &self,
1295 address: DataLinkAddress,
1296 file_object_id: ObjectId,
1297 file_start_record: i32,
1298 file_record_data: &[&[u8]],
1299 ) -> Result<AtomicWriteFileResult, ClientError> {
1300 let invoke_id = self.next_invoke_id().await;
1301 let request = AtomicWriteFileRequest::record(
1302 file_object_id,
1303 file_start_record,
1304 file_record_data,
1305 invoke_id,
1306 );
1307 self.atomic_write_file(address, request).await
1308 }
1309
1310 async fn atomic_write_file(
1311 &self,
1312 address: DataLinkAddress,
1313 request: AtomicWriteFileRequest<'_>,
1314 ) -> Result<AtomicWriteFileResult, ClientError> {
1315 let invoke_id = request.invoke_id;
1316 let tx = self.encode_with_growth(|w| {
1317 Npdu::new(0).encode(w)?;
1318 request.encode(w)
1319 })?;
1320 let payload = self
1321 .await_complex_ack_payload_or_error(
1322 address,
1323 &tx,
1324 invoke_id,
1325 SERVICE_ATOMIC_WRITE_FILE,
1326 self.response_timeout,
1327 )
1328 .await?;
1329 let mut pr = Reader::new(&payload);
1330 let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1331 Ok(into_client_atomic_write_result(parsed))
1332 }
1333
1334 pub async fn subscribe_cov(
1335 &self,
1336 address: DataLinkAddress,
1337 mut request: SubscribeCovRequest,
1338 ) -> Result<(), ClientError> {
1339 request.invoke_id = self.next_invoke_id().await;
1340 let invoke_id = request.invoke_id;
1341 let tx = self.encode_with_growth(|w| {
1342 Npdu::new(0).encode(w)?;
1343 request.encode(w)
1344 })?;
1345 self.await_simple_ack_or_error(
1346 address,
1347 &tx,
1348 invoke_id,
1349 SERVICE_SUBSCRIBE_COV,
1350 self.response_timeout,
1351 )
1352 .await
1353 }
1354
1355 pub async fn cancel_cov_subscription(
1356 &self,
1357 address: DataLinkAddress,
1358 subscriber_process_id: u32,
1359 monitored_object_id: ObjectId,
1360 ) -> Result<(), ClientError> {
1361 self.subscribe_cov(
1362 address,
1363 SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1364 )
1365 .await
1366 }
1367
1368 pub async fn subscribe_cov_property(
1369 &self,
1370 address: DataLinkAddress,
1371 mut request: SubscribeCovPropertyRequest,
1372 ) -> Result<(), ClientError> {
1373 request.invoke_id = self.next_invoke_id().await;
1374 let invoke_id = request.invoke_id;
1375 let tx = self.encode_with_growth(|w| {
1376 Npdu::new(0).encode(w)?;
1377 request.encode(w)
1378 })?;
1379 self.await_simple_ack_or_error(
1380 address,
1381 &tx,
1382 invoke_id,
1383 SERVICE_SUBSCRIBE_COV_PROPERTY,
1384 self.response_timeout,
1385 )
1386 .await
1387 }
1388
1389 pub async fn cancel_cov_property_subscription(
1390 &self,
1391 address: DataLinkAddress,
1392 subscriber_process_id: u32,
1393 monitored_object_id: ObjectId,
1394 monitored_property_id: PropertyId,
1395 monitored_property_array_index: Option<u32>,
1396 ) -> Result<(), ClientError> {
1397 self.subscribe_cov_property(
1398 address,
1399 SubscribeCovPropertyRequest::cancel(
1400 subscriber_process_id,
1401 monitored_object_id,
1402 monitored_property_id,
1403 monitored_property_array_index,
1404 0,
1405 ),
1406 )
1407 .await
1408 }
1409
1410 pub async fn read_range_by_position(
1411 &self,
1412 address: DataLinkAddress,
1413 object_id: ObjectId,
1414 property_id: PropertyId,
1415 array_index: Option<u32>,
1416 reference_index: i32,
1417 count: i16,
1418 ) -> Result<ReadRangeResult, ClientError> {
1419 let invoke_id = self.next_invoke_id().await;
1420 let req = ReadRangeRequest::by_position(
1421 object_id,
1422 property_id,
1423 array_index,
1424 reference_index,
1425 count,
1426 invoke_id,
1427 );
1428 self.read_range_with_request(address, req).await
1429 }
1430
1431 pub async fn read_range_by_sequence_number(
1432 &self,
1433 address: DataLinkAddress,
1434 object_id: ObjectId,
1435 property_id: PropertyId,
1436 array_index: Option<u32>,
1437 reference_sequence: u32,
1438 count: i16,
1439 ) -> Result<ReadRangeResult, ClientError> {
1440 let invoke_id = self.next_invoke_id().await;
1441 let req = ReadRangeRequest::by_sequence_number(
1442 object_id,
1443 property_id,
1444 array_index,
1445 reference_sequence,
1446 count,
1447 invoke_id,
1448 );
1449 self.read_range_with_request(address, req).await
1450 }
1451
1452 pub async fn read_range_by_time(
1453 &self,
1454 address: DataLinkAddress,
1455 object_id: ObjectId,
1456 property_id: PropertyId,
1457 array_index: Option<u32>,
1458 at: (Date, Time),
1459 count: i16,
1460 ) -> Result<ReadRangeResult, ClientError> {
1461 let (date, time) = at;
1462 let invoke_id = self.next_invoke_id().await;
1463 let req = ReadRangeRequest::by_time(
1464 object_id,
1465 property_id,
1466 array_index,
1467 date,
1468 time,
1469 count,
1470 invoke_id,
1471 );
1472 self.read_range_with_request(address, req).await
1473 }
1474
1475 async fn read_range_with_request(
1476 &self,
1477 address: DataLinkAddress,
1478 req: ReadRangeRequest,
1479 ) -> Result<ReadRangeResult, ClientError> {
1480 let invoke_id = req.invoke_id;
1481 let tx = self.encode_with_growth(|w| {
1482 Npdu::new(0).encode(w)?;
1483 req.encode(w)
1484 })?;
1485 let payload = self
1486 .await_complex_ack_payload_or_error(
1487 address,
1488 &tx,
1489 invoke_id,
1490 SERVICE_READ_RANGE,
1491 self.response_timeout,
1492 )
1493 .await?;
1494 let mut pr = Reader::new(&payload);
1495 let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1496 into_client_read_range(parsed)
1497 }
1498
1499 pub async fn recv_cov_notification(
1500 &self,
1501 wait: Duration,
1502 ) -> Result<Option<CovNotification>, ClientError> {
1503 let _io_lock = self.request_io_lock.lock().await;
1504 let deadline = tokio::time::Instant::now() + wait;
1505
1506 while tokio::time::Instant::now() < deadline {
1507 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1508 let mut rx = [0u8; 1500];
1509 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1510 let (n, source) = match recv {
1511 Ok(Ok(v)) => v,
1512 Ok(Err(e)) => return Err(e.into()),
1513 Err(_) => break,
1514 };
1515
1516 let apdu = extract_apdu(&rx[..n])?;
1517 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1518 match ApduType::from_u8(first >> 4) {
1519 Some(ApduType::UnconfirmedRequest) => {
1520 let mut r = Reader::new(apdu);
1521 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1522 if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1523 continue;
1524 }
1525 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1526 return Ok(Some(into_client_cov_notification(source, false, cov)?));
1527 }
1528 Some(ApduType::ConfirmedRequest) => {
1529 let mut r = Reader::new(apdu);
1530 let header = ConfirmedRequestHeader::decode(&mut r)?;
1531 if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1532 continue;
1533 }
1534 if header.segmented {
1535 return Err(ClientError::UnsupportedResponse);
1536 }
1537
1538 let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1539 self.send_simple_ack(
1540 source,
1541 header.invoke_id,
1542 SERVICE_CONFIRMED_COV_NOTIFICATION,
1543 )
1544 .await?;
1545 return Ok(Some(into_client_cov_notification(source, true, cov)?));
1546 }
1547 _ => continue,
1548 }
1549 }
1550
1551 Ok(None)
1552 }
1553
1554 pub async fn recv_event_notification(
1555 &self,
1556 wait: Duration,
1557 ) -> Result<Option<EventNotification>, ClientError> {
1558 let _io_lock = self.request_io_lock.lock().await;
1559 let deadline = tokio::time::Instant::now() + wait;
1560
1561 while tokio::time::Instant::now() < deadline {
1562 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1563 let mut rx = [0u8; 1500];
1564 let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1565 let (n, source) = match recv {
1566 Ok(Ok(v)) => v,
1567 Ok(Err(e)) => return Err(e.into()),
1568 Err(_) => break,
1569 };
1570
1571 let apdu = extract_apdu(&rx[..n])?;
1572 let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1573 match ApduType::from_u8(first >> 4) {
1574 Some(ApduType::UnconfirmedRequest) => {
1575 let mut r = Reader::new(apdu);
1576 let header = UnconfirmedRequestHeader::decode(&mut r)?;
1577 if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1578 continue;
1579 }
1580 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1581 return Ok(Some(into_client_event_notification(
1582 source,
1583 false,
1584 notification,
1585 )));
1586 }
1587 Some(ApduType::ConfirmedRequest) => {
1588 let mut r = Reader::new(apdu);
1589 let header = ConfirmedRequestHeader::decode(&mut r)?;
1590 if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1591 continue;
1592 }
1593 if header.segmented {
1594 return Err(ClientError::UnsupportedResponse);
1595 }
1596 let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1597 self.send_simple_ack(
1598 source,
1599 header.invoke_id,
1600 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1601 )
1602 .await?;
1603 return Ok(Some(into_client_event_notification(
1604 source,
1605 true,
1606 notification,
1607 )));
1608 }
1609 _ => continue,
1610 }
1611 }
1612
1613 Ok(None)
1614 }
1615
1616 pub async fn read_property(
1617 &self,
1618 address: DataLinkAddress,
1619 object_id: ObjectId,
1620 property_id: PropertyId,
1621 ) -> Result<ClientDataValue, ClientError> {
1622 let invoke_id = self.next_invoke_id().await;
1623 let req = ReadPropertyRequest {
1624 object_id,
1625 property_id,
1626 array_index: None,
1627 invoke_id,
1628 };
1629 let tx = self.encode_with_growth(|w| {
1630 Npdu::new(0).encode(w)?;
1631 req.encode(w)
1632 })?;
1633 let payload = self
1634 .await_complex_ack_payload_or_error(
1635 address,
1636 &tx,
1637 invoke_id,
1638 SERVICE_READ_PROPERTY,
1639 self.response_timeout,
1640 )
1641 .await?;
1642 let mut pr = Reader::new(&payload);
1643 let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1644 into_client_value(parsed.value)
1645 }
1646
1647 pub async fn write_property(
1648 &self,
1649 address: DataLinkAddress,
1650 mut request: WritePropertyRequest<'_>,
1651 ) -> Result<(), ClientError> {
1652 request.invoke_id = self.next_invoke_id().await;
1653 let invoke_id = request.invoke_id;
1654 let tx = self.encode_with_growth(|w| {
1655 Npdu::new(0).encode(w)?;
1656 request.encode(w)
1657 })?;
1658 self.await_simple_ack_or_error(
1659 address,
1660 &tx,
1661 invoke_id,
1662 SERVICE_WRITE_PROPERTY,
1663 self.response_timeout,
1664 )
1665 .await
1666 }
1667
1668 pub async fn read_property_multiple(
1669 &self,
1670 address: DataLinkAddress,
1671 object_id: ObjectId,
1672 property_ids: &[PropertyId],
1673 ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1674 let refs: Vec<PropertyReference> = property_ids
1675 .iter()
1676 .copied()
1677 .map(|property_id| PropertyReference {
1678 property_id,
1679 array_index: None,
1680 })
1681 .collect();
1682 let specs = [ReadAccessSpecification {
1683 object_id,
1684 properties: &refs,
1685 }];
1686
1687 let invoke_id = self.next_invoke_id().await;
1688 let req = ReadPropertyMultipleRequest {
1689 specs: &specs,
1690 invoke_id,
1691 };
1692
1693 let tx = self.encode_with_growth(|w| {
1694 Npdu::new(0).encode(w)?;
1695 req.encode(w)
1696 })?;
1697 let payload = self
1698 .await_complex_ack_payload_or_error(
1699 address,
1700 &tx,
1701 invoke_id,
1702 SERVICE_READ_PROPERTY_MULTIPLE,
1703 self.response_timeout,
1704 )
1705 .await?;
1706 let mut pr = Reader::new(&payload);
1707 let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1708 let mut out = Vec::new();
1709 for access in parsed.results {
1710 if access.object_id != object_id {
1711 continue;
1712 }
1713 for item in access.results {
1714 out.push((item.property_id, into_client_value(item.value)?));
1715 }
1716 }
1717 Ok(out)
1718 }
1719
1720 pub async fn write_property_multiple(
1721 &self,
1722 address: DataLinkAddress,
1723 object_id: ObjectId,
1724 properties: &[PropertyWriteSpec<'_>],
1725 ) -> Result<(), ClientError> {
1726 let invoke_id = self.next_invoke_id().await;
1727 let specs = [WriteAccessSpecification {
1728 object_id,
1729 properties,
1730 }];
1731 let req = WritePropertyMultipleRequest {
1732 specs: &specs,
1733 invoke_id,
1734 };
1735
1736 let tx = self.encode_with_growth(|w| {
1737 Npdu::new(0).encode(w)?;
1738 req.encode(w)
1739 })?;
1740 self.await_simple_ack_or_error(
1741 address,
1742 &tx,
1743 invoke_id,
1744 SERVICE_WRITE_PROPERTY_MULTIPLE,
1745 self.response_timeout,
1746 )
1747 .await
1748 }
1749
1750 pub async fn private_transfer(
1752 &self,
1753 address: DataLinkAddress,
1754 vendor_id: u32,
1755 service_number: u32,
1756 service_parameters: Option<&[u8]>,
1757 ) -> Result<PrivateTransferAck, ClientError> {
1758 let invoke_id = self.next_invoke_id().await;
1759 let req = ConfirmedPrivateTransferRequest {
1760 vendor_id,
1761 service_number,
1762 service_parameters,
1763 invoke_id,
1764 };
1765
1766 let tx = self.encode_with_growth(|w| {
1767 Npdu::new(0).encode(w)?;
1768 req.encode(w)
1769 })?;
1770 let payload = self
1771 .await_complex_ack_payload_or_error(
1772 address,
1773 &tx,
1774 invoke_id,
1775 SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1776 self.response_timeout,
1777 )
1778 .await?;
1779 let mut r = Reader::new(&payload);
1780 PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1781 }
1782}
1783
1784fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
1785 let mut r = Reader::new(payload);
1786 let _npdu = Npdu::decode(&mut r)?;
1787 r.read_exact(r.remaining()).map_err(ClientError::from)
1788}
1789
1790fn remote_service_error(err: BacnetError) -> ClientError {
1791 ClientError::RemoteServiceError {
1792 service_choice: err.service_choice,
1793 error_class_raw: err.error_class,
1794 error_code_raw: err.error_code,
1795 error_class: err.error_class.and_then(ErrorClass::from_u32),
1796 error_code: err.error_code.and_then(ErrorCode::from_u32),
1797 }
1798}
1799
1800fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
1801 Ok(match value {
1802 DataValue::Null => ClientDataValue::Null,
1803 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
1804 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
1805 DataValue::Signed(v) => ClientDataValue::Signed(v),
1806 DataValue::Real(v) => ClientDataValue::Real(v),
1807 DataValue::Double(v) => ClientDataValue::Double(v),
1808 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
1809 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
1810 DataValue::BitString(v) => ClientDataValue::BitString {
1811 unused_bits: v.unused_bits,
1812 data: v.data.to_vec(),
1813 },
1814 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
1815 DataValue::Date(v) => ClientDataValue::Date(v),
1816 DataValue::Time(v) => ClientDataValue::Time(v),
1817 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
1818 DataValue::Constructed { tag_num, values } => {
1819 let mut children = Vec::with_capacity(values.len());
1820 for child in values {
1821 children.push(into_client_value(child)?);
1822 }
1823 ClientDataValue::Constructed {
1824 tag_num,
1825 values: children,
1826 }
1827 }
1828 })
1829}
1830
1831fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
1832 value
1833 .into_iter()
1834 .map(|item| AlarmSummaryItem {
1835 object_id: item.object_id,
1836 alarm_state_raw: item.alarm_state,
1837 alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1838 item.alarm_state,
1839 ),
1840 acknowledged_transitions: ClientBitString {
1841 unused_bits: item.acknowledged_transitions.unused_bits,
1842 data: item.acknowledged_transitions.data.to_vec(),
1843 },
1844 })
1845 .collect()
1846}
1847
1848fn into_client_enrollment_summary(
1849 value: Vec<CoreEnrollmentSummaryItem>,
1850) -> Vec<EnrollmentSummaryItem> {
1851 value
1852 .into_iter()
1853 .map(|item| EnrollmentSummaryItem {
1854 object_id: item.object_id,
1855 event_type: item.event_type,
1856 event_state_raw: item.event_state,
1857 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1858 item.event_state,
1859 ),
1860 priority: item.priority,
1861 notification_class: item.notification_class,
1862 })
1863 .collect()
1864}
1865
1866fn into_client_event_information(
1867 value: Vec<CoreEventSummaryItem<'_>>,
1868) -> Vec<EventInformationItem> {
1869 value
1870 .into_iter()
1871 .map(|item| EventInformationItem {
1872 object_id: item.object_id,
1873 event_state_raw: item.event_state,
1874 event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1875 item.event_state,
1876 ),
1877 acknowledged_transitions: ClientBitString {
1878 unused_bits: item.acknowledged_transitions.unused_bits,
1879 data: item.acknowledged_transitions.data.to_vec(),
1880 },
1881 notify_type: item.notify_type,
1882 event_enable: ClientBitString {
1883 unused_bits: item.event_enable.unused_bits,
1884 data: item.event_enable.data.to_vec(),
1885 },
1886 event_priorities: item.event_priorities,
1887 })
1888 .collect()
1889}
1890
1891fn into_client_cov_notification(
1892 source: DataLinkAddress,
1893 confirmed: bool,
1894 value: CovNotificationRequest<'_>,
1895) -> Result<CovNotification, ClientError> {
1896 let mut values = Vec::with_capacity(value.values.len());
1897 for property in value.values {
1898 values.push(CovPropertyValue {
1899 property_id: property.property_id,
1900 array_index: property.array_index,
1901 value: into_client_value(property.value)?,
1902 priority: property.priority,
1903 });
1904 }
1905
1906 Ok(CovNotification {
1907 source,
1908 confirmed,
1909 subscriber_process_id: value.subscriber_process_id,
1910 initiating_device_id: value.initiating_device_id,
1911 monitored_object_id: value.monitored_object_id,
1912 time_remaining_seconds: value.time_remaining_seconds,
1913 values,
1914 })
1915}
1916
1917fn into_client_event_notification(
1918 source: DataLinkAddress,
1919 confirmed: bool,
1920 value: EventNotificationRequest<'_>,
1921) -> EventNotification {
1922 EventNotification {
1923 source,
1924 confirmed,
1925 process_id: value.process_id,
1926 initiating_device_id: value.initiating_device_id,
1927 event_object_id: value.event_object_id,
1928 timestamp: value.timestamp,
1929 notification_class: value.notification_class,
1930 priority: value.priority,
1931 event_type: value.event_type,
1932 message_text: value.message_text.map(str::to_string),
1933 notify_type: value.notify_type,
1934 ack_required: value.ack_required,
1935 from_state_raw: value.from_state,
1936 from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1937 value.from_state,
1938 ),
1939 to_state_raw: value.to_state,
1940 to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
1941 }
1942}
1943
1944fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
1945 let mut items = Vec::with_capacity(value.items.len());
1946 for item in value.items {
1947 items.push(into_client_value(item)?);
1948 }
1949 Ok(ReadRangeResult {
1950 object_id: value.object_id,
1951 property_id: value.property_id,
1952 array_index: value.array_index,
1953 result_flags: ClientBitString {
1954 unused_bits: value.result_flags.unused_bits,
1955 data: value.result_flags.data.to_vec(),
1956 },
1957 item_count: value.item_count,
1958 items,
1959 })
1960}
1961
1962fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
1963 match value.access_method {
1964 AtomicReadFileAckAccess::Stream {
1965 file_start_position,
1966 file_data,
1967 } => AtomicReadFileResult::Stream {
1968 end_of_file: value.end_of_file,
1969 file_start_position,
1970 file_data: file_data.to_vec(),
1971 },
1972 AtomicReadFileAckAccess::Record {
1973 file_start_record,
1974 returned_record_count,
1975 file_record_data,
1976 } => AtomicReadFileResult::Record {
1977 end_of_file: value.end_of_file,
1978 file_start_record,
1979 returned_record_count,
1980 file_record_data: file_record_data
1981 .into_iter()
1982 .map(|record| record.to_vec())
1983 .collect(),
1984 },
1985 }
1986}
1987
1988fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
1989 match value {
1990 AtomicWriteFileAck::Stream {
1991 file_start_position,
1992 } => AtomicWriteFileResult::Stream {
1993 file_start_position,
1994 },
1995 AtomicWriteFileAck::Record { file_start_record } => {
1996 AtomicWriteFileResult::Record { file_start_record }
1997 }
1998 }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003 use super::BacnetClient;
2004 use crate::{
2005 AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2006 EnrollmentSummaryItem, EventInformationItem, EventNotification,
2007 };
2008 use rustbac_core::apdu::{
2009 ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2010 UnconfirmedRequestHeader,
2011 };
2012 use rustbac_core::encoding::{
2013 primitives::{
2014 decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2015 encode_ctx_object_id, encode_ctx_unsigned,
2016 },
2017 reader::Reader,
2018 tag::{AppTag, Tag},
2019 writer::Writer,
2020 };
2021 use rustbac_core::npdu::Npdu;
2022 use rustbac_core::services::acknowledge_alarm::{
2023 AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2024 };
2025 use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2026 use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2027 use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2028 use rustbac_core::services::cov_notification::{
2029 SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2030 };
2031 use rustbac_core::services::device_management::{
2032 DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2033 SERVICE_REINITIALIZE_DEVICE,
2034 };
2035 use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2036 use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2037 use rustbac_core::services::event_notification::{
2038 SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2039 };
2040 use rustbac_core::services::list_element::{
2041 AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2042 SERVICE_REMOVE_LIST_ELEMENT,
2043 };
2044 use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2045 use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2046 use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2047 use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2048 use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2049 use rustbac_core::services::subscribe_cov_property::{
2050 SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2051 };
2052 use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2053 use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2054 use rustbac_core::services::write_property_multiple::{
2055 PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2056 };
2057 use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2058 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2059 use std::collections::VecDeque;
2060 use std::sync::Arc;
2061 use std::time::Duration;
2062 use tokio::sync::Mutex;
2063
2064 #[derive(Debug, Default)]
2065 struct MockState {
2066 sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2067 recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2068 }
2069
2070 #[derive(Debug, Clone)]
2071 struct MockDataLink {
2072 state: Arc<MockState>,
2073 }
2074
2075 impl MockDataLink {
2076 fn new() -> (Self, Arc<MockState>) {
2077 let state = Arc::new(MockState::default());
2078 (
2079 Self {
2080 state: state.clone(),
2081 },
2082 state,
2083 )
2084 }
2085 }
2086
2087 impl DataLink for MockDataLink {
2088 async fn send(
2089 &self,
2090 address: DataLinkAddress,
2091 payload: &[u8],
2092 ) -> Result<(), DataLinkError> {
2093 self.state
2094 .sent
2095 .lock()
2096 .await
2097 .push((address, payload.to_vec()));
2098 Ok(())
2099 }
2100
2101 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2102 let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2103 return Err(DataLinkError::InvalidFrame);
2104 };
2105 if payload.len() > buf.len() {
2106 return Err(DataLinkError::FrameTooLarge);
2107 }
2108 buf[..payload.len()].copy_from_slice(&payload);
2109 Ok((payload.len(), addr))
2110 }
2111 }
2112
2113 fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2114 let mut out = [0u8; 512];
2115 let mut w = Writer::new(&mut out);
2116 Npdu::new(0).encode(&mut w).unwrap();
2117 w.write_all(apdu).unwrap();
2118 w.as_written().to_vec()
2119 }
2120
2121 fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2122 let mut apdu_buf = [0u8; 256];
2123 let mut w = Writer::new(&mut apdu_buf);
2124 ComplexAckHeader {
2125 segmented: false,
2126 more_follows: false,
2127 invoke_id,
2128 sequence_number: None,
2129 proposed_window_size: None,
2130 service_choice: SERVICE_READ_RANGE,
2131 }
2132 .encode(&mut w)
2133 .unwrap();
2134 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2135 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2136 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2137 w.write_u8(5).unwrap();
2138 w.write_u8(0b1110_0000).unwrap();
2139 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2140 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2141 encode_app_real(&mut w, 42.0).unwrap();
2142 encode_app_real(&mut w, 43.0).unwrap();
2143 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2144 w.as_written().to_vec()
2145 }
2146
2147 fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2148 let mut apdu_buf = [0u8; 256];
2149 let mut w = Writer::new(&mut apdu_buf);
2150 ComplexAckHeader {
2151 segmented: false,
2152 more_follows: false,
2153 invoke_id,
2154 sequence_number: None,
2155 proposed_window_size: None,
2156 service_choice: SERVICE_ATOMIC_READ_FILE,
2157 }
2158 .encode(&mut w)
2159 .unwrap();
2160 Tag::Application {
2161 tag: AppTag::Boolean,
2162 len: if eof { 1 } else { 0 },
2163 }
2164 .encode(&mut w)
2165 .unwrap();
2166 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2167 Tag::Application {
2168 tag: AppTag::SignedInt,
2169 len: 1,
2170 }
2171 .encode(&mut w)
2172 .unwrap();
2173 w.write_u8(0).unwrap();
2174 Tag::Application {
2175 tag: AppTag::OctetString,
2176 len: data.len() as u32,
2177 }
2178 .encode(&mut w)
2179 .unwrap();
2180 w.write_all(data).unwrap();
2181 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2182 w.as_written().to_vec()
2183 }
2184
2185 fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2186 let mut apdu_buf = [0u8; 256];
2187 let mut w = Writer::new(&mut apdu_buf);
2188 ComplexAckHeader {
2189 segmented: false,
2190 more_follows: false,
2191 invoke_id,
2192 sequence_number: None,
2193 proposed_window_size: None,
2194 service_choice: SERVICE_ATOMIC_READ_FILE,
2195 }
2196 .encode(&mut w)
2197 .unwrap();
2198 Tag::Application {
2199 tag: AppTag::Boolean,
2200 len: 0,
2201 }
2202 .encode(&mut w)
2203 .unwrap();
2204 Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2205 Tag::Application {
2206 tag: AppTag::SignedInt,
2207 len: 1,
2208 }
2209 .encode(&mut w)
2210 .unwrap();
2211 w.write_u8(7).unwrap();
2212 Tag::Application {
2213 tag: AppTag::UnsignedInt,
2214 len: 1,
2215 }
2216 .encode(&mut w)
2217 .unwrap();
2218 w.write_u8(2).unwrap();
2219 Tag::Application {
2220 tag: AppTag::OctetString,
2221 len: 2,
2222 }
2223 .encode(&mut w)
2224 .unwrap();
2225 w.write_all(&[0x01, 0x02]).unwrap();
2226 Tag::Application {
2227 tag: AppTag::OctetString,
2228 len: 3,
2229 }
2230 .encode(&mut w)
2231 .unwrap();
2232 w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2233 Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2234 w.as_written().to_vec()
2235 }
2236
2237 fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2238 let mut apdu_buf = [0u8; 64];
2239 let mut w = Writer::new(&mut apdu_buf);
2240 ComplexAckHeader {
2241 segmented: false,
2242 more_follows: false,
2243 invoke_id,
2244 sequence_number: None,
2245 proposed_window_size: None,
2246 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2247 }
2248 .encode(&mut w)
2249 .unwrap();
2250 Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2251 w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2252 w.as_written().to_vec()
2253 }
2254
2255 fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2256 let mut apdu_buf = [0u8; 64];
2257 let mut w = Writer::new(&mut apdu_buf);
2258 ComplexAckHeader {
2259 segmented: false,
2260 more_follows: false,
2261 invoke_id,
2262 sequence_number: None,
2263 proposed_window_size: None,
2264 service_choice: SERVICE_ATOMIC_WRITE_FILE,
2265 }
2266 .encode(&mut w)
2267 .unwrap();
2268 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2269 w.write_u8(start_record as u8).unwrap();
2270 w.as_written().to_vec()
2271 }
2272
2273 fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2274 let mut apdu_buf = [0u8; 64];
2275 let mut w = Writer::new(&mut apdu_buf);
2276 ComplexAckHeader {
2277 segmented: false,
2278 more_follows: false,
2279 invoke_id,
2280 sequence_number: None,
2281 proposed_window_size: None,
2282 service_choice: SERVICE_CREATE_OBJECT,
2283 }
2284 .encode(&mut w)
2285 .unwrap();
2286 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2287 w.as_written().to_vec()
2288 }
2289
2290 fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2291 let mut apdu_buf = [0u8; 128];
2292 let mut w = Writer::new(&mut apdu_buf);
2293 ComplexAckHeader {
2294 segmented: false,
2295 more_follows: false,
2296 invoke_id,
2297 sequence_number: None,
2298 proposed_window_size: None,
2299 service_choice: SERVICE_GET_ALARM_SUMMARY,
2300 }
2301 .encode(&mut w)
2302 .unwrap();
2303 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2304 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2305 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2306 w.write_u8(5).unwrap();
2307 w.write_u8(0b1110_0000).unwrap();
2308
2309 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2310 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2311 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2312 w.write_u8(5).unwrap();
2313 w.write_u8(0b1100_0000).unwrap();
2314 w.as_written().to_vec()
2315 }
2316
2317 fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2318 let mut apdu_buf = [0u8; 160];
2319 let mut w = Writer::new(&mut apdu_buf);
2320 ComplexAckHeader {
2321 segmented: false,
2322 more_follows: false,
2323 invoke_id,
2324 sequence_number: None,
2325 proposed_window_size: None,
2326 service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2327 }
2328 .encode(&mut w)
2329 .unwrap();
2330 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2331 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2332 encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2333 encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2334 encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2335
2336 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2337 encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2338 encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2339 encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2340 encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2341 w.as_written().to_vec()
2342 }
2343
2344 fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2345 let mut apdu_buf = [0u8; 256];
2346 let mut w = Writer::new(&mut apdu_buf);
2347 ComplexAckHeader {
2348 segmented: false,
2349 more_follows: false,
2350 invoke_id,
2351 sequence_number: None,
2352 proposed_window_size: None,
2353 service_choice: SERVICE_GET_EVENT_INFORMATION,
2354 }
2355 .encode(&mut w)
2356 .unwrap();
2357 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2358 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2359 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2360 Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2361 w.write_u8(5).unwrap();
2362 w.write_u8(0b1110_0000).unwrap();
2363 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2364 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2365 encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2366 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2367 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2368 encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2369 Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2370 w.write_u8(5).unwrap();
2371 w.write_u8(0b1100_0000).unwrap();
2372 Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2373 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2374 encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2375 encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2376 Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2377 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2378 Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2379 w.write_u8(0).unwrap();
2380 w.as_written().to_vec()
2381 }
2382
2383 #[tokio::test]
2384 async fn who_has_object_name_collects_i_have() {
2385 let (dl, state) = MockDataLink::new();
2386 let client = BacnetClient::with_datalink(dl);
2387 let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2388
2389 let mut apdu = [0u8; 128];
2390 let mut w = Writer::new(&mut apdu);
2391 UnconfirmedRequestHeader {
2392 service_choice: SERVICE_I_HAVE,
2393 }
2394 .encode(&mut w)
2395 .unwrap();
2396 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2397 encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2398 encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2399
2400 state
2401 .recv
2402 .lock()
2403 .await
2404 .push_back((with_npdu(w.as_written()), addr));
2405
2406 let results = client
2407 .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2408 .await
2409 .unwrap();
2410 assert_eq!(results.len(), 1);
2411 assert_eq!(results[0].address, addr);
2412 assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2413 assert_eq!(
2414 results[0].object_id,
2415 ObjectId::new(ObjectType::AnalogInput, 7)
2416 );
2417 assert_eq!(results[0].object_name, "Zone Temp");
2418
2419 let sent = state.sent.lock().await;
2420 assert_eq!(sent.len(), 1);
2421 let mut r = Reader::new(&sent[0].1);
2422 let _npdu = Npdu::decode(&mut r).unwrap();
2423 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2424 assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2425 }
2426
2427 #[tokio::test]
2428 async fn device_communication_control_handles_simple_ack() {
2429 let (dl, state) = MockDataLink::new();
2430 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2431 let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2432
2433 let mut apdu = [0u8; 32];
2434 let mut w = Writer::new(&mut apdu);
2435 SimpleAck {
2436 invoke_id: 1,
2437 service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2438 }
2439 .encode(&mut w)
2440 .unwrap();
2441 state
2442 .recv
2443 .lock()
2444 .await
2445 .push_back((with_npdu(w.as_written()), addr));
2446
2447 client
2448 .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2449 .await
2450 .unwrap();
2451
2452 let sent = state.sent.lock().await;
2453 assert_eq!(sent.len(), 1);
2454 let mut r = Reader::new(&sent[0].1);
2455 let _npdu = Npdu::decode(&mut r).unwrap();
2456 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2457 assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2458 }
2459
2460 #[tokio::test]
2461 async fn reinitialize_device_handles_simple_ack() {
2462 let (dl, state) = MockDataLink::new();
2463 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2464 let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2465
2466 let mut apdu = [0u8; 32];
2467 let mut w = Writer::new(&mut apdu);
2468 SimpleAck {
2469 invoke_id: 1,
2470 service_choice: SERVICE_REINITIALIZE_DEVICE,
2471 }
2472 .encode(&mut w)
2473 .unwrap();
2474 state
2475 .recv
2476 .lock()
2477 .await
2478 .push_back((with_npdu(w.as_written()), addr));
2479
2480 client
2481 .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2482 .await
2483 .unwrap();
2484
2485 let sent = state.sent.lock().await;
2486 assert_eq!(sent.len(), 1);
2487 let mut r = Reader::new(&sent[0].1);
2488 let _npdu = Npdu::decode(&mut r).unwrap();
2489 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2490 assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2491 }
2492
2493 #[tokio::test]
2494 async fn time_synchronize_sends_unconfirmed_request() {
2495 let (dl, state) = MockDataLink::new();
2496 let client = BacnetClient::with_datalink(dl);
2497 let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2498
2499 client
2500 .time_synchronize(
2501 addr,
2502 Date {
2503 year_since_1900: 126,
2504 month: 2,
2505 day: 7,
2506 weekday: 6,
2507 },
2508 Time {
2509 hour: 10,
2510 minute: 11,
2511 second: 12,
2512 hundredths: 13,
2513 },
2514 false,
2515 )
2516 .await
2517 .unwrap();
2518
2519 let sent = state.sent.lock().await;
2520 assert_eq!(sent.len(), 1);
2521 let mut r = Reader::new(&sent[0].1);
2522 let _npdu = Npdu::decode(&mut r).unwrap();
2523 let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2524 assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2525 }
2526
2527 #[tokio::test]
2528 async fn get_alarm_summary_decodes_complex_ack() {
2529 let (dl, state) = MockDataLink::new();
2530 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2531 let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2532
2533 state
2534 .recv
2535 .lock()
2536 .await
2537 .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2538
2539 let summaries = client.get_alarm_summary(addr).await.unwrap();
2540 assert_eq!(summaries.len(), 2);
2541 assert_eq!(
2542 summaries[0],
2543 AlarmSummaryItem {
2544 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2545 alarm_state_raw: 1,
2546 alarm_state: Some(EventState::Fault),
2547 acknowledged_transitions: crate::ClientBitString {
2548 unused_bits: 5,
2549 data: vec![0b1110_0000],
2550 },
2551 }
2552 );
2553 assert_eq!(
2554 summaries[1],
2555 AlarmSummaryItem {
2556 object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2557 alarm_state_raw: 0,
2558 alarm_state: Some(EventState::Normal),
2559 acknowledged_transitions: crate::ClientBitString {
2560 unused_bits: 5,
2561 data: vec![0b1100_0000],
2562 },
2563 }
2564 );
2565
2566 let sent = state.sent.lock().await;
2567 assert_eq!(sent.len(), 1);
2568 let mut r = Reader::new(&sent[0].1);
2569 let _npdu = Npdu::decode(&mut r).unwrap();
2570 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2571 assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2572 }
2573
2574 #[tokio::test]
2575 async fn get_enrollment_summary_decodes_complex_ack() {
2576 let (dl, state) = MockDataLink::new();
2577 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2578 let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2579
2580 state
2581 .recv
2582 .lock()
2583 .await
2584 .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2585
2586 let summaries = client.get_enrollment_summary(addr).await.unwrap();
2587 assert_eq!(summaries.len(), 2);
2588 assert_eq!(
2589 summaries[0],
2590 EnrollmentSummaryItem {
2591 object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2592 event_type: 1,
2593 event_state_raw: 2,
2594 event_state: Some(EventState::Offnormal),
2595 priority: 200,
2596 notification_class: 10,
2597 }
2598 );
2599 assert_eq!(
2600 summaries[1],
2601 EnrollmentSummaryItem {
2602 object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2603 event_type: 0,
2604 event_state_raw: 0,
2605 event_state: Some(EventState::Normal),
2606 priority: 20,
2607 notification_class: 11,
2608 }
2609 );
2610
2611 let sent = state.sent.lock().await;
2612 assert_eq!(sent.len(), 1);
2613 let mut r = Reader::new(&sent[0].1);
2614 let _npdu = Npdu::decode(&mut r).unwrap();
2615 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2616 assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2617 }
2618
2619 #[tokio::test]
2620 async fn get_event_information_decodes_complex_ack() {
2621 let (dl, state) = MockDataLink::new();
2622 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2623 let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2624
2625 state
2626 .recv
2627 .lock()
2628 .await
2629 .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2630
2631 let result = client.get_event_information(addr, None).await.unwrap();
2632 assert!(!result.more_events);
2633 assert_eq!(result.summaries.len(), 1);
2634 assert_eq!(
2635 result.summaries[0],
2636 EventInformationItem {
2637 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2638 event_state_raw: 2,
2639 event_state: Some(EventState::Offnormal),
2640 acknowledged_transitions: crate::ClientBitString {
2641 unused_bits: 5,
2642 data: vec![0b1110_0000],
2643 },
2644 notify_type: 0,
2645 event_enable: crate::ClientBitString {
2646 unused_bits: 5,
2647 data: vec![0b1100_0000],
2648 },
2649 event_priorities: [1, 2, 3],
2650 }
2651 );
2652 }
2653
2654 #[tokio::test]
2655 async fn acknowledge_alarm_handles_simple_ack() {
2656 let (dl, state) = MockDataLink::new();
2657 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2658 let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2659
2660 let mut apdu = [0u8; 32];
2661 let mut w = Writer::new(&mut apdu);
2662 SimpleAck {
2663 invoke_id: 1,
2664 service_choice: SERVICE_ACKNOWLEDGE_ALARM,
2665 }
2666 .encode(&mut w)
2667 .unwrap();
2668 state
2669 .recv
2670 .lock()
2671 .await
2672 .push_back((with_npdu(w.as_written()), addr));
2673
2674 client
2675 .acknowledge_alarm(
2676 addr,
2677 AcknowledgeAlarmRequest {
2678 acknowledging_process_id: 10,
2679 event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2680 event_state_acknowledged: EventState::Offnormal,
2681 event_time_stamp: TimeStamp::SequenceNumber(42),
2682 acknowledgment_source: "operator",
2683 time_of_acknowledgment: TimeStamp::DateTime {
2684 date: Date {
2685 year_since_1900: 126,
2686 month: 2,
2687 day: 7,
2688 weekday: 6,
2689 },
2690 time: Time {
2691 hour: 10,
2692 minute: 11,
2693 second: 12,
2694 hundredths: 13,
2695 },
2696 },
2697 invoke_id: 0,
2698 },
2699 )
2700 .await
2701 .unwrap();
2702
2703 let sent = state.sent.lock().await;
2704 assert_eq!(sent.len(), 1);
2705 let mut r = Reader::new(&sent[0].1);
2706 let _npdu = Npdu::decode(&mut r).unwrap();
2707 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2708 assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
2709 }
2710
2711 #[tokio::test]
2712 async fn create_object_by_type_decodes_complex_ack() {
2713 let (dl, state) = MockDataLink::new();
2714 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2715 let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
2716 let created = ObjectId::new(ObjectType::AnalogValue, 42);
2717
2718 state
2719 .recv
2720 .lock()
2721 .await
2722 .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
2723
2724 let result = client
2725 .create_object_by_type(addr, ObjectType::AnalogValue)
2726 .await
2727 .unwrap();
2728 assert_eq!(result, created);
2729
2730 let sent = state.sent.lock().await;
2731 let mut r = Reader::new(&sent[0].1);
2732 let _npdu = Npdu::decode(&mut r).unwrap();
2733 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2734 assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
2735 }
2736
2737 #[tokio::test]
2738 async fn delete_object_handles_simple_ack() {
2739 let (dl, state) = MockDataLink::new();
2740 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2741 let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
2742
2743 let mut apdu = [0u8; 32];
2744 let mut w = Writer::new(&mut apdu);
2745 SimpleAck {
2746 invoke_id: 1,
2747 service_choice: SERVICE_DELETE_OBJECT,
2748 }
2749 .encode(&mut w)
2750 .unwrap();
2751 state
2752 .recv
2753 .lock()
2754 .await
2755 .push_back((with_npdu(w.as_written()), addr));
2756
2757 client
2758 .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
2759 .await
2760 .unwrap();
2761 }
2762
2763 #[tokio::test]
2764 async fn add_list_element_handles_simple_ack() {
2765 let (dl, state) = MockDataLink::new();
2766 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2767 let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
2768
2769 let mut apdu = [0u8; 32];
2770 let mut w = Writer::new(&mut apdu);
2771 SimpleAck {
2772 invoke_id: 1,
2773 service_choice: SERVICE_ADD_LIST_ELEMENT,
2774 }
2775 .encode(&mut w)
2776 .unwrap();
2777 state
2778 .recv
2779 .lock()
2780 .await
2781 .push_back((with_npdu(w.as_written()), addr));
2782
2783 let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
2784 client
2785 .add_list_element(
2786 addr,
2787 AddListElementRequest {
2788 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
2789 property_id: PropertyId::Proprietary(512),
2790 array_index: None,
2791 elements: &values,
2792 invoke_id: 0,
2793 },
2794 )
2795 .await
2796 .unwrap();
2797 }
2798
2799 #[tokio::test]
2800 async fn remove_list_element_handles_simple_ack() {
2801 let (dl, state) = MockDataLink::new();
2802 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2803 let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
2804
2805 let mut apdu = [0u8; 32];
2806 let mut w = Writer::new(&mut apdu);
2807 SimpleAck {
2808 invoke_id: 1,
2809 service_choice: SERVICE_REMOVE_LIST_ELEMENT,
2810 }
2811 .encode(&mut w)
2812 .unwrap();
2813 state
2814 .recv
2815 .lock()
2816 .await
2817 .push_back((with_npdu(w.as_written()), addr));
2818
2819 let values = [DataValue::Unsigned(1)];
2820 client
2821 .remove_list_element(
2822 addr,
2823 RemoveListElementRequest {
2824 object_id: ObjectId::new(ObjectType::AnalogValue, 1),
2825 property_id: PropertyId::Proprietary(513),
2826 array_index: None,
2827 elements: &values,
2828 invoke_id: 0,
2829 },
2830 )
2831 .await
2832 .unwrap();
2833 }
2834
2835 #[tokio::test]
2836 async fn atomic_read_file_stream_decodes_complex_ack() {
2837 let (dl, state) = MockDataLink::new();
2838 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2839 let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
2840 let file_object = ObjectId::new(ObjectType::File, 2);
2841
2842 state.recv.lock().await.push_back((
2843 with_npdu(&atomic_read_file_stream_ack_apdu(
2844 1,
2845 true,
2846 &[0xAA, 0xBB, 0xCC],
2847 )),
2848 addr,
2849 ));
2850
2851 let result = client
2852 .atomic_read_file_stream(addr, file_object, 0, 3)
2853 .await
2854 .unwrap();
2855
2856 assert_eq!(
2857 result,
2858 AtomicReadFileResult::Stream {
2859 end_of_file: true,
2860 file_start_position: 0,
2861 file_data: vec![0xAA, 0xBB, 0xCC],
2862 }
2863 );
2864
2865 let sent = state.sent.lock().await;
2866 assert_eq!(sent.len(), 1);
2867 let mut r = Reader::new(&sent[0].1);
2868 let _npdu = Npdu::decode(&mut r).unwrap();
2869 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2870 assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
2871 }
2872
2873 #[tokio::test]
2874 async fn atomic_read_file_record_decodes_complex_ack() {
2875 let (dl, state) = MockDataLink::new();
2876 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2877 let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
2878 let file_object = ObjectId::new(ObjectType::File, 5);
2879
2880 state
2881 .recv
2882 .lock()
2883 .await
2884 .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
2885
2886 let result = client
2887 .atomic_read_file_record(addr, file_object, 7, 2)
2888 .await
2889 .unwrap();
2890
2891 assert_eq!(
2892 result,
2893 AtomicReadFileResult::Record {
2894 end_of_file: false,
2895 file_start_record: 7,
2896 returned_record_count: 2,
2897 file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
2898 }
2899 );
2900 }
2901
2902 #[tokio::test]
2903 async fn atomic_write_file_stream_decodes_complex_ack() {
2904 let (dl, state) = MockDataLink::new();
2905 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2906 let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
2907 let file_object = ObjectId::new(ObjectType::File, 3);
2908
2909 state
2910 .recv
2911 .lock()
2912 .await
2913 .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
2914
2915 let result = client
2916 .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
2917 .await
2918 .unwrap();
2919
2920 assert_eq!(
2921 result,
2922 AtomicWriteFileResult::Stream {
2923 file_start_position: 128
2924 }
2925 );
2926 }
2927
2928 #[tokio::test]
2929 async fn atomic_write_file_record_decodes_complex_ack() {
2930 let (dl, state) = MockDataLink::new();
2931 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2932 let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
2933 let file_object = ObjectId::new(ObjectType::File, 9);
2934 let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
2935
2936 state
2937 .recv
2938 .lock()
2939 .await
2940 .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
2941
2942 let result = client
2943 .atomic_write_file_record(addr, file_object, 7, &records)
2944 .await
2945 .unwrap();
2946
2947 assert_eq!(
2948 result,
2949 AtomicWriteFileResult::Record {
2950 file_start_record: 7
2951 }
2952 );
2953 }
2954
2955 #[tokio::test]
2956 async fn read_properties_decodes_complex_ack() {
2957 let (dl, state) = MockDataLink::new();
2958 let client = BacnetClient::with_datalink(dl);
2959 let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
2960 let object_id = ObjectId::new(ObjectType::Device, 1);
2961
2962 let mut apdu_buf = [0u8; 256];
2963 let mut w = Writer::new(&mut apdu_buf);
2964 ComplexAckHeader {
2965 segmented: false,
2966 more_follows: false,
2967 invoke_id: 1,
2968 sequence_number: None,
2969 proposed_window_size: None,
2970 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2971 }
2972 .encode(&mut w)
2973 .unwrap();
2974 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
2975 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
2976 .encode(&mut w)
2977 .unwrap();
2978 encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
2979 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
2980 .encode(&mut w)
2981 .unwrap();
2982 encode_app_real(&mut w, 55.5).unwrap();
2983 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
2984 .encode(&mut w)
2985 .unwrap();
2986 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
2987 .encode(&mut w)
2988 .unwrap();
2989
2990 state
2991 .recv
2992 .lock()
2993 .await
2994 .push_back((with_npdu(w.as_written()), addr));
2995
2996 let values = client
2997 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
2998 .await
2999 .unwrap();
3000 assert_eq!(values.len(), 1);
3001 assert_eq!(values[0].0, PropertyId::PresentValue);
3002 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3003
3004 let sent = state.sent.lock().await;
3005 assert_eq!(sent.len(), 1);
3006 let mut r = Reader::new(&sent[0].1);
3007 let _npdu = Npdu::decode(&mut r).unwrap();
3008 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3009 assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3010 }
3011
3012 #[tokio::test]
3013 async fn read_property_multiple_reassembles_segmented_complex_ack() {
3014 let (dl, state) = MockDataLink::new();
3015 let client = BacnetClient::with_datalink(dl);
3016 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3017 let object_id = ObjectId::new(ObjectType::Device, 1);
3018
3019 let mut payload_buf = [0u8; 256];
3020 let mut pw = Writer::new(&mut payload_buf);
3021 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3022 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3023 .encode(&mut pw)
3024 .unwrap();
3025 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3026 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3027 .encode(&mut pw)
3028 .unwrap();
3029 encode_app_real(&mut pw, 66.0).unwrap();
3030 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3031 .encode(&mut pw)
3032 .unwrap();
3033 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3034 .encode(&mut pw)
3035 .unwrap();
3036 let payload = pw.as_written();
3037 let split = payload.len() / 2;
3038
3039 let mut apdu1 = [0u8; 256];
3040 let mut w1 = Writer::new(&mut apdu1);
3041 ComplexAckHeader {
3042 segmented: true,
3043 more_follows: true,
3044 invoke_id: 1,
3045 sequence_number: Some(0),
3046 proposed_window_size: Some(1),
3047 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3048 }
3049 .encode(&mut w1)
3050 .unwrap();
3051 w1.write_all(&payload[..split]).unwrap();
3052
3053 let mut apdu2 = [0u8; 256];
3054 let mut w2 = Writer::new(&mut apdu2);
3055 ComplexAckHeader {
3056 segmented: true,
3057 more_follows: false,
3058 invoke_id: 1,
3059 sequence_number: Some(1),
3060 proposed_window_size: Some(1),
3061 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3062 }
3063 .encode(&mut w2)
3064 .unwrap();
3065 w2.write_all(&payload[split..]).unwrap();
3066
3067 state
3068 .recv
3069 .lock()
3070 .await
3071 .push_back((with_npdu(w1.as_written()), addr));
3072 state
3073 .recv
3074 .lock()
3075 .await
3076 .push_back((with_npdu(w2.as_written()), addr));
3077
3078 let values = client
3079 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3080 .await
3081 .unwrap();
3082 assert_eq!(values.len(), 1);
3083 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3084
3085 let sent = state.sent.lock().await;
3086 assert!(sent.len() >= 3);
3087
3088 let mut saw_segment_ack = 0usize;
3089 for (_, frame) in sent.iter().skip(1) {
3090 let mut r = Reader::new(frame);
3091 let _npdu = Npdu::decode(&mut r).unwrap();
3092 let apdu = r.read_exact(r.remaining()).unwrap();
3093 if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3094 let mut sr = Reader::new(apdu);
3095 let sack = SegmentAck::decode(&mut sr).unwrap();
3096 assert_eq!(sack.invoke_id, 1);
3097 saw_segment_ack += 1;
3098 }
3099 }
3100 assert!(saw_segment_ack >= 1);
3101 }
3102
3103 #[tokio::test]
3104 async fn read_property_multiple_tolerates_duplicate_segment() {
3105 let (dl, state) = MockDataLink::new();
3106 let client = BacnetClient::with_datalink(dl);
3107 let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3108 let object_id = ObjectId::new(ObjectType::Device, 1);
3109
3110 let mut payload_buf = [0u8; 256];
3111 let mut pw = Writer::new(&mut payload_buf);
3112 encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3113 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3114 .encode(&mut pw)
3115 .unwrap();
3116 encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3117 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3118 .encode(&mut pw)
3119 .unwrap();
3120 encode_app_real(&mut pw, 66.0).unwrap();
3121 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3122 .encode(&mut pw)
3123 .unwrap();
3124 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3125 .encode(&mut pw)
3126 .unwrap();
3127 let payload = pw.as_written();
3128 let split = payload.len() / 2;
3129
3130 let mut apdu1 = [0u8; 256];
3131 let mut w1 = Writer::new(&mut apdu1);
3132 ComplexAckHeader {
3133 segmented: true,
3134 more_follows: true,
3135 invoke_id: 1,
3136 sequence_number: Some(0),
3137 proposed_window_size: Some(1),
3138 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3139 }
3140 .encode(&mut w1)
3141 .unwrap();
3142 w1.write_all(&payload[..split]).unwrap();
3143
3144 let mut dup = [0u8; 256];
3145 let mut wd = Writer::new(&mut dup);
3146 ComplexAckHeader {
3147 segmented: true,
3148 more_follows: true,
3149 invoke_id: 1,
3150 sequence_number: Some(0),
3151 proposed_window_size: Some(1),
3152 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3153 }
3154 .encode(&mut wd)
3155 .unwrap();
3156 wd.write_all(&payload[..split]).unwrap();
3157
3158 let mut apdu2 = [0u8; 256];
3159 let mut w2 = Writer::new(&mut apdu2);
3160 ComplexAckHeader {
3161 segmented: true,
3162 more_follows: false,
3163 invoke_id: 1,
3164 sequence_number: Some(1),
3165 proposed_window_size: Some(1),
3166 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3167 }
3168 .encode(&mut w2)
3169 .unwrap();
3170 w2.write_all(&payload[split..]).unwrap();
3171
3172 {
3173 let mut recv = state.recv.lock().await;
3174 recv.push_back((with_npdu(w1.as_written()), addr));
3175 recv.push_back((with_npdu(wd.as_written()), addr));
3176 recv.push_back((with_npdu(w2.as_written()), addr));
3177 }
3178
3179 let values = client
3180 .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3181 .await
3182 .unwrap();
3183 assert_eq!(values.len(), 1);
3184 assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3185 }
3186
3187 #[tokio::test]
3188 async fn write_properties_handles_simple_ack() {
3189 let (dl, state) = MockDataLink::new();
3190 let client = BacnetClient::with_datalink(dl);
3191 let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3192 let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3193
3194 let mut apdu_buf = [0u8; 32];
3195 let mut w = Writer::new(&mut apdu_buf);
3196 SimpleAck {
3197 invoke_id: 1,
3198 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3199 }
3200 .encode(&mut w)
3201 .unwrap();
3202 state
3203 .recv
3204 .lock()
3205 .await
3206 .push_back((with_npdu(w.as_written()), addr));
3207
3208 let writes = [PropertyWriteSpec {
3209 property_id: PropertyId::PresentValue,
3210 array_index: None,
3211 value: DataValue::Real(12.5),
3212 priority: Some(8),
3213 }];
3214 client
3215 .write_property_multiple(addr, object_id, &writes)
3216 .await
3217 .unwrap();
3218
3219 let sent = state.sent.lock().await;
3220 assert_eq!(sent.len(), 1);
3221 let mut r = Reader::new(&sent[0].1);
3222 let _npdu = Npdu::decode(&mut r).unwrap();
3223 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3224 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3225 }
3226
3227 #[tokio::test]
3228 async fn subscribe_cov_handles_simple_ack() {
3229 let (dl, state) = MockDataLink::new();
3230 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3231 let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3232
3233 let mut apdu_buf = [0u8; 32];
3234 let mut w = Writer::new(&mut apdu_buf);
3235 SimpleAck {
3236 invoke_id: 1,
3237 service_choice: SERVICE_SUBSCRIBE_COV,
3238 }
3239 .encode(&mut w)
3240 .unwrap();
3241 state
3242 .recv
3243 .lock()
3244 .await
3245 .push_back((with_npdu(w.as_written()), addr));
3246
3247 client
3248 .subscribe_cov(
3249 addr,
3250 SubscribeCovRequest {
3251 subscriber_process_id: 10,
3252 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3253 issue_confirmed_notifications: Some(false),
3254 lifetime_seconds: Some(300),
3255 invoke_id: 0,
3256 },
3257 )
3258 .await
3259 .unwrap();
3260
3261 let sent = state.sent.lock().await;
3262 assert_eq!(sent.len(), 1);
3263 let mut r = Reader::new(&sent[0].1);
3264 let _npdu = Npdu::decode(&mut r).unwrap();
3265 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3266 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3267 }
3268
3269 #[tokio::test]
3270 async fn subscribe_cov_property_handles_simple_ack() {
3271 let (dl, state) = MockDataLink::new();
3272 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3273 let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3274
3275 let mut apdu_buf = [0u8; 32];
3276 let mut w = Writer::new(&mut apdu_buf);
3277 SimpleAck {
3278 invoke_id: 1,
3279 service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3280 }
3281 .encode(&mut w)
3282 .unwrap();
3283 state
3284 .recv
3285 .lock()
3286 .await
3287 .push_back((with_npdu(w.as_written()), addr));
3288
3289 client
3290 .subscribe_cov_property(
3291 addr,
3292 SubscribeCovPropertyRequest {
3293 subscriber_process_id: 22,
3294 monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3295 issue_confirmed_notifications: Some(true),
3296 lifetime_seconds: Some(120),
3297 monitored_property_id: PropertyId::PresentValue,
3298 monitored_property_array_index: None,
3299 cov_increment: Some(0.1),
3300 invoke_id: 0,
3301 },
3302 )
3303 .await
3304 .unwrap();
3305
3306 let sent = state.sent.lock().await;
3307 assert_eq!(sent.len(), 1);
3308 let mut r = Reader::new(&sent[0].1);
3309 let _npdu = Npdu::decode(&mut r).unwrap();
3310 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3311 assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3312 }
3313
3314 #[tokio::test]
3315 async fn read_range_by_position_decodes_complex_ack() {
3316 let (dl, state) = MockDataLink::new();
3317 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3318 let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3319 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3320
3321 let mut apdu_buf = [0u8; 256];
3322 let mut w = Writer::new(&mut apdu_buf);
3323 ComplexAckHeader {
3324 segmented: false,
3325 more_follows: false,
3326 invoke_id: 1,
3327 sequence_number: None,
3328 proposed_window_size: None,
3329 service_choice: SERVICE_READ_RANGE,
3330 }
3331 .encode(&mut w)
3332 .unwrap();
3333 encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3334 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3335 Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3336 w.write_u8(5).unwrap();
3337 w.write_u8(0b1110_0000).unwrap();
3338 encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3339 Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3340 encode_app_real(&mut w, 42.0).unwrap();
3341 encode_app_real(&mut w, 43.0).unwrap();
3342 Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3343
3344 state
3345 .recv
3346 .lock()
3347 .await
3348 .push_back((with_npdu(w.as_written()), addr));
3349
3350 let result = client
3351 .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3352 .await
3353 .unwrap();
3354 assert_eq!(result.object_id, object_id);
3355 assert_eq!(result.item_count, 2);
3356 assert_eq!(result.items.len(), 2);
3357 assert!(matches!(
3358 result.items[0],
3359 ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3360 ));
3361 }
3362
3363 #[tokio::test]
3364 async fn read_range_by_sequence_number_encodes_range_selector() {
3365 let (dl, state) = MockDataLink::new();
3366 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3367 let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3368 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3369
3370 state
3371 .recv
3372 .lock()
3373 .await
3374 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3375
3376 let _ = client
3377 .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3378 .await
3379 .unwrap();
3380
3381 let sent = state.sent.lock().await;
3382 let mut r = Reader::new(&sent[0].1);
3383 let _npdu = Npdu::decode(&mut r).unwrap();
3384 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3385 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3386 match Tag::decode(&mut r).unwrap() {
3387 Tag::Context { tag_num: 0, len: 4 } => {
3388 let _ = r.read_exact(4).unwrap();
3389 }
3390 other => panic!("unexpected object id tag: {other:?}"),
3391 }
3392 match Tag::decode(&mut r).unwrap() {
3393 Tag::Context { tag_num: 1, len } => {
3394 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3395 }
3396 other => panic!("unexpected property tag: {other:?}"),
3397 }
3398 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3399 match Tag::decode(&mut r).unwrap() {
3400 Tag::Application {
3401 tag: AppTag::UnsignedInt,
3402 len,
3403 } => {
3404 assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3405 }
3406 other => panic!("unexpected ref seq tag: {other:?}"),
3407 }
3408 match Tag::decode(&mut r).unwrap() {
3409 Tag::Application {
3410 tag: AppTag::SignedInt,
3411 len,
3412 } => {
3413 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3414 }
3415 other => panic!("unexpected count tag: {other:?}"),
3416 }
3417 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3418 }
3419
3420 #[tokio::test]
3421 async fn read_range_by_time_encodes_range_selector() {
3422 let (dl, state) = MockDataLink::new();
3423 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3424 let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3425 let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3426 let date = Date {
3427 year_since_1900: 126,
3428 month: 2,
3429 day: 7,
3430 weekday: 6,
3431 };
3432 let time = Time {
3433 hour: 10,
3434 minute: 11,
3435 second: 12,
3436 hundredths: 13,
3437 };
3438
3439 state
3440 .recv
3441 .lock()
3442 .await
3443 .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3444
3445 let _ = client
3446 .read_range_by_time(
3447 addr,
3448 object_id,
3449 PropertyId::PresentValue,
3450 None,
3451 (date, time),
3452 2,
3453 )
3454 .await
3455 .unwrap();
3456
3457 let sent = state.sent.lock().await;
3458 let mut r = Reader::new(&sent[0].1);
3459 let _npdu = Npdu::decode(&mut r).unwrap();
3460 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3461 assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3462 match Tag::decode(&mut r).unwrap() {
3463 Tag::Context { tag_num: 0, len: 4 } => {
3464 let _ = r.read_exact(4).unwrap();
3465 }
3466 other => panic!("unexpected object id tag: {other:?}"),
3467 }
3468 match Tag::decode(&mut r).unwrap() {
3469 Tag::Context { tag_num: 1, len } => {
3470 let _ = decode_unsigned(&mut r, len as usize).unwrap();
3471 }
3472 other => panic!("unexpected property tag: {other:?}"),
3473 }
3474 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3475 match Tag::decode(&mut r).unwrap() {
3476 Tag::Application {
3477 tag: AppTag::Date,
3478 len: 4,
3479 } => {
3480 let raw = r.read_exact(4).unwrap();
3481 assert_eq!(
3482 raw,
3483 &[date.year_since_1900, date.month, date.day, date.weekday]
3484 );
3485 }
3486 other => panic!("unexpected date tag: {other:?}"),
3487 }
3488 match Tag::decode(&mut r).unwrap() {
3489 Tag::Application {
3490 tag: AppTag::Time,
3491 len: 4,
3492 } => {
3493 let raw = r.read_exact(4).unwrap();
3494 assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3495 }
3496 other => panic!("unexpected time tag: {other:?}"),
3497 }
3498 match Tag::decode(&mut r).unwrap() {
3499 Tag::Application {
3500 tag: AppTag::SignedInt,
3501 len,
3502 } => {
3503 assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3504 }
3505 other => panic!("unexpected count tag: {other:?}"),
3506 }
3507 assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3508 }
3509
3510 #[tokio::test]
3511 async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3512 let (dl, state) = MockDataLink::new();
3513 let client = BacnetClient::with_datalink(dl);
3514 let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3515
3516 let mut apdu = [0u8; 256];
3517 let mut w = Writer::new(&mut apdu);
3518 UnconfirmedRequestHeader {
3519 service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3520 }
3521 .encode(&mut w)
3522 .unwrap();
3523 encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3524 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3525 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3526 encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3527 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3528 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3529 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3530 encode_app_real(&mut w, 73.25).unwrap();
3531 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3532 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3533
3534 state
3535 .recv
3536 .lock()
3537 .await
3538 .push_back((with_npdu(w.as_written()), addr));
3539
3540 let notification = client
3541 .recv_cov_notification(Duration::from_secs(1))
3542 .await
3543 .unwrap()
3544 .unwrap();
3545 assert!(!notification.confirmed);
3546 assert_eq!(notification.subscriber_process_id, 17);
3547 assert_eq!(notification.values.len(), 1);
3548 assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3549 assert!(matches!(
3550 notification.values[0].value,
3551 ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3552 ));
3553
3554 let sent = state.sent.lock().await;
3555 assert!(sent.is_empty());
3556 }
3557
3558 #[tokio::test]
3559 async fn recv_confirmed_cov_notification_sends_simple_ack() {
3560 let (dl, state) = MockDataLink::new();
3561 let client = BacnetClient::with_datalink(dl);
3562 let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3563
3564 let mut apdu = [0u8; 256];
3565 let mut w = Writer::new(&mut apdu);
3566 ConfirmedRequestHeader {
3567 segmented: false,
3568 more_follows: false,
3569 segmented_response_accepted: false,
3570 max_segments: 0,
3571 max_apdu: 5,
3572 invoke_id: 9,
3573 sequence_number: None,
3574 proposed_window_size: None,
3575 service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3576 }
3577 .encode(&mut w)
3578 .unwrap();
3579 encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3580 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3581 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3582 encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3583 Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3584 encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3585 Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3586 encode_app_real(&mut w, 55.0).unwrap();
3587 Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3588 Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3589
3590 state
3591 .recv
3592 .lock()
3593 .await
3594 .push_back((with_npdu(w.as_written()), addr));
3595
3596 let notification = client
3597 .recv_cov_notification(Duration::from_secs(1))
3598 .await
3599 .unwrap()
3600 .unwrap();
3601 assert!(notification.confirmed);
3602 assert_eq!(notification.values.len(), 1);
3603
3604 let sent = state.sent.lock().await;
3605 assert_eq!(sent.len(), 1);
3606 let mut r = Reader::new(&sent[0].1);
3607 let _npdu = Npdu::decode(&mut r).unwrap();
3608 let ack = SimpleAck::decode(&mut r).unwrap();
3609 assert_eq!(ack.invoke_id, 9);
3610 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3611 }
3612
3613 #[tokio::test]
3614 async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3615 let (dl, state) = MockDataLink::new();
3616 let client = BacnetClient::with_datalink(dl);
3617 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3618
3619 let mut apdu = [0u8; 256];
3620 let mut w = Writer::new(&mut apdu);
3621 UnconfirmedRequestHeader {
3622 service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3623 }
3624 .encode(&mut w)
3625 .unwrap();
3626 encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3627 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3628 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3629 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3630 encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3631 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3632 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3633 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3634 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3635 encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3636 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3637 Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3638 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3639 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3640 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3641 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3642 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3643 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3644 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3645
3646 state
3647 .recv
3648 .lock()
3649 .await
3650 .push_back((with_npdu(w.as_written()), addr));
3651
3652 let notification: EventNotification = client
3653 .recv_event_notification(Duration::from_secs(1))
3654 .await
3655 .unwrap()
3656 .unwrap();
3657 assert!(!notification.confirmed);
3658 assert_eq!(notification.process_id, 99);
3659 assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3660 assert_eq!(notification.ack_required, Some(true));
3661 assert_eq!(notification.from_state, Some(EventState::Offnormal));
3662 assert_eq!(notification.to_state, Some(EventState::Normal));
3663 assert_eq!(notification.notify_type, 0);
3664
3665 let sent = state.sent.lock().await;
3666 assert!(sent.is_empty());
3667 }
3668
3669 #[tokio::test]
3670 async fn recv_confirmed_event_notification_sends_simple_ack() {
3671 let (dl, state) = MockDataLink::new();
3672 let client = BacnetClient::with_datalink(dl);
3673 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
3674
3675 let mut apdu = [0u8; 256];
3676 let mut w = Writer::new(&mut apdu);
3677 ConfirmedRequestHeader {
3678 segmented: false,
3679 more_follows: false,
3680 segmented_response_accepted: false,
3681 max_segments: 0,
3682 max_apdu: 5,
3683 invoke_id: 11,
3684 sequence_number: None,
3685 proposed_window_size: None,
3686 service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
3687 }
3688 .encode(&mut w)
3689 .unwrap();
3690 encode_ctx_unsigned(&mut w, 0, 100).unwrap();
3691 encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3692 encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3693 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3694 encode_ctx_unsigned(&mut w, 1, 56).unwrap();
3695 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3696 encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3697 encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3698 encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3699 encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3700 encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3701 encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3702 Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3703 Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3704 encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3705 Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3706 Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3707
3708 state
3709 .recv
3710 .lock()
3711 .await
3712 .push_back((with_npdu(w.as_written()), addr));
3713
3714 let notification = client
3715 .recv_event_notification(Duration::from_secs(1))
3716 .await
3717 .unwrap()
3718 .unwrap();
3719 assert!(notification.confirmed);
3720
3721 let sent = state.sent.lock().await;
3722 assert_eq!(sent.len(), 1);
3723 let mut r = Reader::new(&sent[0].1);
3724 let _npdu = Npdu::decode(&mut r).unwrap();
3725 let ack = SimpleAck::decode(&mut r).unwrap();
3726 assert_eq!(ack.invoke_id, 11);
3727 assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
3728 }
3729
3730 #[tokio::test]
3731 async fn write_property_multiple_segments_large_request() {
3732 let (dl, state) = MockDataLink::new();
3733 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3734 let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
3735 let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
3736
3737 {
3738 let mut recv = state.recv.lock().await;
3739 for seq in 0u8..=254 {
3740 let mut apdu = [0u8; 16];
3741 let mut w = Writer::new(&mut apdu);
3742 SegmentAck {
3743 negative_ack: false,
3744 sent_by_server: true,
3745 invoke_id: 1,
3746 sequence_number: seq,
3747 actual_window_size: 1,
3748 }
3749 .encode(&mut w)
3750 .unwrap();
3751 recv.push_back((with_npdu(w.as_written()), addr));
3752 }
3753
3754 let mut apdu = [0u8; 16];
3755 let mut w = Writer::new(&mut apdu);
3756 SimpleAck {
3757 invoke_id: 1,
3758 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3759 }
3760 .encode(&mut w)
3761 .unwrap();
3762 recv.push_back((with_npdu(w.as_written()), addr));
3763 }
3764
3765 let writes: Vec<PropertyWriteSpec> = (0..180)
3766 .map(|_| PropertyWriteSpec {
3767 property_id: PropertyId::Description,
3768 array_index: None,
3769 value: DataValue::CharacterString(
3770 "rustbac segmented write test payload................................................................",
3771 ),
3772 priority: None,
3773 })
3774 .collect();
3775
3776 client
3777 .write_property_multiple(addr, object_id, &writes)
3778 .await
3779 .unwrap();
3780
3781 let sent = state.sent.lock().await;
3782 assert!(sent.len() > 1);
3783
3784 let mut seqs = Vec::new();
3785 let mut saw_more_follows = false;
3786 let mut saw_last = false;
3787 for (_, frame) in sent.iter() {
3788 let mut r = Reader::new(frame);
3789 let _npdu = Npdu::decode(&mut r).unwrap();
3790 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3791 assert!(hdr.segmented);
3792 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3793 if hdr.more_follows {
3794 saw_more_follows = true;
3795 } else {
3796 saw_last = true;
3797 }
3798 seqs.push(hdr.sequence_number.unwrap());
3799 }
3800
3801 assert!(saw_more_follows);
3802 assert!(saw_last);
3803 for (idx, seq) in seqs.iter().enumerate() {
3804 assert_eq!(*seq as usize, idx);
3805 }
3806 }
3807
3808 #[tokio::test]
3809 async fn write_property_multiple_uses_configured_segment_window() {
3810 let (dl, state) = MockDataLink::new();
3811 let client = BacnetClient::with_datalink(dl)
3812 .with_response_timeout(Duration::from_secs(1))
3813 .with_segmented_request_window_size(4);
3814 let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
3815 let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
3816
3817 {
3818 let mut recv = state.recv.lock().await;
3819 for seq in 0u8..=254 {
3820 let mut apdu = [0u8; 16];
3821 let mut w = Writer::new(&mut apdu);
3822 SegmentAck {
3823 negative_ack: false,
3824 sent_by_server: true,
3825 invoke_id: 1,
3826 sequence_number: seq,
3827 actual_window_size: 4,
3828 }
3829 .encode(&mut w)
3830 .unwrap();
3831 recv.push_back((with_npdu(w.as_written()), addr));
3832 }
3833
3834 let mut apdu = [0u8; 16];
3835 let mut w = Writer::new(&mut apdu);
3836 SimpleAck {
3837 invoke_id: 1,
3838 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3839 }
3840 .encode(&mut w)
3841 .unwrap();
3842 recv.push_back((with_npdu(w.as_written()), addr));
3843 }
3844
3845 let writes: Vec<PropertyWriteSpec> = (0..180)
3846 .map(|_| PropertyWriteSpec {
3847 property_id: PropertyId::Description,
3848 array_index: None,
3849 value: DataValue::CharacterString(
3850 "rustbac segmented write test payload................................................................",
3851 ),
3852 priority: None,
3853 })
3854 .collect();
3855
3856 client
3857 .write_property_multiple(addr, object_id, &writes)
3858 .await
3859 .unwrap();
3860
3861 let sent = state.sent.lock().await;
3862 assert!(sent.len() > 4);
3863 for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
3864 let mut r = Reader::new(frame);
3865 let _npdu = Npdu::decode(&mut r).unwrap();
3866 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3867 assert!(hdr.segmented);
3868 assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3869 assert_eq!(hdr.sequence_number, Some(idx as u8));
3870 assert_eq!(hdr.proposed_window_size, Some(4));
3871 }
3872 }
3873
3874 #[tokio::test]
3875 async fn write_property_multiple_adapts_window_to_peer_ack_window() {
3876 let (dl, state) = MockDataLink::new();
3877 let client = BacnetClient::with_datalink(dl)
3878 .with_response_timeout(Duration::from_secs(1))
3879 .with_segmented_request_window_size(4);
3880 let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
3881 let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
3882
3883 {
3884 let mut recv = state.recv.lock().await;
3885 for seq in 0u8..=254 {
3886 let mut apdu = [0u8; 16];
3887 let mut w = Writer::new(&mut apdu);
3888 SegmentAck {
3889 negative_ack: false,
3890 sent_by_server: true,
3891 invoke_id: 1,
3892 sequence_number: seq,
3893 actual_window_size: 2,
3894 }
3895 .encode(&mut w)
3896 .unwrap();
3897 recv.push_back((with_npdu(w.as_written()), addr));
3898 }
3899
3900 let mut apdu = [0u8; 16];
3901 let mut w = Writer::new(&mut apdu);
3902 SimpleAck {
3903 invoke_id: 1,
3904 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3905 }
3906 .encode(&mut w)
3907 .unwrap();
3908 recv.push_back((with_npdu(w.as_written()), addr));
3909 }
3910
3911 let writes: Vec<PropertyWriteSpec> = (0..180)
3912 .map(|_| PropertyWriteSpec {
3913 property_id: PropertyId::Description,
3914 array_index: None,
3915 value: DataValue::CharacterString(
3916 "rustbac segmented write test payload................................................................",
3917 ),
3918 priority: None,
3919 })
3920 .collect();
3921
3922 client
3923 .write_property_multiple(addr, object_id, &writes)
3924 .await
3925 .unwrap();
3926
3927 let sent = state.sent.lock().await;
3928 let mut saw_adapted_window = false;
3929 for (_, frame) in sent.iter() {
3930 let mut r = Reader::new(frame);
3931 let _npdu = Npdu::decode(&mut r).unwrap();
3932 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3933 if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
3934 saw_adapted_window = true;
3935 break;
3936 }
3937 }
3938 assert!(saw_adapted_window);
3939 }
3940
3941 #[tokio::test]
3942 async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
3943 let (dl, state) = MockDataLink::new();
3944 let client = BacnetClient::with_datalink(dl)
3945 .with_response_timeout(Duration::from_secs(1))
3946 .with_segmented_request_window_size(1)
3947 .with_segmented_request_retries(1);
3948 let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
3949 let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
3950
3951 {
3952 let mut recv = state.recv.lock().await;
3953
3954 let mut nack_apdu = [0u8; 16];
3955 let mut nack_w = Writer::new(&mut nack_apdu);
3956 SegmentAck {
3957 negative_ack: true,
3958 sent_by_server: true,
3959 invoke_id: 1,
3960 sequence_number: 0,
3961 actual_window_size: 1,
3962 }
3963 .encode(&mut nack_w)
3964 .unwrap();
3965 recv.push_back((with_npdu(nack_w.as_written()), addr));
3966
3967 for seq in 0u8..=254 {
3968 let mut apdu = [0u8; 16];
3969 let mut w = Writer::new(&mut apdu);
3970 SegmentAck {
3971 negative_ack: false,
3972 sent_by_server: true,
3973 invoke_id: 1,
3974 sequence_number: seq,
3975 actual_window_size: 1,
3976 }
3977 .encode(&mut w)
3978 .unwrap();
3979 recv.push_back((with_npdu(w.as_written()), addr));
3980 }
3981
3982 let mut apdu = [0u8; 16];
3983 let mut w = Writer::new(&mut apdu);
3984 SimpleAck {
3985 invoke_id: 1,
3986 service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3987 }
3988 .encode(&mut w)
3989 .unwrap();
3990 recv.push_back((with_npdu(w.as_written()), addr));
3991 }
3992
3993 let writes: Vec<PropertyWriteSpec> = (0..180)
3994 .map(|_| PropertyWriteSpec {
3995 property_id: PropertyId::Description,
3996 array_index: None,
3997 value: DataValue::CharacterString(
3998 "rustbac segmented write test payload................................................................",
3999 ),
4000 priority: None,
4001 })
4002 .collect();
4003
4004 client
4005 .write_property_multiple(addr, object_id, &writes)
4006 .await
4007 .unwrap();
4008
4009 let sent = state.sent.lock().await;
4010 let mut seq0_frames = 0usize;
4011 for (_, frame) in sent.iter() {
4012 let mut r = Reader::new(frame);
4013 let _npdu = Npdu::decode(&mut r).unwrap();
4014 let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4015 if hdr.sequence_number == Some(0) {
4016 seq0_frames += 1;
4017 }
4018 }
4019 assert!(seq0_frames >= 2);
4020 }
4021
4022 #[tokio::test]
4023 async fn read_property_ignores_invalid_frames_until_valid_response() {
4024 let (dl, state) = MockDataLink::new();
4025 let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4026 let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4027 let state_for_task = state.clone();
4028
4029 tokio::spawn(async move {
4030 tokio::time::sleep(Duration::from_millis(20)).await;
4031 let mut apdu = [0u8; 128];
4032 let mut w = Writer::new(&mut apdu);
4033 ComplexAckHeader {
4034 segmented: false,
4035 more_follows: false,
4036 invoke_id: 1,
4037 sequence_number: None,
4038 proposed_window_size: None,
4039 service_choice: SERVICE_READ_PROPERTY,
4040 }
4041 .encode(&mut w)
4042 .unwrap();
4043 encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4044 encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4045 Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4046 encode_app_real(&mut w, 77.0).unwrap();
4047 Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4048 state_for_task
4049 .recv
4050 .lock()
4051 .await
4052 .push_back((with_npdu(w.as_written()), addr));
4053 });
4054
4055 let value = client
4056 .read_property(
4057 addr,
4058 ObjectId::new(ObjectType::Device, 1),
4059 PropertyId::PresentValue,
4060 )
4061 .await
4062 .unwrap();
4063 assert!(matches!(
4064 value,
4065 ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4066 ));
4067 }
4068
4069 #[tokio::test]
4070 async fn read_property_maps_reject() {
4071 let (dl, state) = MockDataLink::new();
4072 let client = BacnetClient::with_datalink(dl);
4073 let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4074
4075 let mut apdu = [0u8; 8];
4076 let mut w = Writer::new(&mut apdu);
4077 w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4078 w.write_u8(1).unwrap(); w.write_u8(2).unwrap(); state
4081 .recv
4082 .lock()
4083 .await
4084 .push_back((with_npdu(w.as_written()), addr));
4085
4086 let err = client
4087 .read_property(
4088 addr,
4089 ObjectId::new(ObjectType::Device, 1),
4090 PropertyId::ObjectName,
4091 )
4092 .await
4093 .unwrap_err();
4094 assert!(matches!(
4095 err,
4096 crate::ClientError::RemoteReject { reason: 2 }
4097 ));
4098 }
4099
4100 #[tokio::test]
4101 async fn read_property_maps_remote_error_details() {
4102 let (dl, state) = MockDataLink::new();
4103 let client = BacnetClient::with_datalink(dl);
4104 let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4105
4106 let mut apdu = [0u8; 16];
4107 let mut w = Writer::new(&mut apdu);
4108 w.write_u8((ApduType::Error as u8) << 4).unwrap();
4109 w.write_u8(1).unwrap(); w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4111 .unwrap();
4112 Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4113 w.write_u8(2).unwrap(); Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4115 w.write_u8(32).unwrap(); state
4118 .recv
4119 .lock()
4120 .await
4121 .push_back((with_npdu(w.as_written()), addr));
4122
4123 let err = client
4124 .read_property(
4125 addr,
4126 ObjectId::new(ObjectType::Device, 1),
4127 PropertyId::ObjectName,
4128 )
4129 .await
4130 .unwrap_err();
4131 assert!(matches!(
4132 err,
4133 crate::ClientError::RemoteServiceError {
4134 service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4135 error_class_raw: Some(2),
4136 error_code_raw: Some(32),
4137 error_class: Some(rustbac_core::types::ErrorClass::Property),
4138 error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4139 }
4140 ));
4141 }
4142
4143 #[tokio::test]
4144 async fn write_property_maps_abort() {
4145 let (dl, state) = MockDataLink::new();
4146 let client = BacnetClient::with_datalink(dl);
4147 let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4148
4149 let mut apdu = [0u8; 8];
4150 let mut w = Writer::new(&mut apdu);
4151 w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); w.write_u8(1).unwrap(); w.write_u8(9).unwrap(); state
4155 .recv
4156 .lock()
4157 .await
4158 .push_back((with_npdu(w.as_written()), addr));
4159
4160 let req = rustbac_core::services::write_property::WritePropertyRequest {
4161 object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4162 property_id: PropertyId::PresentValue,
4163 value: DataValue::Real(10.0),
4164 priority: Some(8),
4165 ..Default::default()
4166 };
4167 let err = client.write_property(addr, req).await.unwrap_err();
4168 assert!(matches!(
4169 err,
4170 crate::ClientError::RemoteAbort {
4171 reason: 9,
4172 server: true
4173 }
4174 ));
4175 }
4176
4177 #[tokio::test]
4178 async fn read_property_multiple_returns_owned_string() {
4179 let (dl, state) = MockDataLink::new();
4180 let client = BacnetClient::with_datalink(dl);
4181 let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4182 let object_id = ObjectId::new(ObjectType::Device, 1);
4183
4184 let mut apdu_buf = [0u8; 256];
4185 let mut w = Writer::new(&mut apdu_buf);
4186 ComplexAckHeader {
4187 segmented: false,
4188 more_follows: false,
4189 invoke_id: 1,
4190 sequence_number: None,
4191 proposed_window_size: None,
4192 service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4193 }
4194 .encode(&mut w)
4195 .unwrap();
4196 encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4197 rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4198 .encode(&mut w)
4199 .unwrap();
4200 encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4201 rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4202 .encode(&mut w)
4203 .unwrap();
4204 rustbac_core::services::value_codec::encode_application_data_value(
4205 &mut w,
4206 &DataValue::CharacterString("AHU-1"),
4207 )
4208 .unwrap();
4209 rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4210 .encode(&mut w)
4211 .unwrap();
4212 rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4213 .encode(&mut w)
4214 .unwrap();
4215
4216 state
4217 .recv
4218 .lock()
4219 .await
4220 .push_back((with_npdu(w.as_written()), addr));
4221
4222 let values = client
4223 .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4224 .await
4225 .unwrap();
4226 assert_eq!(values.len(), 1);
4227 assert_eq!(values[0].0, PropertyId::ObjectName);
4228 assert!(matches!(
4229 &values[0].1,
4230 ClientDataValue::CharacterString(s) if s == "AHU-1"
4231 ));
4232 }
4233
4234 #[tokio::test]
4235 async fn new_sc_rejects_invalid_endpoint() {
4236 let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4237 assert!(matches!(err, crate::ClientError::DataLink(_)));
4238 }
4239}