Skip to main content

rustbac_client/
client.rs

1use crate::{
2    AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3    ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4    DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5    EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9    AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10    SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15    AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18    AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19    SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22    AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25    AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32    DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33    ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36    EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37    GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40    EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41    SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49    AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50    SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53    CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54    SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57    ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58    SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61    ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64    PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65    ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70    SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77    PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78    SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83    BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::{HashMap, HashSet};
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::sync::RwLock;
89use std::time::Duration;
90use tokio::sync::Mutex;
91use tokio::task::JoinHandle;
92use tokio::time::{timeout, Instant};
93
94const MIN_SEGMENT_DATA_LEN: usize = 32;
95const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
96
97/// High-level async BACnet client.
98///
99/// `BacnetClient<D>` wraps any [`DataLink`] transport and exposes ergonomic methods for common
100/// BACnet operations: reading and writing properties, device and object discovery, COV
101/// subscriptions, alarm/event services, file I/O, and device management.
102///
103/// The invoke-id counter and the per-request I/O lock are stored behind `Mutex`es, so the
104/// client is intentionally not `Clone` — wrap it in an `Arc` to share it across tasks.
105///
106/// # Construction
107///
108/// - UDP/IP: [`BacnetClient::new()`] (local broadcast) or [`BacnetClient::new_foreign()`]
109///   (register as a foreign device with a BBMD).
110/// - BACnet/SC (WebSocket): [`BacnetClient::new_sc()`].
111/// - Custom transport: [`BacnetClient::with_datalink()`].
112#[derive(Debug)]
113pub struct BacnetClient<D: DataLink> {
114    datalink: D,
115    invoke_id: Mutex<u8>,
116    request_io_lock: Mutex<()>,
117    response_timeout: Duration,
118    segmented_request_window_size: u8,
119    segmented_request_retries: u8,
120    segment_ack_timeout: Duration,
121    /// Peer max-APDU sizes in bytes, populated from I-Am responses via `who_is`.
122    capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
123}
124
125/// Handle for a background task that periodically re-registers this client as a BACnet
126/// foreign device with the BBMD.
127///
128/// Dropping this value aborts the renewal task automatically. Call [`ForeignDeviceRenewal::stop`]
129/// to abort it explicitly.
130#[derive(Debug)]
131pub struct ForeignDeviceRenewal {
132    task: JoinHandle<()>,
133}
134
135impl ForeignDeviceRenewal {
136    /// Abort the background renewal task immediately.
137    pub fn stop(self) {
138        self.task.abort();
139    }
140}
141
142impl Drop for ForeignDeviceRenewal {
143    fn drop(&mut self) {
144        self.task.abort();
145    }
146}
147
148impl BacnetClient<BacnetIpTransport> {
149    /// Create a UDP/IP BACnet client bound to an ephemeral port on all interfaces.
150    ///
151    /// Returns [`ClientError::DataLink`] if the socket cannot be bound.
152    pub async fn new() -> Result<Self, ClientError> {
153        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
154        let datalink = BacnetIpTransport::bind(bind_addr).await?;
155        Ok(Self {
156            datalink,
157            invoke_id: Mutex::new(1),
158            request_io_lock: Mutex::new(()),
159            response_timeout: Duration::from_secs(3),
160            segmented_request_window_size: 16,
161            segmented_request_retries: 2,
162            segment_ack_timeout: Duration::from_millis(500),
163            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
164        })
165    }
166
167    /// Create a UDP/IP BACnet client registered as a foreign device with the BBMD at
168    /// `bbmd_addr`.
169    ///
170    /// `ttl_seconds` is the time-to-live for the foreign-device registration. Returns
171    /// [`ClientError::DataLink`] if the socket cannot be bound or the initial registration
172    /// request fails.
173    pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
174        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
175        let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
176        datalink.register_foreign_device(ttl_seconds).await?;
177        Ok(Self {
178            datalink,
179            invoke_id: Mutex::new(1),
180            request_io_lock: Mutex::new(()),
181            response_timeout: Duration::from_secs(3),
182            segmented_request_window_size: 16,
183            segmented_request_retries: 2,
184            segment_ack_timeout: Duration::from_millis(500),
185            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
186        })
187    }
188
189    /// Re-register this client as a foreign device with the BBMD, using `ttl_seconds` as the new
190    /// time-to-live.
191    pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
192        let _io = self.request_io_lock.lock().await;
193        self.datalink.register_foreign_device(ttl_seconds).await?;
194        Ok(())
195    }
196
197    /// Read the Broadcast Distribution Table (BDT) from the BBMD.
198    pub async fn read_broadcast_distribution_table(
199        &self,
200    ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
201        let _io = self.request_io_lock.lock().await;
202        self.datalink
203            .read_broadcast_distribution_table()
204            .await
205            .map_err(ClientError::from)
206    }
207
208    /// Write (replace) the Broadcast Distribution Table on the BBMD with the given `entries`.
209    pub async fn write_broadcast_distribution_table(
210        &self,
211        entries: &[BroadcastDistributionEntry],
212    ) -> Result<(), ClientError> {
213        let _io = self.request_io_lock.lock().await;
214        self.datalink
215            .write_broadcast_distribution_table(entries)
216            .await?;
217        Ok(())
218    }
219
220    /// Read the Foreign Device Table (FDT) from the BBMD.
221    pub async fn read_foreign_device_table(
222        &self,
223    ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
224        let _io = self.request_io_lock.lock().await;
225        self.datalink
226            .read_foreign_device_table()
227            .await
228            .map_err(ClientError::from)
229    }
230
231    /// Delete the foreign device entry for `address` from the BBMD's Foreign Device Table.
232    pub async fn delete_foreign_device_table_entry(
233        &self,
234        address: SocketAddrV4,
235    ) -> Result<(), ClientError> {
236        let _io = self.request_io_lock.lock().await;
237        self.datalink
238            .delete_foreign_device_table_entry(address)
239            .await?;
240        Ok(())
241    }
242
243    /// Spawn a background task that re-registers this client as a foreign device at roughly
244    /// 75 % of `ttl_seconds` to keep the registration alive indefinitely.
245    ///
246    /// Returns a [`ForeignDeviceRenewal`] handle; dropping it (or calling `.stop()`) cancels
247    /// the task. Returns an error if `ttl_seconds` is 0.
248    pub fn start_foreign_device_renewal(
249        &self,
250        ttl_seconds: u16,
251    ) -> Result<ForeignDeviceRenewal, ClientError> {
252        if ttl_seconds == 0 {
253            return Err(EncodeError::InvalidLength.into());
254        }
255
256        let datalink = self.datalink.clone();
257        let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
258        let interval = Duration::from_secs(refresh_seconds.max(1));
259        let task = tokio::spawn(async move {
260            loop {
261                tokio::time::sleep(interval).await;
262                if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
263                    log::warn!("foreign device renewal send failed: {err}");
264                }
265            }
266        });
267        Ok(ForeignDeviceRenewal { task })
268    }
269}
270
271impl BacnetClient<BacnetScTransport> {
272    /// Create a BACnet/SC client by connecting to the WebSocket hub at `endpoint`
273    /// (e.g. `"wss://hub.example.com:47808/bacnet"`).
274    pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
275        let datalink = BacnetScTransport::connect(endpoint).await?;
276        Ok(Self::with_datalink(datalink))
277    }
278}
279
280impl<D: DataLink> BacnetClient<D> {
281    /// Create a client wrapping an already-constructed `datalink`.
282    ///
283    /// Use this when you need a custom or pre-configured [`DataLink`] implementation.
284    pub fn with_datalink(datalink: D) -> Self {
285        Self {
286            datalink,
287            invoke_id: Mutex::new(1),
288            request_io_lock: Mutex::new(()),
289            response_timeout: Duration::from_secs(3),
290            segmented_request_window_size: 16,
291            segmented_request_retries: 2,
292            segment_ack_timeout: Duration::from_millis(500),
293            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
294        }
295    }
296
297    /// Override the per-request response timeout (default: 3 s).
298    pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
299        self.response_timeout = timeout;
300        self
301    }
302
303    /// Override the segmented-request window size (number of segments sent before waiting
304    /// for an ACK). Clamped to a minimum of 1. Default: 16.
305    pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
306        self.segmented_request_window_size = window_size.max(1);
307        self
308    }
309
310    /// Override the number of times a segment window is retried after a negative ACK or
311    /// segment-ACK timeout before giving up. Default: 2.
312    pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
313        self.segmented_request_retries = retries;
314        self
315    }
316
317    /// Override the timeout used when waiting for a segment ACK from the remote device
318    /// during a segmented confirmed request. Clamped to a minimum of 1 ms. Default: 500 ms.
319    pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
320        self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
321        self
322    }
323
324    async fn next_invoke_id(&self) -> u8 {
325        let mut lock = self.invoke_id.lock().await;
326        let id = *lock;
327        *lock = lock.wrapping_add(1);
328        if *lock == 0 {
329            *lock = 1;
330        }
331        id
332    }
333
334    async fn send_segment_ack(
335        &self,
336        address: DataLinkAddress,
337        invoke_id: u8,
338        sequence_number: u8,
339        window_size: u8,
340    ) -> Result<(), ClientError> {
341        let mut tx = [0u8; 64];
342        let mut w = Writer::new(&mut tx);
343        Npdu::new(0).encode(&mut w)?;
344        SegmentAck {
345            negative_ack: false,
346            sent_by_server: false,
347            invoke_id,
348            sequence_number,
349            actual_window_size: window_size,
350        }
351        .encode(&mut w)?;
352        self.datalink.send(address, w.as_written()).await?;
353        Ok(())
354    }
355
356    async fn recv_ignoring_invalid_frame(
357        &self,
358        buf: &mut [u8],
359        deadline: Instant,
360    ) -> Result<(usize, DataLinkAddress), ClientError> {
361        loop {
362            let remaining = deadline.saturating_duration_since(Instant::now());
363            if remaining.is_zero() {
364                return Err(ClientError::Timeout);
365            }
366
367            match timeout(remaining, self.datalink.recv(buf)).await {
368                Err(_) => return Err(ClientError::Timeout),
369                Ok(Err(DataLinkError::InvalidFrame)) => continue,
370                Ok(Err(e)) => return Err(e.into()),
371                Ok(Ok(v)) => return Ok(v),
372            }
373        }
374    }
375
376    async fn send_simple_ack(
377        &self,
378        address: DataLinkAddress,
379        invoke_id: u8,
380        service_choice: u8,
381    ) -> Result<(), ClientError> {
382        let mut tx = [0u8; 64];
383        let mut w = Writer::new(&mut tx);
384        Npdu::new(0).encode(&mut w)?;
385        SimpleAck {
386            invoke_id,
387            service_choice,
388        }
389        .encode(&mut w)?;
390        self.datalink.send(address, w.as_written()).await?;
391        Ok(())
392    }
393
394    fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
395    where
396        F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
397    {
398        for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
399            let mut buf = vec![0u8; size];
400            let mut w = Writer::new(&mut buf);
401            match encode(&mut w) {
402                Ok(()) => {
403                    let written_len = w.as_written().len();
404                    buf.truncate(written_len);
405                    return Ok(buf);
406                }
407                Err(EncodeError::BufferTooSmall) => continue,
408                Err(e) => return Err(e.into()),
409            }
410        }
411        Err(ClientError::SegmentedRequestTooLarge)
412    }
413
414    const fn max_apdu_octets(max_apdu_code: u8) -> usize {
415        match max_apdu_code & 0x0f {
416            0 => 50,
417            1 => 128,
418            2 => 206,
419            3 => 480,
420            4 => 1024,
421            5 => 1476,
422            _ => 480,
423        }
424    }
425
426    async fn await_segment_ack(
427        &self,
428        address: DataLinkAddress,
429        invoke_id: u8,
430        service_choice: u8,
431        expected_sequence: u8,
432        deadline: Instant,
433    ) -> Result<SegmentAck, ClientError> {
434        loop {
435            let remaining = deadline.saturating_duration_since(Instant::now());
436            if remaining.is_zero() {
437                return Err(ClientError::Timeout);
438            }
439
440            let mut rx = [0u8; 1500];
441            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
442            let (n, src) = match recv {
443                Err(_) => return Err(ClientError::Timeout),
444                Ok(Err(DataLinkError::InvalidFrame)) => continue,
445                Ok(Err(e)) => return Err(e.into()),
446                Ok(Ok(v)) => v,
447            };
448            if src != address {
449                continue;
450            }
451
452            let Ok(apdu) = extract_apdu(&rx[..n]) else {
453                continue;
454            };
455            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
456            match ApduType::from_u8(first >> 4) {
457                Some(ApduType::SegmentAck) => {
458                    let mut r = Reader::new(apdu);
459                    let ack = SegmentAck::decode(&mut r)?;
460                    if ack.invoke_id != invoke_id || !ack.sent_by_server {
461                        continue;
462                    }
463                    if ack.negative_ack {
464                        return Err(ClientError::SegmentNegativeAck {
465                            sequence_number: ack.sequence_number,
466                        });
467                    }
468                    if ack.sequence_number == expected_sequence {
469                        return Ok(ack);
470                    }
471                }
472                Some(ApduType::Error) => {
473                    let mut r = Reader::new(apdu);
474                    let err = BacnetError::decode(&mut r)?;
475                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
476                        return Err(remote_service_error(err));
477                    }
478                }
479                Some(ApduType::Reject) => {
480                    let mut r = Reader::new(apdu);
481                    let rej = RejectPdu::decode(&mut r)?;
482                    if rej.invoke_id == invoke_id {
483                        return Err(ClientError::RemoteReject { reason: rej.reason });
484                    }
485                }
486                Some(ApduType::Abort) => {
487                    let mut r = Reader::new(apdu);
488                    let abort = AbortPdu::decode(&mut r)?;
489                    if abort.invoke_id == invoke_id {
490                        return Err(ClientError::RemoteAbort {
491                            reason: abort.reason,
492                            server: abort.server,
493                        });
494                    }
495                }
496                _ => continue,
497            }
498        }
499    }
500
501    async fn send_confirmed_request(
502        &self,
503        address: DataLinkAddress,
504        frame: &[u8],
505        deadline: Instant,
506    ) -> Result<(), ClientError> {
507        let mut pr = Reader::new(frame);
508        let _npdu = Npdu::decode(&mut pr)?;
509        let npdu_len = frame.len() - pr.remaining();
510        let npdu_bytes = &frame[..npdu_len];
511        let apdu = &frame[npdu_len..];
512
513        let mut ar = Reader::new(apdu);
514        let header = ConfirmedRequestHeader::decode(&mut ar)?;
515        let service_payload = ar.read_exact(ar.remaining())?;
516
517        // Use the peer's max-APDU if we learned it from a prior I-Am; fall back to
518        // the code declared in the request header (our own capability advertisement).
519        let peer_max_apdu = self
520            .capability_cache
521            .read()
522            .ok()
523            .and_then(|c| c.get(&address).copied())
524            .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
525        let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
526        let segment_count = service_payload.len().div_ceil(segment_data_len);
527
528        if segment_count <= 1 {
529            self.datalink.send(address, frame).await?;
530            return Ok(());
531        }
532
533        if segment_count > usize::from(u8::MAX) + 1 {
534            return Err(ClientError::SegmentedRequestTooLarge);
535        }
536
537        let configured_window_size = self.segmented_request_window_size.max(1);
538        let mut window_size = configured_window_size;
539        let mut peer_window_ceiling = configured_window_size;
540        let mut batch_start = 0usize;
541        while batch_start < segment_count {
542            let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
543            let expected_sequence = (batch_end - 1) as u8;
544
545            let mut frames = Vec::with_capacity(batch_end - batch_start);
546            for segment_index in batch_start..batch_end {
547                let seq = segment_index as u8;
548                let more_follows = segment_index + 1 < segment_count;
549                let start = segment_index * segment_data_len;
550                let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
551                let segment = &service_payload[start..end];
552
553                let seg_header = ConfirmedRequestHeader {
554                    segmented: true,
555                    more_follows,
556                    segmented_response_accepted: header.segmented_response_accepted,
557                    max_segments: header.max_segments,
558                    max_apdu: header.max_apdu,
559                    invoke_id: header.invoke_id,
560                    sequence_number: Some(seq),
561                    proposed_window_size: Some(window_size),
562                    service_choice: header.service_choice,
563                };
564
565                let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
566                let written_len = {
567                    let mut w = Writer::new(&mut tx);
568                    w.write_all(npdu_bytes)?;
569                    seg_header.encode(&mut w)?;
570                    w.write_all(segment)?;
571                    w.as_written().len()
572                };
573                tx.truncate(written_len);
574                frames.push(tx);
575            }
576
577            let mut retries_remaining = self.segmented_request_retries;
578            loop {
579                for frame in &frames {
580                    self.datalink.send(address, frame).await?;
581                }
582
583                if batch_end == segment_count {
584                    break;
585                }
586
587                let remaining = deadline.saturating_duration_since(Instant::now());
588                if remaining.is_zero() {
589                    return Err(ClientError::Timeout);
590                }
591                let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
592                match self
593                    .await_segment_ack(
594                        address,
595                        header.invoke_id,
596                        header.service_choice,
597                        expected_sequence,
598                        ack_wait_deadline,
599                    )
600                    .await
601                {
602                    Ok(ack) => {
603                        peer_window_ceiling =
604                            peer_window_ceiling.min(ack.actual_window_size.max(1));
605                        window_size = window_size
606                            .saturating_add(1)
607                            .min(configured_window_size)
608                            .min(peer_window_ceiling)
609                            .max(1);
610                        break;
611                    }
612                    Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
613                        if retries_remaining > 0 =>
614                    {
615                        retries_remaining -= 1;
616                        window_size = window_size.saturating_div(2).max(1);
617                        continue;
618                    }
619                    Err(e) => return Err(e),
620                }
621            }
622
623            batch_start = batch_end;
624        }
625
626        Ok(())
627    }
628
629    async fn collect_complex_ack_payload(
630        &self,
631        address: DataLinkAddress,
632        invoke_id: u8,
633        service_choice: u8,
634        first_header: ComplexAckHeader,
635        first_payload: &[u8],
636        deadline: Instant,
637    ) -> Result<Vec<u8>, ClientError> {
638        let mut payload = first_payload.to_vec();
639        if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
640            return Err(ClientError::ResponseTooLarge {
641                limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
642            });
643        }
644        if !first_header.segmented {
645            return Ok(payload);
646        }
647
648        let mut last_seq = first_header
649            .sequence_number
650            .ok_or(ClientError::UnsupportedResponse)?;
651        let mut window_size = first_header.proposed_window_size.unwrap_or(1);
652        self.send_segment_ack(address, invoke_id, last_seq, window_size)
653            .await?;
654        let mut more_follows = first_header.more_follows;
655
656        while more_follows {
657            let mut rx = [0u8; 1500];
658            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
659            if src != address {
660                continue;
661            }
662
663            let Ok(apdu) = extract_apdu(&rx[..n]) else {
664                continue;
665            };
666            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
667            match ApduType::from_u8(first >> 4) {
668                Some(ApduType::ComplexAck) => {
669                    let mut r = Reader::new(apdu);
670                    let seg = ComplexAckHeader::decode(&mut r)?;
671                    if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
672                        continue;
673                    }
674                    if !seg.segmented {
675                        return Err(ClientError::UnsupportedResponse);
676                    }
677                    let seq = seg
678                        .sequence_number
679                        .ok_or(ClientError::UnsupportedResponse)?;
680                    if seq == last_seq {
681                        // Duplicate segment: acknowledge again and continue waiting.
682                        self.send_segment_ack(address, invoke_id, last_seq, window_size)
683                            .await?;
684                        continue;
685                    }
686                    if seq != last_seq.wrapping_add(1) {
687                        continue;
688                    }
689
690                    let seg_payload = r.read_exact(r.remaining())?;
691                    if payload.len().saturating_add(seg_payload.len())
692                        > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
693                    {
694                        return Err(ClientError::ResponseTooLarge {
695                            limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
696                        });
697                    }
698                    payload.extend_from_slice(seg_payload);
699
700                    last_seq = seq;
701                    more_follows = seg.more_follows;
702                    window_size = seg.proposed_window_size.unwrap_or(window_size);
703                    self.send_segment_ack(address, invoke_id, last_seq, window_size)
704                        .await?;
705                }
706                Some(ApduType::Error) => {
707                    let mut r = Reader::new(apdu);
708                    let err = BacnetError::decode(&mut r)?;
709                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
710                        return Err(remote_service_error(err));
711                    }
712                }
713                Some(ApduType::Reject) => {
714                    let mut r = Reader::new(apdu);
715                    let rej = RejectPdu::decode(&mut r)?;
716                    if rej.invoke_id == invoke_id {
717                        return Err(ClientError::RemoteReject { reason: rej.reason });
718                    }
719                }
720                Some(ApduType::Abort) => {
721                    let mut r = Reader::new(apdu);
722                    let abort = AbortPdu::decode(&mut r)?;
723                    if abort.invoke_id == invoke_id {
724                        return Err(ClientError::RemoteAbort {
725                            reason: abort.reason,
726                            server: abort.server,
727                        });
728                    }
729                }
730                _ => continue,
731            }
732        }
733
734        Ok(payload)
735    }
736
737    /// Broadcast a Who-Is request and collect I-Am replies for the duration of `wait`.
738    ///
739    /// `range` constrains the device-instance range as `(low, high)`; `None` performs a
740    /// global Who-Is. Duplicate devices (same object id) are deduplicated. Returns
741    /// [`ClientError::DataLink`] on send/receive failure.
742    pub async fn who_is(
743        &self,
744        range: Option<(u32, u32)>,
745        wait: Duration,
746    ) -> Result<Vec<DiscoveredDevice>, ClientError> {
747        // Unconfirmed broadcast — no request_io_lock needed; the recv loop
748        // filters on service_choice so it coexists with concurrent confirmed requests.
749        let req = match range {
750            Some((low, high)) => WhoIsRequest {
751                low_limit: Some(low),
752                high_limit: Some(high),
753            },
754            None => WhoIsRequest::global(),
755        };
756
757        let mut tx = [0u8; 128];
758        let mut w = Writer::new(&mut tx);
759        Npdu::new(0).encode(&mut w)?;
760        req.encode(&mut w)?;
761
762        self.datalink
763            .send(
764                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
765                w.as_written(),
766            )
767            .await?;
768
769        let mut devices = Vec::new();
770        let mut seen = HashSet::new();
771        let deadline = tokio::time::Instant::now() + wait;
772
773        while tokio::time::Instant::now() < deadline {
774            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
775            let mut rx = [0u8; 1500];
776            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
777            match recv {
778                Ok(Ok((n, src))) => {
779                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
780                        continue;
781                    };
782                    let mut r = Reader::new(apdu);
783                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
784                        continue;
785                    };
786                    if unconfirmed.service_choice != SERVICE_I_AM {
787                        continue;
788                    }
789                    let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
790                        continue;
791                    };
792                    if seen.insert(i_am.device_id) {
793                        devices.push(DiscoveredDevice {
794                            address: src,
795                            device_id: Some(i_am.device_id),
796                        });
797                        // Cache the peer's reported max-APDU so segmented
798                        // requests can be sized correctly without a separate read.
799                        if let Ok(mut cache) = self.capability_cache.write() {
800                            cache.insert(src, i_am.max_apdu as usize);
801                        }
802                    }
803                }
804                Ok(Err(DataLinkError::InvalidFrame)) => continue,
805                Ok(Err(e)) => return Err(e.into()),
806                Err(_) => break,
807            }
808        }
809
810        Ok(devices)
811    }
812
813    /// Broadcast a Who-Has request for a specific object id and collect I-Have replies for
814    /// `wait`.
815    ///
816    /// `range` constrains the responding device-instance range; `None` means any device.
817    pub async fn who_has_object_id(
818        &self,
819        range: Option<(u32, u32)>,
820        object_id: ObjectId,
821        wait: Duration,
822    ) -> Result<Vec<DiscoveredObject>, ClientError> {
823        let req = WhoHasRequest {
824            low_limit: range.map(|(low, _)| low),
825            high_limit: range.map(|(_, high)| high),
826            object: WhoHasObject::ObjectId(object_id),
827        };
828        self.who_has(req, wait).await
829    }
830
831    /// Broadcast a Who-Has request for an object by name and collect I-Have replies for `wait`.
832    ///
833    /// `range` constrains the responding device-instance range; `None` means any device.
834    pub async fn who_has_object_name(
835        &self,
836        range: Option<(u32, u32)>,
837        object_name: &str,
838        wait: Duration,
839    ) -> Result<Vec<DiscoveredObject>, ClientError> {
840        let req = WhoHasRequest {
841            low_limit: range.map(|(low, _)| low),
842            high_limit: range.map(|(_, high)| high),
843            object: WhoHasObject::ObjectName(object_name),
844        };
845        self.who_has(req, wait).await
846    }
847
848    async fn who_has(
849        &self,
850        request: WhoHasRequest<'_>,
851        wait: Duration,
852    ) -> Result<Vec<DiscoveredObject>, ClientError> {
853        // Unconfirmed broadcast — same rationale as who_is.
854        let tx = self.encode_with_growth(|w| {
855            Npdu::new(0).encode(w)?;
856            request.encode(w)
857        })?;
858        self.datalink
859            .send(
860                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
861                &tx,
862            )
863            .await?;
864
865        let mut objects = Vec::new();
866        let mut seen = HashSet::new();
867        let deadline = tokio::time::Instant::now() + wait;
868
869        while tokio::time::Instant::now() < deadline {
870            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
871            let mut rx = [0u8; 1500];
872            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
873            match recv {
874                Ok(Ok((n, src))) => {
875                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
876                        continue;
877                    };
878                    let mut r = Reader::new(apdu);
879                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
880                        continue;
881                    };
882                    if unconfirmed.service_choice != SERVICE_I_HAVE {
883                        continue;
884                    }
885                    let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
886                        continue;
887                    };
888                    if !seen.insert((src, i_have.object_id.raw())) {
889                        continue;
890                    }
891                    objects.push(DiscoveredObject {
892                        address: src,
893                        device_id: i_have.device_id,
894                        object_id: i_have.object_id,
895                        object_name: i_have.object_name.to_string(),
896                    });
897                }
898                Ok(Err(DataLinkError::InvalidFrame)) => continue,
899                Ok(Err(e)) => return Err(e.into()),
900                Err(_) => break,
901            }
902        }
903
904        Ok(objects)
905    }
906
907    /// Send a DeviceCommunicationControl request to a device.
908    ///
909    /// `time_duration_seconds` sets the duration for which the state applies; `None` means
910    /// indefinite. `enable_disable` selects the new communication state. `password` is only
911    /// required if the device is password-protected.
912    pub async fn device_communication_control(
913        &self,
914        address: DataLinkAddress,
915        time_duration_seconds: Option<u16>,
916        enable_disable: DeviceCommunicationState,
917        password: Option<&str>,
918    ) -> Result<(), ClientError> {
919        let invoke_id = self.next_invoke_id().await;
920        let request = DeviceCommunicationControlRequest {
921            time_duration_seconds,
922            enable_disable,
923            password,
924            invoke_id,
925        };
926        let tx = self.encode_with_growth(|w| {
927            Npdu::new(0).encode(w)?;
928            request.encode(w)
929        })?;
930        self.await_simple_ack_or_error(
931            address,
932            &tx,
933            invoke_id,
934            SERVICE_DEVICE_COMMUNICATION_CONTROL,
935            self.response_timeout,
936        )
937        .await
938    }
939
940    /// Send a ReinitializeDevice request to a device (e.g. cold-start, warm-start, or backup).
941    ///
942    /// `state` selects the reinitialization type. `password` is sent only if `Some`.
943    pub async fn reinitialize_device(
944        &self,
945        address: DataLinkAddress,
946        state: ReinitializeState,
947        password: Option<&str>,
948    ) -> Result<(), ClientError> {
949        let invoke_id = self.next_invoke_id().await;
950        let request = ReinitializeDeviceRequest {
951            state,
952            password,
953            invoke_id,
954        };
955        let tx = self.encode_with_growth(|w| {
956            Npdu::new(0).encode(w)?;
957            request.encode(w)
958        })?;
959        self.await_simple_ack_or_error(
960            address,
961            &tx,
962            invoke_id,
963            SERVICE_REINITIALIZE_DEVICE,
964            self.response_timeout,
965        )
966        .await
967    }
968
969    /// Send a TimeSynchronization (or UTCTimeSynchronization) request to a device.
970    ///
971    /// Set `utc` to `true` to send the UTC variant of the request.
972    /// This is a fire-and-forget unconfirmed service; no response is awaited.
973    pub async fn time_synchronize(
974        &self,
975        address: DataLinkAddress,
976        date: Date,
977        time: Time,
978        utc: bool,
979    ) -> Result<(), ClientError> {
980        let request = if utc {
981            TimeSynchronizationRequest::utc(date, time)
982        } else {
983            TimeSynchronizationRequest::local(date, time)
984        };
985        let tx = self.encode_with_growth(|w| {
986            Npdu::new(0).encode(w)?;
987            request.encode(w)
988        })?;
989        self.datalink.send(address, &tx).await?;
990        Ok(())
991    }
992
993    /// Create a new object of the given type on the device, letting the device choose the
994    /// instance number. Returns the [`ObjectId`] assigned by the device.
995    pub async fn create_object_by_type(
996        &self,
997        address: DataLinkAddress,
998        object_type: rustbac_core::types::ObjectType,
999    ) -> Result<ObjectId, ClientError> {
1000        self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1001            .await
1002    }
1003
1004    /// Send a CreateObject request to the device and return the [`ObjectId`] of the newly
1005    /// created object. Use this variant when you need fine-grained control over the request
1006    /// (e.g. specifying initial property values).
1007    pub async fn create_object(
1008        &self,
1009        address: DataLinkAddress,
1010        mut request: CreateObjectRequest,
1011    ) -> Result<ObjectId, ClientError> {
1012        request.invoke_id = self.next_invoke_id().await;
1013        let invoke_id = request.invoke_id;
1014        let tx = self.encode_with_growth(|w| {
1015            Npdu::new(0).encode(w)?;
1016            request.encode(w)
1017        })?;
1018        let payload = self
1019            .await_complex_ack_payload_or_error(
1020                address,
1021                &tx,
1022                invoke_id,
1023                SERVICE_CREATE_OBJECT,
1024                self.response_timeout,
1025            )
1026            .await?;
1027        let mut pr = Reader::new(&payload);
1028        let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1029        Ok(parsed.object_id)
1030    }
1031
1032    /// Send a DeleteObject request to remove `object_id` from the device.
1033    pub async fn delete_object(
1034        &self,
1035        address: DataLinkAddress,
1036        object_id: ObjectId,
1037    ) -> Result<(), ClientError> {
1038        let invoke_id = self.next_invoke_id().await;
1039        let request = DeleteObjectRequest {
1040            object_id,
1041            invoke_id,
1042        };
1043        let tx = self.encode_with_growth(|w| {
1044            Npdu::new(0).encode(w)?;
1045            request.encode(w)
1046        })?;
1047        self.await_simple_ack_or_error(
1048            address,
1049            &tx,
1050            invoke_id,
1051            SERVICE_DELETE_OBJECT,
1052            self.response_timeout,
1053        )
1054        .await
1055    }
1056
1057    /// Send an AddListElement request to append elements to a list property on the device.
1058    pub async fn add_list_element(
1059        &self,
1060        address: DataLinkAddress,
1061        mut request: AddListElementRequest<'_>,
1062    ) -> Result<(), ClientError> {
1063        request.invoke_id = self.next_invoke_id().await;
1064        let invoke_id = request.invoke_id;
1065        let tx = self.encode_with_growth(|w| {
1066            Npdu::new(0).encode(w)?;
1067            request.encode(w)
1068        })?;
1069        self.await_simple_ack_or_error(
1070            address,
1071            &tx,
1072            invoke_id,
1073            SERVICE_ADD_LIST_ELEMENT,
1074            self.response_timeout,
1075        )
1076        .await
1077    }
1078
1079    /// Send a RemoveListElement request to delete elements from a list property on the device.
1080    pub async fn remove_list_element(
1081        &self,
1082        address: DataLinkAddress,
1083        mut request: RemoveListElementRequest<'_>,
1084    ) -> Result<(), ClientError> {
1085        request.invoke_id = self.next_invoke_id().await;
1086        let invoke_id = request.invoke_id;
1087        let tx = self.encode_with_growth(|w| {
1088            Npdu::new(0).encode(w)?;
1089            request.encode(w)
1090        })?;
1091        self.await_simple_ack_or_error(
1092            address,
1093            &tx,
1094            invoke_id,
1095            SERVICE_REMOVE_LIST_ELEMENT,
1096            self.response_timeout,
1097        )
1098        .await
1099    }
1100
1101    async fn await_simple_ack_or_error(
1102        &self,
1103        address: DataLinkAddress,
1104        tx: &[u8],
1105        invoke_id: u8,
1106        service_choice: u8,
1107        timeout_window: Duration,
1108    ) -> Result<(), ClientError> {
1109        #[cfg(feature = "tracing")]
1110        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1111        let _io_lock = self.request_io_lock.lock().await;
1112        let deadline = tokio::time::Instant::now() + timeout_window;
1113        self.send_confirmed_request(address, tx, deadline).await?;
1114        while tokio::time::Instant::now() < deadline {
1115            let mut rx = [0u8; 1500];
1116            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1117            if src != address {
1118                continue;
1119            }
1120            let apdu = extract_apdu(&rx[..n])?;
1121            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1122            match ApduType::from_u8(first >> 4) {
1123                Some(ApduType::SimpleAck) => {
1124                    let mut r = Reader::new(apdu);
1125                    let ack = SimpleAck::decode(&mut r)?;
1126                    if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1127                        return Ok(());
1128                    }
1129                }
1130                Some(ApduType::Error) => {
1131                    let mut r = Reader::new(apdu);
1132                    let err = BacnetError::decode(&mut r)?;
1133                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1134                        return Err(remote_service_error(err));
1135                    }
1136                }
1137                Some(ApduType::Reject) => {
1138                    let mut r = Reader::new(apdu);
1139                    let rej = RejectPdu::decode(&mut r)?;
1140                    if rej.invoke_id == invoke_id {
1141                        return Err(ClientError::RemoteReject { reason: rej.reason });
1142                    }
1143                }
1144                Some(ApduType::Abort) => {
1145                    let mut r = Reader::new(apdu);
1146                    let abort = AbortPdu::decode(&mut r)?;
1147                    if abort.invoke_id == invoke_id {
1148                        return Err(ClientError::RemoteAbort {
1149                            reason: abort.reason,
1150                            server: abort.server,
1151                        });
1152                    }
1153                }
1154                _ => continue,
1155            }
1156        }
1157        #[cfg(feature = "tracing")]
1158        tracing::warn!(invoke_id = invoke_id, "request timed out");
1159        Err(ClientError::Timeout)
1160    }
1161
1162    async fn await_complex_ack_payload_or_error(
1163        &self,
1164        address: DataLinkAddress,
1165        tx: &[u8],
1166        invoke_id: u8,
1167        service_choice: u8,
1168        timeout_window: Duration,
1169    ) -> Result<Vec<u8>, ClientError> {
1170        #[cfg(feature = "tracing")]
1171        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1172        let _io_lock = self.request_io_lock.lock().await;
1173        let deadline = tokio::time::Instant::now() + timeout_window;
1174        self.send_confirmed_request(address, tx, deadline).await?;
1175        while tokio::time::Instant::now() < deadline {
1176            let mut rx = [0u8; 1500];
1177            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1178            if src != address {
1179                continue;
1180            }
1181
1182            let apdu = extract_apdu(&rx[..n])?;
1183            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1184            match ApduType::from_u8(first >> 4) {
1185                Some(ApduType::ComplexAck) => {
1186                    let mut r = Reader::new(apdu);
1187                    let ack = ComplexAckHeader::decode(&mut r)?;
1188                    if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1189                        continue;
1190                    }
1191                    return self
1192                        .collect_complex_ack_payload(
1193                            address,
1194                            invoke_id,
1195                            service_choice,
1196                            ack,
1197                            r.read_exact(r.remaining())?,
1198                            deadline,
1199                        )
1200                        .await;
1201                }
1202                Some(ApduType::Error) => {
1203                    let mut r = Reader::new(apdu);
1204                    let err = BacnetError::decode(&mut r)?;
1205                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1206                        return Err(remote_service_error(err));
1207                    }
1208                }
1209                Some(ApduType::Reject) => {
1210                    let mut r = Reader::new(apdu);
1211                    let rej = RejectPdu::decode(&mut r)?;
1212                    if rej.invoke_id == invoke_id {
1213                        return Err(ClientError::RemoteReject { reason: rej.reason });
1214                    }
1215                }
1216                Some(ApduType::Abort) => {
1217                    let mut r = Reader::new(apdu);
1218                    let abort = AbortPdu::decode(&mut r)?;
1219                    if abort.invoke_id == invoke_id {
1220                        return Err(ClientError::RemoteAbort {
1221                            reason: abort.reason,
1222                            server: abort.server,
1223                        });
1224                    }
1225                }
1226                _ => continue,
1227            }
1228        }
1229        #[cfg(feature = "tracing")]
1230        tracing::warn!(invoke_id = invoke_id, "request timed out");
1231        Err(ClientError::Timeout)
1232    }
1233
1234    /// Send a GetAlarmSummary request and return the list of active alarms on the device.
1235    pub async fn get_alarm_summary(
1236        &self,
1237        address: DataLinkAddress,
1238    ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1239        let invoke_id = self.next_invoke_id().await;
1240        let request = GetAlarmSummaryRequest { invoke_id };
1241        let tx = self.encode_with_growth(|w| {
1242            Npdu::new(0).encode(w)?;
1243            request.encode(w)
1244        })?;
1245        let payload = self
1246            .await_complex_ack_payload_or_error(
1247                address,
1248                &tx,
1249                invoke_id,
1250                SERVICE_GET_ALARM_SUMMARY,
1251                self.response_timeout,
1252            )
1253            .await?;
1254        let mut pr = Reader::new(&payload);
1255        let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1256        Ok(into_client_alarm_summary(parsed.summaries))
1257    }
1258
1259    /// Send a GetEnrollmentSummary request and return the list of event enrollments on the device.
1260    pub async fn get_enrollment_summary(
1261        &self,
1262        address: DataLinkAddress,
1263    ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1264        let invoke_id = self.next_invoke_id().await;
1265        let request = GetEnrollmentSummaryRequest { invoke_id };
1266        let tx = self.encode_with_growth(|w| {
1267            Npdu::new(0).encode(w)?;
1268            request.encode(w)
1269        })?;
1270        let payload = self
1271            .await_complex_ack_payload_or_error(
1272                address,
1273                &tx,
1274                invoke_id,
1275                SERVICE_GET_ENROLLMENT_SUMMARY,
1276                self.response_timeout,
1277            )
1278            .await?;
1279        let mut pr = Reader::new(&payload);
1280        let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1281        Ok(into_client_enrollment_summary(parsed.summaries))
1282    }
1283
1284    /// Send a GetEventInformation request and return active event summaries from the device.
1285    ///
1286    /// Pass `last_received_object_id` to page through results; the returned
1287    /// [`EventInformationResult::more_events`] indicates whether another call is needed.
1288    pub async fn get_event_information(
1289        &self,
1290        address: DataLinkAddress,
1291        last_received_object_id: Option<ObjectId>,
1292    ) -> Result<EventInformationResult, ClientError> {
1293        let invoke_id = self.next_invoke_id().await;
1294        let request = GetEventInformationRequest {
1295            last_received_object_id,
1296            invoke_id,
1297        };
1298        let tx = self.encode_with_growth(|w| {
1299            Npdu::new(0).encode(w)?;
1300            request.encode(w)
1301        })?;
1302        let payload = self
1303            .await_complex_ack_payload_or_error(
1304                address,
1305                &tx,
1306                invoke_id,
1307                SERVICE_GET_EVENT_INFORMATION,
1308                self.response_timeout,
1309            )
1310            .await?;
1311        let mut pr = Reader::new(&payload);
1312        let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1313        Ok(EventInformationResult {
1314            summaries: into_client_event_information(parsed.summaries),
1315            more_events: parsed.more_events,
1316        })
1317    }
1318
1319    /// Send an AcknowledgeAlarm request to the device.
1320    pub async fn acknowledge_alarm(
1321        &self,
1322        address: DataLinkAddress,
1323        mut request: AcknowledgeAlarmRequest<'_>,
1324    ) -> Result<(), ClientError> {
1325        request.invoke_id = self.next_invoke_id().await;
1326        let invoke_id = request.invoke_id;
1327        let tx = self.encode_with_growth(|w| {
1328            Npdu::new(0).encode(w)?;
1329            request.encode(w)
1330        })?;
1331        self.await_simple_ack_or_error(
1332            address,
1333            &tx,
1334            invoke_id,
1335            SERVICE_ACKNOWLEDGE_ALARM,
1336            self.response_timeout,
1337        )
1338        .await
1339    }
1340
1341    /// Read a contiguous byte range from a BACnet File object using stream access.
1342    ///
1343    /// `file_start_position` is the byte offset (may be negative for end-relative access).
1344    /// `requested_octet_count` is the number of bytes to read.
1345    pub async fn atomic_read_file_stream(
1346        &self,
1347        address: DataLinkAddress,
1348        file_object_id: ObjectId,
1349        file_start_position: i32,
1350        requested_octet_count: u32,
1351    ) -> Result<AtomicReadFileResult, ClientError> {
1352        let invoke_id = self.next_invoke_id().await;
1353        let request = AtomicReadFileRequest::stream(
1354            file_object_id,
1355            file_start_position,
1356            requested_octet_count,
1357            invoke_id,
1358        );
1359        self.atomic_read_file(address, request).await
1360    }
1361
1362    /// Read a range of records from a BACnet File object using record access.
1363    ///
1364    /// `file_start_record` is the first record index (may be negative for end-relative access).
1365    /// `requested_record_count` is the number of records to read.
1366    pub async fn atomic_read_file_record(
1367        &self,
1368        address: DataLinkAddress,
1369        file_object_id: ObjectId,
1370        file_start_record: i32,
1371        requested_record_count: u32,
1372    ) -> Result<AtomicReadFileResult, ClientError> {
1373        let invoke_id = self.next_invoke_id().await;
1374        let request = AtomicReadFileRequest::record(
1375            file_object_id,
1376            file_start_record,
1377            requested_record_count,
1378            invoke_id,
1379        );
1380        self.atomic_read_file(address, request).await
1381    }
1382
1383    async fn atomic_read_file(
1384        &self,
1385        address: DataLinkAddress,
1386        request: AtomicReadFileRequest,
1387    ) -> Result<AtomicReadFileResult, ClientError> {
1388        let invoke_id = request.invoke_id;
1389        let tx = self.encode_with_growth(|w| {
1390            Npdu::new(0).encode(w)?;
1391            request.encode(w)
1392        })?;
1393        let payload = self
1394            .await_complex_ack_payload_or_error(
1395                address,
1396                &tx,
1397                invoke_id,
1398                SERVICE_ATOMIC_READ_FILE,
1399                self.response_timeout,
1400            )
1401            .await?;
1402        let mut pr = Reader::new(&payload);
1403        let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1404        Ok(into_client_atomic_read_result(parsed))
1405    }
1406
1407    /// Write `file_data` to a BACnet File object starting at `file_start_position` using stream
1408    /// access. The returned result contains the actual start position used by the device.
1409    pub async fn atomic_write_file_stream(
1410        &self,
1411        address: DataLinkAddress,
1412        file_object_id: ObjectId,
1413        file_start_position: i32,
1414        file_data: &[u8],
1415    ) -> Result<AtomicWriteFileResult, ClientError> {
1416        let invoke_id = self.next_invoke_id().await;
1417        let request = AtomicWriteFileRequest::stream(
1418            file_object_id,
1419            file_start_position,
1420            file_data,
1421            invoke_id,
1422        );
1423        self.atomic_write_file(address, request).await
1424    }
1425
1426    /// Write records to a BACnet File object starting at `file_start_record` using record access.
1427    ///
1428    /// Each element of `file_record_data` is one record's raw bytes.
1429    pub async fn atomic_write_file_record(
1430        &self,
1431        address: DataLinkAddress,
1432        file_object_id: ObjectId,
1433        file_start_record: i32,
1434        file_record_data: &[&[u8]],
1435    ) -> Result<AtomicWriteFileResult, ClientError> {
1436        let invoke_id = self.next_invoke_id().await;
1437        let request = AtomicWriteFileRequest::record(
1438            file_object_id,
1439            file_start_record,
1440            file_record_data,
1441            invoke_id,
1442        );
1443        self.atomic_write_file(address, request).await
1444    }
1445
1446    async fn atomic_write_file(
1447        &self,
1448        address: DataLinkAddress,
1449        request: AtomicWriteFileRequest<'_>,
1450    ) -> Result<AtomicWriteFileResult, ClientError> {
1451        let invoke_id = request.invoke_id;
1452        let tx = self.encode_with_growth(|w| {
1453            Npdu::new(0).encode(w)?;
1454            request.encode(w)
1455        })?;
1456        let payload = self
1457            .await_complex_ack_payload_or_error(
1458                address,
1459                &tx,
1460                invoke_id,
1461                SERVICE_ATOMIC_WRITE_FILE,
1462                self.response_timeout,
1463            )
1464            .await?;
1465        let mut pr = Reader::new(&payload);
1466        let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1467        Ok(into_client_atomic_write_result(parsed))
1468    }
1469
1470    /// Send a SubscribeCOV request to start (or renew) a COV subscription on the device.
1471    ///
1472    /// Use [`cancel_cov_subscription`](Self::cancel_cov_subscription) to unsubscribe.
1473    pub async fn subscribe_cov(
1474        &self,
1475        address: DataLinkAddress,
1476        mut request: SubscribeCovRequest,
1477    ) -> Result<(), ClientError> {
1478        request.invoke_id = self.next_invoke_id().await;
1479        let invoke_id = request.invoke_id;
1480        let tx = self.encode_with_growth(|w| {
1481            Npdu::new(0).encode(w)?;
1482            request.encode(w)
1483        })?;
1484        self.await_simple_ack_or_error(
1485            address,
1486            &tx,
1487            invoke_id,
1488            SERVICE_SUBSCRIBE_COV,
1489            self.response_timeout,
1490        )
1491        .await
1492    }
1493
1494    /// Cancel an existing COV subscription identified by `subscriber_process_id` and
1495    /// `monitored_object_id`.
1496    pub async fn cancel_cov_subscription(
1497        &self,
1498        address: DataLinkAddress,
1499        subscriber_process_id: u32,
1500        monitored_object_id: ObjectId,
1501    ) -> Result<(), ClientError> {
1502        self.subscribe_cov(
1503            address,
1504            SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1505        )
1506        .await
1507    }
1508
1509    /// Send a SubscribeCOVProperty request to subscribe to changes of a specific property.
1510    ///
1511    /// Use [`cancel_cov_property_subscription`](Self::cancel_cov_property_subscription) to
1512    /// unsubscribe.
1513    pub async fn subscribe_cov_property(
1514        &self,
1515        address: DataLinkAddress,
1516        mut request: SubscribeCovPropertyRequest,
1517    ) -> Result<(), ClientError> {
1518        request.invoke_id = self.next_invoke_id().await;
1519        let invoke_id = request.invoke_id;
1520        let tx = self.encode_with_growth(|w| {
1521            Npdu::new(0).encode(w)?;
1522            request.encode(w)
1523        })?;
1524        self.await_simple_ack_or_error(
1525            address,
1526            &tx,
1527            invoke_id,
1528            SERVICE_SUBSCRIBE_COV_PROPERTY,
1529            self.response_timeout,
1530        )
1531        .await
1532    }
1533
1534    /// Cancel an existing COV property subscription.
1535    ///
1536    /// The combination of `subscriber_process_id`, `monitored_object_id`,
1537    /// `monitored_property_id`, and `monitored_property_array_index` must match the
1538    /// subscription to cancel.
1539    pub async fn cancel_cov_property_subscription(
1540        &self,
1541        address: DataLinkAddress,
1542        subscriber_process_id: u32,
1543        monitored_object_id: ObjectId,
1544        monitored_property_id: PropertyId,
1545        monitored_property_array_index: Option<u32>,
1546    ) -> Result<(), ClientError> {
1547        self.subscribe_cov_property(
1548            address,
1549            SubscribeCovPropertyRequest::cancel(
1550                subscriber_process_id,
1551                monitored_object_id,
1552                monitored_property_id,
1553                monitored_property_array_index,
1554                0,
1555            ),
1556        )
1557        .await
1558    }
1559
1560    /// Read a range of entries from a list/log property by absolute position.
1561    ///
1562    /// `reference_index` is the 1-based starting entry index. A positive `count` reads
1563    /// forward; negative reads backward from `reference_index`.
1564    pub async fn read_range_by_position(
1565        &self,
1566        address: DataLinkAddress,
1567        object_id: ObjectId,
1568        property_id: PropertyId,
1569        array_index: Option<u32>,
1570        reference_index: i32,
1571        count: i16,
1572    ) -> Result<ReadRangeResult, ClientError> {
1573        let invoke_id = self.next_invoke_id().await;
1574        let req = ReadRangeRequest::by_position(
1575            object_id,
1576            property_id,
1577            array_index,
1578            reference_index,
1579            count,
1580            invoke_id,
1581        );
1582        self.read_range_with_request(address, req).await
1583    }
1584
1585    /// Read a range of entries from a list/log property anchored at a sequence number.
1586    ///
1587    /// `reference_sequence` is the sequence number of the anchor entry. A positive `count`
1588    /// reads forward; negative reads backward.
1589    pub async fn read_range_by_sequence_number(
1590        &self,
1591        address: DataLinkAddress,
1592        object_id: ObjectId,
1593        property_id: PropertyId,
1594        array_index: Option<u32>,
1595        reference_sequence: u32,
1596        count: i16,
1597    ) -> Result<ReadRangeResult, ClientError> {
1598        let invoke_id = self.next_invoke_id().await;
1599        let req = ReadRangeRequest::by_sequence_number(
1600            object_id,
1601            property_id,
1602            array_index,
1603            reference_sequence,
1604            count,
1605            invoke_id,
1606        );
1607        self.read_range_with_request(address, req).await
1608    }
1609
1610    /// Read a range of entries from a list/log property anchored at a timestamp.
1611    ///
1612    /// `at` is the reference date and time. A positive `count` reads forward from that
1613    /// timestamp; negative reads backward.
1614    pub async fn read_range_by_time(
1615        &self,
1616        address: DataLinkAddress,
1617        object_id: ObjectId,
1618        property_id: PropertyId,
1619        array_index: Option<u32>,
1620        at: (Date, Time),
1621        count: i16,
1622    ) -> Result<ReadRangeResult, ClientError> {
1623        let (date, time) = at;
1624        let invoke_id = self.next_invoke_id().await;
1625        let req = ReadRangeRequest::by_time(
1626            object_id,
1627            property_id,
1628            array_index,
1629            date,
1630            time,
1631            count,
1632            invoke_id,
1633        );
1634        self.read_range_with_request(address, req).await
1635    }
1636
1637    async fn read_range_with_request(
1638        &self,
1639        address: DataLinkAddress,
1640        req: ReadRangeRequest,
1641    ) -> Result<ReadRangeResult, ClientError> {
1642        let invoke_id = req.invoke_id;
1643        let tx = self.encode_with_growth(|w| {
1644            Npdu::new(0).encode(w)?;
1645            req.encode(w)
1646        })?;
1647        let payload = self
1648            .await_complex_ack_payload_or_error(
1649                address,
1650                &tx,
1651                invoke_id,
1652                SERVICE_READ_RANGE,
1653                self.response_timeout,
1654            )
1655            .await?;
1656        let mut pr = Reader::new(&payload);
1657        let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1658        into_client_read_range(parsed)
1659    }
1660
1661    /// Wait up to `wait` for a single incoming COV notification (confirmed or unconfirmed).
1662    ///
1663    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1664    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1665    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1666    pub async fn recv_cov_notification(
1667        &self,
1668        wait: Duration,
1669    ) -> Result<Option<CovNotification>, ClientError> {
1670        let _io_lock = self.request_io_lock.lock().await;
1671        let deadline = tokio::time::Instant::now() + wait;
1672
1673        while tokio::time::Instant::now() < deadline {
1674            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1675            let mut rx = [0u8; 1500];
1676            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1677            let (n, source) = match recv {
1678                Ok(Ok(v)) => v,
1679                Ok(Err(e)) => return Err(e.into()),
1680                Err(_) => break,
1681            };
1682
1683            let apdu = extract_apdu(&rx[..n])?;
1684            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1685            match ApduType::from_u8(first >> 4) {
1686                Some(ApduType::UnconfirmedRequest) => {
1687                    let mut r = Reader::new(apdu);
1688                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1689                    if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1690                        continue;
1691                    }
1692                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1693                    return Ok(Some(into_client_cov_notification(source, false, cov)?));
1694                }
1695                Some(ApduType::ConfirmedRequest) => {
1696                    let mut r = Reader::new(apdu);
1697                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1698                    if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1699                        continue;
1700                    }
1701                    if header.segmented {
1702                        return Err(ClientError::UnsupportedResponse);
1703                    }
1704
1705                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1706                    self.send_simple_ack(
1707                        source,
1708                        header.invoke_id,
1709                        SERVICE_CONFIRMED_COV_NOTIFICATION,
1710                    )
1711                    .await?;
1712                    return Ok(Some(into_client_cov_notification(source, true, cov)?));
1713                }
1714                _ => continue,
1715            }
1716        }
1717
1718        Ok(None)
1719    }
1720
1721    /// Wait up to `wait` for a single incoming event notification (confirmed or unconfirmed).
1722    ///
1723    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1724    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1725    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1726    pub async fn recv_event_notification(
1727        &self,
1728        wait: Duration,
1729    ) -> Result<Option<EventNotification>, ClientError> {
1730        let _io_lock = self.request_io_lock.lock().await;
1731        let deadline = tokio::time::Instant::now() + wait;
1732
1733        while tokio::time::Instant::now() < deadline {
1734            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1735            let mut rx = [0u8; 1500];
1736            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1737            let (n, source) = match recv {
1738                Ok(Ok(v)) => v,
1739                Ok(Err(e)) => return Err(e.into()),
1740                Err(_) => break,
1741            };
1742
1743            let apdu = extract_apdu(&rx[..n])?;
1744            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1745            match ApduType::from_u8(first >> 4) {
1746                Some(ApduType::UnconfirmedRequest) => {
1747                    let mut r = Reader::new(apdu);
1748                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1749                    if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1750                        continue;
1751                    }
1752                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1753                    return Ok(Some(into_client_event_notification(
1754                        source,
1755                        false,
1756                        notification,
1757                    )));
1758                }
1759                Some(ApduType::ConfirmedRequest) => {
1760                    let mut r = Reader::new(apdu);
1761                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1762                    if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1763                        continue;
1764                    }
1765                    if header.segmented {
1766                        return Err(ClientError::UnsupportedResponse);
1767                    }
1768                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1769                    self.send_simple_ack(
1770                        source,
1771                        header.invoke_id,
1772                        SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1773                    )
1774                    .await?;
1775                    return Ok(Some(into_client_event_notification(
1776                        source,
1777                        true,
1778                        notification,
1779                    )));
1780                }
1781                _ => continue,
1782            }
1783        }
1784
1785        Ok(None)
1786    }
1787
1788    /// Send a ReadProperty request and return the property value as a [`ClientDataValue`].
1789    ///
1790    /// Use [`read_property_multiple`](Self::read_property_multiple) to fetch several
1791    /// properties in a single round-trip.
1792    pub async fn read_property(
1793        &self,
1794        address: DataLinkAddress,
1795        object_id: ObjectId,
1796        property_id: PropertyId,
1797    ) -> Result<ClientDataValue, ClientError> {
1798        let invoke_id = self.next_invoke_id().await;
1799        let req = ReadPropertyRequest {
1800            object_id,
1801            property_id,
1802            array_index: None,
1803            invoke_id,
1804        };
1805        let tx = self.encode_with_growth(|w| {
1806            Npdu::new(0).encode(w)?;
1807            req.encode(w)
1808        })?;
1809        let payload = self
1810            .await_complex_ack_payload_or_error(
1811                address,
1812                &tx,
1813                invoke_id,
1814                SERVICE_READ_PROPERTY,
1815                self.response_timeout,
1816            )
1817            .await?;
1818        let mut pr = Reader::new(&payload);
1819        let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1820        into_client_value(parsed.value)
1821    }
1822
1823    /// Send a WriteProperty request to set a single property on the device.
1824    pub async fn write_property(
1825        &self,
1826        address: DataLinkAddress,
1827        mut request: WritePropertyRequest<'_>,
1828    ) -> Result<(), ClientError> {
1829        request.invoke_id = self.next_invoke_id().await;
1830        let invoke_id = request.invoke_id;
1831        let tx = self.encode_with_growth(|w| {
1832            Npdu::new(0).encode(w)?;
1833            request.encode(w)
1834        })?;
1835        self.await_simple_ack_or_error(
1836            address,
1837            &tx,
1838            invoke_id,
1839            SERVICE_WRITE_PROPERTY,
1840            self.response_timeout,
1841        )
1842        .await
1843    }
1844
1845    /// Send a ReadPropertyMultiple request to fetch several properties of one object in a
1846    /// single round-trip.
1847    ///
1848    /// Returns pairs of `(PropertyId, ClientDataValue)` in the order returned by the device.
1849    pub async fn read_property_multiple(
1850        &self,
1851        address: DataLinkAddress,
1852        object_id: ObjectId,
1853        property_ids: &[PropertyId],
1854    ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1855        let refs: Vec<PropertyReference> = property_ids
1856            .iter()
1857            .copied()
1858            .map(|property_id| PropertyReference {
1859                property_id,
1860                array_index: None,
1861            })
1862            .collect();
1863        let specs = [ReadAccessSpecification {
1864            object_id,
1865            properties: &refs,
1866        }];
1867
1868        let invoke_id = self.next_invoke_id().await;
1869        let req = ReadPropertyMultipleRequest {
1870            specs: &specs,
1871            invoke_id,
1872        };
1873
1874        let tx = self.encode_with_growth(|w| {
1875            Npdu::new(0).encode(w)?;
1876            req.encode(w)
1877        })?;
1878        let payload = self
1879            .await_complex_ack_payload_or_error(
1880                address,
1881                &tx,
1882                invoke_id,
1883                SERVICE_READ_PROPERTY_MULTIPLE,
1884                self.response_timeout,
1885            )
1886            .await?;
1887        let mut pr = Reader::new(&payload);
1888        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1889        let mut out = Vec::new();
1890        for access in parsed.results {
1891            if access.object_id != object_id {
1892                continue;
1893            }
1894            for item in access.results {
1895                out.push((item.property_id, into_client_value(item.value)?));
1896            }
1897        }
1898        Ok(out)
1899    }
1900
1901    /// Send a WritePropertyMultiple request to set several properties of one object in a
1902    /// single round-trip.
1903    pub async fn write_property_multiple(
1904        &self,
1905        address: DataLinkAddress,
1906        object_id: ObjectId,
1907        properties: &[PropertyWriteSpec<'_>],
1908    ) -> Result<(), ClientError> {
1909        let invoke_id = self.next_invoke_id().await;
1910        let specs = [WriteAccessSpecification {
1911            object_id,
1912            properties,
1913        }];
1914        let req = WritePropertyMultipleRequest {
1915            specs: &specs,
1916            invoke_id,
1917        };
1918
1919        let tx = self.encode_with_growth(|w| {
1920            Npdu::new(0).encode(w)?;
1921            req.encode(w)
1922        })?;
1923        self.await_simple_ack_or_error(
1924            address,
1925            &tx,
1926            invoke_id,
1927            SERVICE_WRITE_PROPERTY_MULTIPLE,
1928            self.response_timeout,
1929        )
1930        .await
1931    }
1932
1933    /// Send a ConfirmedPrivateTransfer request and return the ack.
1934    pub async fn private_transfer(
1935        &self,
1936        address: DataLinkAddress,
1937        vendor_id: u32,
1938        service_number: u32,
1939        service_parameters: Option<&[u8]>,
1940    ) -> Result<PrivateTransferAck, ClientError> {
1941        let invoke_id = self.next_invoke_id().await;
1942        let req = ConfirmedPrivateTransferRequest {
1943            vendor_id,
1944            service_number,
1945            service_parameters,
1946            invoke_id,
1947        };
1948
1949        let tx = self.encode_with_growth(|w| {
1950            Npdu::new(0).encode(w)?;
1951            req.encode(w)
1952        })?;
1953        let payload = self
1954            .await_complex_ack_payload_or_error(
1955                address,
1956                &tx,
1957                invoke_id,
1958                SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1959                self.response_timeout,
1960            )
1961            .await?;
1962        let mut r = Reader::new(&payload);
1963        PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1964    }
1965
1966    /// Read multiple `(object_id, property_id)` pairs in a single ReadPropertyMultiple round-trip.
1967    ///
1968    /// All pairs must target the same device at `address`. Returns a map from each requested
1969    /// `(ObjectId, PropertyId)` to its value. Properties not returned by the device are absent
1970    /// from the map (the device may skip unknown properties rather than erroring).
1971    pub async fn read_many(
1972        &self,
1973        address: DataLinkAddress,
1974        requests: &[(ObjectId, PropertyId)],
1975    ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
1976        // Group by object to build ReadAccessSpecifications
1977        let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
1978        for &(oid, pid) in requests {
1979            if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
1980                entry.1.push(PropertyReference {
1981                    property_id: pid,
1982                    array_index: None,
1983                });
1984            } else {
1985                grouped.push((
1986                    oid,
1987                    vec![PropertyReference {
1988                        property_id: pid,
1989                        array_index: None,
1990                    }],
1991                ));
1992            }
1993        }
1994
1995        let specs: Vec<ReadAccessSpecification<'_>> = grouped
1996            .iter()
1997            .map(|(oid, props)| ReadAccessSpecification {
1998                object_id: *oid,
1999                properties: props,
2000            })
2001            .collect();
2002
2003        let invoke_id = self.next_invoke_id().await;
2004        let req = ReadPropertyMultipleRequest {
2005            specs: &specs,
2006            invoke_id,
2007        };
2008        let tx = self.encode_with_growth(|w| {
2009            Npdu::new(0).encode(w)?;
2010            req.encode(w)
2011        })?;
2012        let payload = self
2013            .await_complex_ack_payload_or_error(
2014                address,
2015                &tx,
2016                invoke_id,
2017                SERVICE_READ_PROPERTY_MULTIPLE,
2018                self.response_timeout,
2019            )
2020            .await?;
2021
2022        let mut pr = Reader::new(&payload);
2023        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2024        let mut out = HashMap::new();
2025        for access in parsed.results {
2026            for item in access.results {
2027                if let Ok(v) = into_client_value(item.value) {
2028                    out.insert((access.object_id, item.property_id), v);
2029                }
2030            }
2031        }
2032        Ok(out)
2033    }
2034
2035    /// Write multiple properties across one or more objects in a single WritePropertyMultiple
2036    /// round-trip.
2037    ///
2038    /// `writes` is a slice of `(object_id, property_id, value, priority)` tuples.
2039    /// `priority` may be `None` for the relinquish-default case. All writes target the same
2040    /// device at `address`. Takes [`ClientDataValue`] (the owned form) for ergonomic use.
2041    pub async fn write_many(
2042        &self,
2043        address: DataLinkAddress,
2044        writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2045    ) -> Result<(), ClientError> {
2046        use rustbac_core::types::{BitString, DataValue as DV};
2047
2048        fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2049            match v {
2050                ClientDataValue::Null => DV::Null,
2051                ClientDataValue::Boolean(b) => DV::Boolean(*b),
2052                ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2053                ClientDataValue::Signed(n) => DV::Signed(*n),
2054                ClientDataValue::Real(f) => DV::Real(*f),
2055                ClientDataValue::Double(f) => DV::Double(*f),
2056                ClientDataValue::OctetString(b) => DV::OctetString(b),
2057                ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2058                ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2059                    unused_bits: *unused_bits,
2060                    data,
2061                }),
2062                ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2063                ClientDataValue::Date(d) => DV::Date(*d),
2064                ClientDataValue::Time(t) => DV::Time(*t),
2065                ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2066                ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2067                    tag_num: *tag_num,
2068                    values: values.iter().map(cv_to_dv).collect(),
2069                },
2070            }
2071        }
2072
2073        // Pre-convert values so DataValue borrows from the input slice lifetime.
2074        let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2075            .iter()
2076            .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2077            .collect();
2078
2079        // Group by object to build WriteAccessSpecifications.
2080        let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2081        for (oid, pid, val, prio) in &converted {
2082            let spec = PropertyWriteSpec {
2083                property_id: *pid,
2084                array_index: None,
2085                value: val.clone(),
2086                priority: *prio,
2087            };
2088            if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2089                entry.1.push(spec);
2090            } else {
2091                grouped.push((*oid, vec![spec]));
2092            }
2093        }
2094
2095        let specs: Vec<WriteAccessSpecification<'_>> = grouped
2096            .iter()
2097            .map(|(oid, props)| WriteAccessSpecification {
2098                object_id: *oid,
2099                properties: props,
2100            })
2101            .collect();
2102
2103        let invoke_id = self.next_invoke_id().await;
2104        let req = WritePropertyMultipleRequest {
2105            specs: &specs,
2106            invoke_id,
2107        };
2108        let tx = self.encode_with_growth(|w| {
2109            Npdu::new(0).encode(w)?;
2110            req.encode(w)
2111        })?;
2112        self.await_simple_ack_or_error(
2113            address,
2114            &tx,
2115            invoke_id,
2116            SERVICE_WRITE_PROPERTY_MULTIPLE,
2117            self.response_timeout,
2118        )
2119        .await
2120    }
2121}
2122
2123fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2124    let mut r = Reader::new(payload);
2125    let _npdu = Npdu::decode(&mut r)?;
2126    r.read_exact(r.remaining()).map_err(ClientError::from)
2127}
2128
2129fn remote_service_error(err: BacnetError) -> ClientError {
2130    ClientError::RemoteServiceError {
2131        service_choice: err.service_choice,
2132        error_class_raw: err.error_class,
2133        error_code_raw: err.error_code,
2134        error_class: err.error_class.and_then(ErrorClass::from_u32),
2135        error_code: err.error_code.and_then(ErrorCode::from_u32),
2136    }
2137}
2138
2139fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
2140    Ok(match value {
2141        DataValue::Null => ClientDataValue::Null,
2142        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
2143        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
2144        DataValue::Signed(v) => ClientDataValue::Signed(v),
2145        DataValue::Real(v) => ClientDataValue::Real(v),
2146        DataValue::Double(v) => ClientDataValue::Double(v),
2147        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
2148        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
2149        DataValue::BitString(v) => ClientDataValue::BitString {
2150            unused_bits: v.unused_bits,
2151            data: v.data.to_vec(),
2152        },
2153        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
2154        DataValue::Date(v) => ClientDataValue::Date(v),
2155        DataValue::Time(v) => ClientDataValue::Time(v),
2156        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
2157        DataValue::Constructed { tag_num, values } => {
2158            let mut children = Vec::with_capacity(values.len());
2159            for child in values {
2160                children.push(into_client_value(child)?);
2161            }
2162            ClientDataValue::Constructed {
2163                tag_num,
2164                values: children,
2165            }
2166        }
2167    })
2168}
2169
2170fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
2171    value
2172        .into_iter()
2173        .map(|item| AlarmSummaryItem {
2174            object_id: item.object_id,
2175            alarm_state_raw: item.alarm_state,
2176            alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2177                item.alarm_state,
2178            ),
2179            acknowledged_transitions: ClientBitString {
2180                unused_bits: item.acknowledged_transitions.unused_bits,
2181                data: item.acknowledged_transitions.data.to_vec(),
2182            },
2183        })
2184        .collect()
2185}
2186
2187fn into_client_enrollment_summary(
2188    value: Vec<CoreEnrollmentSummaryItem>,
2189) -> Vec<EnrollmentSummaryItem> {
2190    value
2191        .into_iter()
2192        .map(|item| EnrollmentSummaryItem {
2193            object_id: item.object_id,
2194            event_type: item.event_type,
2195            event_state_raw: item.event_state,
2196            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2197                item.event_state,
2198            ),
2199            priority: item.priority,
2200            notification_class: item.notification_class,
2201        })
2202        .collect()
2203}
2204
2205fn into_client_event_information(
2206    value: Vec<CoreEventSummaryItem<'_>>,
2207) -> Vec<EventInformationItem> {
2208    value
2209        .into_iter()
2210        .map(|item| EventInformationItem {
2211            object_id: item.object_id,
2212            event_state_raw: item.event_state,
2213            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2214                item.event_state,
2215            ),
2216            acknowledged_transitions: ClientBitString {
2217                unused_bits: item.acknowledged_transitions.unused_bits,
2218                data: item.acknowledged_transitions.data.to_vec(),
2219            },
2220            notify_type: item.notify_type,
2221            event_enable: ClientBitString {
2222                unused_bits: item.event_enable.unused_bits,
2223                data: item.event_enable.data.to_vec(),
2224            },
2225            event_priorities: item.event_priorities,
2226        })
2227        .collect()
2228}
2229
2230fn into_client_cov_notification(
2231    source: DataLinkAddress,
2232    confirmed: bool,
2233    value: CovNotificationRequest<'_>,
2234) -> Result<CovNotification, ClientError> {
2235    let mut values = Vec::with_capacity(value.values.len());
2236    for property in value.values {
2237        values.push(CovPropertyValue {
2238            property_id: property.property_id,
2239            array_index: property.array_index,
2240            value: into_client_value(property.value)?,
2241            priority: property.priority,
2242        });
2243    }
2244
2245    Ok(CovNotification {
2246        source,
2247        confirmed,
2248        subscriber_process_id: value.subscriber_process_id,
2249        initiating_device_id: value.initiating_device_id,
2250        monitored_object_id: value.monitored_object_id,
2251        time_remaining_seconds: value.time_remaining_seconds,
2252        values,
2253    })
2254}
2255
2256fn into_client_event_notification(
2257    source: DataLinkAddress,
2258    confirmed: bool,
2259    value: EventNotificationRequest<'_>,
2260) -> EventNotification {
2261    EventNotification {
2262        source,
2263        confirmed,
2264        process_id: value.process_id,
2265        initiating_device_id: value.initiating_device_id,
2266        event_object_id: value.event_object_id,
2267        timestamp: value.timestamp,
2268        notification_class: value.notification_class,
2269        priority: value.priority,
2270        event_type: value.event_type,
2271        message_text: value.message_text.map(str::to_string),
2272        notify_type: value.notify_type,
2273        ack_required: value.ack_required,
2274        from_state_raw: value.from_state,
2275        from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2276            value.from_state,
2277        ),
2278        to_state_raw: value.to_state,
2279        to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
2280    }
2281}
2282
2283fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
2284    let mut items = Vec::with_capacity(value.items.len());
2285    for item in value.items {
2286        items.push(into_client_value(item)?);
2287    }
2288    Ok(ReadRangeResult {
2289        object_id: value.object_id,
2290        property_id: value.property_id,
2291        array_index: value.array_index,
2292        result_flags: ClientBitString {
2293            unused_bits: value.result_flags.unused_bits,
2294            data: value.result_flags.data.to_vec(),
2295        },
2296        item_count: value.item_count,
2297        items,
2298    })
2299}
2300
2301fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
2302    match value.access_method {
2303        AtomicReadFileAckAccess::Stream {
2304            file_start_position,
2305            file_data,
2306        } => AtomicReadFileResult::Stream {
2307            end_of_file: value.end_of_file,
2308            file_start_position,
2309            file_data: file_data.to_vec(),
2310        },
2311        AtomicReadFileAckAccess::Record {
2312            file_start_record,
2313            returned_record_count,
2314            file_record_data,
2315        } => AtomicReadFileResult::Record {
2316            end_of_file: value.end_of_file,
2317            file_start_record,
2318            returned_record_count,
2319            file_record_data: file_record_data
2320                .into_iter()
2321                .map(|record| record.to_vec())
2322                .collect(),
2323        },
2324    }
2325}
2326
2327fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
2328    match value {
2329        AtomicWriteFileAck::Stream {
2330            file_start_position,
2331        } => AtomicWriteFileResult::Stream {
2332            file_start_position,
2333        },
2334        AtomicWriteFileAck::Record { file_start_record } => {
2335            AtomicWriteFileResult::Record { file_start_record }
2336        }
2337    }
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342    use super::BacnetClient;
2343    use crate::{
2344        AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2345        EnrollmentSummaryItem, EventInformationItem, EventNotification,
2346    };
2347    use rustbac_core::apdu::{
2348        ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2349        UnconfirmedRequestHeader,
2350    };
2351    use rustbac_core::encoding::{
2352        primitives::{
2353            decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2354            encode_ctx_object_id, encode_ctx_unsigned,
2355        },
2356        reader::Reader,
2357        tag::{AppTag, Tag},
2358        writer::Writer,
2359    };
2360    use rustbac_core::npdu::Npdu;
2361    use rustbac_core::services::acknowledge_alarm::{
2362        AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2363    };
2364    use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2365    use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2366    use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2367    use rustbac_core::services::cov_notification::{
2368        SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2369    };
2370    use rustbac_core::services::device_management::{
2371        DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2372        SERVICE_REINITIALIZE_DEVICE,
2373    };
2374    use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2375    use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2376    use rustbac_core::services::event_notification::{
2377        SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2378    };
2379    use rustbac_core::services::list_element::{
2380        AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2381        SERVICE_REMOVE_LIST_ELEMENT,
2382    };
2383    use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2384    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2385    use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2386    use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2387    use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2388    use rustbac_core::services::subscribe_cov_property::{
2389        SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2390    };
2391    use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2392    use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2393    use rustbac_core::services::write_property_multiple::{
2394        PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2395    };
2396    use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2397    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2398    use std::collections::VecDeque;
2399    use std::sync::Arc;
2400    use std::time::Duration;
2401    use tokio::sync::Mutex;
2402
2403    #[derive(Debug, Default)]
2404    struct MockState {
2405        sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2406        recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2407    }
2408
2409    #[derive(Debug, Clone)]
2410    struct MockDataLink {
2411        state: Arc<MockState>,
2412    }
2413
2414    impl MockDataLink {
2415        fn new() -> (Self, Arc<MockState>) {
2416            let state = Arc::new(MockState::default());
2417            (
2418                Self {
2419                    state: state.clone(),
2420                },
2421                state,
2422            )
2423        }
2424    }
2425
2426    impl DataLink for MockDataLink {
2427        async fn send(
2428            &self,
2429            address: DataLinkAddress,
2430            payload: &[u8],
2431        ) -> Result<(), DataLinkError> {
2432            self.state
2433                .sent
2434                .lock()
2435                .await
2436                .push((address, payload.to_vec()));
2437            Ok(())
2438        }
2439
2440        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2441            let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2442                return Err(DataLinkError::InvalidFrame);
2443            };
2444            if payload.len() > buf.len() {
2445                return Err(DataLinkError::FrameTooLarge);
2446            }
2447            buf[..payload.len()].copy_from_slice(&payload);
2448            Ok((payload.len(), addr))
2449        }
2450    }
2451
2452    fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2453        let mut out = [0u8; 512];
2454        let mut w = Writer::new(&mut out);
2455        Npdu::new(0).encode(&mut w).unwrap();
2456        w.write_all(apdu).unwrap();
2457        w.as_written().to_vec()
2458    }
2459
2460    fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2461        let mut apdu_buf = [0u8; 256];
2462        let mut w = Writer::new(&mut apdu_buf);
2463        ComplexAckHeader {
2464            segmented: false,
2465            more_follows: false,
2466            invoke_id,
2467            sequence_number: None,
2468            proposed_window_size: None,
2469            service_choice: SERVICE_READ_RANGE,
2470        }
2471        .encode(&mut w)
2472        .unwrap();
2473        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2474        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2475        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2476        w.write_u8(5).unwrap();
2477        w.write_u8(0b1110_0000).unwrap();
2478        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2479        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2480        encode_app_real(&mut w, 42.0).unwrap();
2481        encode_app_real(&mut w, 43.0).unwrap();
2482        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2483        w.as_written().to_vec()
2484    }
2485
2486    fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2487        let mut apdu_buf = [0u8; 256];
2488        let mut w = Writer::new(&mut apdu_buf);
2489        ComplexAckHeader {
2490            segmented: false,
2491            more_follows: false,
2492            invoke_id,
2493            sequence_number: None,
2494            proposed_window_size: None,
2495            service_choice: SERVICE_ATOMIC_READ_FILE,
2496        }
2497        .encode(&mut w)
2498        .unwrap();
2499        Tag::Application {
2500            tag: AppTag::Boolean,
2501            len: if eof { 1 } else { 0 },
2502        }
2503        .encode(&mut w)
2504        .unwrap();
2505        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2506        Tag::Application {
2507            tag: AppTag::SignedInt,
2508            len: 1,
2509        }
2510        .encode(&mut w)
2511        .unwrap();
2512        w.write_u8(0).unwrap();
2513        Tag::Application {
2514            tag: AppTag::OctetString,
2515            len: data.len() as u32,
2516        }
2517        .encode(&mut w)
2518        .unwrap();
2519        w.write_all(data).unwrap();
2520        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2521        w.as_written().to_vec()
2522    }
2523
2524    fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2525        let mut apdu_buf = [0u8; 256];
2526        let mut w = Writer::new(&mut apdu_buf);
2527        ComplexAckHeader {
2528            segmented: false,
2529            more_follows: false,
2530            invoke_id,
2531            sequence_number: None,
2532            proposed_window_size: None,
2533            service_choice: SERVICE_ATOMIC_READ_FILE,
2534        }
2535        .encode(&mut w)
2536        .unwrap();
2537        Tag::Application {
2538            tag: AppTag::Boolean,
2539            len: 0,
2540        }
2541        .encode(&mut w)
2542        .unwrap();
2543        Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2544        Tag::Application {
2545            tag: AppTag::SignedInt,
2546            len: 1,
2547        }
2548        .encode(&mut w)
2549        .unwrap();
2550        w.write_u8(7).unwrap();
2551        Tag::Application {
2552            tag: AppTag::UnsignedInt,
2553            len: 1,
2554        }
2555        .encode(&mut w)
2556        .unwrap();
2557        w.write_u8(2).unwrap();
2558        Tag::Application {
2559            tag: AppTag::OctetString,
2560            len: 2,
2561        }
2562        .encode(&mut w)
2563        .unwrap();
2564        w.write_all(&[0x01, 0x02]).unwrap();
2565        Tag::Application {
2566            tag: AppTag::OctetString,
2567            len: 3,
2568        }
2569        .encode(&mut w)
2570        .unwrap();
2571        w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2572        Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2573        w.as_written().to_vec()
2574    }
2575
2576    fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2577        let mut apdu_buf = [0u8; 64];
2578        let mut w = Writer::new(&mut apdu_buf);
2579        ComplexAckHeader {
2580            segmented: false,
2581            more_follows: false,
2582            invoke_id,
2583            sequence_number: None,
2584            proposed_window_size: None,
2585            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2586        }
2587        .encode(&mut w)
2588        .unwrap();
2589        Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2590        w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2591        w.as_written().to_vec()
2592    }
2593
2594    fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2595        let mut apdu_buf = [0u8; 64];
2596        let mut w = Writer::new(&mut apdu_buf);
2597        ComplexAckHeader {
2598            segmented: false,
2599            more_follows: false,
2600            invoke_id,
2601            sequence_number: None,
2602            proposed_window_size: None,
2603            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2604        }
2605        .encode(&mut w)
2606        .unwrap();
2607        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2608        w.write_u8(start_record as u8).unwrap();
2609        w.as_written().to_vec()
2610    }
2611
2612    fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2613        let mut apdu_buf = [0u8; 64];
2614        let mut w = Writer::new(&mut apdu_buf);
2615        ComplexAckHeader {
2616            segmented: false,
2617            more_follows: false,
2618            invoke_id,
2619            sequence_number: None,
2620            proposed_window_size: None,
2621            service_choice: SERVICE_CREATE_OBJECT,
2622        }
2623        .encode(&mut w)
2624        .unwrap();
2625        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2626        w.as_written().to_vec()
2627    }
2628
2629    fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2630        let mut apdu_buf = [0u8; 128];
2631        let mut w = Writer::new(&mut apdu_buf);
2632        ComplexAckHeader {
2633            segmented: false,
2634            more_follows: false,
2635            invoke_id,
2636            sequence_number: None,
2637            proposed_window_size: None,
2638            service_choice: SERVICE_GET_ALARM_SUMMARY,
2639        }
2640        .encode(&mut w)
2641        .unwrap();
2642        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2643        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2644        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2645        w.write_u8(5).unwrap();
2646        w.write_u8(0b1110_0000).unwrap();
2647
2648        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2649        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2650        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2651        w.write_u8(5).unwrap();
2652        w.write_u8(0b1100_0000).unwrap();
2653        w.as_written().to_vec()
2654    }
2655
2656    fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2657        let mut apdu_buf = [0u8; 160];
2658        let mut w = Writer::new(&mut apdu_buf);
2659        ComplexAckHeader {
2660            segmented: false,
2661            more_follows: false,
2662            invoke_id,
2663            sequence_number: None,
2664            proposed_window_size: None,
2665            service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2666        }
2667        .encode(&mut w)
2668        .unwrap();
2669        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2670        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2671        encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2672        encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2673        encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2674
2675        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2676        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2677        encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2678        encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2679        encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2680        w.as_written().to_vec()
2681    }
2682
2683    fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2684        let mut apdu_buf = [0u8; 256];
2685        let mut w = Writer::new(&mut apdu_buf);
2686        ComplexAckHeader {
2687            segmented: false,
2688            more_follows: false,
2689            invoke_id,
2690            sequence_number: None,
2691            proposed_window_size: None,
2692            service_choice: SERVICE_GET_EVENT_INFORMATION,
2693        }
2694        .encode(&mut w)
2695        .unwrap();
2696        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2697        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2698        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2699        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2700        w.write_u8(5).unwrap();
2701        w.write_u8(0b1110_0000).unwrap();
2702        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2703        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2704        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2705        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2706        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2707        encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2708        Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2709        w.write_u8(5).unwrap();
2710        w.write_u8(0b1100_0000).unwrap();
2711        Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2712        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2713        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2714        encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2715        Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2716        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2717        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2718        w.write_u8(0).unwrap();
2719        w.as_written().to_vec()
2720    }
2721
2722    #[tokio::test]
2723    async fn who_has_object_name_collects_i_have() {
2724        let (dl, state) = MockDataLink::new();
2725        let client = BacnetClient::with_datalink(dl);
2726        let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2727
2728        let mut apdu = [0u8; 128];
2729        let mut w = Writer::new(&mut apdu);
2730        UnconfirmedRequestHeader {
2731            service_choice: SERVICE_I_HAVE,
2732        }
2733        .encode(&mut w)
2734        .unwrap();
2735        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2736        encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2737        encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2738
2739        state
2740            .recv
2741            .lock()
2742            .await
2743            .push_back((with_npdu(w.as_written()), addr));
2744
2745        let results = client
2746            .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2747            .await
2748            .unwrap();
2749        assert_eq!(results.len(), 1);
2750        assert_eq!(results[0].address, addr);
2751        assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2752        assert_eq!(
2753            results[0].object_id,
2754            ObjectId::new(ObjectType::AnalogInput, 7)
2755        );
2756        assert_eq!(results[0].object_name, "Zone Temp");
2757
2758        let sent = state.sent.lock().await;
2759        assert_eq!(sent.len(), 1);
2760        let mut r = Reader::new(&sent[0].1);
2761        let _npdu = Npdu::decode(&mut r).unwrap();
2762        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2763        assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2764    }
2765
2766    #[tokio::test]
2767    async fn device_communication_control_handles_simple_ack() {
2768        let (dl, state) = MockDataLink::new();
2769        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2770        let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2771
2772        let mut apdu = [0u8; 32];
2773        let mut w = Writer::new(&mut apdu);
2774        SimpleAck {
2775            invoke_id: 1,
2776            service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2777        }
2778        .encode(&mut w)
2779        .unwrap();
2780        state
2781            .recv
2782            .lock()
2783            .await
2784            .push_back((with_npdu(w.as_written()), addr));
2785
2786        client
2787            .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2788            .await
2789            .unwrap();
2790
2791        let sent = state.sent.lock().await;
2792        assert_eq!(sent.len(), 1);
2793        let mut r = Reader::new(&sent[0].1);
2794        let _npdu = Npdu::decode(&mut r).unwrap();
2795        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2796        assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2797    }
2798
2799    #[tokio::test]
2800    async fn reinitialize_device_handles_simple_ack() {
2801        let (dl, state) = MockDataLink::new();
2802        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2803        let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2804
2805        let mut apdu = [0u8; 32];
2806        let mut w = Writer::new(&mut apdu);
2807        SimpleAck {
2808            invoke_id: 1,
2809            service_choice: SERVICE_REINITIALIZE_DEVICE,
2810        }
2811        .encode(&mut w)
2812        .unwrap();
2813        state
2814            .recv
2815            .lock()
2816            .await
2817            .push_back((with_npdu(w.as_written()), addr));
2818
2819        client
2820            .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2821            .await
2822            .unwrap();
2823
2824        let sent = state.sent.lock().await;
2825        assert_eq!(sent.len(), 1);
2826        let mut r = Reader::new(&sent[0].1);
2827        let _npdu = Npdu::decode(&mut r).unwrap();
2828        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2829        assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2830    }
2831
2832    #[tokio::test]
2833    async fn time_synchronize_sends_unconfirmed_request() {
2834        let (dl, state) = MockDataLink::new();
2835        let client = BacnetClient::with_datalink(dl);
2836        let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2837
2838        client
2839            .time_synchronize(
2840                addr,
2841                Date {
2842                    year_since_1900: 126,
2843                    month: 2,
2844                    day: 7,
2845                    weekday: 6,
2846                },
2847                Time {
2848                    hour: 10,
2849                    minute: 11,
2850                    second: 12,
2851                    hundredths: 13,
2852                },
2853                false,
2854            )
2855            .await
2856            .unwrap();
2857
2858        let sent = state.sent.lock().await;
2859        assert_eq!(sent.len(), 1);
2860        let mut r = Reader::new(&sent[0].1);
2861        let _npdu = Npdu::decode(&mut r).unwrap();
2862        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2863        assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2864    }
2865
2866    #[tokio::test]
2867    async fn get_alarm_summary_decodes_complex_ack() {
2868        let (dl, state) = MockDataLink::new();
2869        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2870        let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2871
2872        state
2873            .recv
2874            .lock()
2875            .await
2876            .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2877
2878        let summaries = client.get_alarm_summary(addr).await.unwrap();
2879        assert_eq!(summaries.len(), 2);
2880        assert_eq!(
2881            summaries[0],
2882            AlarmSummaryItem {
2883                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2884                alarm_state_raw: 1,
2885                alarm_state: Some(EventState::Fault),
2886                acknowledged_transitions: crate::ClientBitString {
2887                    unused_bits: 5,
2888                    data: vec![0b1110_0000],
2889                },
2890            }
2891        );
2892        assert_eq!(
2893            summaries[1],
2894            AlarmSummaryItem {
2895                object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2896                alarm_state_raw: 0,
2897                alarm_state: Some(EventState::Normal),
2898                acknowledged_transitions: crate::ClientBitString {
2899                    unused_bits: 5,
2900                    data: vec![0b1100_0000],
2901                },
2902            }
2903        );
2904
2905        let sent = state.sent.lock().await;
2906        assert_eq!(sent.len(), 1);
2907        let mut r = Reader::new(&sent[0].1);
2908        let _npdu = Npdu::decode(&mut r).unwrap();
2909        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2910        assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2911    }
2912
2913    #[tokio::test]
2914    async fn get_enrollment_summary_decodes_complex_ack() {
2915        let (dl, state) = MockDataLink::new();
2916        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2917        let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2918
2919        state
2920            .recv
2921            .lock()
2922            .await
2923            .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2924
2925        let summaries = client.get_enrollment_summary(addr).await.unwrap();
2926        assert_eq!(summaries.len(), 2);
2927        assert_eq!(
2928            summaries[0],
2929            EnrollmentSummaryItem {
2930                object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2931                event_type: 1,
2932                event_state_raw: 2,
2933                event_state: Some(EventState::Offnormal),
2934                priority: 200,
2935                notification_class: 10,
2936            }
2937        );
2938        assert_eq!(
2939            summaries[1],
2940            EnrollmentSummaryItem {
2941                object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2942                event_type: 0,
2943                event_state_raw: 0,
2944                event_state: Some(EventState::Normal),
2945                priority: 20,
2946                notification_class: 11,
2947            }
2948        );
2949
2950        let sent = state.sent.lock().await;
2951        assert_eq!(sent.len(), 1);
2952        let mut r = Reader::new(&sent[0].1);
2953        let _npdu = Npdu::decode(&mut r).unwrap();
2954        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2955        assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2956    }
2957
2958    #[tokio::test]
2959    async fn get_event_information_decodes_complex_ack() {
2960        let (dl, state) = MockDataLink::new();
2961        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2962        let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2963
2964        state
2965            .recv
2966            .lock()
2967            .await
2968            .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2969
2970        let result = client.get_event_information(addr, None).await.unwrap();
2971        assert!(!result.more_events);
2972        assert_eq!(result.summaries.len(), 1);
2973        assert_eq!(
2974            result.summaries[0],
2975            EventInformationItem {
2976                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2977                event_state_raw: 2,
2978                event_state: Some(EventState::Offnormal),
2979                acknowledged_transitions: crate::ClientBitString {
2980                    unused_bits: 5,
2981                    data: vec![0b1110_0000],
2982                },
2983                notify_type: 0,
2984                event_enable: crate::ClientBitString {
2985                    unused_bits: 5,
2986                    data: vec![0b1100_0000],
2987                },
2988                event_priorities: [1, 2, 3],
2989            }
2990        );
2991    }
2992
2993    #[tokio::test]
2994    async fn acknowledge_alarm_handles_simple_ack() {
2995        let (dl, state) = MockDataLink::new();
2996        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2997        let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2998
2999        let mut apdu = [0u8; 32];
3000        let mut w = Writer::new(&mut apdu);
3001        SimpleAck {
3002            invoke_id: 1,
3003            service_choice: SERVICE_ACKNOWLEDGE_ALARM,
3004        }
3005        .encode(&mut w)
3006        .unwrap();
3007        state
3008            .recv
3009            .lock()
3010            .await
3011            .push_back((with_npdu(w.as_written()), addr));
3012
3013        client
3014            .acknowledge_alarm(
3015                addr,
3016                AcknowledgeAlarmRequest {
3017                    acknowledging_process_id: 10,
3018                    event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3019                    event_state_acknowledged: EventState::Offnormal,
3020                    event_time_stamp: TimeStamp::SequenceNumber(42),
3021                    acknowledgment_source: "operator",
3022                    time_of_acknowledgment: TimeStamp::DateTime {
3023                        date: Date {
3024                            year_since_1900: 126,
3025                            month: 2,
3026                            day: 7,
3027                            weekday: 6,
3028                        },
3029                        time: Time {
3030                            hour: 10,
3031                            minute: 11,
3032                            second: 12,
3033                            hundredths: 13,
3034                        },
3035                    },
3036                    invoke_id: 0,
3037                },
3038            )
3039            .await
3040            .unwrap();
3041
3042        let sent = state.sent.lock().await;
3043        assert_eq!(sent.len(), 1);
3044        let mut r = Reader::new(&sent[0].1);
3045        let _npdu = Npdu::decode(&mut r).unwrap();
3046        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3047        assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
3048    }
3049
3050    #[tokio::test]
3051    async fn create_object_by_type_decodes_complex_ack() {
3052        let (dl, state) = MockDataLink::new();
3053        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3054        let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
3055        let created = ObjectId::new(ObjectType::AnalogValue, 42);
3056
3057        state
3058            .recv
3059            .lock()
3060            .await
3061            .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
3062
3063        let result = client
3064            .create_object_by_type(addr, ObjectType::AnalogValue)
3065            .await
3066            .unwrap();
3067        assert_eq!(result, created);
3068
3069        let sent = state.sent.lock().await;
3070        let mut r = Reader::new(&sent[0].1);
3071        let _npdu = Npdu::decode(&mut r).unwrap();
3072        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3073        assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
3074    }
3075
3076    #[tokio::test]
3077    async fn delete_object_handles_simple_ack() {
3078        let (dl, state) = MockDataLink::new();
3079        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3080        let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
3081
3082        let mut apdu = [0u8; 32];
3083        let mut w = Writer::new(&mut apdu);
3084        SimpleAck {
3085            invoke_id: 1,
3086            service_choice: SERVICE_DELETE_OBJECT,
3087        }
3088        .encode(&mut w)
3089        .unwrap();
3090        state
3091            .recv
3092            .lock()
3093            .await
3094            .push_back((with_npdu(w.as_written()), addr));
3095
3096        client
3097            .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
3098            .await
3099            .unwrap();
3100    }
3101
3102    #[tokio::test]
3103    async fn add_list_element_handles_simple_ack() {
3104        let (dl, state) = MockDataLink::new();
3105        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3106        let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
3107
3108        let mut apdu = [0u8; 32];
3109        let mut w = Writer::new(&mut apdu);
3110        SimpleAck {
3111            invoke_id: 1,
3112            service_choice: SERVICE_ADD_LIST_ELEMENT,
3113        }
3114        .encode(&mut w)
3115        .unwrap();
3116        state
3117            .recv
3118            .lock()
3119            .await
3120            .push_back((with_npdu(w.as_written()), addr));
3121
3122        let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
3123        client
3124            .add_list_element(
3125                addr,
3126                AddListElementRequest {
3127                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3128                    property_id: PropertyId::Proprietary(512),
3129                    array_index: None,
3130                    elements: &values,
3131                    invoke_id: 0,
3132                },
3133            )
3134            .await
3135            .unwrap();
3136    }
3137
3138    #[tokio::test]
3139    async fn remove_list_element_handles_simple_ack() {
3140        let (dl, state) = MockDataLink::new();
3141        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3142        let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
3143
3144        let mut apdu = [0u8; 32];
3145        let mut w = Writer::new(&mut apdu);
3146        SimpleAck {
3147            invoke_id: 1,
3148            service_choice: SERVICE_REMOVE_LIST_ELEMENT,
3149        }
3150        .encode(&mut w)
3151        .unwrap();
3152        state
3153            .recv
3154            .lock()
3155            .await
3156            .push_back((with_npdu(w.as_written()), addr));
3157
3158        let values = [DataValue::Unsigned(1)];
3159        client
3160            .remove_list_element(
3161                addr,
3162                RemoveListElementRequest {
3163                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3164                    property_id: PropertyId::Proprietary(513),
3165                    array_index: None,
3166                    elements: &values,
3167                    invoke_id: 0,
3168                },
3169            )
3170            .await
3171            .unwrap();
3172    }
3173
3174    #[tokio::test]
3175    async fn atomic_read_file_stream_decodes_complex_ack() {
3176        let (dl, state) = MockDataLink::new();
3177        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3178        let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
3179        let file_object = ObjectId::new(ObjectType::File, 2);
3180
3181        state.recv.lock().await.push_back((
3182            with_npdu(&atomic_read_file_stream_ack_apdu(
3183                1,
3184                true,
3185                &[0xAA, 0xBB, 0xCC],
3186            )),
3187            addr,
3188        ));
3189
3190        let result = client
3191            .atomic_read_file_stream(addr, file_object, 0, 3)
3192            .await
3193            .unwrap();
3194
3195        assert_eq!(
3196            result,
3197            AtomicReadFileResult::Stream {
3198                end_of_file: true,
3199                file_start_position: 0,
3200                file_data: vec![0xAA, 0xBB, 0xCC],
3201            }
3202        );
3203
3204        let sent = state.sent.lock().await;
3205        assert_eq!(sent.len(), 1);
3206        let mut r = Reader::new(&sent[0].1);
3207        let _npdu = Npdu::decode(&mut r).unwrap();
3208        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3209        assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
3210    }
3211
3212    #[tokio::test]
3213    async fn atomic_read_file_record_decodes_complex_ack() {
3214        let (dl, state) = MockDataLink::new();
3215        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3216        let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
3217        let file_object = ObjectId::new(ObjectType::File, 5);
3218
3219        state
3220            .recv
3221            .lock()
3222            .await
3223            .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
3224
3225        let result = client
3226            .atomic_read_file_record(addr, file_object, 7, 2)
3227            .await
3228            .unwrap();
3229
3230        assert_eq!(
3231            result,
3232            AtomicReadFileResult::Record {
3233                end_of_file: false,
3234                file_start_record: 7,
3235                returned_record_count: 2,
3236                file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
3237            }
3238        );
3239    }
3240
3241    #[tokio::test]
3242    async fn atomic_write_file_stream_decodes_complex_ack() {
3243        let (dl, state) = MockDataLink::new();
3244        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3245        let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
3246        let file_object = ObjectId::new(ObjectType::File, 3);
3247
3248        state
3249            .recv
3250            .lock()
3251            .await
3252            .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
3253
3254        let result = client
3255            .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
3256            .await
3257            .unwrap();
3258
3259        assert_eq!(
3260            result,
3261            AtomicWriteFileResult::Stream {
3262                file_start_position: 128
3263            }
3264        );
3265    }
3266
3267    #[tokio::test]
3268    async fn atomic_write_file_record_decodes_complex_ack() {
3269        let (dl, state) = MockDataLink::new();
3270        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3271        let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
3272        let file_object = ObjectId::new(ObjectType::File, 9);
3273        let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
3274
3275        state
3276            .recv
3277            .lock()
3278            .await
3279            .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
3280
3281        let result = client
3282            .atomic_write_file_record(addr, file_object, 7, &records)
3283            .await
3284            .unwrap();
3285
3286        assert_eq!(
3287            result,
3288            AtomicWriteFileResult::Record {
3289                file_start_record: 7
3290            }
3291        );
3292    }
3293
3294    #[tokio::test]
3295    async fn read_properties_decodes_complex_ack() {
3296        let (dl, state) = MockDataLink::new();
3297        let client = BacnetClient::with_datalink(dl);
3298        let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
3299        let object_id = ObjectId::new(ObjectType::Device, 1);
3300
3301        let mut apdu_buf = [0u8; 256];
3302        let mut w = Writer::new(&mut apdu_buf);
3303        ComplexAckHeader {
3304            segmented: false,
3305            more_follows: false,
3306            invoke_id: 1,
3307            sequence_number: None,
3308            proposed_window_size: None,
3309            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3310        }
3311        .encode(&mut w)
3312        .unwrap();
3313        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
3314        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3315            .encode(&mut w)
3316            .unwrap();
3317        encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
3318        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3319            .encode(&mut w)
3320            .unwrap();
3321        encode_app_real(&mut w, 55.5).unwrap();
3322        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3323            .encode(&mut w)
3324            .unwrap();
3325        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3326            .encode(&mut w)
3327            .unwrap();
3328
3329        state
3330            .recv
3331            .lock()
3332            .await
3333            .push_back((with_npdu(w.as_written()), addr));
3334
3335        let values = client
3336            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3337            .await
3338            .unwrap();
3339        assert_eq!(values.len(), 1);
3340        assert_eq!(values[0].0, PropertyId::PresentValue);
3341        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3342
3343        let sent = state.sent.lock().await;
3344        assert_eq!(sent.len(), 1);
3345        let mut r = Reader::new(&sent[0].1);
3346        let _npdu = Npdu::decode(&mut r).unwrap();
3347        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3348        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3349    }
3350
3351    #[tokio::test]
3352    async fn read_property_multiple_reassembles_segmented_complex_ack() {
3353        let (dl, state) = MockDataLink::new();
3354        let client = BacnetClient::with_datalink(dl);
3355        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3356        let object_id = ObjectId::new(ObjectType::Device, 1);
3357
3358        let mut payload_buf = [0u8; 256];
3359        let mut pw = Writer::new(&mut payload_buf);
3360        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3361        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3362            .encode(&mut pw)
3363            .unwrap();
3364        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3365        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3366            .encode(&mut pw)
3367            .unwrap();
3368        encode_app_real(&mut pw, 66.0).unwrap();
3369        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3370            .encode(&mut pw)
3371            .unwrap();
3372        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3373            .encode(&mut pw)
3374            .unwrap();
3375        let payload = pw.as_written();
3376        let split = payload.len() / 2;
3377
3378        let mut apdu1 = [0u8; 256];
3379        let mut w1 = Writer::new(&mut apdu1);
3380        ComplexAckHeader {
3381            segmented: true,
3382            more_follows: true,
3383            invoke_id: 1,
3384            sequence_number: Some(0),
3385            proposed_window_size: Some(1),
3386            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3387        }
3388        .encode(&mut w1)
3389        .unwrap();
3390        w1.write_all(&payload[..split]).unwrap();
3391
3392        let mut apdu2 = [0u8; 256];
3393        let mut w2 = Writer::new(&mut apdu2);
3394        ComplexAckHeader {
3395            segmented: true,
3396            more_follows: false,
3397            invoke_id: 1,
3398            sequence_number: Some(1),
3399            proposed_window_size: Some(1),
3400            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3401        }
3402        .encode(&mut w2)
3403        .unwrap();
3404        w2.write_all(&payload[split..]).unwrap();
3405
3406        state
3407            .recv
3408            .lock()
3409            .await
3410            .push_back((with_npdu(w1.as_written()), addr));
3411        state
3412            .recv
3413            .lock()
3414            .await
3415            .push_back((with_npdu(w2.as_written()), addr));
3416
3417        let values = client
3418            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3419            .await
3420            .unwrap();
3421        assert_eq!(values.len(), 1);
3422        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3423
3424        let sent = state.sent.lock().await;
3425        assert!(sent.len() >= 3);
3426
3427        let mut saw_segment_ack = 0usize;
3428        for (_, frame) in sent.iter().skip(1) {
3429            let mut r = Reader::new(frame);
3430            let _npdu = Npdu::decode(&mut r).unwrap();
3431            let apdu = r.read_exact(r.remaining()).unwrap();
3432            if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3433                let mut sr = Reader::new(apdu);
3434                let sack = SegmentAck::decode(&mut sr).unwrap();
3435                assert_eq!(sack.invoke_id, 1);
3436                saw_segment_ack += 1;
3437            }
3438        }
3439        assert!(saw_segment_ack >= 1);
3440    }
3441
3442    #[tokio::test]
3443    async fn read_property_multiple_tolerates_duplicate_segment() {
3444        let (dl, state) = MockDataLink::new();
3445        let client = BacnetClient::with_datalink(dl);
3446        let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3447        let object_id = ObjectId::new(ObjectType::Device, 1);
3448
3449        let mut payload_buf = [0u8; 256];
3450        let mut pw = Writer::new(&mut payload_buf);
3451        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3452        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3453            .encode(&mut pw)
3454            .unwrap();
3455        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3456        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3457            .encode(&mut pw)
3458            .unwrap();
3459        encode_app_real(&mut pw, 66.0).unwrap();
3460        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3461            .encode(&mut pw)
3462            .unwrap();
3463        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3464            .encode(&mut pw)
3465            .unwrap();
3466        let payload = pw.as_written();
3467        let split = payload.len() / 2;
3468
3469        let mut apdu1 = [0u8; 256];
3470        let mut w1 = Writer::new(&mut apdu1);
3471        ComplexAckHeader {
3472            segmented: true,
3473            more_follows: true,
3474            invoke_id: 1,
3475            sequence_number: Some(0),
3476            proposed_window_size: Some(1),
3477            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3478        }
3479        .encode(&mut w1)
3480        .unwrap();
3481        w1.write_all(&payload[..split]).unwrap();
3482
3483        let mut dup = [0u8; 256];
3484        let mut wd = Writer::new(&mut dup);
3485        ComplexAckHeader {
3486            segmented: true,
3487            more_follows: true,
3488            invoke_id: 1,
3489            sequence_number: Some(0),
3490            proposed_window_size: Some(1),
3491            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3492        }
3493        .encode(&mut wd)
3494        .unwrap();
3495        wd.write_all(&payload[..split]).unwrap();
3496
3497        let mut apdu2 = [0u8; 256];
3498        let mut w2 = Writer::new(&mut apdu2);
3499        ComplexAckHeader {
3500            segmented: true,
3501            more_follows: false,
3502            invoke_id: 1,
3503            sequence_number: Some(1),
3504            proposed_window_size: Some(1),
3505            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3506        }
3507        .encode(&mut w2)
3508        .unwrap();
3509        w2.write_all(&payload[split..]).unwrap();
3510
3511        {
3512            let mut recv = state.recv.lock().await;
3513            recv.push_back((with_npdu(w1.as_written()), addr));
3514            recv.push_back((with_npdu(wd.as_written()), addr));
3515            recv.push_back((with_npdu(w2.as_written()), addr));
3516        }
3517
3518        let values = client
3519            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3520            .await
3521            .unwrap();
3522        assert_eq!(values.len(), 1);
3523        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3524    }
3525
3526    #[tokio::test]
3527    async fn write_properties_handles_simple_ack() {
3528        let (dl, state) = MockDataLink::new();
3529        let client = BacnetClient::with_datalink(dl);
3530        let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3531        let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3532
3533        let mut apdu_buf = [0u8; 32];
3534        let mut w = Writer::new(&mut apdu_buf);
3535        SimpleAck {
3536            invoke_id: 1,
3537            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3538        }
3539        .encode(&mut w)
3540        .unwrap();
3541        state
3542            .recv
3543            .lock()
3544            .await
3545            .push_back((with_npdu(w.as_written()), addr));
3546
3547        let writes = [PropertyWriteSpec {
3548            property_id: PropertyId::PresentValue,
3549            array_index: None,
3550            value: DataValue::Real(12.5),
3551            priority: Some(8),
3552        }];
3553        client
3554            .write_property_multiple(addr, object_id, &writes)
3555            .await
3556            .unwrap();
3557
3558        let sent = state.sent.lock().await;
3559        assert_eq!(sent.len(), 1);
3560        let mut r = Reader::new(&sent[0].1);
3561        let _npdu = Npdu::decode(&mut r).unwrap();
3562        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3563        assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3564    }
3565
3566    #[tokio::test]
3567    async fn subscribe_cov_handles_simple_ack() {
3568        let (dl, state) = MockDataLink::new();
3569        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3570        let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3571
3572        let mut apdu_buf = [0u8; 32];
3573        let mut w = Writer::new(&mut apdu_buf);
3574        SimpleAck {
3575            invoke_id: 1,
3576            service_choice: SERVICE_SUBSCRIBE_COV,
3577        }
3578        .encode(&mut w)
3579        .unwrap();
3580        state
3581            .recv
3582            .lock()
3583            .await
3584            .push_back((with_npdu(w.as_written()), addr));
3585
3586        client
3587            .subscribe_cov(
3588                addr,
3589                SubscribeCovRequest {
3590                    subscriber_process_id: 10,
3591                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3592                    issue_confirmed_notifications: Some(false),
3593                    lifetime_seconds: Some(300),
3594                    invoke_id: 0,
3595                },
3596            )
3597            .await
3598            .unwrap();
3599
3600        let sent = state.sent.lock().await;
3601        assert_eq!(sent.len(), 1);
3602        let mut r = Reader::new(&sent[0].1);
3603        let _npdu = Npdu::decode(&mut r).unwrap();
3604        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3605        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3606    }
3607
3608    #[tokio::test]
3609    async fn subscribe_cov_property_handles_simple_ack() {
3610        let (dl, state) = MockDataLink::new();
3611        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3612        let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3613
3614        let mut apdu_buf = [0u8; 32];
3615        let mut w = Writer::new(&mut apdu_buf);
3616        SimpleAck {
3617            invoke_id: 1,
3618            service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3619        }
3620        .encode(&mut w)
3621        .unwrap();
3622        state
3623            .recv
3624            .lock()
3625            .await
3626            .push_back((with_npdu(w.as_written()), addr));
3627
3628        client
3629            .subscribe_cov_property(
3630                addr,
3631                SubscribeCovPropertyRequest {
3632                    subscriber_process_id: 22,
3633                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3634                    issue_confirmed_notifications: Some(true),
3635                    lifetime_seconds: Some(120),
3636                    monitored_property_id: PropertyId::PresentValue,
3637                    monitored_property_array_index: None,
3638                    cov_increment: Some(0.1),
3639                    invoke_id: 0,
3640                },
3641            )
3642            .await
3643            .unwrap();
3644
3645        let sent = state.sent.lock().await;
3646        assert_eq!(sent.len(), 1);
3647        let mut r = Reader::new(&sent[0].1);
3648        let _npdu = Npdu::decode(&mut r).unwrap();
3649        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3650        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3651    }
3652
3653    #[tokio::test]
3654    async fn read_range_by_position_decodes_complex_ack() {
3655        let (dl, state) = MockDataLink::new();
3656        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3657        let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3658        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3659
3660        let mut apdu_buf = [0u8; 256];
3661        let mut w = Writer::new(&mut apdu_buf);
3662        ComplexAckHeader {
3663            segmented: false,
3664            more_follows: false,
3665            invoke_id: 1,
3666            sequence_number: None,
3667            proposed_window_size: None,
3668            service_choice: SERVICE_READ_RANGE,
3669        }
3670        .encode(&mut w)
3671        .unwrap();
3672        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3673        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3674        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3675        w.write_u8(5).unwrap();
3676        w.write_u8(0b1110_0000).unwrap();
3677        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3678        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3679        encode_app_real(&mut w, 42.0).unwrap();
3680        encode_app_real(&mut w, 43.0).unwrap();
3681        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3682
3683        state
3684            .recv
3685            .lock()
3686            .await
3687            .push_back((with_npdu(w.as_written()), addr));
3688
3689        let result = client
3690            .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3691            .await
3692            .unwrap();
3693        assert_eq!(result.object_id, object_id);
3694        assert_eq!(result.item_count, 2);
3695        assert_eq!(result.items.len(), 2);
3696        assert!(matches!(
3697            result.items[0],
3698            ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3699        ));
3700    }
3701
3702    #[tokio::test]
3703    async fn read_range_by_sequence_number_encodes_range_selector() {
3704        let (dl, state) = MockDataLink::new();
3705        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3706        let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3707        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3708
3709        state
3710            .recv
3711            .lock()
3712            .await
3713            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3714
3715        let _ = client
3716            .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3717            .await
3718            .unwrap();
3719
3720        let sent = state.sent.lock().await;
3721        let mut r = Reader::new(&sent[0].1);
3722        let _npdu = Npdu::decode(&mut r).unwrap();
3723        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3724        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3725        match Tag::decode(&mut r).unwrap() {
3726            Tag::Context { tag_num: 0, len: 4 } => {
3727                let _ = r.read_exact(4).unwrap();
3728            }
3729            other => panic!("unexpected object id tag: {other:?}"),
3730        }
3731        match Tag::decode(&mut r).unwrap() {
3732            Tag::Context { tag_num: 1, len } => {
3733                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3734            }
3735            other => panic!("unexpected property tag: {other:?}"),
3736        }
3737        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3738        match Tag::decode(&mut r).unwrap() {
3739            Tag::Application {
3740                tag: AppTag::UnsignedInt,
3741                len,
3742            } => {
3743                assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3744            }
3745            other => panic!("unexpected ref seq tag: {other:?}"),
3746        }
3747        match Tag::decode(&mut r).unwrap() {
3748            Tag::Application {
3749                tag: AppTag::SignedInt,
3750                len,
3751            } => {
3752                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3753            }
3754            other => panic!("unexpected count tag: {other:?}"),
3755        }
3756        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3757    }
3758
3759    #[tokio::test]
3760    async fn read_range_by_time_encodes_range_selector() {
3761        let (dl, state) = MockDataLink::new();
3762        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3763        let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3764        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3765        let date = Date {
3766            year_since_1900: 126,
3767            month: 2,
3768            day: 7,
3769            weekday: 6,
3770        };
3771        let time = Time {
3772            hour: 10,
3773            minute: 11,
3774            second: 12,
3775            hundredths: 13,
3776        };
3777
3778        state
3779            .recv
3780            .lock()
3781            .await
3782            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3783
3784        let _ = client
3785            .read_range_by_time(
3786                addr,
3787                object_id,
3788                PropertyId::PresentValue,
3789                None,
3790                (date, time),
3791                2,
3792            )
3793            .await
3794            .unwrap();
3795
3796        let sent = state.sent.lock().await;
3797        let mut r = Reader::new(&sent[0].1);
3798        let _npdu = Npdu::decode(&mut r).unwrap();
3799        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3800        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3801        match Tag::decode(&mut r).unwrap() {
3802            Tag::Context { tag_num: 0, len: 4 } => {
3803                let _ = r.read_exact(4).unwrap();
3804            }
3805            other => panic!("unexpected object id tag: {other:?}"),
3806        }
3807        match Tag::decode(&mut r).unwrap() {
3808            Tag::Context { tag_num: 1, len } => {
3809                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3810            }
3811            other => panic!("unexpected property tag: {other:?}"),
3812        }
3813        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3814        match Tag::decode(&mut r).unwrap() {
3815            Tag::Application {
3816                tag: AppTag::Date,
3817                len: 4,
3818            } => {
3819                let raw = r.read_exact(4).unwrap();
3820                assert_eq!(
3821                    raw,
3822                    &[date.year_since_1900, date.month, date.day, date.weekday]
3823                );
3824            }
3825            other => panic!("unexpected date tag: {other:?}"),
3826        }
3827        match Tag::decode(&mut r).unwrap() {
3828            Tag::Application {
3829                tag: AppTag::Time,
3830                len: 4,
3831            } => {
3832                let raw = r.read_exact(4).unwrap();
3833                assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3834            }
3835            other => panic!("unexpected time tag: {other:?}"),
3836        }
3837        match Tag::decode(&mut r).unwrap() {
3838            Tag::Application {
3839                tag: AppTag::SignedInt,
3840                len,
3841            } => {
3842                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3843            }
3844            other => panic!("unexpected count tag: {other:?}"),
3845        }
3846        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3847    }
3848
3849    #[tokio::test]
3850    async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3851        let (dl, state) = MockDataLink::new();
3852        let client = BacnetClient::with_datalink(dl);
3853        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3854
3855        let mut apdu = [0u8; 256];
3856        let mut w = Writer::new(&mut apdu);
3857        UnconfirmedRequestHeader {
3858            service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3859        }
3860        .encode(&mut w)
3861        .unwrap();
3862        encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3863        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3864        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3865        encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3866        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3867        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3868        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3869        encode_app_real(&mut w, 73.25).unwrap();
3870        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3871        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3872
3873        state
3874            .recv
3875            .lock()
3876            .await
3877            .push_back((with_npdu(w.as_written()), addr));
3878
3879        let notification = client
3880            .recv_cov_notification(Duration::from_secs(1))
3881            .await
3882            .unwrap()
3883            .unwrap();
3884        assert!(!notification.confirmed);
3885        assert_eq!(notification.subscriber_process_id, 17);
3886        assert_eq!(notification.values.len(), 1);
3887        assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3888        assert!(matches!(
3889            notification.values[0].value,
3890            ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3891        ));
3892
3893        let sent = state.sent.lock().await;
3894        assert!(sent.is_empty());
3895    }
3896
3897    #[tokio::test]
3898    async fn recv_confirmed_cov_notification_sends_simple_ack() {
3899        let (dl, state) = MockDataLink::new();
3900        let client = BacnetClient::with_datalink(dl);
3901        let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3902
3903        let mut apdu = [0u8; 256];
3904        let mut w = Writer::new(&mut apdu);
3905        ConfirmedRequestHeader {
3906            segmented: false,
3907            more_follows: false,
3908            segmented_response_accepted: false,
3909            max_segments: 0,
3910            max_apdu: 5,
3911            invoke_id: 9,
3912            sequence_number: None,
3913            proposed_window_size: None,
3914            service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3915        }
3916        .encode(&mut w)
3917        .unwrap();
3918        encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3919        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3920        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3921        encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3922        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3923        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3924        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3925        encode_app_real(&mut w, 55.0).unwrap();
3926        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3927        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3928
3929        state
3930            .recv
3931            .lock()
3932            .await
3933            .push_back((with_npdu(w.as_written()), addr));
3934
3935        let notification = client
3936            .recv_cov_notification(Duration::from_secs(1))
3937            .await
3938            .unwrap()
3939            .unwrap();
3940        assert!(notification.confirmed);
3941        assert_eq!(notification.values.len(), 1);
3942
3943        let sent = state.sent.lock().await;
3944        assert_eq!(sent.len(), 1);
3945        let mut r = Reader::new(&sent[0].1);
3946        let _npdu = Npdu::decode(&mut r).unwrap();
3947        let ack = SimpleAck::decode(&mut r).unwrap();
3948        assert_eq!(ack.invoke_id, 9);
3949        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3950    }
3951
3952    #[tokio::test]
3953    async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3954        let (dl, state) = MockDataLink::new();
3955        let client = BacnetClient::with_datalink(dl);
3956        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3957
3958        let mut apdu = [0u8; 256];
3959        let mut w = Writer::new(&mut apdu);
3960        UnconfirmedRequestHeader {
3961            service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3962        }
3963        .encode(&mut w)
3964        .unwrap();
3965        encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3966        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3967        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3968        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3969        encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3970        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3971        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3972        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3973        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3974        encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3975        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3976        Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3977        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3978        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3979        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3980        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3981        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3982        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3983        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3984
3985        state
3986            .recv
3987            .lock()
3988            .await
3989            .push_back((with_npdu(w.as_written()), addr));
3990
3991        let notification: EventNotification = client
3992            .recv_event_notification(Duration::from_secs(1))
3993            .await
3994            .unwrap()
3995            .unwrap();
3996        assert!(!notification.confirmed);
3997        assert_eq!(notification.process_id, 99);
3998        assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3999        assert_eq!(notification.ack_required, Some(true));
4000        assert_eq!(notification.from_state, Some(EventState::Offnormal));
4001        assert_eq!(notification.to_state, Some(EventState::Normal));
4002        assert_eq!(notification.notify_type, 0);
4003
4004        let sent = state.sent.lock().await;
4005        assert!(sent.is_empty());
4006    }
4007
4008    #[tokio::test]
4009    async fn recv_confirmed_event_notification_sends_simple_ack() {
4010        let (dl, state) = MockDataLink::new();
4011        let client = BacnetClient::with_datalink(dl);
4012        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4013
4014        let mut apdu = [0u8; 256];
4015        let mut w = Writer::new(&mut apdu);
4016        ConfirmedRequestHeader {
4017            segmented: false,
4018            more_follows: false,
4019            segmented_response_accepted: false,
4020            max_segments: 0,
4021            max_apdu: 5,
4022            invoke_id: 11,
4023            sequence_number: None,
4024            proposed_window_size: None,
4025            service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
4026        }
4027        .encode(&mut w)
4028        .unwrap();
4029        encode_ctx_unsigned(&mut w, 0, 100).unwrap();
4030        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4031        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
4032        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4033        encode_ctx_unsigned(&mut w, 1, 56).unwrap();
4034        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4035        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4036        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4037        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4038        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4039        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4040        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4041        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4042        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4043        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4044        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
4045        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
4046
4047        state
4048            .recv
4049            .lock()
4050            .await
4051            .push_back((with_npdu(w.as_written()), addr));
4052
4053        let notification = client
4054            .recv_event_notification(Duration::from_secs(1))
4055            .await
4056            .unwrap()
4057            .unwrap();
4058        assert!(notification.confirmed);
4059
4060        let sent = state.sent.lock().await;
4061        assert_eq!(sent.len(), 1);
4062        let mut r = Reader::new(&sent[0].1);
4063        let _npdu = Npdu::decode(&mut r).unwrap();
4064        let ack = SimpleAck::decode(&mut r).unwrap();
4065        assert_eq!(ack.invoke_id, 11);
4066        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
4067    }
4068
4069    #[tokio::test]
4070    async fn write_property_multiple_segments_large_request() {
4071        let (dl, state) = MockDataLink::new();
4072        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4073        let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
4074        let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
4075
4076        {
4077            let mut recv = state.recv.lock().await;
4078            for seq in 0u8..=254 {
4079                let mut apdu = [0u8; 16];
4080                let mut w = Writer::new(&mut apdu);
4081                SegmentAck {
4082                    negative_ack: false,
4083                    sent_by_server: true,
4084                    invoke_id: 1,
4085                    sequence_number: seq,
4086                    actual_window_size: 1,
4087                }
4088                .encode(&mut w)
4089                .unwrap();
4090                recv.push_back((with_npdu(w.as_written()), addr));
4091            }
4092
4093            let mut apdu = [0u8; 16];
4094            let mut w = Writer::new(&mut apdu);
4095            SimpleAck {
4096                invoke_id: 1,
4097                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4098            }
4099            .encode(&mut w)
4100            .unwrap();
4101            recv.push_back((with_npdu(w.as_written()), addr));
4102        }
4103
4104        let writes: Vec<PropertyWriteSpec> = (0..180)
4105            .map(|_| PropertyWriteSpec {
4106                property_id: PropertyId::Description,
4107                array_index: None,
4108                value: DataValue::CharacterString(
4109                    "rustbac segmented write test payload................................................................",
4110                ),
4111                priority: None,
4112            })
4113            .collect();
4114
4115        client
4116            .write_property_multiple(addr, object_id, &writes)
4117            .await
4118            .unwrap();
4119
4120        let sent = state.sent.lock().await;
4121        assert!(sent.len() > 1);
4122
4123        let mut seqs = Vec::new();
4124        let mut saw_more_follows = false;
4125        let mut saw_last = false;
4126        for (_, frame) in sent.iter() {
4127            let mut r = Reader::new(frame);
4128            let _npdu = Npdu::decode(&mut r).unwrap();
4129            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4130            assert!(hdr.segmented);
4131            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4132            if hdr.more_follows {
4133                saw_more_follows = true;
4134            } else {
4135                saw_last = true;
4136            }
4137            seqs.push(hdr.sequence_number.unwrap());
4138        }
4139
4140        assert!(saw_more_follows);
4141        assert!(saw_last);
4142        for (idx, seq) in seqs.iter().enumerate() {
4143            assert_eq!(*seq as usize, idx);
4144        }
4145    }
4146
4147    #[tokio::test]
4148    async fn write_property_multiple_uses_configured_segment_window() {
4149        let (dl, state) = MockDataLink::new();
4150        let client = BacnetClient::with_datalink(dl)
4151            .with_response_timeout(Duration::from_secs(1))
4152            .with_segmented_request_window_size(4);
4153        let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
4154        let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
4155
4156        {
4157            let mut recv = state.recv.lock().await;
4158            for seq in 0u8..=254 {
4159                let mut apdu = [0u8; 16];
4160                let mut w = Writer::new(&mut apdu);
4161                SegmentAck {
4162                    negative_ack: false,
4163                    sent_by_server: true,
4164                    invoke_id: 1,
4165                    sequence_number: seq,
4166                    actual_window_size: 4,
4167                }
4168                .encode(&mut w)
4169                .unwrap();
4170                recv.push_back((with_npdu(w.as_written()), addr));
4171            }
4172
4173            let mut apdu = [0u8; 16];
4174            let mut w = Writer::new(&mut apdu);
4175            SimpleAck {
4176                invoke_id: 1,
4177                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4178            }
4179            .encode(&mut w)
4180            .unwrap();
4181            recv.push_back((with_npdu(w.as_written()), addr));
4182        }
4183
4184        let writes: Vec<PropertyWriteSpec> = (0..180)
4185            .map(|_| PropertyWriteSpec {
4186                property_id: PropertyId::Description,
4187                array_index: None,
4188                value: DataValue::CharacterString(
4189                    "rustbac segmented write test payload................................................................",
4190                ),
4191                priority: None,
4192            })
4193            .collect();
4194
4195        client
4196            .write_property_multiple(addr, object_id, &writes)
4197            .await
4198            .unwrap();
4199
4200        let sent = state.sent.lock().await;
4201        assert!(sent.len() > 4);
4202        for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
4203            let mut r = Reader::new(frame);
4204            let _npdu = Npdu::decode(&mut r).unwrap();
4205            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4206            assert!(hdr.segmented);
4207            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4208            assert_eq!(hdr.sequence_number, Some(idx as u8));
4209            assert_eq!(hdr.proposed_window_size, Some(4));
4210        }
4211    }
4212
4213    #[tokio::test]
4214    async fn write_property_multiple_adapts_window_to_peer_ack_window() {
4215        let (dl, state) = MockDataLink::new();
4216        let client = BacnetClient::with_datalink(dl)
4217            .with_response_timeout(Duration::from_secs(1))
4218            .with_segmented_request_window_size(4);
4219        let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
4220        let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
4221
4222        {
4223            let mut recv = state.recv.lock().await;
4224            for seq in 0u8..=254 {
4225                let mut apdu = [0u8; 16];
4226                let mut w = Writer::new(&mut apdu);
4227                SegmentAck {
4228                    negative_ack: false,
4229                    sent_by_server: true,
4230                    invoke_id: 1,
4231                    sequence_number: seq,
4232                    actual_window_size: 2,
4233                }
4234                .encode(&mut w)
4235                .unwrap();
4236                recv.push_back((with_npdu(w.as_written()), addr));
4237            }
4238
4239            let mut apdu = [0u8; 16];
4240            let mut w = Writer::new(&mut apdu);
4241            SimpleAck {
4242                invoke_id: 1,
4243                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4244            }
4245            .encode(&mut w)
4246            .unwrap();
4247            recv.push_back((with_npdu(w.as_written()), addr));
4248        }
4249
4250        let writes: Vec<PropertyWriteSpec> = (0..180)
4251            .map(|_| PropertyWriteSpec {
4252                property_id: PropertyId::Description,
4253                array_index: None,
4254                value: DataValue::CharacterString(
4255                    "rustbac segmented write test payload................................................................",
4256                ),
4257                priority: None,
4258            })
4259            .collect();
4260
4261        client
4262            .write_property_multiple(addr, object_id, &writes)
4263            .await
4264            .unwrap();
4265
4266        let sent = state.sent.lock().await;
4267        let mut saw_adapted_window = false;
4268        for (_, frame) in sent.iter() {
4269            let mut r = Reader::new(frame);
4270            let _npdu = Npdu::decode(&mut r).unwrap();
4271            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4272            if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
4273                saw_adapted_window = true;
4274                break;
4275            }
4276        }
4277        assert!(saw_adapted_window);
4278    }
4279
4280    #[tokio::test]
4281    async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
4282        let (dl, state) = MockDataLink::new();
4283        let client = BacnetClient::with_datalink(dl)
4284            .with_response_timeout(Duration::from_secs(1))
4285            .with_segmented_request_window_size(1)
4286            .with_segmented_request_retries(1);
4287        let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
4288        let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
4289
4290        {
4291            let mut recv = state.recv.lock().await;
4292
4293            let mut nack_apdu = [0u8; 16];
4294            let mut nack_w = Writer::new(&mut nack_apdu);
4295            SegmentAck {
4296                negative_ack: true,
4297                sent_by_server: true,
4298                invoke_id: 1,
4299                sequence_number: 0,
4300                actual_window_size: 1,
4301            }
4302            .encode(&mut nack_w)
4303            .unwrap();
4304            recv.push_back((with_npdu(nack_w.as_written()), addr));
4305
4306            for seq in 0u8..=254 {
4307                let mut apdu = [0u8; 16];
4308                let mut w = Writer::new(&mut apdu);
4309                SegmentAck {
4310                    negative_ack: false,
4311                    sent_by_server: true,
4312                    invoke_id: 1,
4313                    sequence_number: seq,
4314                    actual_window_size: 1,
4315                }
4316                .encode(&mut w)
4317                .unwrap();
4318                recv.push_back((with_npdu(w.as_written()), addr));
4319            }
4320
4321            let mut apdu = [0u8; 16];
4322            let mut w = Writer::new(&mut apdu);
4323            SimpleAck {
4324                invoke_id: 1,
4325                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4326            }
4327            .encode(&mut w)
4328            .unwrap();
4329            recv.push_back((with_npdu(w.as_written()), addr));
4330        }
4331
4332        let writes: Vec<PropertyWriteSpec> = (0..180)
4333            .map(|_| PropertyWriteSpec {
4334                property_id: PropertyId::Description,
4335                array_index: None,
4336                value: DataValue::CharacterString(
4337                    "rustbac segmented write test payload................................................................",
4338                ),
4339                priority: None,
4340            })
4341            .collect();
4342
4343        client
4344            .write_property_multiple(addr, object_id, &writes)
4345            .await
4346            .unwrap();
4347
4348        let sent = state.sent.lock().await;
4349        let mut seq0_frames = 0usize;
4350        for (_, frame) in sent.iter() {
4351            let mut r = Reader::new(frame);
4352            let _npdu = Npdu::decode(&mut r).unwrap();
4353            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4354            if hdr.sequence_number == Some(0) {
4355                seq0_frames += 1;
4356            }
4357        }
4358        assert!(seq0_frames >= 2);
4359    }
4360
4361    #[tokio::test]
4362    async fn read_property_ignores_invalid_frames_until_valid_response() {
4363        let (dl, state) = MockDataLink::new();
4364        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4365        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4366        let state_for_task = state.clone();
4367
4368        tokio::spawn(async move {
4369            tokio::time::sleep(Duration::from_millis(20)).await;
4370            let mut apdu = [0u8; 128];
4371            let mut w = Writer::new(&mut apdu);
4372            ComplexAckHeader {
4373                segmented: false,
4374                more_follows: false,
4375                invoke_id: 1,
4376                sequence_number: None,
4377                proposed_window_size: None,
4378                service_choice: SERVICE_READ_PROPERTY,
4379            }
4380            .encode(&mut w)
4381            .unwrap();
4382            encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4383            encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4384            Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4385            encode_app_real(&mut w, 77.0).unwrap();
4386            Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4387            state_for_task
4388                .recv
4389                .lock()
4390                .await
4391                .push_back((with_npdu(w.as_written()), addr));
4392        });
4393
4394        let value = client
4395            .read_property(
4396                addr,
4397                ObjectId::new(ObjectType::Device, 1),
4398                PropertyId::PresentValue,
4399            )
4400            .await
4401            .unwrap();
4402        assert!(matches!(
4403            value,
4404            ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4405        ));
4406    }
4407
4408    #[tokio::test]
4409    async fn read_property_maps_reject() {
4410        let (dl, state) = MockDataLink::new();
4411        let client = BacnetClient::with_datalink(dl);
4412        let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4413
4414        let mut apdu = [0u8; 8];
4415        let mut w = Writer::new(&mut apdu);
4416        w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4417        w.write_u8(1).unwrap(); // invoke id
4418        w.write_u8(2).unwrap(); // reason
4419        state
4420            .recv
4421            .lock()
4422            .await
4423            .push_back((with_npdu(w.as_written()), addr));
4424
4425        let err = client
4426            .read_property(
4427                addr,
4428                ObjectId::new(ObjectType::Device, 1),
4429                PropertyId::ObjectName,
4430            )
4431            .await
4432            .unwrap_err();
4433        assert!(matches!(
4434            err,
4435            crate::ClientError::RemoteReject { reason: 2 }
4436        ));
4437    }
4438
4439    #[tokio::test]
4440    async fn read_property_maps_remote_error_details() {
4441        let (dl, state) = MockDataLink::new();
4442        let client = BacnetClient::with_datalink(dl);
4443        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4444
4445        let mut apdu = [0u8; 16];
4446        let mut w = Writer::new(&mut apdu);
4447        w.write_u8((ApduType::Error as u8) << 4).unwrap();
4448        w.write_u8(1).unwrap(); // invoke id
4449        w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4450            .unwrap();
4451        Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4452        w.write_u8(2).unwrap(); // property class
4453        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4454        w.write_u8(32).unwrap(); // unknownProperty
4455
4456        state
4457            .recv
4458            .lock()
4459            .await
4460            .push_back((with_npdu(w.as_written()), addr));
4461
4462        let err = client
4463            .read_property(
4464                addr,
4465                ObjectId::new(ObjectType::Device, 1),
4466                PropertyId::ObjectName,
4467            )
4468            .await
4469            .unwrap_err();
4470        assert!(matches!(
4471            err,
4472            crate::ClientError::RemoteServiceError {
4473                service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4474                error_class_raw: Some(2),
4475                error_code_raw: Some(32),
4476                error_class: Some(rustbac_core::types::ErrorClass::Property),
4477                error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4478            }
4479        ));
4480    }
4481
4482    #[tokio::test]
4483    async fn write_property_maps_abort() {
4484        let (dl, state) = MockDataLink::new();
4485        let client = BacnetClient::with_datalink(dl);
4486        let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4487
4488        let mut apdu = [0u8; 8];
4489        let mut w = Writer::new(&mut apdu);
4490        w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); // server abort
4491        w.write_u8(1).unwrap(); // invoke id
4492        w.write_u8(9).unwrap(); // reason
4493        state
4494            .recv
4495            .lock()
4496            .await
4497            .push_back((with_npdu(w.as_written()), addr));
4498
4499        let req = rustbac_core::services::write_property::WritePropertyRequest {
4500            object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4501            property_id: PropertyId::PresentValue,
4502            value: DataValue::Real(10.0),
4503            priority: Some(8),
4504            ..Default::default()
4505        };
4506        let err = client.write_property(addr, req).await.unwrap_err();
4507        assert!(matches!(
4508            err,
4509            crate::ClientError::RemoteAbort {
4510                reason: 9,
4511                server: true
4512            }
4513        ));
4514    }
4515
4516    #[tokio::test]
4517    async fn read_property_multiple_returns_owned_string() {
4518        let (dl, state) = MockDataLink::new();
4519        let client = BacnetClient::with_datalink(dl);
4520        let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4521        let object_id = ObjectId::new(ObjectType::Device, 1);
4522
4523        let mut apdu_buf = [0u8; 256];
4524        let mut w = Writer::new(&mut apdu_buf);
4525        ComplexAckHeader {
4526            segmented: false,
4527            more_follows: false,
4528            invoke_id: 1,
4529            sequence_number: None,
4530            proposed_window_size: None,
4531            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4532        }
4533        .encode(&mut w)
4534        .unwrap();
4535        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4536        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4537            .encode(&mut w)
4538            .unwrap();
4539        encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4540        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4541            .encode(&mut w)
4542            .unwrap();
4543        rustbac_core::services::value_codec::encode_application_data_value(
4544            &mut w,
4545            &DataValue::CharacterString("AHU-1"),
4546        )
4547        .unwrap();
4548        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4549            .encode(&mut w)
4550            .unwrap();
4551        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4552            .encode(&mut w)
4553            .unwrap();
4554
4555        state
4556            .recv
4557            .lock()
4558            .await
4559            .push_back((with_npdu(w.as_written()), addr));
4560
4561        let values = client
4562            .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4563            .await
4564            .unwrap();
4565        assert_eq!(values.len(), 1);
4566        assert_eq!(values[0].0, PropertyId::ObjectName);
4567        assert!(matches!(
4568            &values[0].1,
4569            ClientDataValue::CharacterString(s) if s == "AHU-1"
4570        ));
4571    }
4572
4573    #[tokio::test]
4574    async fn new_sc_rejects_invalid_endpoint() {
4575        let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4576        assert!(matches!(err, crate::ClientError::DataLink(_)));
4577    }
4578}