Skip to main content

rustbac_client/
client.rs

1use crate::{
2    AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3    ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4    DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5    EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9    AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10    SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15    AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18    AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19    SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22    AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25    AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32    DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33    ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36    EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37    GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40    EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41    SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49    AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50    SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53    CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54    SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57    ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58    SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61    ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64    PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65    ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70    SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77    PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78    SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83    BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::{HashMap, HashSet};
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::sync::RwLock;
89use std::time::Duration;
90use tokio::sync::Mutex;
91use tokio::task::JoinHandle;
92use tokio::time::{timeout, Instant};
93
94const MIN_SEGMENT_DATA_LEN: usize = 32;
95const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
96
97/// High-level async BACnet client.
98///
99/// `BacnetClient<D>` wraps any [`DataLink`] transport and exposes ergonomic methods for common
100/// BACnet operations: reading and writing properties, device and object discovery, COV
101/// subscriptions, alarm/event services, file I/O, and device management.
102///
103/// The invoke-id counter and the per-request I/O lock are stored behind `Mutex`es, so the
104/// client is intentionally not `Clone` — wrap it in an `Arc` to share it across tasks.
105///
106/// # Construction
107///
108/// - UDP/IP: [`BacnetClient::new()`] (local broadcast) or [`BacnetClient::new_foreign()`]
109///   (register as a foreign device with a BBMD).
110/// - BACnet/SC (WebSocket): [`BacnetClient::new_sc()`].
111/// - Custom transport: [`BacnetClient::with_datalink()`].
112#[derive(Debug)]
113pub struct BacnetClient<D: DataLink> {
114    datalink: D,
115    invoke_id: Mutex<u8>,
116    request_io_lock: Mutex<()>,
117    response_timeout: Duration,
118    segmented_request_window_size: u8,
119    segmented_request_retries: u8,
120    segment_ack_timeout: Duration,
121    /// Peer max-APDU sizes in bytes, populated from I-Am responses via `who_is`.
122    capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
123}
124
125/// Handle for a background task that periodically re-registers this client as a BACnet
126/// foreign device with the BBMD.
127///
128/// Dropping this value aborts the renewal task automatically. Call [`ForeignDeviceRenewal::stop`]
129/// to abort it explicitly.
130#[derive(Debug)]
131pub struct ForeignDeviceRenewal {
132    task: JoinHandle<()>,
133}
134
135impl ForeignDeviceRenewal {
136    /// Abort the background renewal task immediately.
137    pub fn stop(self) {
138        self.task.abort();
139    }
140}
141
142impl Drop for ForeignDeviceRenewal {
143    fn drop(&mut self) {
144        self.task.abort();
145    }
146}
147
148impl BacnetClient<BacnetIpTransport> {
149    /// Create a UDP/IP BACnet client bound to an ephemeral port on all interfaces.
150    ///
151    /// Returns [`ClientError::DataLink`] if the socket cannot be bound.
152    pub async fn new() -> Result<Self, ClientError> {
153        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
154        let datalink = BacnetIpTransport::bind(bind_addr).await?;
155        Ok(Self {
156            datalink,
157            invoke_id: Mutex::new(1),
158            request_io_lock: Mutex::new(()),
159            response_timeout: Duration::from_secs(3),
160            segmented_request_window_size: 16,
161            segmented_request_retries: 2,
162            segment_ack_timeout: Duration::from_millis(500),
163            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
164        })
165    }
166
167    /// Create a UDP/IP BACnet client registered as a foreign device with the BBMD at
168    /// `bbmd_addr`.
169    ///
170    /// `ttl_seconds` is the time-to-live for the foreign-device registration. Returns
171    /// [`ClientError::DataLink`] if the socket cannot be bound or the initial registration
172    /// request fails.
173    pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
174        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
175        let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
176        datalink.register_foreign_device(ttl_seconds).await?;
177        Ok(Self {
178            datalink,
179            invoke_id: Mutex::new(1),
180            request_io_lock: Mutex::new(()),
181            response_timeout: Duration::from_secs(3),
182            segmented_request_window_size: 16,
183            segmented_request_retries: 2,
184            segment_ack_timeout: Duration::from_millis(500),
185            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
186        })
187    }
188
189    /// Re-register this client as a foreign device with the BBMD, using `ttl_seconds` as the new
190    /// time-to-live.
191    pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
192        let _io = self.request_io_lock.lock().await;
193        self.datalink.register_foreign_device(ttl_seconds).await?;
194        Ok(())
195    }
196
197    /// Read the Broadcast Distribution Table (BDT) from the BBMD.
198    pub async fn read_broadcast_distribution_table(
199        &self,
200    ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
201        let _io = self.request_io_lock.lock().await;
202        self.datalink
203            .read_broadcast_distribution_table()
204            .await
205            .map_err(ClientError::from)
206    }
207
208    /// Write (replace) the Broadcast Distribution Table on the BBMD with the given `entries`.
209    pub async fn write_broadcast_distribution_table(
210        &self,
211        entries: &[BroadcastDistributionEntry],
212    ) -> Result<(), ClientError> {
213        let _io = self.request_io_lock.lock().await;
214        self.datalink
215            .write_broadcast_distribution_table(entries)
216            .await?;
217        Ok(())
218    }
219
220    /// Read the Foreign Device Table (FDT) from the BBMD.
221    pub async fn read_foreign_device_table(
222        &self,
223    ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
224        let _io = self.request_io_lock.lock().await;
225        self.datalink
226            .read_foreign_device_table()
227            .await
228            .map_err(ClientError::from)
229    }
230
231    /// Delete the foreign device entry for `address` from the BBMD's Foreign Device Table.
232    pub async fn delete_foreign_device_table_entry(
233        &self,
234        address: SocketAddrV4,
235    ) -> Result<(), ClientError> {
236        let _io = self.request_io_lock.lock().await;
237        self.datalink
238            .delete_foreign_device_table_entry(address)
239            .await?;
240        Ok(())
241    }
242
243    /// Spawn a background task that re-registers this client as a foreign device at roughly
244    /// 75 % of `ttl_seconds` to keep the registration alive indefinitely.
245    ///
246    /// Returns a [`ForeignDeviceRenewal`] handle; dropping it (or calling `.stop()`) cancels
247    /// the task. Returns an error if `ttl_seconds` is 0.
248    pub fn start_foreign_device_renewal(
249        &self,
250        ttl_seconds: u16,
251    ) -> Result<ForeignDeviceRenewal, ClientError> {
252        if ttl_seconds == 0 {
253            return Err(EncodeError::InvalidLength.into());
254        }
255
256        let datalink = self.datalink.clone();
257        let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
258        let interval = Duration::from_secs(refresh_seconds.max(1));
259        let task = tokio::spawn(async move {
260            loop {
261                tokio::time::sleep(interval).await;
262                if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
263                    log::warn!("foreign device renewal send failed: {err}");
264                }
265            }
266        });
267        Ok(ForeignDeviceRenewal { task })
268    }
269}
270
271impl BacnetClient<BacnetScTransport> {
272    /// Create a BACnet/SC client by connecting to the WebSocket hub at `endpoint`
273    /// (e.g. `"wss://hub.example.com:47808/bacnet"`).
274    pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
275        let datalink = BacnetScTransport::connect(endpoint).await?;
276        Ok(Self::with_datalink(datalink))
277    }
278}
279
280impl<D: DataLink> BacnetClient<D> {
281    /// Create a client wrapping an already-constructed `datalink`.
282    ///
283    /// Use this when you need a custom or pre-configured [`DataLink`] implementation.
284    pub fn with_datalink(datalink: D) -> Self {
285        Self {
286            datalink,
287            invoke_id: Mutex::new(1),
288            request_io_lock: Mutex::new(()),
289            response_timeout: Duration::from_secs(3),
290            segmented_request_window_size: 16,
291            segmented_request_retries: 2,
292            segment_ack_timeout: Duration::from_millis(500),
293            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
294        }
295    }
296
297    /// Override the per-request response timeout (default: 3 s).
298    pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
299        self.response_timeout = timeout;
300        self
301    }
302
303    /// Override the segmented-request window size (number of segments sent before waiting
304    /// for an ACK). Clamped to a minimum of 1. Default: 16.
305    pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
306        self.segmented_request_window_size = window_size.max(1);
307        self
308    }
309
310    /// Override the number of times a segment window is retried after a negative ACK or
311    /// segment-ACK timeout before giving up. Default: 2.
312    pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
313        self.segmented_request_retries = retries;
314        self
315    }
316
317    /// Override the timeout used when waiting for a segment ACK from the remote device
318    /// during a segmented confirmed request. Clamped to a minimum of 1 ms. Default: 500 ms.
319    pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
320        self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
321        self
322    }
323
324    async fn next_invoke_id(&self) -> u8 {
325        let mut lock = self.invoke_id.lock().await;
326        let id = *lock;
327        *lock = lock.wrapping_add(1);
328        if *lock == 0 {
329            *lock = 1;
330        }
331        id
332    }
333
334    async fn send_segment_ack(
335        &self,
336        address: DataLinkAddress,
337        invoke_id: u8,
338        sequence_number: u8,
339        window_size: u8,
340    ) -> Result<(), ClientError> {
341        let mut tx = [0u8; 64];
342        let mut w = Writer::new(&mut tx);
343        Npdu::new(0).encode(&mut w)?;
344        SegmentAck {
345            negative_ack: false,
346            sent_by_server: false,
347            invoke_id,
348            sequence_number,
349            actual_window_size: window_size,
350        }
351        .encode(&mut w)?;
352        self.datalink.send(address, w.as_written()).await?;
353        Ok(())
354    }
355
356    async fn recv_ignoring_invalid_frame(
357        &self,
358        buf: &mut [u8],
359        deadline: Instant,
360    ) -> Result<(usize, DataLinkAddress), ClientError> {
361        loop {
362            let remaining = deadline.saturating_duration_since(Instant::now());
363            if remaining.is_zero() {
364                return Err(ClientError::Timeout);
365            }
366
367            match timeout(remaining, self.datalink.recv(buf)).await {
368                Err(_) => return Err(ClientError::Timeout),
369                Ok(Err(DataLinkError::InvalidFrame)) => continue,
370                Ok(Err(e)) => return Err(e.into()),
371                Ok(Ok(v)) => return Ok(v),
372            }
373        }
374    }
375
376    async fn send_simple_ack(
377        &self,
378        address: DataLinkAddress,
379        invoke_id: u8,
380        service_choice: u8,
381    ) -> Result<(), ClientError> {
382        let mut tx = [0u8; 64];
383        let mut w = Writer::new(&mut tx);
384        Npdu::new(0).encode(&mut w)?;
385        SimpleAck {
386            invoke_id,
387            service_choice,
388        }
389        .encode(&mut w)?;
390        self.datalink.send(address, w.as_written()).await?;
391        Ok(())
392    }
393
394    fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
395    where
396        F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
397    {
398        for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
399            let mut buf = vec![0u8; size];
400            let mut w = Writer::new(&mut buf);
401            match encode(&mut w) {
402                Ok(()) => {
403                    let written_len = w.as_written().len();
404                    buf.truncate(written_len);
405                    return Ok(buf);
406                }
407                Err(EncodeError::BufferTooSmall) => continue,
408                Err(e) => return Err(e.into()),
409            }
410        }
411        Err(ClientError::SegmentedRequestTooLarge)
412    }
413
414    const fn max_apdu_octets(max_apdu_code: u8) -> usize {
415        match max_apdu_code & 0x0f {
416            0 => 50,
417            1 => 128,
418            2 => 206,
419            3 => 480,
420            4 => 1024,
421            5 => 1476,
422            _ => 480,
423        }
424    }
425
426    async fn await_segment_ack(
427        &self,
428        address: DataLinkAddress,
429        invoke_id: u8,
430        service_choice: u8,
431        expected_sequence: u8,
432        deadline: Instant,
433    ) -> Result<SegmentAck, ClientError> {
434        loop {
435            let remaining = deadline.saturating_duration_since(Instant::now());
436            if remaining.is_zero() {
437                return Err(ClientError::Timeout);
438            }
439
440            let mut rx = [0u8; 1500];
441            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
442            let (n, src) = match recv {
443                Err(_) => return Err(ClientError::Timeout),
444                Ok(Err(DataLinkError::InvalidFrame)) => continue,
445                Ok(Err(e)) => return Err(e.into()),
446                Ok(Ok(v)) => v,
447            };
448            if src != address {
449                continue;
450            }
451
452            let Ok(apdu) = extract_apdu(&rx[..n]) else {
453                continue;
454            };
455            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
456            match ApduType::from_u8(first >> 4) {
457                Some(ApduType::SegmentAck) => {
458                    let mut r = Reader::new(apdu);
459                    let ack = SegmentAck::decode(&mut r)?;
460                    if ack.invoke_id != invoke_id || !ack.sent_by_server {
461                        continue;
462                    }
463                    if ack.negative_ack {
464                        return Err(ClientError::SegmentNegativeAck {
465                            sequence_number: ack.sequence_number,
466                        });
467                    }
468                    if ack.sequence_number == expected_sequence {
469                        return Ok(ack);
470                    }
471                }
472                Some(ApduType::Error) => {
473                    let mut r = Reader::new(apdu);
474                    let err = BacnetError::decode(&mut r)?;
475                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
476                        return Err(remote_service_error(err));
477                    }
478                }
479                Some(ApduType::Reject) => {
480                    let mut r = Reader::new(apdu);
481                    let rej = RejectPdu::decode(&mut r)?;
482                    if rej.invoke_id == invoke_id {
483                        return Err(ClientError::RemoteReject { reason: rej.reason });
484                    }
485                }
486                Some(ApduType::Abort) => {
487                    let mut r = Reader::new(apdu);
488                    let abort = AbortPdu::decode(&mut r)?;
489                    if abort.invoke_id == invoke_id {
490                        return Err(ClientError::RemoteAbort {
491                            reason: abort.reason,
492                            server: abort.server,
493                        });
494                    }
495                }
496                _ => continue,
497            }
498        }
499    }
500
501    async fn send_confirmed_request(
502        &self,
503        address: DataLinkAddress,
504        frame: &[u8],
505        deadline: Instant,
506    ) -> Result<(), ClientError> {
507        let mut pr = Reader::new(frame);
508        let _npdu = Npdu::decode(&mut pr)?;
509        let npdu_len = frame.len() - pr.remaining();
510        let npdu_bytes = &frame[..npdu_len];
511        let apdu = &frame[npdu_len..];
512
513        let mut ar = Reader::new(apdu);
514        let header = ConfirmedRequestHeader::decode(&mut ar)?;
515        let service_payload = ar.read_exact(ar.remaining())?;
516
517        // Use the peer's max-APDU if we learned it from a prior I-Am; fall back to
518        // the code declared in the request header (our own capability advertisement).
519        let peer_max_apdu = self
520            .capability_cache
521            .read()
522            .ok()
523            .and_then(|c| c.get(&address).copied())
524            .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
525        let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
526        let segment_count = service_payload.len().div_ceil(segment_data_len);
527
528        if segment_count <= 1 {
529            self.datalink.send(address, frame).await?;
530            return Ok(());
531        }
532
533        if segment_count > usize::from(u8::MAX) + 1 {
534            return Err(ClientError::SegmentedRequestTooLarge);
535        }
536
537        let configured_window_size = self.segmented_request_window_size.max(1);
538        let mut window_size = configured_window_size;
539        let mut peer_window_ceiling = configured_window_size;
540        let mut batch_start = 0usize;
541        while batch_start < segment_count {
542            let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
543            let expected_sequence = (batch_end - 1) as u8;
544
545            let mut frames = Vec::with_capacity(batch_end - batch_start);
546            for segment_index in batch_start..batch_end {
547                let seq = segment_index as u8;
548                let more_follows = segment_index + 1 < segment_count;
549                let start = segment_index * segment_data_len;
550                let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
551                let segment = &service_payload[start..end];
552
553                let seg_header = ConfirmedRequestHeader {
554                    segmented: true,
555                    more_follows,
556                    segmented_response_accepted: header.segmented_response_accepted,
557                    max_segments: header.max_segments,
558                    max_apdu: header.max_apdu,
559                    invoke_id: header.invoke_id,
560                    sequence_number: Some(seq),
561                    proposed_window_size: Some(window_size),
562                    service_choice: header.service_choice,
563                };
564
565                let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
566                let written_len = {
567                    let mut w = Writer::new(&mut tx);
568                    w.write_all(npdu_bytes)?;
569                    seg_header.encode(&mut w)?;
570                    w.write_all(segment)?;
571                    w.as_written().len()
572                };
573                tx.truncate(written_len);
574                frames.push(tx);
575            }
576
577            let mut retries_remaining = self.segmented_request_retries;
578            loop {
579                for frame in &frames {
580                    self.datalink.send(address, frame).await?;
581                }
582
583                if batch_end == segment_count {
584                    break;
585                }
586
587                let remaining = deadline.saturating_duration_since(Instant::now());
588                if remaining.is_zero() {
589                    return Err(ClientError::Timeout);
590                }
591                let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
592                match self
593                    .await_segment_ack(
594                        address,
595                        header.invoke_id,
596                        header.service_choice,
597                        expected_sequence,
598                        ack_wait_deadline,
599                    )
600                    .await
601                {
602                    Ok(ack) => {
603                        peer_window_ceiling =
604                            peer_window_ceiling.min(ack.actual_window_size.max(1));
605                        window_size = window_size
606                            .saturating_add(1)
607                            .min(configured_window_size)
608                            .min(peer_window_ceiling)
609                            .max(1);
610                        break;
611                    }
612                    Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
613                        if retries_remaining > 0 =>
614                    {
615                        retries_remaining -= 1;
616                        window_size = window_size.saturating_div(2).max(1);
617                        continue;
618                    }
619                    Err(e) => return Err(e),
620                }
621            }
622
623            batch_start = batch_end;
624        }
625
626        Ok(())
627    }
628
629    async fn collect_complex_ack_payload(
630        &self,
631        address: DataLinkAddress,
632        invoke_id: u8,
633        service_choice: u8,
634        first_header: ComplexAckHeader,
635        first_payload: &[u8],
636        deadline: Instant,
637    ) -> Result<Vec<u8>, ClientError> {
638        let mut payload = first_payload.to_vec();
639        if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
640            return Err(ClientError::ResponseTooLarge {
641                limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
642            });
643        }
644        if !first_header.segmented {
645            return Ok(payload);
646        }
647
648        let mut last_seq = first_header
649            .sequence_number
650            .ok_or(ClientError::UnsupportedResponse)?;
651        let mut window_size = first_header.proposed_window_size.unwrap_or(1);
652        self.send_segment_ack(address, invoke_id, last_seq, window_size)
653            .await?;
654        let mut more_follows = first_header.more_follows;
655
656        while more_follows {
657            let mut rx = [0u8; 1500];
658            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
659            if src != address {
660                continue;
661            }
662
663            let Ok(apdu) = extract_apdu(&rx[..n]) else {
664                continue;
665            };
666            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
667            match ApduType::from_u8(first >> 4) {
668                Some(ApduType::ComplexAck) => {
669                    let mut r = Reader::new(apdu);
670                    let seg = ComplexAckHeader::decode(&mut r)?;
671                    if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
672                        continue;
673                    }
674                    if !seg.segmented {
675                        return Err(ClientError::UnsupportedResponse);
676                    }
677                    let seq = seg
678                        .sequence_number
679                        .ok_or(ClientError::UnsupportedResponse)?;
680                    if seq == last_seq {
681                        // Duplicate segment: acknowledge again and continue waiting.
682                        self.send_segment_ack(address, invoke_id, last_seq, window_size)
683                            .await?;
684                        continue;
685                    }
686                    if seq != last_seq.wrapping_add(1) {
687                        continue;
688                    }
689
690                    let seg_payload = r.read_exact(r.remaining())?;
691                    if payload.len().saturating_add(seg_payload.len())
692                        > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
693                    {
694                        return Err(ClientError::ResponseTooLarge {
695                            limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
696                        });
697                    }
698                    payload.extend_from_slice(seg_payload);
699
700                    last_seq = seq;
701                    more_follows = seg.more_follows;
702                    window_size = seg.proposed_window_size.unwrap_or(window_size);
703                    self.send_segment_ack(address, invoke_id, last_seq, window_size)
704                        .await?;
705                }
706                Some(ApduType::Error) => {
707                    let mut r = Reader::new(apdu);
708                    let err = BacnetError::decode(&mut r)?;
709                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
710                        return Err(remote_service_error(err));
711                    }
712                }
713                Some(ApduType::Reject) => {
714                    let mut r = Reader::new(apdu);
715                    let rej = RejectPdu::decode(&mut r)?;
716                    if rej.invoke_id == invoke_id {
717                        return Err(ClientError::RemoteReject { reason: rej.reason });
718                    }
719                }
720                Some(ApduType::Abort) => {
721                    let mut r = Reader::new(apdu);
722                    let abort = AbortPdu::decode(&mut r)?;
723                    if abort.invoke_id == invoke_id {
724                        return Err(ClientError::RemoteAbort {
725                            reason: abort.reason,
726                            server: abort.server,
727                        });
728                    }
729                }
730                _ => continue,
731            }
732        }
733
734        Ok(payload)
735    }
736
737    /// Broadcast a Who-Is request and collect I-Am replies for the duration of `wait`.
738    ///
739    /// `range` constrains the device-instance range as `(low, high)`; `None` performs a
740    /// global Who-Is. Duplicate devices (same object id) are deduplicated. Returns
741    /// [`ClientError::DataLink`] on send/receive failure.
742    pub async fn who_is(
743        &self,
744        range: Option<(u32, u32)>,
745        wait: Duration,
746    ) -> Result<Vec<DiscoveredDevice>, ClientError> {
747        // Unconfirmed broadcast — no request_io_lock needed; the recv loop
748        // filters on service_choice so it coexists with concurrent confirmed requests.
749        let req = match range {
750            Some((low, high)) => WhoIsRequest {
751                low_limit: Some(low),
752                high_limit: Some(high),
753            },
754            None => WhoIsRequest::global(),
755        };
756
757        let mut tx = [0u8; 128];
758        let mut w = Writer::new(&mut tx);
759        Npdu::new(0).encode(&mut w)?;
760        req.encode(&mut w)?;
761
762        self.datalink
763            .send(
764                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
765                w.as_written(),
766            )
767            .await?;
768
769        let mut devices = Vec::new();
770        let mut seen = HashSet::new();
771        let deadline = tokio::time::Instant::now() + wait;
772
773        while tokio::time::Instant::now() < deadline {
774            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
775            let mut rx = [0u8; 1500];
776            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
777            match recv {
778                Ok(Ok((n, src))) => {
779                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
780                        continue;
781                    };
782                    let mut r = Reader::new(apdu);
783                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
784                        continue;
785                    };
786                    if unconfirmed.service_choice != SERVICE_I_AM {
787                        continue;
788                    }
789                    let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
790                        continue;
791                    };
792                    if seen.insert(i_am.device_id) {
793                        devices.push(DiscoveredDevice {
794                            address: src,
795                            device_id: Some(i_am.device_id),
796                        });
797                        // Cache the peer's reported max-APDU so segmented
798                        // requests can be sized correctly without a separate read.
799                        if let Ok(mut cache) = self.capability_cache.write() {
800                            cache.insert(src, i_am.max_apdu as usize);
801                        }
802                    }
803                }
804                Ok(Err(DataLinkError::InvalidFrame)) => continue,
805                Ok(Err(e)) => return Err(e.into()),
806                Err(_) => break,
807            }
808        }
809
810        Ok(devices)
811    }
812
813    /// Broadcast a Who-Has request for a specific object id and collect I-Have replies for
814    /// `wait`.
815    ///
816    /// `range` constrains the responding device-instance range; `None` means any device.
817    pub async fn who_has_object_id(
818        &self,
819        range: Option<(u32, u32)>,
820        object_id: ObjectId,
821        wait: Duration,
822    ) -> Result<Vec<DiscoveredObject>, ClientError> {
823        let req = WhoHasRequest {
824            low_limit: range.map(|(low, _)| low),
825            high_limit: range.map(|(_, high)| high),
826            object: WhoHasObject::ObjectId(object_id),
827        };
828        self.who_has(req, wait).await
829    }
830
831    /// Broadcast a Who-Has request for an object by name and collect I-Have replies for `wait`.
832    ///
833    /// `range` constrains the responding device-instance range; `None` means any device.
834    pub async fn who_has_object_name(
835        &self,
836        range: Option<(u32, u32)>,
837        object_name: &str,
838        wait: Duration,
839    ) -> Result<Vec<DiscoveredObject>, ClientError> {
840        let req = WhoHasRequest {
841            low_limit: range.map(|(low, _)| low),
842            high_limit: range.map(|(_, high)| high),
843            object: WhoHasObject::ObjectName(object_name),
844        };
845        self.who_has(req, wait).await
846    }
847
848    async fn who_has(
849        &self,
850        request: WhoHasRequest<'_>,
851        wait: Duration,
852    ) -> Result<Vec<DiscoveredObject>, ClientError> {
853        // Unconfirmed broadcast — same rationale as who_is.
854        let tx = self.encode_with_growth(|w| {
855            Npdu::new(0).encode(w)?;
856            request.encode(w)
857        })?;
858        self.datalink
859            .send(
860                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
861                &tx,
862            )
863            .await?;
864
865        let mut objects = Vec::new();
866        let mut seen = HashSet::new();
867        let deadline = tokio::time::Instant::now() + wait;
868
869        while tokio::time::Instant::now() < deadline {
870            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
871            let mut rx = [0u8; 1500];
872            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
873            match recv {
874                Ok(Ok((n, src))) => {
875                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
876                        continue;
877                    };
878                    let mut r = Reader::new(apdu);
879                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
880                        continue;
881                    };
882                    if unconfirmed.service_choice != SERVICE_I_HAVE {
883                        continue;
884                    }
885                    let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
886                        continue;
887                    };
888                    if !seen.insert((src, i_have.object_id.raw())) {
889                        continue;
890                    }
891                    objects.push(DiscoveredObject {
892                        address: src,
893                        device_id: i_have.device_id,
894                        object_id: i_have.object_id,
895                        object_name: i_have.object_name.to_string(),
896                    });
897                }
898                Ok(Err(DataLinkError::InvalidFrame)) => continue,
899                Ok(Err(e)) => return Err(e.into()),
900                Err(_) => break,
901            }
902        }
903
904        Ok(objects)
905    }
906
907    /// Send a DeviceCommunicationControl request to a device.
908    ///
909    /// `time_duration_seconds` sets the duration for which the state applies; `None` means
910    /// indefinite. `enable_disable` selects the new communication state. `password` is only
911    /// required if the device is password-protected.
912    pub async fn device_communication_control(
913        &self,
914        address: DataLinkAddress,
915        time_duration_seconds: Option<u16>,
916        enable_disable: DeviceCommunicationState,
917        password: Option<&str>,
918    ) -> Result<(), ClientError> {
919        let invoke_id = self.next_invoke_id().await;
920        let request = DeviceCommunicationControlRequest {
921            time_duration_seconds,
922            enable_disable,
923            password,
924            invoke_id,
925        };
926        let tx = self.encode_with_growth(|w| {
927            Npdu::new(0).encode(w)?;
928            request.encode(w)
929        })?;
930        self.await_simple_ack_or_error(
931            address,
932            &tx,
933            invoke_id,
934            SERVICE_DEVICE_COMMUNICATION_CONTROL,
935            self.response_timeout,
936        )
937        .await
938    }
939
940    /// Send a ReinitializeDevice request to a device (e.g. cold-start, warm-start, or backup).
941    ///
942    /// `state` selects the reinitialization type. `password` is sent only if `Some`.
943    pub async fn reinitialize_device(
944        &self,
945        address: DataLinkAddress,
946        state: ReinitializeState,
947        password: Option<&str>,
948    ) -> Result<(), ClientError> {
949        let invoke_id = self.next_invoke_id().await;
950        let request = ReinitializeDeviceRequest {
951            state,
952            password,
953            invoke_id,
954        };
955        let tx = self.encode_with_growth(|w| {
956            Npdu::new(0).encode(w)?;
957            request.encode(w)
958        })?;
959        self.await_simple_ack_or_error(
960            address,
961            &tx,
962            invoke_id,
963            SERVICE_REINITIALIZE_DEVICE,
964            self.response_timeout,
965        )
966        .await
967    }
968
969    /// Send a TimeSynchronization (or UTCTimeSynchronization) request to a device.
970    ///
971    /// Set `utc` to `true` to send the UTC variant of the request.
972    /// This is a fire-and-forget unconfirmed service; no response is awaited.
973    pub async fn time_synchronize(
974        &self,
975        address: DataLinkAddress,
976        date: Date,
977        time: Time,
978        utc: bool,
979    ) -> Result<(), ClientError> {
980        let request = if utc {
981            TimeSynchronizationRequest::utc(date, time)
982        } else {
983            TimeSynchronizationRequest::local(date, time)
984        };
985        let tx = self.encode_with_growth(|w| {
986            Npdu::new(0).encode(w)?;
987            request.encode(w)
988        })?;
989        self.datalink.send(address, &tx).await?;
990        Ok(())
991    }
992
993    /// Create a new object of the given type on the device, letting the device choose the
994    /// instance number. Returns the [`ObjectId`] assigned by the device.
995    pub async fn create_object_by_type(
996        &self,
997        address: DataLinkAddress,
998        object_type: rustbac_core::types::ObjectType,
999    ) -> Result<ObjectId, ClientError> {
1000        self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1001            .await
1002    }
1003
1004    /// Send a CreateObject request to the device and return the [`ObjectId`] of the newly
1005    /// created object. Use this variant when you need fine-grained control over the request
1006    /// (e.g. specifying initial property values).
1007    pub async fn create_object(
1008        &self,
1009        address: DataLinkAddress,
1010        mut request: CreateObjectRequest,
1011    ) -> Result<ObjectId, ClientError> {
1012        request.invoke_id = self.next_invoke_id().await;
1013        let invoke_id = request.invoke_id;
1014        let tx = self.encode_with_growth(|w| {
1015            Npdu::new(0).encode(w)?;
1016            request.encode(w)
1017        })?;
1018        let payload = self
1019            .await_complex_ack_payload_or_error(
1020                address,
1021                &tx,
1022                invoke_id,
1023                SERVICE_CREATE_OBJECT,
1024                self.response_timeout,
1025            )
1026            .await?;
1027        let mut pr = Reader::new(&payload);
1028        let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1029        Ok(parsed.object_id)
1030    }
1031
1032    /// Send a DeleteObject request to remove `object_id` from the device.
1033    pub async fn delete_object(
1034        &self,
1035        address: DataLinkAddress,
1036        object_id: ObjectId,
1037    ) -> Result<(), ClientError> {
1038        let invoke_id = self.next_invoke_id().await;
1039        let request = DeleteObjectRequest {
1040            object_id,
1041            invoke_id,
1042        };
1043        let tx = self.encode_with_growth(|w| {
1044            Npdu::new(0).encode(w)?;
1045            request.encode(w)
1046        })?;
1047        self.await_simple_ack_or_error(
1048            address,
1049            &tx,
1050            invoke_id,
1051            SERVICE_DELETE_OBJECT,
1052            self.response_timeout,
1053        )
1054        .await
1055    }
1056
1057    /// Send an AddListElement request to append elements to a list property on the device.
1058    pub async fn add_list_element(
1059        &self,
1060        address: DataLinkAddress,
1061        mut request: AddListElementRequest<'_>,
1062    ) -> Result<(), ClientError> {
1063        request.invoke_id = self.next_invoke_id().await;
1064        let invoke_id = request.invoke_id;
1065        let tx = self.encode_with_growth(|w| {
1066            Npdu::new(0).encode(w)?;
1067            request.encode(w)
1068        })?;
1069        self.await_simple_ack_or_error(
1070            address,
1071            &tx,
1072            invoke_id,
1073            SERVICE_ADD_LIST_ELEMENT,
1074            self.response_timeout,
1075        )
1076        .await
1077    }
1078
1079    /// Send a RemoveListElement request to delete elements from a list property on the device.
1080    pub async fn remove_list_element(
1081        &self,
1082        address: DataLinkAddress,
1083        mut request: RemoveListElementRequest<'_>,
1084    ) -> Result<(), ClientError> {
1085        request.invoke_id = self.next_invoke_id().await;
1086        let invoke_id = request.invoke_id;
1087        let tx = self.encode_with_growth(|w| {
1088            Npdu::new(0).encode(w)?;
1089            request.encode(w)
1090        })?;
1091        self.await_simple_ack_or_error(
1092            address,
1093            &tx,
1094            invoke_id,
1095            SERVICE_REMOVE_LIST_ELEMENT,
1096            self.response_timeout,
1097        )
1098        .await
1099    }
1100
1101    async fn await_simple_ack_or_error(
1102        &self,
1103        address: DataLinkAddress,
1104        tx: &[u8],
1105        invoke_id: u8,
1106        service_choice: u8,
1107        timeout_window: Duration,
1108    ) -> Result<(), ClientError> {
1109        #[cfg(feature = "tracing")]
1110        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1111        let _io_lock = self.request_io_lock.lock().await;
1112        let deadline = tokio::time::Instant::now() + timeout_window;
1113        self.send_confirmed_request(address, tx, deadline).await?;
1114        while tokio::time::Instant::now() < deadline {
1115            let mut rx = [0u8; 1500];
1116            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1117            if src != address {
1118                continue;
1119            }
1120            let apdu = extract_apdu(&rx[..n])?;
1121            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1122            match ApduType::from_u8(first >> 4) {
1123                Some(ApduType::SimpleAck) => {
1124                    let mut r = Reader::new(apdu);
1125                    let ack = SimpleAck::decode(&mut r)?;
1126                    if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1127                        return Ok(());
1128                    }
1129                }
1130                Some(ApduType::Error) => {
1131                    let mut r = Reader::new(apdu);
1132                    let err = BacnetError::decode(&mut r)?;
1133                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1134                        return Err(remote_service_error(err));
1135                    }
1136                }
1137                Some(ApduType::Reject) => {
1138                    let mut r = Reader::new(apdu);
1139                    let rej = RejectPdu::decode(&mut r)?;
1140                    if rej.invoke_id == invoke_id {
1141                        return Err(ClientError::RemoteReject { reason: rej.reason });
1142                    }
1143                }
1144                Some(ApduType::Abort) => {
1145                    let mut r = Reader::new(apdu);
1146                    let abort = AbortPdu::decode(&mut r)?;
1147                    if abort.invoke_id == invoke_id {
1148                        return Err(ClientError::RemoteAbort {
1149                            reason: abort.reason,
1150                            server: abort.server,
1151                        });
1152                    }
1153                }
1154                _ => continue,
1155            }
1156        }
1157        #[cfg(feature = "tracing")]
1158        tracing::warn!(invoke_id = invoke_id, "request timed out");
1159        Err(ClientError::Timeout)
1160    }
1161
1162    async fn await_complex_ack_payload_or_error(
1163        &self,
1164        address: DataLinkAddress,
1165        tx: &[u8],
1166        invoke_id: u8,
1167        service_choice: u8,
1168        timeout_window: Duration,
1169    ) -> Result<Vec<u8>, ClientError> {
1170        #[cfg(feature = "tracing")]
1171        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1172        let _io_lock = self.request_io_lock.lock().await;
1173        let deadline = tokio::time::Instant::now() + timeout_window;
1174        self.send_confirmed_request(address, tx, deadline).await?;
1175        while tokio::time::Instant::now() < deadline {
1176            let mut rx = [0u8; 1500];
1177            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1178            if src != address {
1179                continue;
1180            }
1181
1182            let apdu = extract_apdu(&rx[..n])?;
1183            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1184            match ApduType::from_u8(first >> 4) {
1185                Some(ApduType::ComplexAck) => {
1186                    let mut r = Reader::new(apdu);
1187                    let ack = ComplexAckHeader::decode(&mut r)?;
1188                    if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1189                        continue;
1190                    }
1191                    return self
1192                        .collect_complex_ack_payload(
1193                            address,
1194                            invoke_id,
1195                            service_choice,
1196                            ack,
1197                            r.read_exact(r.remaining())?,
1198                            deadline,
1199                        )
1200                        .await;
1201                }
1202                Some(ApduType::Error) => {
1203                    let mut r = Reader::new(apdu);
1204                    let err = BacnetError::decode(&mut r)?;
1205                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1206                        return Err(remote_service_error(err));
1207                    }
1208                }
1209                Some(ApduType::Reject) => {
1210                    let mut r = Reader::new(apdu);
1211                    let rej = RejectPdu::decode(&mut r)?;
1212                    if rej.invoke_id == invoke_id {
1213                        return Err(ClientError::RemoteReject { reason: rej.reason });
1214                    }
1215                }
1216                Some(ApduType::Abort) => {
1217                    let mut r = Reader::new(apdu);
1218                    let abort = AbortPdu::decode(&mut r)?;
1219                    if abort.invoke_id == invoke_id {
1220                        return Err(ClientError::RemoteAbort {
1221                            reason: abort.reason,
1222                            server: abort.server,
1223                        });
1224                    }
1225                }
1226                _ => continue,
1227            }
1228        }
1229        #[cfg(feature = "tracing")]
1230        tracing::warn!(invoke_id = invoke_id, "request timed out");
1231        Err(ClientError::Timeout)
1232    }
1233
1234    /// Send a GetAlarmSummary request and return the list of active alarms on the device.
1235    pub async fn get_alarm_summary(
1236        &self,
1237        address: DataLinkAddress,
1238    ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1239        let invoke_id = self.next_invoke_id().await;
1240        let request = GetAlarmSummaryRequest { invoke_id };
1241        let tx = self.encode_with_growth(|w| {
1242            Npdu::new(0).encode(w)?;
1243            request.encode(w)
1244        })?;
1245        let payload = self
1246            .await_complex_ack_payload_or_error(
1247                address,
1248                &tx,
1249                invoke_id,
1250                SERVICE_GET_ALARM_SUMMARY,
1251                self.response_timeout,
1252            )
1253            .await?;
1254        let mut pr = Reader::new(&payload);
1255        let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1256        Ok(into_client_alarm_summary(parsed.summaries))
1257    }
1258
1259    /// Send a GetEnrollmentSummary request and return the list of event enrollments on the device.
1260    pub async fn get_enrollment_summary(
1261        &self,
1262        address: DataLinkAddress,
1263    ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1264        let invoke_id = self.next_invoke_id().await;
1265        let request = GetEnrollmentSummaryRequest { invoke_id };
1266        let tx = self.encode_with_growth(|w| {
1267            Npdu::new(0).encode(w)?;
1268            request.encode(w)
1269        })?;
1270        let payload = self
1271            .await_complex_ack_payload_or_error(
1272                address,
1273                &tx,
1274                invoke_id,
1275                SERVICE_GET_ENROLLMENT_SUMMARY,
1276                self.response_timeout,
1277            )
1278            .await?;
1279        let mut pr = Reader::new(&payload);
1280        let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1281        Ok(into_client_enrollment_summary(parsed.summaries))
1282    }
1283
1284    /// Send a GetEventInformation request and return active event summaries from the device.
1285    ///
1286    /// Pass `last_received_object_id` to page through results; the returned
1287    /// [`EventInformationResult::more_events`] indicates whether another call is needed.
1288    pub async fn get_event_information(
1289        &self,
1290        address: DataLinkAddress,
1291        last_received_object_id: Option<ObjectId>,
1292    ) -> Result<EventInformationResult, ClientError> {
1293        let invoke_id = self.next_invoke_id().await;
1294        let request = GetEventInformationRequest {
1295            last_received_object_id,
1296            invoke_id,
1297        };
1298        let tx = self.encode_with_growth(|w| {
1299            Npdu::new(0).encode(w)?;
1300            request.encode(w)
1301        })?;
1302        let payload = self
1303            .await_complex_ack_payload_or_error(
1304                address,
1305                &tx,
1306                invoke_id,
1307                SERVICE_GET_EVENT_INFORMATION,
1308                self.response_timeout,
1309            )
1310            .await?;
1311        let mut pr = Reader::new(&payload);
1312        let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1313        Ok(EventInformationResult {
1314            summaries: into_client_event_information(parsed.summaries),
1315            more_events: parsed.more_events,
1316        })
1317    }
1318
1319    /// Send an AcknowledgeAlarm request to the device.
1320    pub async fn acknowledge_alarm(
1321        &self,
1322        address: DataLinkAddress,
1323        mut request: AcknowledgeAlarmRequest<'_>,
1324    ) -> Result<(), ClientError> {
1325        request.invoke_id = self.next_invoke_id().await;
1326        let invoke_id = request.invoke_id;
1327        let tx = self.encode_with_growth(|w| {
1328            Npdu::new(0).encode(w)?;
1329            request.encode(w)
1330        })?;
1331        self.await_simple_ack_or_error(
1332            address,
1333            &tx,
1334            invoke_id,
1335            SERVICE_ACKNOWLEDGE_ALARM,
1336            self.response_timeout,
1337        )
1338        .await
1339    }
1340
1341    /// Read a contiguous byte range from a BACnet File object using stream access.
1342    ///
1343    /// `file_start_position` is the byte offset (may be negative for end-relative access).
1344    /// `requested_octet_count` is the number of bytes to read.
1345    pub async fn atomic_read_file_stream(
1346        &self,
1347        address: DataLinkAddress,
1348        file_object_id: ObjectId,
1349        file_start_position: i32,
1350        requested_octet_count: u32,
1351    ) -> Result<AtomicReadFileResult, ClientError> {
1352        let invoke_id = self.next_invoke_id().await;
1353        let request = AtomicReadFileRequest::stream(
1354            file_object_id,
1355            file_start_position,
1356            requested_octet_count,
1357            invoke_id,
1358        );
1359        self.atomic_read_file(address, request).await
1360    }
1361
1362    /// Read a range of records from a BACnet File object using record access.
1363    ///
1364    /// `file_start_record` is the first record index (may be negative for end-relative access).
1365    /// `requested_record_count` is the number of records to read.
1366    pub async fn atomic_read_file_record(
1367        &self,
1368        address: DataLinkAddress,
1369        file_object_id: ObjectId,
1370        file_start_record: i32,
1371        requested_record_count: u32,
1372    ) -> Result<AtomicReadFileResult, ClientError> {
1373        let invoke_id = self.next_invoke_id().await;
1374        let request = AtomicReadFileRequest::record(
1375            file_object_id,
1376            file_start_record,
1377            requested_record_count,
1378            invoke_id,
1379        );
1380        self.atomic_read_file(address, request).await
1381    }
1382
1383    async fn atomic_read_file(
1384        &self,
1385        address: DataLinkAddress,
1386        request: AtomicReadFileRequest,
1387    ) -> Result<AtomicReadFileResult, ClientError> {
1388        let invoke_id = request.invoke_id;
1389        let tx = self.encode_with_growth(|w| {
1390            Npdu::new(0).encode(w)?;
1391            request.encode(w)
1392        })?;
1393        let payload = self
1394            .await_complex_ack_payload_or_error(
1395                address,
1396                &tx,
1397                invoke_id,
1398                SERVICE_ATOMIC_READ_FILE,
1399                self.response_timeout,
1400            )
1401            .await?;
1402        let mut pr = Reader::new(&payload);
1403        let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1404        Ok(into_client_atomic_read_result(parsed))
1405    }
1406
1407    /// Write `file_data` to a BACnet File object starting at `file_start_position` using stream
1408    /// access. The returned result contains the actual start position used by the device.
1409    pub async fn atomic_write_file_stream(
1410        &self,
1411        address: DataLinkAddress,
1412        file_object_id: ObjectId,
1413        file_start_position: i32,
1414        file_data: &[u8],
1415    ) -> Result<AtomicWriteFileResult, ClientError> {
1416        let invoke_id = self.next_invoke_id().await;
1417        let request = AtomicWriteFileRequest::stream(
1418            file_object_id,
1419            file_start_position,
1420            file_data,
1421            invoke_id,
1422        );
1423        self.atomic_write_file(address, request).await
1424    }
1425
1426    /// Write records to a BACnet File object starting at `file_start_record` using record access.
1427    ///
1428    /// Each element of `file_record_data` is one record's raw bytes.
1429    pub async fn atomic_write_file_record(
1430        &self,
1431        address: DataLinkAddress,
1432        file_object_id: ObjectId,
1433        file_start_record: i32,
1434        file_record_data: &[&[u8]],
1435    ) -> Result<AtomicWriteFileResult, ClientError> {
1436        let invoke_id = self.next_invoke_id().await;
1437        let request = AtomicWriteFileRequest::record(
1438            file_object_id,
1439            file_start_record,
1440            file_record_data,
1441            invoke_id,
1442        );
1443        self.atomic_write_file(address, request).await
1444    }
1445
1446    async fn atomic_write_file(
1447        &self,
1448        address: DataLinkAddress,
1449        request: AtomicWriteFileRequest<'_>,
1450    ) -> Result<AtomicWriteFileResult, ClientError> {
1451        let invoke_id = request.invoke_id;
1452        let tx = self.encode_with_growth(|w| {
1453            Npdu::new(0).encode(w)?;
1454            request.encode(w)
1455        })?;
1456        let payload = self
1457            .await_complex_ack_payload_or_error(
1458                address,
1459                &tx,
1460                invoke_id,
1461                SERVICE_ATOMIC_WRITE_FILE,
1462                self.response_timeout,
1463            )
1464            .await?;
1465        let mut pr = Reader::new(&payload);
1466        let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1467        Ok(into_client_atomic_write_result(parsed))
1468    }
1469
1470    /// Send a SubscribeCOV request to start (or renew) a COV subscription on the device.
1471    ///
1472    /// Use [`cancel_cov_subscription`](Self::cancel_cov_subscription) to unsubscribe.
1473    pub async fn subscribe_cov(
1474        &self,
1475        address: DataLinkAddress,
1476        mut request: SubscribeCovRequest,
1477    ) -> Result<(), ClientError> {
1478        request.invoke_id = self.next_invoke_id().await;
1479        let invoke_id = request.invoke_id;
1480        let tx = self.encode_with_growth(|w| {
1481            Npdu::new(0).encode(w)?;
1482            request.encode(w)
1483        })?;
1484        self.await_simple_ack_or_error(
1485            address,
1486            &tx,
1487            invoke_id,
1488            SERVICE_SUBSCRIBE_COV,
1489            self.response_timeout,
1490        )
1491        .await
1492    }
1493
1494    /// Cancel an existing COV subscription identified by `subscriber_process_id` and
1495    /// `monitored_object_id`.
1496    pub async fn cancel_cov_subscription(
1497        &self,
1498        address: DataLinkAddress,
1499        subscriber_process_id: u32,
1500        monitored_object_id: ObjectId,
1501    ) -> Result<(), ClientError> {
1502        self.subscribe_cov(
1503            address,
1504            SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1505        )
1506        .await
1507    }
1508
1509    /// Send a SubscribeCOVProperty request to subscribe to changes of a specific property.
1510    ///
1511    /// Use [`cancel_cov_property_subscription`](Self::cancel_cov_property_subscription) to
1512    /// unsubscribe.
1513    pub async fn subscribe_cov_property(
1514        &self,
1515        address: DataLinkAddress,
1516        mut request: SubscribeCovPropertyRequest,
1517    ) -> Result<(), ClientError> {
1518        request.invoke_id = self.next_invoke_id().await;
1519        let invoke_id = request.invoke_id;
1520        let tx = self.encode_with_growth(|w| {
1521            Npdu::new(0).encode(w)?;
1522            request.encode(w)
1523        })?;
1524        self.await_simple_ack_or_error(
1525            address,
1526            &tx,
1527            invoke_id,
1528            SERVICE_SUBSCRIBE_COV_PROPERTY,
1529            self.response_timeout,
1530        )
1531        .await
1532    }
1533
1534    /// Cancel an existing COV property subscription.
1535    ///
1536    /// The combination of `subscriber_process_id`, `monitored_object_id`,
1537    /// `monitored_property_id`, and `monitored_property_array_index` must match the
1538    /// subscription to cancel.
1539    pub async fn cancel_cov_property_subscription(
1540        &self,
1541        address: DataLinkAddress,
1542        subscriber_process_id: u32,
1543        monitored_object_id: ObjectId,
1544        monitored_property_id: PropertyId,
1545        monitored_property_array_index: Option<u32>,
1546    ) -> Result<(), ClientError> {
1547        self.subscribe_cov_property(
1548            address,
1549            SubscribeCovPropertyRequest::cancel(
1550                subscriber_process_id,
1551                monitored_object_id,
1552                monitored_property_id,
1553                monitored_property_array_index,
1554                0,
1555            ),
1556        )
1557        .await
1558    }
1559
1560    /// Read a range of entries from a list/log property by absolute position.
1561    ///
1562    /// `reference_index` is the 1-based starting entry index. A positive `count` reads
1563    /// forward; negative reads backward from `reference_index`.
1564    pub async fn read_range_by_position(
1565        &self,
1566        address: DataLinkAddress,
1567        object_id: ObjectId,
1568        property_id: PropertyId,
1569        array_index: Option<u32>,
1570        reference_index: i32,
1571        count: i16,
1572    ) -> Result<ReadRangeResult, ClientError> {
1573        let invoke_id = self.next_invoke_id().await;
1574        let req = ReadRangeRequest::by_position(
1575            object_id,
1576            property_id,
1577            array_index,
1578            reference_index,
1579            count,
1580            invoke_id,
1581        );
1582        self.read_range_with_request(address, req).await
1583    }
1584
1585    /// Read a range of entries from a list/log property anchored at a sequence number.
1586    ///
1587    /// `reference_sequence` is the sequence number of the anchor entry. A positive `count`
1588    /// reads forward; negative reads backward.
1589    pub async fn read_range_by_sequence_number(
1590        &self,
1591        address: DataLinkAddress,
1592        object_id: ObjectId,
1593        property_id: PropertyId,
1594        array_index: Option<u32>,
1595        reference_sequence: u32,
1596        count: i16,
1597    ) -> Result<ReadRangeResult, ClientError> {
1598        let invoke_id = self.next_invoke_id().await;
1599        let req = ReadRangeRequest::by_sequence_number(
1600            object_id,
1601            property_id,
1602            array_index,
1603            reference_sequence,
1604            count,
1605            invoke_id,
1606        );
1607        self.read_range_with_request(address, req).await
1608    }
1609
1610    /// Read a range of entries from a list/log property anchored at a timestamp.
1611    ///
1612    /// `at` is the reference date and time. A positive `count` reads forward from that
1613    /// timestamp; negative reads backward.
1614    pub async fn read_range_by_time(
1615        &self,
1616        address: DataLinkAddress,
1617        object_id: ObjectId,
1618        property_id: PropertyId,
1619        array_index: Option<u32>,
1620        at: (Date, Time),
1621        count: i16,
1622    ) -> Result<ReadRangeResult, ClientError> {
1623        let (date, time) = at;
1624        let invoke_id = self.next_invoke_id().await;
1625        let req = ReadRangeRequest::by_time(
1626            object_id,
1627            property_id,
1628            array_index,
1629            date,
1630            time,
1631            count,
1632            invoke_id,
1633        );
1634        self.read_range_with_request(address, req).await
1635    }
1636
1637    async fn read_range_with_request(
1638        &self,
1639        address: DataLinkAddress,
1640        req: ReadRangeRequest,
1641    ) -> Result<ReadRangeResult, ClientError> {
1642        let invoke_id = req.invoke_id;
1643        let tx = self.encode_with_growth(|w| {
1644            Npdu::new(0).encode(w)?;
1645            req.encode(w)
1646        })?;
1647        let payload = self
1648            .await_complex_ack_payload_or_error(
1649                address,
1650                &tx,
1651                invoke_id,
1652                SERVICE_READ_RANGE,
1653                self.response_timeout,
1654            )
1655            .await?;
1656        let mut pr = Reader::new(&payload);
1657        let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1658        into_client_read_range(parsed)
1659    }
1660
1661    /// Wait up to `wait` for a single incoming COV notification (confirmed or unconfirmed).
1662    ///
1663    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1664    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1665    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1666    pub async fn recv_cov_notification(
1667        &self,
1668        wait: Duration,
1669    ) -> Result<Option<CovNotification>, ClientError> {
1670        let _io_lock = self.request_io_lock.lock().await;
1671        let deadline = tokio::time::Instant::now() + wait;
1672
1673        while tokio::time::Instant::now() < deadline {
1674            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1675            let mut rx = [0u8; 1500];
1676            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1677            let (n, source) = match recv {
1678                Ok(Ok(v)) => v,
1679                Ok(Err(e)) => return Err(e.into()),
1680                Err(_) => break,
1681            };
1682
1683            let apdu = extract_apdu(&rx[..n])?;
1684            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1685            match ApduType::from_u8(first >> 4) {
1686                Some(ApduType::UnconfirmedRequest) => {
1687                    let mut r = Reader::new(apdu);
1688                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1689                    if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1690                        continue;
1691                    }
1692                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1693                    return Ok(Some(into_client_cov_notification(source, false, cov)?));
1694                }
1695                Some(ApduType::ConfirmedRequest) => {
1696                    let mut r = Reader::new(apdu);
1697                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1698                    if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1699                        continue;
1700                    }
1701                    if header.segmented {
1702                        return Err(ClientError::UnsupportedResponse);
1703                    }
1704
1705                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1706                    self.send_simple_ack(
1707                        source,
1708                        header.invoke_id,
1709                        SERVICE_CONFIRMED_COV_NOTIFICATION,
1710                    )
1711                    .await?;
1712                    return Ok(Some(into_client_cov_notification(source, true, cov)?));
1713                }
1714                _ => continue,
1715            }
1716        }
1717
1718        Ok(None)
1719    }
1720
1721    /// Wait up to `wait` for a single incoming event notification (confirmed or unconfirmed).
1722    ///
1723    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1724    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1725    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1726    pub async fn recv_event_notification(
1727        &self,
1728        wait: Duration,
1729    ) -> Result<Option<EventNotification>, ClientError> {
1730        let _io_lock = self.request_io_lock.lock().await;
1731        let deadline = tokio::time::Instant::now() + wait;
1732
1733        while tokio::time::Instant::now() < deadline {
1734            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1735            let mut rx = [0u8; 1500];
1736            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1737            let (n, source) = match recv {
1738                Ok(Ok(v)) => v,
1739                Ok(Err(e)) => return Err(e.into()),
1740                Err(_) => break,
1741            };
1742
1743            let apdu = extract_apdu(&rx[..n])?;
1744            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1745            match ApduType::from_u8(first >> 4) {
1746                Some(ApduType::UnconfirmedRequest) => {
1747                    let mut r = Reader::new(apdu);
1748                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1749                    if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1750                        continue;
1751                    }
1752                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1753                    return Ok(Some(into_client_event_notification(
1754                        source,
1755                        false,
1756                        notification,
1757                    )));
1758                }
1759                Some(ApduType::ConfirmedRequest) => {
1760                    let mut r = Reader::new(apdu);
1761                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1762                    if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1763                        continue;
1764                    }
1765                    if header.segmented {
1766                        return Err(ClientError::UnsupportedResponse);
1767                    }
1768                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1769                    self.send_simple_ack(
1770                        source,
1771                        header.invoke_id,
1772                        SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1773                    )
1774                    .await?;
1775                    return Ok(Some(into_client_event_notification(
1776                        source,
1777                        true,
1778                        notification,
1779                    )));
1780                }
1781                _ => continue,
1782            }
1783        }
1784
1785        Ok(None)
1786    }
1787
1788    /// Send a ReadProperty request and return the property value as a [`ClientDataValue`].
1789    ///
1790    /// Use [`read_property_multiple`](Self::read_property_multiple) to fetch several
1791    /// properties in a single round-trip.
1792    pub async fn read_property(
1793        &self,
1794        address: DataLinkAddress,
1795        object_id: ObjectId,
1796        property_id: PropertyId,
1797    ) -> Result<ClientDataValue, ClientError> {
1798        let invoke_id = self.next_invoke_id().await;
1799        let req = ReadPropertyRequest {
1800            object_id,
1801            property_id,
1802            array_index: None,
1803            invoke_id,
1804        };
1805        let tx = self.encode_with_growth(|w| {
1806            Npdu::new(0).encode(w)?;
1807            req.encode(w)
1808        })?;
1809        let payload = self
1810            .await_complex_ack_payload_or_error(
1811                address,
1812                &tx,
1813                invoke_id,
1814                SERVICE_READ_PROPERTY,
1815                self.response_timeout,
1816            )
1817            .await?;
1818        let mut pr = Reader::new(&payload);
1819        let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1820        into_client_value(parsed.value)
1821    }
1822
1823    /// Send a WriteProperty request to set a single property on the device.
1824    pub async fn write_property(
1825        &self,
1826        address: DataLinkAddress,
1827        mut request: WritePropertyRequest<'_>,
1828    ) -> Result<(), ClientError> {
1829        request.invoke_id = self.next_invoke_id().await;
1830        let invoke_id = request.invoke_id;
1831        let tx = self.encode_with_growth(|w| {
1832            Npdu::new(0).encode(w)?;
1833            request.encode(w)
1834        })?;
1835        self.await_simple_ack_or_error(
1836            address,
1837            &tx,
1838            invoke_id,
1839            SERVICE_WRITE_PROPERTY,
1840            self.response_timeout,
1841        )
1842        .await
1843    }
1844
1845    /// Send a ReadPropertyMultiple request to fetch several properties of one object in a
1846    /// single round-trip.
1847    ///
1848    /// Returns pairs of `(PropertyId, ClientDataValue)` in the order returned by the device.
1849    pub async fn read_property_multiple(
1850        &self,
1851        address: DataLinkAddress,
1852        object_id: ObjectId,
1853        property_ids: &[PropertyId],
1854    ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1855        let refs: Vec<PropertyReference> = property_ids
1856            .iter()
1857            .copied()
1858            .map(|property_id| PropertyReference {
1859                property_id,
1860                array_index: None,
1861            })
1862            .collect();
1863        let specs = [ReadAccessSpecification {
1864            object_id,
1865            properties: &refs,
1866        }];
1867
1868        let invoke_id = self.next_invoke_id().await;
1869        let req = ReadPropertyMultipleRequest {
1870            specs: &specs,
1871            invoke_id,
1872        };
1873
1874        let tx = self.encode_with_growth(|w| {
1875            Npdu::new(0).encode(w)?;
1876            req.encode(w)
1877        })?;
1878        let payload = self
1879            .await_complex_ack_payload_or_error(
1880                address,
1881                &tx,
1882                invoke_id,
1883                SERVICE_READ_PROPERTY_MULTIPLE,
1884                self.response_timeout,
1885            )
1886            .await?;
1887        let mut pr = Reader::new(&payload);
1888        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1889        let mut out = Vec::new();
1890        for access in parsed.results {
1891            if access.object_id != object_id {
1892                continue;
1893            }
1894            for item in access.results {
1895                out.push((item.property_id, into_client_value(item.value)?));
1896            }
1897        }
1898        Ok(out)
1899    }
1900
1901    /// Send a WritePropertyMultiple request to set several properties of one object in a
1902    /// single round-trip.
1903    pub async fn write_property_multiple(
1904        &self,
1905        address: DataLinkAddress,
1906        object_id: ObjectId,
1907        properties: &[PropertyWriteSpec<'_>],
1908    ) -> Result<(), ClientError> {
1909        let invoke_id = self.next_invoke_id().await;
1910        let specs = [WriteAccessSpecification {
1911            object_id,
1912            properties,
1913        }];
1914        let req = WritePropertyMultipleRequest {
1915            specs: &specs,
1916            invoke_id,
1917        };
1918
1919        let tx = self.encode_with_growth(|w| {
1920            Npdu::new(0).encode(w)?;
1921            req.encode(w)
1922        })?;
1923        self.await_simple_ack_or_error(
1924            address,
1925            &tx,
1926            invoke_id,
1927            SERVICE_WRITE_PROPERTY_MULTIPLE,
1928            self.response_timeout,
1929        )
1930        .await
1931    }
1932
1933    /// Send a ConfirmedPrivateTransfer request and return the ack.
1934    pub async fn private_transfer(
1935        &self,
1936        address: DataLinkAddress,
1937        vendor_id: u32,
1938        service_number: u32,
1939        service_parameters: Option<&[u8]>,
1940    ) -> Result<PrivateTransferAck, ClientError> {
1941        let invoke_id = self.next_invoke_id().await;
1942        let req = ConfirmedPrivateTransferRequest {
1943            vendor_id,
1944            service_number,
1945            service_parameters,
1946            invoke_id,
1947        };
1948
1949        let tx = self.encode_with_growth(|w| {
1950            Npdu::new(0).encode(w)?;
1951            req.encode(w)
1952        })?;
1953        let payload = self
1954            .await_complex_ack_payload_or_error(
1955                address,
1956                &tx,
1957                invoke_id,
1958                SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1959                self.response_timeout,
1960            )
1961            .await?;
1962        let mut r = Reader::new(&payload);
1963        PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1964    }
1965
1966    /// Read multiple `(object_id, property_id)` pairs in a single ReadPropertyMultiple round-trip.
1967    ///
1968    /// All pairs must target the same device at `address`. Returns a map from each requested
1969    /// `(ObjectId, PropertyId)` to its value. Properties not returned by the device are absent
1970    /// from the map (the device may skip unknown properties rather than erroring).
1971    pub async fn read_many(
1972        &self,
1973        address: DataLinkAddress,
1974        requests: &[(ObjectId, PropertyId)],
1975    ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
1976        // Group by object to build ReadAccessSpecifications
1977        let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
1978        for &(oid, pid) in requests {
1979            if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
1980                entry.1.push(PropertyReference { property_id: pid, array_index: None });
1981            } else {
1982                grouped.push((oid, vec![PropertyReference { property_id: pid, array_index: None }]));
1983            }
1984        }
1985
1986        let specs: Vec<ReadAccessSpecification<'_>> = grouped
1987            .iter()
1988            .map(|(oid, props)| ReadAccessSpecification { object_id: *oid, properties: props })
1989            .collect();
1990
1991        let invoke_id = self.next_invoke_id().await;
1992        let req = ReadPropertyMultipleRequest { specs: &specs, invoke_id };
1993        let tx = self.encode_with_growth(|w| {
1994            Npdu::new(0).encode(w)?;
1995            req.encode(w)
1996        })?;
1997        let payload = self
1998            .await_complex_ack_payload_or_error(
1999                address, &tx, invoke_id, SERVICE_READ_PROPERTY_MULTIPLE, self.response_timeout,
2000            )
2001            .await?;
2002
2003        let mut pr = Reader::new(&payload);
2004        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2005        let mut out = HashMap::new();
2006        for access in parsed.results {
2007            for item in access.results {
2008                if let Ok(v) = into_client_value(item.value) {
2009                    out.insert((access.object_id, item.property_id), v);
2010                }
2011            }
2012        }
2013        Ok(out)
2014    }
2015
2016    /// Write multiple properties across one or more objects in a single WritePropertyMultiple
2017    /// round-trip.
2018    ///
2019    /// `writes` is a slice of `(object_id, property_id, value, priority)` tuples.
2020    /// `priority` may be `None` for the relinquish-default case. All writes target the same
2021    /// device at `address`. Takes [`ClientDataValue`] (the owned form) for ergonomic use.
2022    pub async fn write_many(
2023        &self,
2024        address: DataLinkAddress,
2025        writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2026    ) -> Result<(), ClientError> {
2027        use rustbac_core::types::{BitString, DataValue as DV};
2028
2029        fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2030            match v {
2031                ClientDataValue::Null => DV::Null,
2032                ClientDataValue::Boolean(b) => DV::Boolean(*b),
2033                ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2034                ClientDataValue::Signed(n) => DV::Signed(*n),
2035                ClientDataValue::Real(f) => DV::Real(*f),
2036                ClientDataValue::Double(f) => DV::Double(*f),
2037                ClientDataValue::OctetString(b) => DV::OctetString(b),
2038                ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2039                ClientDataValue::BitString { unused_bits, data } => {
2040                    DV::BitString(BitString { unused_bits: *unused_bits, data })
2041                }
2042                ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2043                ClientDataValue::Date(d) => DV::Date(*d),
2044                ClientDataValue::Time(t) => DV::Time(*t),
2045                ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2046                ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2047                    tag_num: *tag_num,
2048                    values: values.iter().map(cv_to_dv).collect(),
2049                },
2050            }
2051        }
2052
2053        // Pre-convert values so DataValue borrows from the input slice lifetime.
2054        let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2055            .iter()
2056            .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2057            .collect();
2058
2059        // Group by object to build WriteAccessSpecifications.
2060        let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2061        for (oid, pid, val, prio) in &converted {
2062            let spec = PropertyWriteSpec {
2063                property_id: *pid,
2064                array_index: None,
2065                value: val.clone(),
2066                priority: *prio,
2067            };
2068            if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2069                entry.1.push(spec);
2070            } else {
2071                grouped.push((*oid, vec![spec]));
2072            }
2073        }
2074
2075        let specs: Vec<WriteAccessSpecification<'_>> = grouped
2076            .iter()
2077            .map(|(oid, props)| WriteAccessSpecification { object_id: *oid, properties: props })
2078            .collect();
2079
2080        let invoke_id = self.next_invoke_id().await;
2081        let req = WritePropertyMultipleRequest { specs: &specs, invoke_id };
2082        let tx = self.encode_with_growth(|w| {
2083            Npdu::new(0).encode(w)?;
2084            req.encode(w)
2085        })?;
2086        self.await_simple_ack_or_error(
2087            address, &tx, invoke_id, SERVICE_WRITE_PROPERTY_MULTIPLE, self.response_timeout,
2088        )
2089        .await
2090    }
2091}
2092
2093fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2094    let mut r = Reader::new(payload);
2095    let _npdu = Npdu::decode(&mut r)?;
2096    r.read_exact(r.remaining()).map_err(ClientError::from)
2097}
2098
2099fn remote_service_error(err: BacnetError) -> ClientError {
2100    ClientError::RemoteServiceError {
2101        service_choice: err.service_choice,
2102        error_class_raw: err.error_class,
2103        error_code_raw: err.error_code,
2104        error_class: err.error_class.and_then(ErrorClass::from_u32),
2105        error_code: err.error_code.and_then(ErrorCode::from_u32),
2106    }
2107}
2108
2109fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
2110    Ok(match value {
2111        DataValue::Null => ClientDataValue::Null,
2112        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
2113        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
2114        DataValue::Signed(v) => ClientDataValue::Signed(v),
2115        DataValue::Real(v) => ClientDataValue::Real(v),
2116        DataValue::Double(v) => ClientDataValue::Double(v),
2117        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
2118        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
2119        DataValue::BitString(v) => ClientDataValue::BitString {
2120            unused_bits: v.unused_bits,
2121            data: v.data.to_vec(),
2122        },
2123        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
2124        DataValue::Date(v) => ClientDataValue::Date(v),
2125        DataValue::Time(v) => ClientDataValue::Time(v),
2126        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
2127        DataValue::Constructed { tag_num, values } => {
2128            let mut children = Vec::with_capacity(values.len());
2129            for child in values {
2130                children.push(into_client_value(child)?);
2131            }
2132            ClientDataValue::Constructed {
2133                tag_num,
2134                values: children,
2135            }
2136        }
2137    })
2138}
2139
2140fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
2141    value
2142        .into_iter()
2143        .map(|item| AlarmSummaryItem {
2144            object_id: item.object_id,
2145            alarm_state_raw: item.alarm_state,
2146            alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2147                item.alarm_state,
2148            ),
2149            acknowledged_transitions: ClientBitString {
2150                unused_bits: item.acknowledged_transitions.unused_bits,
2151                data: item.acknowledged_transitions.data.to_vec(),
2152            },
2153        })
2154        .collect()
2155}
2156
2157fn into_client_enrollment_summary(
2158    value: Vec<CoreEnrollmentSummaryItem>,
2159) -> Vec<EnrollmentSummaryItem> {
2160    value
2161        .into_iter()
2162        .map(|item| EnrollmentSummaryItem {
2163            object_id: item.object_id,
2164            event_type: item.event_type,
2165            event_state_raw: item.event_state,
2166            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2167                item.event_state,
2168            ),
2169            priority: item.priority,
2170            notification_class: item.notification_class,
2171        })
2172        .collect()
2173}
2174
2175fn into_client_event_information(
2176    value: Vec<CoreEventSummaryItem<'_>>,
2177) -> Vec<EventInformationItem> {
2178    value
2179        .into_iter()
2180        .map(|item| EventInformationItem {
2181            object_id: item.object_id,
2182            event_state_raw: item.event_state,
2183            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2184                item.event_state,
2185            ),
2186            acknowledged_transitions: ClientBitString {
2187                unused_bits: item.acknowledged_transitions.unused_bits,
2188                data: item.acknowledged_transitions.data.to_vec(),
2189            },
2190            notify_type: item.notify_type,
2191            event_enable: ClientBitString {
2192                unused_bits: item.event_enable.unused_bits,
2193                data: item.event_enable.data.to_vec(),
2194            },
2195            event_priorities: item.event_priorities,
2196        })
2197        .collect()
2198}
2199
2200fn into_client_cov_notification(
2201    source: DataLinkAddress,
2202    confirmed: bool,
2203    value: CovNotificationRequest<'_>,
2204) -> Result<CovNotification, ClientError> {
2205    let mut values = Vec::with_capacity(value.values.len());
2206    for property in value.values {
2207        values.push(CovPropertyValue {
2208            property_id: property.property_id,
2209            array_index: property.array_index,
2210            value: into_client_value(property.value)?,
2211            priority: property.priority,
2212        });
2213    }
2214
2215    Ok(CovNotification {
2216        source,
2217        confirmed,
2218        subscriber_process_id: value.subscriber_process_id,
2219        initiating_device_id: value.initiating_device_id,
2220        monitored_object_id: value.monitored_object_id,
2221        time_remaining_seconds: value.time_remaining_seconds,
2222        values,
2223    })
2224}
2225
2226fn into_client_event_notification(
2227    source: DataLinkAddress,
2228    confirmed: bool,
2229    value: EventNotificationRequest<'_>,
2230) -> EventNotification {
2231    EventNotification {
2232        source,
2233        confirmed,
2234        process_id: value.process_id,
2235        initiating_device_id: value.initiating_device_id,
2236        event_object_id: value.event_object_id,
2237        timestamp: value.timestamp,
2238        notification_class: value.notification_class,
2239        priority: value.priority,
2240        event_type: value.event_type,
2241        message_text: value.message_text.map(str::to_string),
2242        notify_type: value.notify_type,
2243        ack_required: value.ack_required,
2244        from_state_raw: value.from_state,
2245        from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
2246            value.from_state,
2247        ),
2248        to_state_raw: value.to_state,
2249        to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
2250    }
2251}
2252
2253fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
2254    let mut items = Vec::with_capacity(value.items.len());
2255    for item in value.items {
2256        items.push(into_client_value(item)?);
2257    }
2258    Ok(ReadRangeResult {
2259        object_id: value.object_id,
2260        property_id: value.property_id,
2261        array_index: value.array_index,
2262        result_flags: ClientBitString {
2263            unused_bits: value.result_flags.unused_bits,
2264            data: value.result_flags.data.to_vec(),
2265        },
2266        item_count: value.item_count,
2267        items,
2268    })
2269}
2270
2271fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
2272    match value.access_method {
2273        AtomicReadFileAckAccess::Stream {
2274            file_start_position,
2275            file_data,
2276        } => AtomicReadFileResult::Stream {
2277            end_of_file: value.end_of_file,
2278            file_start_position,
2279            file_data: file_data.to_vec(),
2280        },
2281        AtomicReadFileAckAccess::Record {
2282            file_start_record,
2283            returned_record_count,
2284            file_record_data,
2285        } => AtomicReadFileResult::Record {
2286            end_of_file: value.end_of_file,
2287            file_start_record,
2288            returned_record_count,
2289            file_record_data: file_record_data
2290                .into_iter()
2291                .map(|record| record.to_vec())
2292                .collect(),
2293        },
2294    }
2295}
2296
2297fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
2298    match value {
2299        AtomicWriteFileAck::Stream {
2300            file_start_position,
2301        } => AtomicWriteFileResult::Stream {
2302            file_start_position,
2303        },
2304        AtomicWriteFileAck::Record { file_start_record } => {
2305            AtomicWriteFileResult::Record { file_start_record }
2306        }
2307    }
2308}
2309
2310#[cfg(test)]
2311mod tests {
2312    use super::BacnetClient;
2313    use crate::{
2314        AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2315        EnrollmentSummaryItem, EventInformationItem, EventNotification,
2316    };
2317    use rustbac_core::apdu::{
2318        ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2319        UnconfirmedRequestHeader,
2320    };
2321    use rustbac_core::encoding::{
2322        primitives::{
2323            decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2324            encode_ctx_object_id, encode_ctx_unsigned,
2325        },
2326        reader::Reader,
2327        tag::{AppTag, Tag},
2328        writer::Writer,
2329    };
2330    use rustbac_core::npdu::Npdu;
2331    use rustbac_core::services::acknowledge_alarm::{
2332        AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2333    };
2334    use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2335    use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2336    use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2337    use rustbac_core::services::cov_notification::{
2338        SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2339    };
2340    use rustbac_core::services::device_management::{
2341        DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2342        SERVICE_REINITIALIZE_DEVICE,
2343    };
2344    use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2345    use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2346    use rustbac_core::services::event_notification::{
2347        SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2348    };
2349    use rustbac_core::services::list_element::{
2350        AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2351        SERVICE_REMOVE_LIST_ELEMENT,
2352    };
2353    use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2354    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2355    use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2356    use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2357    use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2358    use rustbac_core::services::subscribe_cov_property::{
2359        SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2360    };
2361    use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2362    use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2363    use rustbac_core::services::write_property_multiple::{
2364        PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2365    };
2366    use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2367    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2368    use std::collections::VecDeque;
2369    use std::sync::Arc;
2370    use std::time::Duration;
2371    use tokio::sync::Mutex;
2372
2373    #[derive(Debug, Default)]
2374    struct MockState {
2375        sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2376        recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2377    }
2378
2379    #[derive(Debug, Clone)]
2380    struct MockDataLink {
2381        state: Arc<MockState>,
2382    }
2383
2384    impl MockDataLink {
2385        fn new() -> (Self, Arc<MockState>) {
2386            let state = Arc::new(MockState::default());
2387            (
2388                Self {
2389                    state: state.clone(),
2390                },
2391                state,
2392            )
2393        }
2394    }
2395
2396    impl DataLink for MockDataLink {
2397        async fn send(
2398            &self,
2399            address: DataLinkAddress,
2400            payload: &[u8],
2401        ) -> Result<(), DataLinkError> {
2402            self.state
2403                .sent
2404                .lock()
2405                .await
2406                .push((address, payload.to_vec()));
2407            Ok(())
2408        }
2409
2410        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2411            let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2412                return Err(DataLinkError::InvalidFrame);
2413            };
2414            if payload.len() > buf.len() {
2415                return Err(DataLinkError::FrameTooLarge);
2416            }
2417            buf[..payload.len()].copy_from_slice(&payload);
2418            Ok((payload.len(), addr))
2419        }
2420    }
2421
2422    fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2423        let mut out = [0u8; 512];
2424        let mut w = Writer::new(&mut out);
2425        Npdu::new(0).encode(&mut w).unwrap();
2426        w.write_all(apdu).unwrap();
2427        w.as_written().to_vec()
2428    }
2429
2430    fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2431        let mut apdu_buf = [0u8; 256];
2432        let mut w = Writer::new(&mut apdu_buf);
2433        ComplexAckHeader {
2434            segmented: false,
2435            more_follows: false,
2436            invoke_id,
2437            sequence_number: None,
2438            proposed_window_size: None,
2439            service_choice: SERVICE_READ_RANGE,
2440        }
2441        .encode(&mut w)
2442        .unwrap();
2443        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2444        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2445        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2446        w.write_u8(5).unwrap();
2447        w.write_u8(0b1110_0000).unwrap();
2448        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2449        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2450        encode_app_real(&mut w, 42.0).unwrap();
2451        encode_app_real(&mut w, 43.0).unwrap();
2452        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2453        w.as_written().to_vec()
2454    }
2455
2456    fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2457        let mut apdu_buf = [0u8; 256];
2458        let mut w = Writer::new(&mut apdu_buf);
2459        ComplexAckHeader {
2460            segmented: false,
2461            more_follows: false,
2462            invoke_id,
2463            sequence_number: None,
2464            proposed_window_size: None,
2465            service_choice: SERVICE_ATOMIC_READ_FILE,
2466        }
2467        .encode(&mut w)
2468        .unwrap();
2469        Tag::Application {
2470            tag: AppTag::Boolean,
2471            len: if eof { 1 } else { 0 },
2472        }
2473        .encode(&mut w)
2474        .unwrap();
2475        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2476        Tag::Application {
2477            tag: AppTag::SignedInt,
2478            len: 1,
2479        }
2480        .encode(&mut w)
2481        .unwrap();
2482        w.write_u8(0).unwrap();
2483        Tag::Application {
2484            tag: AppTag::OctetString,
2485            len: data.len() as u32,
2486        }
2487        .encode(&mut w)
2488        .unwrap();
2489        w.write_all(data).unwrap();
2490        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2491        w.as_written().to_vec()
2492    }
2493
2494    fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2495        let mut apdu_buf = [0u8; 256];
2496        let mut w = Writer::new(&mut apdu_buf);
2497        ComplexAckHeader {
2498            segmented: false,
2499            more_follows: false,
2500            invoke_id,
2501            sequence_number: None,
2502            proposed_window_size: None,
2503            service_choice: SERVICE_ATOMIC_READ_FILE,
2504        }
2505        .encode(&mut w)
2506        .unwrap();
2507        Tag::Application {
2508            tag: AppTag::Boolean,
2509            len: 0,
2510        }
2511        .encode(&mut w)
2512        .unwrap();
2513        Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2514        Tag::Application {
2515            tag: AppTag::SignedInt,
2516            len: 1,
2517        }
2518        .encode(&mut w)
2519        .unwrap();
2520        w.write_u8(7).unwrap();
2521        Tag::Application {
2522            tag: AppTag::UnsignedInt,
2523            len: 1,
2524        }
2525        .encode(&mut w)
2526        .unwrap();
2527        w.write_u8(2).unwrap();
2528        Tag::Application {
2529            tag: AppTag::OctetString,
2530            len: 2,
2531        }
2532        .encode(&mut w)
2533        .unwrap();
2534        w.write_all(&[0x01, 0x02]).unwrap();
2535        Tag::Application {
2536            tag: AppTag::OctetString,
2537            len: 3,
2538        }
2539        .encode(&mut w)
2540        .unwrap();
2541        w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2542        Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2543        w.as_written().to_vec()
2544    }
2545
2546    fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2547        let mut apdu_buf = [0u8; 64];
2548        let mut w = Writer::new(&mut apdu_buf);
2549        ComplexAckHeader {
2550            segmented: false,
2551            more_follows: false,
2552            invoke_id,
2553            sequence_number: None,
2554            proposed_window_size: None,
2555            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2556        }
2557        .encode(&mut w)
2558        .unwrap();
2559        Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2560        w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2561        w.as_written().to_vec()
2562    }
2563
2564    fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2565        let mut apdu_buf = [0u8; 64];
2566        let mut w = Writer::new(&mut apdu_buf);
2567        ComplexAckHeader {
2568            segmented: false,
2569            more_follows: false,
2570            invoke_id,
2571            sequence_number: None,
2572            proposed_window_size: None,
2573            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2574        }
2575        .encode(&mut w)
2576        .unwrap();
2577        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2578        w.write_u8(start_record as u8).unwrap();
2579        w.as_written().to_vec()
2580    }
2581
2582    fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2583        let mut apdu_buf = [0u8; 64];
2584        let mut w = Writer::new(&mut apdu_buf);
2585        ComplexAckHeader {
2586            segmented: false,
2587            more_follows: false,
2588            invoke_id,
2589            sequence_number: None,
2590            proposed_window_size: None,
2591            service_choice: SERVICE_CREATE_OBJECT,
2592        }
2593        .encode(&mut w)
2594        .unwrap();
2595        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2596        w.as_written().to_vec()
2597    }
2598
2599    fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2600        let mut apdu_buf = [0u8; 128];
2601        let mut w = Writer::new(&mut apdu_buf);
2602        ComplexAckHeader {
2603            segmented: false,
2604            more_follows: false,
2605            invoke_id,
2606            sequence_number: None,
2607            proposed_window_size: None,
2608            service_choice: SERVICE_GET_ALARM_SUMMARY,
2609        }
2610        .encode(&mut w)
2611        .unwrap();
2612        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2613        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2614        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2615        w.write_u8(5).unwrap();
2616        w.write_u8(0b1110_0000).unwrap();
2617
2618        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2619        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2620        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2621        w.write_u8(5).unwrap();
2622        w.write_u8(0b1100_0000).unwrap();
2623        w.as_written().to_vec()
2624    }
2625
2626    fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2627        let mut apdu_buf = [0u8; 160];
2628        let mut w = Writer::new(&mut apdu_buf);
2629        ComplexAckHeader {
2630            segmented: false,
2631            more_follows: false,
2632            invoke_id,
2633            sequence_number: None,
2634            proposed_window_size: None,
2635            service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2636        }
2637        .encode(&mut w)
2638        .unwrap();
2639        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2640        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2641        encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2642        encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2643        encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2644
2645        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2646        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2647        encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2648        encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2649        encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2650        w.as_written().to_vec()
2651    }
2652
2653    fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2654        let mut apdu_buf = [0u8; 256];
2655        let mut w = Writer::new(&mut apdu_buf);
2656        ComplexAckHeader {
2657            segmented: false,
2658            more_follows: false,
2659            invoke_id,
2660            sequence_number: None,
2661            proposed_window_size: None,
2662            service_choice: SERVICE_GET_EVENT_INFORMATION,
2663        }
2664        .encode(&mut w)
2665        .unwrap();
2666        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2667        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2668        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2669        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2670        w.write_u8(5).unwrap();
2671        w.write_u8(0b1110_0000).unwrap();
2672        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2673        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2674        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2675        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2676        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2677        encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2678        Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2679        w.write_u8(5).unwrap();
2680        w.write_u8(0b1100_0000).unwrap();
2681        Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2682        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2683        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2684        encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2685        Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2686        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2687        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2688        w.write_u8(0).unwrap();
2689        w.as_written().to_vec()
2690    }
2691
2692    #[tokio::test]
2693    async fn who_has_object_name_collects_i_have() {
2694        let (dl, state) = MockDataLink::new();
2695        let client = BacnetClient::with_datalink(dl);
2696        let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2697
2698        let mut apdu = [0u8; 128];
2699        let mut w = Writer::new(&mut apdu);
2700        UnconfirmedRequestHeader {
2701            service_choice: SERVICE_I_HAVE,
2702        }
2703        .encode(&mut w)
2704        .unwrap();
2705        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2706        encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2707        encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2708
2709        state
2710            .recv
2711            .lock()
2712            .await
2713            .push_back((with_npdu(w.as_written()), addr));
2714
2715        let results = client
2716            .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2717            .await
2718            .unwrap();
2719        assert_eq!(results.len(), 1);
2720        assert_eq!(results[0].address, addr);
2721        assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2722        assert_eq!(
2723            results[0].object_id,
2724            ObjectId::new(ObjectType::AnalogInput, 7)
2725        );
2726        assert_eq!(results[0].object_name, "Zone Temp");
2727
2728        let sent = state.sent.lock().await;
2729        assert_eq!(sent.len(), 1);
2730        let mut r = Reader::new(&sent[0].1);
2731        let _npdu = Npdu::decode(&mut r).unwrap();
2732        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2733        assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2734    }
2735
2736    #[tokio::test]
2737    async fn device_communication_control_handles_simple_ack() {
2738        let (dl, state) = MockDataLink::new();
2739        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2740        let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2741
2742        let mut apdu = [0u8; 32];
2743        let mut w = Writer::new(&mut apdu);
2744        SimpleAck {
2745            invoke_id: 1,
2746            service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2747        }
2748        .encode(&mut w)
2749        .unwrap();
2750        state
2751            .recv
2752            .lock()
2753            .await
2754            .push_back((with_npdu(w.as_written()), addr));
2755
2756        client
2757            .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2758            .await
2759            .unwrap();
2760
2761        let sent = state.sent.lock().await;
2762        assert_eq!(sent.len(), 1);
2763        let mut r = Reader::new(&sent[0].1);
2764        let _npdu = Npdu::decode(&mut r).unwrap();
2765        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2766        assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2767    }
2768
2769    #[tokio::test]
2770    async fn reinitialize_device_handles_simple_ack() {
2771        let (dl, state) = MockDataLink::new();
2772        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2773        let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2774
2775        let mut apdu = [0u8; 32];
2776        let mut w = Writer::new(&mut apdu);
2777        SimpleAck {
2778            invoke_id: 1,
2779            service_choice: SERVICE_REINITIALIZE_DEVICE,
2780        }
2781        .encode(&mut w)
2782        .unwrap();
2783        state
2784            .recv
2785            .lock()
2786            .await
2787            .push_back((with_npdu(w.as_written()), addr));
2788
2789        client
2790            .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2791            .await
2792            .unwrap();
2793
2794        let sent = state.sent.lock().await;
2795        assert_eq!(sent.len(), 1);
2796        let mut r = Reader::new(&sent[0].1);
2797        let _npdu = Npdu::decode(&mut r).unwrap();
2798        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2799        assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2800    }
2801
2802    #[tokio::test]
2803    async fn time_synchronize_sends_unconfirmed_request() {
2804        let (dl, state) = MockDataLink::new();
2805        let client = BacnetClient::with_datalink(dl);
2806        let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2807
2808        client
2809            .time_synchronize(
2810                addr,
2811                Date {
2812                    year_since_1900: 126,
2813                    month: 2,
2814                    day: 7,
2815                    weekday: 6,
2816                },
2817                Time {
2818                    hour: 10,
2819                    minute: 11,
2820                    second: 12,
2821                    hundredths: 13,
2822                },
2823                false,
2824            )
2825            .await
2826            .unwrap();
2827
2828        let sent = state.sent.lock().await;
2829        assert_eq!(sent.len(), 1);
2830        let mut r = Reader::new(&sent[0].1);
2831        let _npdu = Npdu::decode(&mut r).unwrap();
2832        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2833        assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2834    }
2835
2836    #[tokio::test]
2837    async fn get_alarm_summary_decodes_complex_ack() {
2838        let (dl, state) = MockDataLink::new();
2839        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2840        let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2841
2842        state
2843            .recv
2844            .lock()
2845            .await
2846            .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2847
2848        let summaries = client.get_alarm_summary(addr).await.unwrap();
2849        assert_eq!(summaries.len(), 2);
2850        assert_eq!(
2851            summaries[0],
2852            AlarmSummaryItem {
2853                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2854                alarm_state_raw: 1,
2855                alarm_state: Some(EventState::Fault),
2856                acknowledged_transitions: crate::ClientBitString {
2857                    unused_bits: 5,
2858                    data: vec![0b1110_0000],
2859                },
2860            }
2861        );
2862        assert_eq!(
2863            summaries[1],
2864            AlarmSummaryItem {
2865                object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2866                alarm_state_raw: 0,
2867                alarm_state: Some(EventState::Normal),
2868                acknowledged_transitions: crate::ClientBitString {
2869                    unused_bits: 5,
2870                    data: vec![0b1100_0000],
2871                },
2872            }
2873        );
2874
2875        let sent = state.sent.lock().await;
2876        assert_eq!(sent.len(), 1);
2877        let mut r = Reader::new(&sent[0].1);
2878        let _npdu = Npdu::decode(&mut r).unwrap();
2879        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2880        assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2881    }
2882
2883    #[tokio::test]
2884    async fn get_enrollment_summary_decodes_complex_ack() {
2885        let (dl, state) = MockDataLink::new();
2886        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2887        let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2888
2889        state
2890            .recv
2891            .lock()
2892            .await
2893            .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2894
2895        let summaries = client.get_enrollment_summary(addr).await.unwrap();
2896        assert_eq!(summaries.len(), 2);
2897        assert_eq!(
2898            summaries[0],
2899            EnrollmentSummaryItem {
2900                object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2901                event_type: 1,
2902                event_state_raw: 2,
2903                event_state: Some(EventState::Offnormal),
2904                priority: 200,
2905                notification_class: 10,
2906            }
2907        );
2908        assert_eq!(
2909            summaries[1],
2910            EnrollmentSummaryItem {
2911                object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2912                event_type: 0,
2913                event_state_raw: 0,
2914                event_state: Some(EventState::Normal),
2915                priority: 20,
2916                notification_class: 11,
2917            }
2918        );
2919
2920        let sent = state.sent.lock().await;
2921        assert_eq!(sent.len(), 1);
2922        let mut r = Reader::new(&sent[0].1);
2923        let _npdu = Npdu::decode(&mut r).unwrap();
2924        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2925        assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2926    }
2927
2928    #[tokio::test]
2929    async fn get_event_information_decodes_complex_ack() {
2930        let (dl, state) = MockDataLink::new();
2931        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2932        let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2933
2934        state
2935            .recv
2936            .lock()
2937            .await
2938            .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2939
2940        let result = client.get_event_information(addr, None).await.unwrap();
2941        assert!(!result.more_events);
2942        assert_eq!(result.summaries.len(), 1);
2943        assert_eq!(
2944            result.summaries[0],
2945            EventInformationItem {
2946                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2947                event_state_raw: 2,
2948                event_state: Some(EventState::Offnormal),
2949                acknowledged_transitions: crate::ClientBitString {
2950                    unused_bits: 5,
2951                    data: vec![0b1110_0000],
2952                },
2953                notify_type: 0,
2954                event_enable: crate::ClientBitString {
2955                    unused_bits: 5,
2956                    data: vec![0b1100_0000],
2957                },
2958                event_priorities: [1, 2, 3],
2959            }
2960        );
2961    }
2962
2963    #[tokio::test]
2964    async fn acknowledge_alarm_handles_simple_ack() {
2965        let (dl, state) = MockDataLink::new();
2966        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2967        let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2968
2969        let mut apdu = [0u8; 32];
2970        let mut w = Writer::new(&mut apdu);
2971        SimpleAck {
2972            invoke_id: 1,
2973            service_choice: SERVICE_ACKNOWLEDGE_ALARM,
2974        }
2975        .encode(&mut w)
2976        .unwrap();
2977        state
2978            .recv
2979            .lock()
2980            .await
2981            .push_back((with_npdu(w.as_written()), addr));
2982
2983        client
2984            .acknowledge_alarm(
2985                addr,
2986                AcknowledgeAlarmRequest {
2987                    acknowledging_process_id: 10,
2988                    event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2989                    event_state_acknowledged: EventState::Offnormal,
2990                    event_time_stamp: TimeStamp::SequenceNumber(42),
2991                    acknowledgment_source: "operator",
2992                    time_of_acknowledgment: TimeStamp::DateTime {
2993                        date: Date {
2994                            year_since_1900: 126,
2995                            month: 2,
2996                            day: 7,
2997                            weekday: 6,
2998                        },
2999                        time: Time {
3000                            hour: 10,
3001                            minute: 11,
3002                            second: 12,
3003                            hundredths: 13,
3004                        },
3005                    },
3006                    invoke_id: 0,
3007                },
3008            )
3009            .await
3010            .unwrap();
3011
3012        let sent = state.sent.lock().await;
3013        assert_eq!(sent.len(), 1);
3014        let mut r = Reader::new(&sent[0].1);
3015        let _npdu = Npdu::decode(&mut r).unwrap();
3016        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3017        assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
3018    }
3019
3020    #[tokio::test]
3021    async fn create_object_by_type_decodes_complex_ack() {
3022        let (dl, state) = MockDataLink::new();
3023        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3024        let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
3025        let created = ObjectId::new(ObjectType::AnalogValue, 42);
3026
3027        state
3028            .recv
3029            .lock()
3030            .await
3031            .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
3032
3033        let result = client
3034            .create_object_by_type(addr, ObjectType::AnalogValue)
3035            .await
3036            .unwrap();
3037        assert_eq!(result, created);
3038
3039        let sent = state.sent.lock().await;
3040        let mut r = Reader::new(&sent[0].1);
3041        let _npdu = Npdu::decode(&mut r).unwrap();
3042        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3043        assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
3044    }
3045
3046    #[tokio::test]
3047    async fn delete_object_handles_simple_ack() {
3048        let (dl, state) = MockDataLink::new();
3049        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3050        let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
3051
3052        let mut apdu = [0u8; 32];
3053        let mut w = Writer::new(&mut apdu);
3054        SimpleAck {
3055            invoke_id: 1,
3056            service_choice: SERVICE_DELETE_OBJECT,
3057        }
3058        .encode(&mut w)
3059        .unwrap();
3060        state
3061            .recv
3062            .lock()
3063            .await
3064            .push_back((with_npdu(w.as_written()), addr));
3065
3066        client
3067            .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
3068            .await
3069            .unwrap();
3070    }
3071
3072    #[tokio::test]
3073    async fn add_list_element_handles_simple_ack() {
3074        let (dl, state) = MockDataLink::new();
3075        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3076        let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
3077
3078        let mut apdu = [0u8; 32];
3079        let mut w = Writer::new(&mut apdu);
3080        SimpleAck {
3081            invoke_id: 1,
3082            service_choice: SERVICE_ADD_LIST_ELEMENT,
3083        }
3084        .encode(&mut w)
3085        .unwrap();
3086        state
3087            .recv
3088            .lock()
3089            .await
3090            .push_back((with_npdu(w.as_written()), addr));
3091
3092        let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
3093        client
3094            .add_list_element(
3095                addr,
3096                AddListElementRequest {
3097                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3098                    property_id: PropertyId::Proprietary(512),
3099                    array_index: None,
3100                    elements: &values,
3101                    invoke_id: 0,
3102                },
3103            )
3104            .await
3105            .unwrap();
3106    }
3107
3108    #[tokio::test]
3109    async fn remove_list_element_handles_simple_ack() {
3110        let (dl, state) = MockDataLink::new();
3111        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3112        let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
3113
3114        let mut apdu = [0u8; 32];
3115        let mut w = Writer::new(&mut apdu);
3116        SimpleAck {
3117            invoke_id: 1,
3118            service_choice: SERVICE_REMOVE_LIST_ELEMENT,
3119        }
3120        .encode(&mut w)
3121        .unwrap();
3122        state
3123            .recv
3124            .lock()
3125            .await
3126            .push_back((with_npdu(w.as_written()), addr));
3127
3128        let values = [DataValue::Unsigned(1)];
3129        client
3130            .remove_list_element(
3131                addr,
3132                RemoveListElementRequest {
3133                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
3134                    property_id: PropertyId::Proprietary(513),
3135                    array_index: None,
3136                    elements: &values,
3137                    invoke_id: 0,
3138                },
3139            )
3140            .await
3141            .unwrap();
3142    }
3143
3144    #[tokio::test]
3145    async fn atomic_read_file_stream_decodes_complex_ack() {
3146        let (dl, state) = MockDataLink::new();
3147        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3148        let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
3149        let file_object = ObjectId::new(ObjectType::File, 2);
3150
3151        state.recv.lock().await.push_back((
3152            with_npdu(&atomic_read_file_stream_ack_apdu(
3153                1,
3154                true,
3155                &[0xAA, 0xBB, 0xCC],
3156            )),
3157            addr,
3158        ));
3159
3160        let result = client
3161            .atomic_read_file_stream(addr, file_object, 0, 3)
3162            .await
3163            .unwrap();
3164
3165        assert_eq!(
3166            result,
3167            AtomicReadFileResult::Stream {
3168                end_of_file: true,
3169                file_start_position: 0,
3170                file_data: vec![0xAA, 0xBB, 0xCC],
3171            }
3172        );
3173
3174        let sent = state.sent.lock().await;
3175        assert_eq!(sent.len(), 1);
3176        let mut r = Reader::new(&sent[0].1);
3177        let _npdu = Npdu::decode(&mut r).unwrap();
3178        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3179        assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
3180    }
3181
3182    #[tokio::test]
3183    async fn atomic_read_file_record_decodes_complex_ack() {
3184        let (dl, state) = MockDataLink::new();
3185        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3186        let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
3187        let file_object = ObjectId::new(ObjectType::File, 5);
3188
3189        state
3190            .recv
3191            .lock()
3192            .await
3193            .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
3194
3195        let result = client
3196            .atomic_read_file_record(addr, file_object, 7, 2)
3197            .await
3198            .unwrap();
3199
3200        assert_eq!(
3201            result,
3202            AtomicReadFileResult::Record {
3203                end_of_file: false,
3204                file_start_record: 7,
3205                returned_record_count: 2,
3206                file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
3207            }
3208        );
3209    }
3210
3211    #[tokio::test]
3212    async fn atomic_write_file_stream_decodes_complex_ack() {
3213        let (dl, state) = MockDataLink::new();
3214        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3215        let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
3216        let file_object = ObjectId::new(ObjectType::File, 3);
3217
3218        state
3219            .recv
3220            .lock()
3221            .await
3222            .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
3223
3224        let result = client
3225            .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
3226            .await
3227            .unwrap();
3228
3229        assert_eq!(
3230            result,
3231            AtomicWriteFileResult::Stream {
3232                file_start_position: 128
3233            }
3234        );
3235    }
3236
3237    #[tokio::test]
3238    async fn atomic_write_file_record_decodes_complex_ack() {
3239        let (dl, state) = MockDataLink::new();
3240        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3241        let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
3242        let file_object = ObjectId::new(ObjectType::File, 9);
3243        let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
3244
3245        state
3246            .recv
3247            .lock()
3248            .await
3249            .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
3250
3251        let result = client
3252            .atomic_write_file_record(addr, file_object, 7, &records)
3253            .await
3254            .unwrap();
3255
3256        assert_eq!(
3257            result,
3258            AtomicWriteFileResult::Record {
3259                file_start_record: 7
3260            }
3261        );
3262    }
3263
3264    #[tokio::test]
3265    async fn read_properties_decodes_complex_ack() {
3266        let (dl, state) = MockDataLink::new();
3267        let client = BacnetClient::with_datalink(dl);
3268        let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
3269        let object_id = ObjectId::new(ObjectType::Device, 1);
3270
3271        let mut apdu_buf = [0u8; 256];
3272        let mut w = Writer::new(&mut apdu_buf);
3273        ComplexAckHeader {
3274            segmented: false,
3275            more_follows: false,
3276            invoke_id: 1,
3277            sequence_number: None,
3278            proposed_window_size: None,
3279            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3280        }
3281        .encode(&mut w)
3282        .unwrap();
3283        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
3284        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3285            .encode(&mut w)
3286            .unwrap();
3287        encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
3288        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3289            .encode(&mut w)
3290            .unwrap();
3291        encode_app_real(&mut w, 55.5).unwrap();
3292        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3293            .encode(&mut w)
3294            .unwrap();
3295        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3296            .encode(&mut w)
3297            .unwrap();
3298
3299        state
3300            .recv
3301            .lock()
3302            .await
3303            .push_back((with_npdu(w.as_written()), addr));
3304
3305        let values = client
3306            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3307            .await
3308            .unwrap();
3309        assert_eq!(values.len(), 1);
3310        assert_eq!(values[0].0, PropertyId::PresentValue);
3311        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3312
3313        let sent = state.sent.lock().await;
3314        assert_eq!(sent.len(), 1);
3315        let mut r = Reader::new(&sent[0].1);
3316        let _npdu = Npdu::decode(&mut r).unwrap();
3317        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3318        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3319    }
3320
3321    #[tokio::test]
3322    async fn read_property_multiple_reassembles_segmented_complex_ack() {
3323        let (dl, state) = MockDataLink::new();
3324        let client = BacnetClient::with_datalink(dl);
3325        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3326        let object_id = ObjectId::new(ObjectType::Device, 1);
3327
3328        let mut payload_buf = [0u8; 256];
3329        let mut pw = Writer::new(&mut payload_buf);
3330        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3331        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3332            .encode(&mut pw)
3333            .unwrap();
3334        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3335        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3336            .encode(&mut pw)
3337            .unwrap();
3338        encode_app_real(&mut pw, 66.0).unwrap();
3339        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3340            .encode(&mut pw)
3341            .unwrap();
3342        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3343            .encode(&mut pw)
3344            .unwrap();
3345        let payload = pw.as_written();
3346        let split = payload.len() / 2;
3347
3348        let mut apdu1 = [0u8; 256];
3349        let mut w1 = Writer::new(&mut apdu1);
3350        ComplexAckHeader {
3351            segmented: true,
3352            more_follows: true,
3353            invoke_id: 1,
3354            sequence_number: Some(0),
3355            proposed_window_size: Some(1),
3356            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3357        }
3358        .encode(&mut w1)
3359        .unwrap();
3360        w1.write_all(&payload[..split]).unwrap();
3361
3362        let mut apdu2 = [0u8; 256];
3363        let mut w2 = Writer::new(&mut apdu2);
3364        ComplexAckHeader {
3365            segmented: true,
3366            more_follows: false,
3367            invoke_id: 1,
3368            sequence_number: Some(1),
3369            proposed_window_size: Some(1),
3370            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3371        }
3372        .encode(&mut w2)
3373        .unwrap();
3374        w2.write_all(&payload[split..]).unwrap();
3375
3376        state
3377            .recv
3378            .lock()
3379            .await
3380            .push_back((with_npdu(w1.as_written()), addr));
3381        state
3382            .recv
3383            .lock()
3384            .await
3385            .push_back((with_npdu(w2.as_written()), addr));
3386
3387        let values = client
3388            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3389            .await
3390            .unwrap();
3391        assert_eq!(values.len(), 1);
3392        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3393
3394        let sent = state.sent.lock().await;
3395        assert!(sent.len() >= 3);
3396
3397        let mut saw_segment_ack = 0usize;
3398        for (_, frame) in sent.iter().skip(1) {
3399            let mut r = Reader::new(frame);
3400            let _npdu = Npdu::decode(&mut r).unwrap();
3401            let apdu = r.read_exact(r.remaining()).unwrap();
3402            if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3403                let mut sr = Reader::new(apdu);
3404                let sack = SegmentAck::decode(&mut sr).unwrap();
3405                assert_eq!(sack.invoke_id, 1);
3406                saw_segment_ack += 1;
3407            }
3408        }
3409        assert!(saw_segment_ack >= 1);
3410    }
3411
3412    #[tokio::test]
3413    async fn read_property_multiple_tolerates_duplicate_segment() {
3414        let (dl, state) = MockDataLink::new();
3415        let client = BacnetClient::with_datalink(dl);
3416        let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3417        let object_id = ObjectId::new(ObjectType::Device, 1);
3418
3419        let mut payload_buf = [0u8; 256];
3420        let mut pw = Writer::new(&mut payload_buf);
3421        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3422        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3423            .encode(&mut pw)
3424            .unwrap();
3425        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3426        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3427            .encode(&mut pw)
3428            .unwrap();
3429        encode_app_real(&mut pw, 66.0).unwrap();
3430        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3431            .encode(&mut pw)
3432            .unwrap();
3433        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3434            .encode(&mut pw)
3435            .unwrap();
3436        let payload = pw.as_written();
3437        let split = payload.len() / 2;
3438
3439        let mut apdu1 = [0u8; 256];
3440        let mut w1 = Writer::new(&mut apdu1);
3441        ComplexAckHeader {
3442            segmented: true,
3443            more_follows: true,
3444            invoke_id: 1,
3445            sequence_number: Some(0),
3446            proposed_window_size: Some(1),
3447            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3448        }
3449        .encode(&mut w1)
3450        .unwrap();
3451        w1.write_all(&payload[..split]).unwrap();
3452
3453        let mut dup = [0u8; 256];
3454        let mut wd = Writer::new(&mut dup);
3455        ComplexAckHeader {
3456            segmented: true,
3457            more_follows: true,
3458            invoke_id: 1,
3459            sequence_number: Some(0),
3460            proposed_window_size: Some(1),
3461            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3462        }
3463        .encode(&mut wd)
3464        .unwrap();
3465        wd.write_all(&payload[..split]).unwrap();
3466
3467        let mut apdu2 = [0u8; 256];
3468        let mut w2 = Writer::new(&mut apdu2);
3469        ComplexAckHeader {
3470            segmented: true,
3471            more_follows: false,
3472            invoke_id: 1,
3473            sequence_number: Some(1),
3474            proposed_window_size: Some(1),
3475            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3476        }
3477        .encode(&mut w2)
3478        .unwrap();
3479        w2.write_all(&payload[split..]).unwrap();
3480
3481        {
3482            let mut recv = state.recv.lock().await;
3483            recv.push_back((with_npdu(w1.as_written()), addr));
3484            recv.push_back((with_npdu(wd.as_written()), addr));
3485            recv.push_back((with_npdu(w2.as_written()), addr));
3486        }
3487
3488        let values = client
3489            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3490            .await
3491            .unwrap();
3492        assert_eq!(values.len(), 1);
3493        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3494    }
3495
3496    #[tokio::test]
3497    async fn write_properties_handles_simple_ack() {
3498        let (dl, state) = MockDataLink::new();
3499        let client = BacnetClient::with_datalink(dl);
3500        let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3501        let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3502
3503        let mut apdu_buf = [0u8; 32];
3504        let mut w = Writer::new(&mut apdu_buf);
3505        SimpleAck {
3506            invoke_id: 1,
3507            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3508        }
3509        .encode(&mut w)
3510        .unwrap();
3511        state
3512            .recv
3513            .lock()
3514            .await
3515            .push_back((with_npdu(w.as_written()), addr));
3516
3517        let writes = [PropertyWriteSpec {
3518            property_id: PropertyId::PresentValue,
3519            array_index: None,
3520            value: DataValue::Real(12.5),
3521            priority: Some(8),
3522        }];
3523        client
3524            .write_property_multiple(addr, object_id, &writes)
3525            .await
3526            .unwrap();
3527
3528        let sent = state.sent.lock().await;
3529        assert_eq!(sent.len(), 1);
3530        let mut r = Reader::new(&sent[0].1);
3531        let _npdu = Npdu::decode(&mut r).unwrap();
3532        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3533        assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3534    }
3535
3536    #[tokio::test]
3537    async fn subscribe_cov_handles_simple_ack() {
3538        let (dl, state) = MockDataLink::new();
3539        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3540        let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3541
3542        let mut apdu_buf = [0u8; 32];
3543        let mut w = Writer::new(&mut apdu_buf);
3544        SimpleAck {
3545            invoke_id: 1,
3546            service_choice: SERVICE_SUBSCRIBE_COV,
3547        }
3548        .encode(&mut w)
3549        .unwrap();
3550        state
3551            .recv
3552            .lock()
3553            .await
3554            .push_back((with_npdu(w.as_written()), addr));
3555
3556        client
3557            .subscribe_cov(
3558                addr,
3559                SubscribeCovRequest {
3560                    subscriber_process_id: 10,
3561                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3562                    issue_confirmed_notifications: Some(false),
3563                    lifetime_seconds: Some(300),
3564                    invoke_id: 0,
3565                },
3566            )
3567            .await
3568            .unwrap();
3569
3570        let sent = state.sent.lock().await;
3571        assert_eq!(sent.len(), 1);
3572        let mut r = Reader::new(&sent[0].1);
3573        let _npdu = Npdu::decode(&mut r).unwrap();
3574        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3575        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3576    }
3577
3578    #[tokio::test]
3579    async fn subscribe_cov_property_handles_simple_ack() {
3580        let (dl, state) = MockDataLink::new();
3581        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3582        let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3583
3584        let mut apdu_buf = [0u8; 32];
3585        let mut w = Writer::new(&mut apdu_buf);
3586        SimpleAck {
3587            invoke_id: 1,
3588            service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3589        }
3590        .encode(&mut w)
3591        .unwrap();
3592        state
3593            .recv
3594            .lock()
3595            .await
3596            .push_back((with_npdu(w.as_written()), addr));
3597
3598        client
3599            .subscribe_cov_property(
3600                addr,
3601                SubscribeCovPropertyRequest {
3602                    subscriber_process_id: 22,
3603                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3604                    issue_confirmed_notifications: Some(true),
3605                    lifetime_seconds: Some(120),
3606                    monitored_property_id: PropertyId::PresentValue,
3607                    monitored_property_array_index: None,
3608                    cov_increment: Some(0.1),
3609                    invoke_id: 0,
3610                },
3611            )
3612            .await
3613            .unwrap();
3614
3615        let sent = state.sent.lock().await;
3616        assert_eq!(sent.len(), 1);
3617        let mut r = Reader::new(&sent[0].1);
3618        let _npdu = Npdu::decode(&mut r).unwrap();
3619        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3620        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3621    }
3622
3623    #[tokio::test]
3624    async fn read_range_by_position_decodes_complex_ack() {
3625        let (dl, state) = MockDataLink::new();
3626        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3627        let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3628        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3629
3630        let mut apdu_buf = [0u8; 256];
3631        let mut w = Writer::new(&mut apdu_buf);
3632        ComplexAckHeader {
3633            segmented: false,
3634            more_follows: false,
3635            invoke_id: 1,
3636            sequence_number: None,
3637            proposed_window_size: None,
3638            service_choice: SERVICE_READ_RANGE,
3639        }
3640        .encode(&mut w)
3641        .unwrap();
3642        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3643        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3644        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3645        w.write_u8(5).unwrap();
3646        w.write_u8(0b1110_0000).unwrap();
3647        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3648        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3649        encode_app_real(&mut w, 42.0).unwrap();
3650        encode_app_real(&mut w, 43.0).unwrap();
3651        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3652
3653        state
3654            .recv
3655            .lock()
3656            .await
3657            .push_back((with_npdu(w.as_written()), addr));
3658
3659        let result = client
3660            .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3661            .await
3662            .unwrap();
3663        assert_eq!(result.object_id, object_id);
3664        assert_eq!(result.item_count, 2);
3665        assert_eq!(result.items.len(), 2);
3666        assert!(matches!(
3667            result.items[0],
3668            ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3669        ));
3670    }
3671
3672    #[tokio::test]
3673    async fn read_range_by_sequence_number_encodes_range_selector() {
3674        let (dl, state) = MockDataLink::new();
3675        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3676        let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3677        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3678
3679        state
3680            .recv
3681            .lock()
3682            .await
3683            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3684
3685        let _ = client
3686            .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3687            .await
3688            .unwrap();
3689
3690        let sent = state.sent.lock().await;
3691        let mut r = Reader::new(&sent[0].1);
3692        let _npdu = Npdu::decode(&mut r).unwrap();
3693        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3694        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3695        match Tag::decode(&mut r).unwrap() {
3696            Tag::Context { tag_num: 0, len: 4 } => {
3697                let _ = r.read_exact(4).unwrap();
3698            }
3699            other => panic!("unexpected object id tag: {other:?}"),
3700        }
3701        match Tag::decode(&mut r).unwrap() {
3702            Tag::Context { tag_num: 1, len } => {
3703                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3704            }
3705            other => panic!("unexpected property tag: {other:?}"),
3706        }
3707        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3708        match Tag::decode(&mut r).unwrap() {
3709            Tag::Application {
3710                tag: AppTag::UnsignedInt,
3711                len,
3712            } => {
3713                assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3714            }
3715            other => panic!("unexpected ref seq tag: {other:?}"),
3716        }
3717        match Tag::decode(&mut r).unwrap() {
3718            Tag::Application {
3719                tag: AppTag::SignedInt,
3720                len,
3721            } => {
3722                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3723            }
3724            other => panic!("unexpected count tag: {other:?}"),
3725        }
3726        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3727    }
3728
3729    #[tokio::test]
3730    async fn read_range_by_time_encodes_range_selector() {
3731        let (dl, state) = MockDataLink::new();
3732        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3733        let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3734        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3735        let date = Date {
3736            year_since_1900: 126,
3737            month: 2,
3738            day: 7,
3739            weekday: 6,
3740        };
3741        let time = Time {
3742            hour: 10,
3743            minute: 11,
3744            second: 12,
3745            hundredths: 13,
3746        };
3747
3748        state
3749            .recv
3750            .lock()
3751            .await
3752            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3753
3754        let _ = client
3755            .read_range_by_time(
3756                addr,
3757                object_id,
3758                PropertyId::PresentValue,
3759                None,
3760                (date, time),
3761                2,
3762            )
3763            .await
3764            .unwrap();
3765
3766        let sent = state.sent.lock().await;
3767        let mut r = Reader::new(&sent[0].1);
3768        let _npdu = Npdu::decode(&mut r).unwrap();
3769        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3770        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3771        match Tag::decode(&mut r).unwrap() {
3772            Tag::Context { tag_num: 0, len: 4 } => {
3773                let _ = r.read_exact(4).unwrap();
3774            }
3775            other => panic!("unexpected object id tag: {other:?}"),
3776        }
3777        match Tag::decode(&mut r).unwrap() {
3778            Tag::Context { tag_num: 1, len } => {
3779                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3780            }
3781            other => panic!("unexpected property tag: {other:?}"),
3782        }
3783        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3784        match Tag::decode(&mut r).unwrap() {
3785            Tag::Application {
3786                tag: AppTag::Date,
3787                len: 4,
3788            } => {
3789                let raw = r.read_exact(4).unwrap();
3790                assert_eq!(
3791                    raw,
3792                    &[date.year_since_1900, date.month, date.day, date.weekday]
3793                );
3794            }
3795            other => panic!("unexpected date tag: {other:?}"),
3796        }
3797        match Tag::decode(&mut r).unwrap() {
3798            Tag::Application {
3799                tag: AppTag::Time,
3800                len: 4,
3801            } => {
3802                let raw = r.read_exact(4).unwrap();
3803                assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3804            }
3805            other => panic!("unexpected time tag: {other:?}"),
3806        }
3807        match Tag::decode(&mut r).unwrap() {
3808            Tag::Application {
3809                tag: AppTag::SignedInt,
3810                len,
3811            } => {
3812                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3813            }
3814            other => panic!("unexpected count tag: {other:?}"),
3815        }
3816        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3817    }
3818
3819    #[tokio::test]
3820    async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3821        let (dl, state) = MockDataLink::new();
3822        let client = BacnetClient::with_datalink(dl);
3823        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3824
3825        let mut apdu = [0u8; 256];
3826        let mut w = Writer::new(&mut apdu);
3827        UnconfirmedRequestHeader {
3828            service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3829        }
3830        .encode(&mut w)
3831        .unwrap();
3832        encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3833        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3834        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3835        encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3836        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3837        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3838        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3839        encode_app_real(&mut w, 73.25).unwrap();
3840        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3841        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3842
3843        state
3844            .recv
3845            .lock()
3846            .await
3847            .push_back((with_npdu(w.as_written()), addr));
3848
3849        let notification = client
3850            .recv_cov_notification(Duration::from_secs(1))
3851            .await
3852            .unwrap()
3853            .unwrap();
3854        assert!(!notification.confirmed);
3855        assert_eq!(notification.subscriber_process_id, 17);
3856        assert_eq!(notification.values.len(), 1);
3857        assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3858        assert!(matches!(
3859            notification.values[0].value,
3860            ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3861        ));
3862
3863        let sent = state.sent.lock().await;
3864        assert!(sent.is_empty());
3865    }
3866
3867    #[tokio::test]
3868    async fn recv_confirmed_cov_notification_sends_simple_ack() {
3869        let (dl, state) = MockDataLink::new();
3870        let client = BacnetClient::with_datalink(dl);
3871        let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3872
3873        let mut apdu = [0u8; 256];
3874        let mut w = Writer::new(&mut apdu);
3875        ConfirmedRequestHeader {
3876            segmented: false,
3877            more_follows: false,
3878            segmented_response_accepted: false,
3879            max_segments: 0,
3880            max_apdu: 5,
3881            invoke_id: 9,
3882            sequence_number: None,
3883            proposed_window_size: None,
3884            service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3885        }
3886        .encode(&mut w)
3887        .unwrap();
3888        encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3889        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3890        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3891        encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3892        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3893        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3894        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3895        encode_app_real(&mut w, 55.0).unwrap();
3896        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3897        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3898
3899        state
3900            .recv
3901            .lock()
3902            .await
3903            .push_back((with_npdu(w.as_written()), addr));
3904
3905        let notification = client
3906            .recv_cov_notification(Duration::from_secs(1))
3907            .await
3908            .unwrap()
3909            .unwrap();
3910        assert!(notification.confirmed);
3911        assert_eq!(notification.values.len(), 1);
3912
3913        let sent = state.sent.lock().await;
3914        assert_eq!(sent.len(), 1);
3915        let mut r = Reader::new(&sent[0].1);
3916        let _npdu = Npdu::decode(&mut r).unwrap();
3917        let ack = SimpleAck::decode(&mut r).unwrap();
3918        assert_eq!(ack.invoke_id, 9);
3919        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3920    }
3921
3922    #[tokio::test]
3923    async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3924        let (dl, state) = MockDataLink::new();
3925        let client = BacnetClient::with_datalink(dl);
3926        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3927
3928        let mut apdu = [0u8; 256];
3929        let mut w = Writer::new(&mut apdu);
3930        UnconfirmedRequestHeader {
3931            service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3932        }
3933        .encode(&mut w)
3934        .unwrap();
3935        encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3936        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3937        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3938        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3939        encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3940        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3941        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3942        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3943        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3944        encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3945        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3946        Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3947        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3948        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3949        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3950        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3951        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3952        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3953        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3954
3955        state
3956            .recv
3957            .lock()
3958            .await
3959            .push_back((with_npdu(w.as_written()), addr));
3960
3961        let notification: EventNotification = client
3962            .recv_event_notification(Duration::from_secs(1))
3963            .await
3964            .unwrap()
3965            .unwrap();
3966        assert!(!notification.confirmed);
3967        assert_eq!(notification.process_id, 99);
3968        assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3969        assert_eq!(notification.ack_required, Some(true));
3970        assert_eq!(notification.from_state, Some(EventState::Offnormal));
3971        assert_eq!(notification.to_state, Some(EventState::Normal));
3972        assert_eq!(notification.notify_type, 0);
3973
3974        let sent = state.sent.lock().await;
3975        assert!(sent.is_empty());
3976    }
3977
3978    #[tokio::test]
3979    async fn recv_confirmed_event_notification_sends_simple_ack() {
3980        let (dl, state) = MockDataLink::new();
3981        let client = BacnetClient::with_datalink(dl);
3982        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
3983
3984        let mut apdu = [0u8; 256];
3985        let mut w = Writer::new(&mut apdu);
3986        ConfirmedRequestHeader {
3987            segmented: false,
3988            more_follows: false,
3989            segmented_response_accepted: false,
3990            max_segments: 0,
3991            max_apdu: 5,
3992            invoke_id: 11,
3993            sequence_number: None,
3994            proposed_window_size: None,
3995            service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
3996        }
3997        .encode(&mut w)
3998        .unwrap();
3999        encode_ctx_unsigned(&mut w, 0, 100).unwrap();
4000        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4001        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
4002        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4003        encode_ctx_unsigned(&mut w, 1, 56).unwrap();
4004        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4005        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4006        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4007        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4008        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4009        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4010        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4011        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4012        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4013        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4014        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
4015        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
4016
4017        state
4018            .recv
4019            .lock()
4020            .await
4021            .push_back((with_npdu(w.as_written()), addr));
4022
4023        let notification = client
4024            .recv_event_notification(Duration::from_secs(1))
4025            .await
4026            .unwrap()
4027            .unwrap();
4028        assert!(notification.confirmed);
4029
4030        let sent = state.sent.lock().await;
4031        assert_eq!(sent.len(), 1);
4032        let mut r = Reader::new(&sent[0].1);
4033        let _npdu = Npdu::decode(&mut r).unwrap();
4034        let ack = SimpleAck::decode(&mut r).unwrap();
4035        assert_eq!(ack.invoke_id, 11);
4036        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
4037    }
4038
4039    #[tokio::test]
4040    async fn write_property_multiple_segments_large_request() {
4041        let (dl, state) = MockDataLink::new();
4042        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4043        let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
4044        let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
4045
4046        {
4047            let mut recv = state.recv.lock().await;
4048            for seq in 0u8..=254 {
4049                let mut apdu = [0u8; 16];
4050                let mut w = Writer::new(&mut apdu);
4051                SegmentAck {
4052                    negative_ack: false,
4053                    sent_by_server: true,
4054                    invoke_id: 1,
4055                    sequence_number: seq,
4056                    actual_window_size: 1,
4057                }
4058                .encode(&mut w)
4059                .unwrap();
4060                recv.push_back((with_npdu(w.as_written()), addr));
4061            }
4062
4063            let mut apdu = [0u8; 16];
4064            let mut w = Writer::new(&mut apdu);
4065            SimpleAck {
4066                invoke_id: 1,
4067                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4068            }
4069            .encode(&mut w)
4070            .unwrap();
4071            recv.push_back((with_npdu(w.as_written()), addr));
4072        }
4073
4074        let writes: Vec<PropertyWriteSpec> = (0..180)
4075            .map(|_| PropertyWriteSpec {
4076                property_id: PropertyId::Description,
4077                array_index: None,
4078                value: DataValue::CharacterString(
4079                    "rustbac segmented write test payload................................................................",
4080                ),
4081                priority: None,
4082            })
4083            .collect();
4084
4085        client
4086            .write_property_multiple(addr, object_id, &writes)
4087            .await
4088            .unwrap();
4089
4090        let sent = state.sent.lock().await;
4091        assert!(sent.len() > 1);
4092
4093        let mut seqs = Vec::new();
4094        let mut saw_more_follows = false;
4095        let mut saw_last = false;
4096        for (_, frame) in sent.iter() {
4097            let mut r = Reader::new(frame);
4098            let _npdu = Npdu::decode(&mut r).unwrap();
4099            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4100            assert!(hdr.segmented);
4101            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4102            if hdr.more_follows {
4103                saw_more_follows = true;
4104            } else {
4105                saw_last = true;
4106            }
4107            seqs.push(hdr.sequence_number.unwrap());
4108        }
4109
4110        assert!(saw_more_follows);
4111        assert!(saw_last);
4112        for (idx, seq) in seqs.iter().enumerate() {
4113            assert_eq!(*seq as usize, idx);
4114        }
4115    }
4116
4117    #[tokio::test]
4118    async fn write_property_multiple_uses_configured_segment_window() {
4119        let (dl, state) = MockDataLink::new();
4120        let client = BacnetClient::with_datalink(dl)
4121            .with_response_timeout(Duration::from_secs(1))
4122            .with_segmented_request_window_size(4);
4123        let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
4124        let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
4125
4126        {
4127            let mut recv = state.recv.lock().await;
4128            for seq in 0u8..=254 {
4129                let mut apdu = [0u8; 16];
4130                let mut w = Writer::new(&mut apdu);
4131                SegmentAck {
4132                    negative_ack: false,
4133                    sent_by_server: true,
4134                    invoke_id: 1,
4135                    sequence_number: seq,
4136                    actual_window_size: 4,
4137                }
4138                .encode(&mut w)
4139                .unwrap();
4140                recv.push_back((with_npdu(w.as_written()), addr));
4141            }
4142
4143            let mut apdu = [0u8; 16];
4144            let mut w = Writer::new(&mut apdu);
4145            SimpleAck {
4146                invoke_id: 1,
4147                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4148            }
4149            .encode(&mut w)
4150            .unwrap();
4151            recv.push_back((with_npdu(w.as_written()), addr));
4152        }
4153
4154        let writes: Vec<PropertyWriteSpec> = (0..180)
4155            .map(|_| PropertyWriteSpec {
4156                property_id: PropertyId::Description,
4157                array_index: None,
4158                value: DataValue::CharacterString(
4159                    "rustbac segmented write test payload................................................................",
4160                ),
4161                priority: None,
4162            })
4163            .collect();
4164
4165        client
4166            .write_property_multiple(addr, object_id, &writes)
4167            .await
4168            .unwrap();
4169
4170        let sent = state.sent.lock().await;
4171        assert!(sent.len() > 4);
4172        for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
4173            let mut r = Reader::new(frame);
4174            let _npdu = Npdu::decode(&mut r).unwrap();
4175            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4176            assert!(hdr.segmented);
4177            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4178            assert_eq!(hdr.sequence_number, Some(idx as u8));
4179            assert_eq!(hdr.proposed_window_size, Some(4));
4180        }
4181    }
4182
4183    #[tokio::test]
4184    async fn write_property_multiple_adapts_window_to_peer_ack_window() {
4185        let (dl, state) = MockDataLink::new();
4186        let client = BacnetClient::with_datalink(dl)
4187            .with_response_timeout(Duration::from_secs(1))
4188            .with_segmented_request_window_size(4);
4189        let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
4190        let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
4191
4192        {
4193            let mut recv = state.recv.lock().await;
4194            for seq in 0u8..=254 {
4195                let mut apdu = [0u8; 16];
4196                let mut w = Writer::new(&mut apdu);
4197                SegmentAck {
4198                    negative_ack: false,
4199                    sent_by_server: true,
4200                    invoke_id: 1,
4201                    sequence_number: seq,
4202                    actual_window_size: 2,
4203                }
4204                .encode(&mut w)
4205                .unwrap();
4206                recv.push_back((with_npdu(w.as_written()), addr));
4207            }
4208
4209            let mut apdu = [0u8; 16];
4210            let mut w = Writer::new(&mut apdu);
4211            SimpleAck {
4212                invoke_id: 1,
4213                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4214            }
4215            .encode(&mut w)
4216            .unwrap();
4217            recv.push_back((with_npdu(w.as_written()), addr));
4218        }
4219
4220        let writes: Vec<PropertyWriteSpec> = (0..180)
4221            .map(|_| PropertyWriteSpec {
4222                property_id: PropertyId::Description,
4223                array_index: None,
4224                value: DataValue::CharacterString(
4225                    "rustbac segmented write test payload................................................................",
4226                ),
4227                priority: None,
4228            })
4229            .collect();
4230
4231        client
4232            .write_property_multiple(addr, object_id, &writes)
4233            .await
4234            .unwrap();
4235
4236        let sent = state.sent.lock().await;
4237        let mut saw_adapted_window = false;
4238        for (_, frame) in sent.iter() {
4239            let mut r = Reader::new(frame);
4240            let _npdu = Npdu::decode(&mut r).unwrap();
4241            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4242            if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
4243                saw_adapted_window = true;
4244                break;
4245            }
4246        }
4247        assert!(saw_adapted_window);
4248    }
4249
4250    #[tokio::test]
4251    async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
4252        let (dl, state) = MockDataLink::new();
4253        let client = BacnetClient::with_datalink(dl)
4254            .with_response_timeout(Duration::from_secs(1))
4255            .with_segmented_request_window_size(1)
4256            .with_segmented_request_retries(1);
4257        let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
4258        let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
4259
4260        {
4261            let mut recv = state.recv.lock().await;
4262
4263            let mut nack_apdu = [0u8; 16];
4264            let mut nack_w = Writer::new(&mut nack_apdu);
4265            SegmentAck {
4266                negative_ack: true,
4267                sent_by_server: true,
4268                invoke_id: 1,
4269                sequence_number: 0,
4270                actual_window_size: 1,
4271            }
4272            .encode(&mut nack_w)
4273            .unwrap();
4274            recv.push_back((with_npdu(nack_w.as_written()), addr));
4275
4276            for seq in 0u8..=254 {
4277                let mut apdu = [0u8; 16];
4278                let mut w = Writer::new(&mut apdu);
4279                SegmentAck {
4280                    negative_ack: false,
4281                    sent_by_server: true,
4282                    invoke_id: 1,
4283                    sequence_number: seq,
4284                    actual_window_size: 1,
4285                }
4286                .encode(&mut w)
4287                .unwrap();
4288                recv.push_back((with_npdu(w.as_written()), addr));
4289            }
4290
4291            let mut apdu = [0u8; 16];
4292            let mut w = Writer::new(&mut apdu);
4293            SimpleAck {
4294                invoke_id: 1,
4295                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4296            }
4297            .encode(&mut w)
4298            .unwrap();
4299            recv.push_back((with_npdu(w.as_written()), addr));
4300        }
4301
4302        let writes: Vec<PropertyWriteSpec> = (0..180)
4303            .map(|_| PropertyWriteSpec {
4304                property_id: PropertyId::Description,
4305                array_index: None,
4306                value: DataValue::CharacterString(
4307                    "rustbac segmented write test payload................................................................",
4308                ),
4309                priority: None,
4310            })
4311            .collect();
4312
4313        client
4314            .write_property_multiple(addr, object_id, &writes)
4315            .await
4316            .unwrap();
4317
4318        let sent = state.sent.lock().await;
4319        let mut seq0_frames = 0usize;
4320        for (_, frame) in sent.iter() {
4321            let mut r = Reader::new(frame);
4322            let _npdu = Npdu::decode(&mut r).unwrap();
4323            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4324            if hdr.sequence_number == Some(0) {
4325                seq0_frames += 1;
4326            }
4327        }
4328        assert!(seq0_frames >= 2);
4329    }
4330
4331    #[tokio::test]
4332    async fn read_property_ignores_invalid_frames_until_valid_response() {
4333        let (dl, state) = MockDataLink::new();
4334        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4335        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4336        let state_for_task = state.clone();
4337
4338        tokio::spawn(async move {
4339            tokio::time::sleep(Duration::from_millis(20)).await;
4340            let mut apdu = [0u8; 128];
4341            let mut w = Writer::new(&mut apdu);
4342            ComplexAckHeader {
4343                segmented: false,
4344                more_follows: false,
4345                invoke_id: 1,
4346                sequence_number: None,
4347                proposed_window_size: None,
4348                service_choice: SERVICE_READ_PROPERTY,
4349            }
4350            .encode(&mut w)
4351            .unwrap();
4352            encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4353            encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4354            Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4355            encode_app_real(&mut w, 77.0).unwrap();
4356            Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4357            state_for_task
4358                .recv
4359                .lock()
4360                .await
4361                .push_back((with_npdu(w.as_written()), addr));
4362        });
4363
4364        let value = client
4365            .read_property(
4366                addr,
4367                ObjectId::new(ObjectType::Device, 1),
4368                PropertyId::PresentValue,
4369            )
4370            .await
4371            .unwrap();
4372        assert!(matches!(
4373            value,
4374            ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4375        ));
4376    }
4377
4378    #[tokio::test]
4379    async fn read_property_maps_reject() {
4380        let (dl, state) = MockDataLink::new();
4381        let client = BacnetClient::with_datalink(dl);
4382        let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4383
4384        let mut apdu = [0u8; 8];
4385        let mut w = Writer::new(&mut apdu);
4386        w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4387        w.write_u8(1).unwrap(); // invoke id
4388        w.write_u8(2).unwrap(); // reason
4389        state
4390            .recv
4391            .lock()
4392            .await
4393            .push_back((with_npdu(w.as_written()), addr));
4394
4395        let err = client
4396            .read_property(
4397                addr,
4398                ObjectId::new(ObjectType::Device, 1),
4399                PropertyId::ObjectName,
4400            )
4401            .await
4402            .unwrap_err();
4403        assert!(matches!(
4404            err,
4405            crate::ClientError::RemoteReject { reason: 2 }
4406        ));
4407    }
4408
4409    #[tokio::test]
4410    async fn read_property_maps_remote_error_details() {
4411        let (dl, state) = MockDataLink::new();
4412        let client = BacnetClient::with_datalink(dl);
4413        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4414
4415        let mut apdu = [0u8; 16];
4416        let mut w = Writer::new(&mut apdu);
4417        w.write_u8((ApduType::Error as u8) << 4).unwrap();
4418        w.write_u8(1).unwrap(); // invoke id
4419        w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4420            .unwrap();
4421        Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4422        w.write_u8(2).unwrap(); // property class
4423        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4424        w.write_u8(32).unwrap(); // unknownProperty
4425
4426        state
4427            .recv
4428            .lock()
4429            .await
4430            .push_back((with_npdu(w.as_written()), addr));
4431
4432        let err = client
4433            .read_property(
4434                addr,
4435                ObjectId::new(ObjectType::Device, 1),
4436                PropertyId::ObjectName,
4437            )
4438            .await
4439            .unwrap_err();
4440        assert!(matches!(
4441            err,
4442            crate::ClientError::RemoteServiceError {
4443                service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4444                error_class_raw: Some(2),
4445                error_code_raw: Some(32),
4446                error_class: Some(rustbac_core::types::ErrorClass::Property),
4447                error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4448            }
4449        ));
4450    }
4451
4452    #[tokio::test]
4453    async fn write_property_maps_abort() {
4454        let (dl, state) = MockDataLink::new();
4455        let client = BacnetClient::with_datalink(dl);
4456        let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4457
4458        let mut apdu = [0u8; 8];
4459        let mut w = Writer::new(&mut apdu);
4460        w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); // server abort
4461        w.write_u8(1).unwrap(); // invoke id
4462        w.write_u8(9).unwrap(); // reason
4463        state
4464            .recv
4465            .lock()
4466            .await
4467            .push_back((with_npdu(w.as_written()), addr));
4468
4469        let req = rustbac_core::services::write_property::WritePropertyRequest {
4470            object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4471            property_id: PropertyId::PresentValue,
4472            value: DataValue::Real(10.0),
4473            priority: Some(8),
4474            ..Default::default()
4475        };
4476        let err = client.write_property(addr, req).await.unwrap_err();
4477        assert!(matches!(
4478            err,
4479            crate::ClientError::RemoteAbort {
4480                reason: 9,
4481                server: true
4482            }
4483        ));
4484    }
4485
4486    #[tokio::test]
4487    async fn read_property_multiple_returns_owned_string() {
4488        let (dl, state) = MockDataLink::new();
4489        let client = BacnetClient::with_datalink(dl);
4490        let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4491        let object_id = ObjectId::new(ObjectType::Device, 1);
4492
4493        let mut apdu_buf = [0u8; 256];
4494        let mut w = Writer::new(&mut apdu_buf);
4495        ComplexAckHeader {
4496            segmented: false,
4497            more_follows: false,
4498            invoke_id: 1,
4499            sequence_number: None,
4500            proposed_window_size: None,
4501            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4502        }
4503        .encode(&mut w)
4504        .unwrap();
4505        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4506        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4507            .encode(&mut w)
4508            .unwrap();
4509        encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4510        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4511            .encode(&mut w)
4512            .unwrap();
4513        rustbac_core::services::value_codec::encode_application_data_value(
4514            &mut w,
4515            &DataValue::CharacterString("AHU-1"),
4516        )
4517        .unwrap();
4518        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4519            .encode(&mut w)
4520            .unwrap();
4521        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4522            .encode(&mut w)
4523            .unwrap();
4524
4525        state
4526            .recv
4527            .lock()
4528            .await
4529            .push_back((with_npdu(w.as_written()), addr));
4530
4531        let values = client
4532            .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4533            .await
4534            .unwrap();
4535        assert_eq!(values.len(), 1);
4536        assert_eq!(values[0].0, PropertyId::ObjectName);
4537        assert!(matches!(
4538            &values[0].1,
4539            ClientDataValue::CharacterString(s) if s == "AHU-1"
4540        ));
4541    }
4542
4543    #[tokio::test]
4544    async fn new_sc_rejects_invalid_endpoint() {
4545        let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4546        assert!(matches!(err, crate::ClientError::DataLink(_)));
4547    }
4548}