Skip to main content

rustbac_client/
client.rs

1use crate::{
2    AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3    ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4    DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5    EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9    AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10    SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{
13    primitives::{decode_unsigned, encode_ctx_unsigned},
14    reader::Reader,
15    tag::Tag,
16    writer::Writer,
17};
18use rustbac_core::npdu::Npdu;
19use rustbac_core::services::acknowledge_alarm::{
20    AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
21};
22use rustbac_core::services::alarm_summary::{
23    AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
24    SERVICE_GET_ALARM_SUMMARY,
25};
26use rustbac_core::services::atomic_read_file::{
27    AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
28};
29use rustbac_core::services::atomic_write_file::{
30    AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
31};
32use rustbac_core::services::cov_notification::{
33    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
34    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
35};
36use rustbac_core::services::device_management::{
37    DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
38    ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
39};
40use rustbac_core::services::enrollment_summary::{
41    EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
42    GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
43};
44use rustbac_core::services::event_information::{
45    EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
46    SERVICE_GET_EVENT_INFORMATION,
47};
48use rustbac_core::services::event_notification::{
49    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
50    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
51};
52use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
53use rustbac_core::services::list_element::{
54    AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
55    SERVICE_REMOVE_LIST_ELEMENT,
56};
57use rustbac_core::services::object_management::{
58    CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
59    SERVICE_DELETE_OBJECT,
60};
61use rustbac_core::services::private_transfer::{
62    ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
63    SERVICE_CONFIRMED_PRIVATE_TRANSFER,
64};
65use rustbac_core::services::read_property::{
66    ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
67};
68use rustbac_core::services::read_property_multiple::{
69    PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
70    ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
71};
72use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
73use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
74use rustbac_core::services::subscribe_cov_property::{
75    SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
76};
77use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
78use rustbac_core::services::value_codec::encode_application_data_value;
79use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
80use rustbac_core::services::who_is::WhoIsRequest;
81use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
82use rustbac_core::services::write_property_multiple::{
83    PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
84    SERVICE_WRITE_PROPERTY_MULTIPLE,
85};
86use rustbac_core::types::{
87    DataValue, Date, ErrorClass, ErrorCode, ObjectId, ObjectType, PropertyId, Time,
88};
89use rustbac_core::EncodeError;
90use rustbac_datalink::bip::transport::{
91    BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
92};
93use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
94use std::collections::{HashMap, HashSet};
95use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
96use std::sync::RwLock;
97use std::time::Duration;
98use tokio::sync::Mutex;
99use tokio::task::JoinHandle;
100use tokio::time::{timeout, Instant};
101
102const MIN_SEGMENT_DATA_LEN: usize = 32;
103const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
104
105/// High-level async BACnet client.
106///
107/// `BacnetClient<D>` wraps any [`DataLink`] transport and exposes ergonomic methods for common
108/// BACnet operations: reading and writing properties, device and object discovery, COV
109/// subscriptions, alarm/event services, file I/O, and device management.
110///
111/// The invoke-id counter and the per-request I/O lock are stored behind `Mutex`es, so the
112/// client is intentionally not `Clone` — wrap it in an `Arc` to share it across tasks.
113///
114/// # Construction
115///
116/// - UDP/IP: [`BacnetClient::new()`] (local broadcast) or [`BacnetClient::new_foreign()`]
117///   (register as a foreign device with a BBMD).
118/// - BACnet/SC (WebSocket): [`BacnetClient::new_sc()`].
119/// - Custom transport: [`BacnetClient::with_datalink()`].
120pub struct BacnetClient<D: DataLink> {
121    datalink: D,
122    invoke_id: Mutex<u8>,
123    request_io_lock: Mutex<()>,
124    response_timeout: Duration,
125    segmented_request_window_size: u8,
126    segmented_request_retries: u8,
127    segment_ack_timeout: Duration,
128    /// Peer max-APDU sizes in bytes, populated from I-Am responses via `who_is`.
129    capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
130    /// Optional server handler for inline request dispatch.
131    server_handler: Option<std::sync::Arc<dyn crate::server::ServiceHandler>>,
132    /// Device instance number used for I-Am responses when serving inline.
133    server_device_id: u32,
134    /// Vendor ID used for I-Am responses when serving inline.
135    #[allow(unused)]
136    server_vendor_id: u16,
137}
138
139impl<D: DataLink + std::fmt::Debug> std::fmt::Debug for BacnetClient<D> {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("BacnetClient")
142            .field("datalink", &self.datalink)
143            .field("invoke_id", &self.invoke_id)
144            .field("response_timeout", &self.response_timeout)
145            .field(
146                "segmented_request_window_size",
147                &self.segmented_request_window_size,
148            )
149            .field("segmented_request_retries", &self.segmented_request_retries)
150            .field("segment_ack_timeout", &self.segment_ack_timeout)
151            .field(
152                "server_handler",
153                &self.server_handler.as_ref().map(|_| "..."),
154            )
155            .field("server_device_id", &self.server_device_id)
156            .field("server_vendor_id", &self.server_vendor_id)
157            .finish()
158    }
159}
160
161/// Handle for a background task that periodically re-registers this client as a BACnet
162/// foreign device with the BBMD.
163///
164/// Dropping this value aborts the renewal task automatically. Call [`ForeignDeviceRenewal::stop`]
165/// to abort it explicitly.
166#[derive(Debug)]
167pub struct ForeignDeviceRenewal {
168    task: JoinHandle<()>,
169}
170
171impl ForeignDeviceRenewal {
172    /// Abort the background renewal task immediately.
173    pub fn stop(self) {
174        self.task.abort();
175    }
176}
177
178impl Drop for ForeignDeviceRenewal {
179    fn drop(&mut self) {
180        self.task.abort();
181    }
182}
183
184impl BacnetClient<BacnetIpTransport> {
185    /// Create a UDP/IP BACnet client bound to an ephemeral port on all interfaces.
186    ///
187    /// Returns [`ClientError::DataLink`] if the socket cannot be bound.
188    pub async fn new() -> Result<Self, ClientError> {
189        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
190        let datalink = BacnetIpTransport::bind(bind_addr).await?;
191        Ok(Self {
192            datalink,
193            invoke_id: Mutex::new(1),
194            request_io_lock: Mutex::new(()),
195            response_timeout: Duration::from_secs(3),
196            segmented_request_window_size: 16,
197            segmented_request_retries: 2,
198            segment_ack_timeout: Duration::from_millis(500),
199            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
200            server_handler: None,
201            server_device_id: 0,
202            server_vendor_id: 0,
203        })
204    }
205
206    /// Create a UDP/IP BACnet client registered as a foreign device with the BBMD at
207    /// `bbmd_addr`.
208    ///
209    /// `ttl_seconds` is the time-to-live for the foreign-device registration. Returns
210    /// [`ClientError::DataLink`] if the socket cannot be bound or the initial registration
211    /// request fails.
212    pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
213        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
214        let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
215        datalink.register_foreign_device(ttl_seconds).await?;
216        Ok(Self {
217            datalink,
218            invoke_id: Mutex::new(1),
219            request_io_lock: Mutex::new(()),
220            response_timeout: Duration::from_secs(3),
221            segmented_request_window_size: 16,
222            segmented_request_retries: 2,
223            segment_ack_timeout: Duration::from_millis(500),
224            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
225            server_handler: None,
226            server_device_id: 0,
227            server_vendor_id: 0,
228        })
229    }
230
231    /// Re-register this client as a foreign device with the BBMD, using `ttl_seconds` as the new
232    /// time-to-live.
233    pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
234        let _io = self.request_io_lock.lock().await;
235        self.datalink.register_foreign_device(ttl_seconds).await?;
236        Ok(())
237    }
238
239    /// Read the Broadcast Distribution Table (BDT) from the BBMD.
240    pub async fn read_broadcast_distribution_table(
241        &self,
242    ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
243        let _io = self.request_io_lock.lock().await;
244        self.datalink
245            .read_broadcast_distribution_table()
246            .await
247            .map_err(ClientError::from)
248    }
249
250    /// Write (replace) the Broadcast Distribution Table on the BBMD with the given `entries`.
251    pub async fn write_broadcast_distribution_table(
252        &self,
253        entries: &[BroadcastDistributionEntry],
254    ) -> Result<(), ClientError> {
255        let _io = self.request_io_lock.lock().await;
256        self.datalink
257            .write_broadcast_distribution_table(entries)
258            .await?;
259        Ok(())
260    }
261
262    /// Read the Foreign Device Table (FDT) from the BBMD.
263    pub async fn read_foreign_device_table(
264        &self,
265    ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
266        let _io = self.request_io_lock.lock().await;
267        self.datalink
268            .read_foreign_device_table()
269            .await
270            .map_err(ClientError::from)
271    }
272
273    /// Delete the foreign device entry for `address` from the BBMD's Foreign Device Table.
274    pub async fn delete_foreign_device_table_entry(
275        &self,
276        address: SocketAddrV4,
277    ) -> Result<(), ClientError> {
278        let _io = self.request_io_lock.lock().await;
279        self.datalink
280            .delete_foreign_device_table_entry(address)
281            .await?;
282        Ok(())
283    }
284
285    /// Spawn a background task that re-registers this client as a foreign device at roughly
286    /// 75 % of `ttl_seconds` to keep the registration alive indefinitely.
287    ///
288    /// Returns a [`ForeignDeviceRenewal`] handle; dropping it (or calling `.stop()`) cancels
289    /// the task. Returns an error if `ttl_seconds` is 0.
290    pub fn start_foreign_device_renewal(
291        &self,
292        ttl_seconds: u16,
293    ) -> Result<ForeignDeviceRenewal, ClientError> {
294        if ttl_seconds == 0 {
295            return Err(EncodeError::InvalidLength.into());
296        }
297
298        let datalink = self.datalink.clone();
299        let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
300        let interval = Duration::from_secs(refresh_seconds.max(1));
301        let task = tokio::spawn(async move {
302            loop {
303                tokio::time::sleep(interval).await;
304                if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
305                    log::warn!("foreign device renewal send failed: {err}");
306                }
307            }
308        });
309        Ok(ForeignDeviceRenewal { task })
310    }
311}
312
313impl BacnetClient<rustbac_datalink::bip6::transport::BacnetIp6Transport> {
314    /// Create a BACnet/IPv6 client using UDP multicast.
315    ///
316    /// # Arguments
317    /// * `multicast` — The IPv6 multicast group (default: `FF05::BAC0`).
318    /// * `if_index` — The network interface index for multicast membership.
319    pub async fn new_ipv6(
320        multicast: std::net::Ipv6Addr,
321        if_index: u32,
322    ) -> Result<Self, ClientError> {
323        use rustbac_datalink::bip6::transport::BacnetIp6Transport;
324        let bind_addr = std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, 47808, 0, 0);
325        let datalink = BacnetIp6Transport::bind(bind_addr, multicast, if_index).await?;
326        Ok(Self::with_datalink(datalink))
327    }
328}
329
330impl BacnetClient<BacnetScTransport> {
331    /// Create a BACnet/SC client by connecting to the WebSocket hub at `endpoint`
332    /// (e.g. `"wss://hub.example.com:47808/bacnet"`).
333    pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
334        let datalink = BacnetScTransport::connect(endpoint).await?;
335        Ok(Self::with_datalink(datalink))
336    }
337}
338
339impl<D: DataLink> BacnetClient<D> {
340    /// Create a client wrapping an already-constructed `datalink`.
341    ///
342    /// Use this when you need a custom or pre-configured [`DataLink`] implementation.
343    pub fn with_datalink(datalink: D) -> Self {
344        Self {
345            datalink,
346            invoke_id: Mutex::new(1),
347            request_io_lock: Mutex::new(()),
348            response_timeout: Duration::from_secs(3),
349            segmented_request_window_size: 16,
350            segmented_request_retries: 2,
351            segment_ack_timeout: Duration::from_millis(500),
352            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
353            server_handler: None,
354            server_device_id: 0,
355            server_vendor_id: 0,
356        }
357    }
358
359    /// Override the per-request response timeout (default: 3 s).
360    pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
361        self.response_timeout = timeout;
362        self
363    }
364
365    /// Override the segmented-request window size (number of segments sent before waiting
366    /// for an ACK). Clamped to a minimum of 1. Default: 16.
367    pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
368        self.segmented_request_window_size = window_size.max(1);
369        self
370    }
371
372    /// Override the number of times a segment window is retried after a negative ACK or
373    /// segment-ACK timeout before giving up. Default: 2.
374    pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
375        self.segmented_request_retries = retries;
376        self
377    }
378
379    /// Override the timeout used when waiting for a segment ACK from the remote device
380    /// during a segmented confirmed request. Clamped to a minimum of 1 ms. Default: 500 ms.
381    pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
382        self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
383        self
384    }
385
386    /// Attach a [`ServiceHandler`](crate::server::ServiceHandler) so that incoming service
387    /// requests (e.g. ReadProperty, WriteProperty, Who-Is) are dispatched inline while the
388    /// client waits for responses.  This avoids the need for a separate
389    /// [`BacnetServer`](crate::server::BacnetServer) sharing the same transport.
390    ///
391    /// `device_id` and `vendor_id` are used for I-Am responses.
392    pub fn with_server_handler(
393        mut self,
394        handler: std::sync::Arc<dyn crate::server::ServiceHandler>,
395        device_id: u32,
396        vendor_id: u16,
397    ) -> Self {
398        self.server_handler = Some(handler);
399        self.server_device_id = device_id;
400        self.server_vendor_id = vendor_id;
401        self
402    }
403
404    /// Non-blocking poll that receives and dispatches a single incoming server request.
405    ///
406    /// Call this periodically when the client is idle (not waiting for a response) to service
407    /// requests such as ReadProperty or Who-Is from other BACnet devices.
408    ///
409    /// Returns `Ok(())` regardless of whether a request was received or dispatched.
410    /// Returns `Err(ClientError::Timeout)` if no server handler has been configured.
411    pub async fn poll_server(&self) -> Result<(), ClientError> {
412        let handler = self.server_handler.as_ref().ok_or(ClientError::Timeout)?;
413        let _io_lock = self.request_io_lock.lock().await;
414        let mut buf = [0u8; 1500];
415        match tokio::time::timeout(Duration::from_millis(50), self.datalink.recv(&mut buf)).await {
416            Ok(Ok((n, src))) => {
417                let _ = dispatch_incoming_request(
418                    &self.datalink,
419                    handler.as_ref(),
420                    self.server_device_id,
421                    self.server_vendor_id,
422                    &buf[..n],
423                    src,
424                )
425                .await;
426                Ok(())
427            }
428            _ => Ok(()),
429        }
430    }
431
432    async fn next_invoke_id(&self) -> u8 {
433        let mut lock = self.invoke_id.lock().await;
434        let id = *lock;
435        *lock = lock.wrapping_add(1);
436        if *lock == 0 {
437            *lock = 1;
438        }
439        id
440    }
441
442    async fn send_segment_ack(
443        &self,
444        address: DataLinkAddress,
445        invoke_id: u8,
446        sequence_number: u8,
447        window_size: u8,
448    ) -> Result<(), ClientError> {
449        let mut tx = [0u8; 64];
450        let mut w = Writer::new(&mut tx);
451        Npdu::new(0).encode(&mut w)?;
452        SegmentAck {
453            negative_ack: false,
454            sent_by_server: false,
455            invoke_id,
456            sequence_number,
457            actual_window_size: window_size,
458        }
459        .encode(&mut w)?;
460        self.datalink.send(address, w.as_written()).await?;
461        Ok(())
462    }
463
464    async fn recv_ignoring_invalid_frame(
465        &self,
466        buf: &mut [u8],
467        deadline: Instant,
468    ) -> Result<(usize, DataLinkAddress), ClientError> {
469        loop {
470            let remaining = deadline.saturating_duration_since(Instant::now());
471            if remaining.is_zero() {
472                return Err(ClientError::Timeout);
473            }
474
475            match timeout(remaining, self.datalink.recv(buf)).await {
476                Err(_) => return Err(ClientError::Timeout),
477                Ok(Err(DataLinkError::InvalidFrame)) => continue,
478                Ok(Err(e)) => return Err(e.into()),
479                Ok(Ok(v)) => return Ok(v),
480            }
481        }
482    }
483
484    async fn send_simple_ack(
485        &self,
486        address: DataLinkAddress,
487        invoke_id: u8,
488        service_choice: u8,
489    ) -> Result<(), ClientError> {
490        let mut tx = [0u8; 64];
491        let mut w = Writer::new(&mut tx);
492        Npdu::new(0).encode(&mut w)?;
493        SimpleAck {
494            invoke_id,
495            service_choice,
496        }
497        .encode(&mut w)?;
498        self.datalink.send(address, w.as_written()).await?;
499        Ok(())
500    }
501
502    fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
503    where
504        F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
505    {
506        for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
507            let mut buf = vec![0u8; size];
508            let mut w = Writer::new(&mut buf);
509            match encode(&mut w) {
510                Ok(()) => {
511                    let written_len = w.as_written().len();
512                    buf.truncate(written_len);
513                    return Ok(buf);
514                }
515                Err(EncodeError::BufferTooSmall) => continue,
516                Err(e) => return Err(e.into()),
517            }
518        }
519        Err(ClientError::SegmentedRequestTooLarge)
520    }
521
522    const fn max_apdu_octets(max_apdu_code: u8) -> usize {
523        match max_apdu_code & 0x0f {
524            0 => 50,
525            1 => 128,
526            2 => 206,
527            3 => 480,
528            4 => 1024,
529            5 => 1476,
530            _ => 480,
531        }
532    }
533
534    async fn await_segment_ack(
535        &self,
536        address: DataLinkAddress,
537        invoke_id: u8,
538        service_choice: u8,
539        expected_sequence: u8,
540        deadline: Instant,
541    ) -> Result<SegmentAck, ClientError> {
542        loop {
543            let remaining = deadline.saturating_duration_since(Instant::now());
544            if remaining.is_zero() {
545                return Err(ClientError::Timeout);
546            }
547
548            let mut rx = [0u8; 1500];
549            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
550            let (n, src) = match recv {
551                Err(_) => return Err(ClientError::Timeout),
552                Ok(Err(DataLinkError::InvalidFrame)) => continue,
553                Ok(Err(e)) => return Err(e.into()),
554                Ok(Ok(v)) => v,
555            };
556            if src != address {
557                continue;
558            }
559
560            let Ok(apdu) = extract_apdu(&rx[..n]) else {
561                continue;
562            };
563            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
564            match ApduType::from_u8(first >> 4) {
565                Some(ApduType::SegmentAck) => {
566                    let mut r = Reader::new(apdu);
567                    let ack = SegmentAck::decode(&mut r)?;
568                    if ack.invoke_id != invoke_id || !ack.sent_by_server {
569                        continue;
570                    }
571                    if ack.negative_ack {
572                        return Err(ClientError::SegmentNegativeAck {
573                            sequence_number: ack.sequence_number,
574                        });
575                    }
576                    if ack.sequence_number == expected_sequence {
577                        return Ok(ack);
578                    }
579                }
580                Some(ApduType::Error) => {
581                    let mut r = Reader::new(apdu);
582                    let err = BacnetError::decode(&mut r)?;
583                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
584                        return Err(remote_service_error(err));
585                    }
586                }
587                Some(ApduType::Reject) => {
588                    let mut r = Reader::new(apdu);
589                    let rej = RejectPdu::decode(&mut r)?;
590                    if rej.invoke_id == invoke_id {
591                        return Err(ClientError::RemoteReject { reason: rej.reason });
592                    }
593                }
594                Some(ApduType::Abort) => {
595                    let mut r = Reader::new(apdu);
596                    let abort = AbortPdu::decode(&mut r)?;
597                    if abort.invoke_id == invoke_id {
598                        return Err(ClientError::RemoteAbort {
599                            reason: abort.reason,
600                            server: abort.server,
601                        });
602                    }
603                }
604                _ => continue,
605            }
606        }
607    }
608
609    async fn send_confirmed_request(
610        &self,
611        address: DataLinkAddress,
612        frame: &[u8],
613        deadline: Instant,
614    ) -> Result<(), ClientError> {
615        let mut pr = Reader::new(frame);
616        let _npdu = Npdu::decode(&mut pr)?;
617        let npdu_len = frame.len() - pr.remaining();
618        let npdu_bytes = &frame[..npdu_len];
619        let apdu = &frame[npdu_len..];
620
621        let mut ar = Reader::new(apdu);
622        let header = ConfirmedRequestHeader::decode(&mut ar)?;
623        let service_payload = ar.read_exact(ar.remaining())?;
624
625        // Use the peer's max-APDU if we learned it from a prior I-Am; fall back to
626        // the code declared in the request header (our own capability advertisement).
627        let peer_max_apdu = self
628            .capability_cache
629            .read()
630            .ok()
631            .and_then(|c| c.get(&address).copied())
632            .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
633        let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
634        let segment_count = service_payload.len().div_ceil(segment_data_len);
635
636        if segment_count <= 1 {
637            self.datalink.send(address, frame).await?;
638            return Ok(());
639        }
640
641        if segment_count > usize::from(u8::MAX) + 1 {
642            return Err(ClientError::SegmentedRequestTooLarge);
643        }
644
645        let configured_window_size = self.segmented_request_window_size.max(1);
646        let mut window_size = configured_window_size;
647        let mut peer_window_ceiling = configured_window_size;
648        let mut batch_start = 0usize;
649        while batch_start < segment_count {
650            let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
651            let expected_sequence = (batch_end - 1) as u8;
652
653            let mut frames = Vec::with_capacity(batch_end - batch_start);
654            for segment_index in batch_start..batch_end {
655                let seq = segment_index as u8;
656                let more_follows = segment_index + 1 < segment_count;
657                let start = segment_index * segment_data_len;
658                let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
659                let segment = &service_payload[start..end];
660
661                let seg_header = ConfirmedRequestHeader {
662                    segmented: true,
663                    more_follows,
664                    segmented_response_accepted: header.segmented_response_accepted,
665                    max_segments: header.max_segments,
666                    max_apdu: header.max_apdu,
667                    invoke_id: header.invoke_id,
668                    sequence_number: Some(seq),
669                    proposed_window_size: Some(window_size),
670                    service_choice: header.service_choice,
671                };
672
673                let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
674                let written_len = {
675                    let mut w = Writer::new(&mut tx);
676                    w.write_all(npdu_bytes)?;
677                    seg_header.encode(&mut w)?;
678                    w.write_all(segment)?;
679                    w.as_written().len()
680                };
681                tx.truncate(written_len);
682                frames.push(tx);
683            }
684
685            let mut retries_remaining = self.segmented_request_retries;
686            loop {
687                for frame in &frames {
688                    self.datalink.send(address, frame).await?;
689                }
690
691                if batch_end == segment_count {
692                    break;
693                }
694
695                let remaining = deadline.saturating_duration_since(Instant::now());
696                if remaining.is_zero() {
697                    return Err(ClientError::Timeout);
698                }
699                let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
700                match self
701                    .await_segment_ack(
702                        address,
703                        header.invoke_id,
704                        header.service_choice,
705                        expected_sequence,
706                        ack_wait_deadline,
707                    )
708                    .await
709                {
710                    Ok(ack) => {
711                        peer_window_ceiling =
712                            peer_window_ceiling.min(ack.actual_window_size.max(1));
713                        window_size = window_size
714                            .saturating_add(1)
715                            .min(configured_window_size)
716                            .min(peer_window_ceiling)
717                            .max(1);
718                        break;
719                    }
720                    Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
721                        if retries_remaining > 0 =>
722                    {
723                        retries_remaining -= 1;
724                        window_size = window_size.saturating_div(2).max(1);
725                        continue;
726                    }
727                    Err(e) => return Err(e),
728                }
729            }
730
731            batch_start = batch_end;
732        }
733
734        Ok(())
735    }
736
737    async fn collect_complex_ack_payload(
738        &self,
739        address: DataLinkAddress,
740        invoke_id: u8,
741        service_choice: u8,
742        first_header: ComplexAckHeader,
743        first_payload: &[u8],
744        deadline: Instant,
745    ) -> Result<Vec<u8>, ClientError> {
746        let mut payload = first_payload.to_vec();
747        if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
748            return Err(ClientError::ResponseTooLarge {
749                limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
750            });
751        }
752        if !first_header.segmented {
753            return Ok(payload);
754        }
755
756        let mut last_seq = first_header
757            .sequence_number
758            .ok_or(ClientError::UnsupportedResponse)?;
759        let mut window_size = first_header.proposed_window_size.unwrap_or(1);
760        self.send_segment_ack(address, invoke_id, last_seq, window_size)
761            .await?;
762        let mut more_follows = first_header.more_follows;
763
764        while more_follows {
765            let mut rx = [0u8; 1500];
766            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
767            if src != address {
768                // Try to dispatch as an incoming server request
769                if let Some(ref handler) = self.server_handler {
770                    let _ = dispatch_incoming_request(
771                        &self.datalink,
772                        handler.as_ref(),
773                        self.server_device_id,
774                        self.server_vendor_id,
775                        &rx[..n],
776                        src,
777                    )
778                    .await;
779                }
780                continue;
781            }
782
783            let Ok(apdu) = extract_apdu(&rx[..n]) else {
784                continue;
785            };
786            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
787            match ApduType::from_u8(first >> 4) {
788                Some(ApduType::ComplexAck) => {
789                    let mut r = Reader::new(apdu);
790                    let seg = ComplexAckHeader::decode(&mut r)?;
791                    if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
792                        continue;
793                    }
794                    if !seg.segmented {
795                        return Err(ClientError::UnsupportedResponse);
796                    }
797                    let seq = seg
798                        .sequence_number
799                        .ok_or(ClientError::UnsupportedResponse)?;
800                    if seq == last_seq {
801                        // Duplicate segment: acknowledge again and continue waiting.
802                        self.send_segment_ack(address, invoke_id, last_seq, window_size)
803                            .await?;
804                        continue;
805                    }
806                    if seq != last_seq.wrapping_add(1) {
807                        continue;
808                    }
809
810                    let seg_payload = r.read_exact(r.remaining())?;
811                    if payload.len().saturating_add(seg_payload.len())
812                        > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
813                    {
814                        return Err(ClientError::ResponseTooLarge {
815                            limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
816                        });
817                    }
818                    payload.extend_from_slice(seg_payload);
819
820                    last_seq = seq;
821                    more_follows = seg.more_follows;
822                    window_size = seg.proposed_window_size.unwrap_or(window_size);
823                    self.send_segment_ack(address, invoke_id, last_seq, window_size)
824                        .await?;
825                }
826                Some(ApduType::Error) => {
827                    let mut r = Reader::new(apdu);
828                    let err = BacnetError::decode(&mut r)?;
829                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
830                        return Err(remote_service_error(err));
831                    }
832                }
833                Some(ApduType::Reject) => {
834                    let mut r = Reader::new(apdu);
835                    let rej = RejectPdu::decode(&mut r)?;
836                    if rej.invoke_id == invoke_id {
837                        return Err(ClientError::RemoteReject { reason: rej.reason });
838                    }
839                }
840                Some(ApduType::Abort) => {
841                    let mut r = Reader::new(apdu);
842                    let abort = AbortPdu::decode(&mut r)?;
843                    if abort.invoke_id == invoke_id {
844                        return Err(ClientError::RemoteAbort {
845                            reason: abort.reason,
846                            server: abort.server,
847                        });
848                    }
849                }
850                _ => {
851                    // Try to dispatch as an incoming server request
852                    if let Some(ref handler) = self.server_handler {
853                        let _ = dispatch_incoming_request(
854                            &self.datalink,
855                            handler.as_ref(),
856                            self.server_device_id,
857                            self.server_vendor_id,
858                            &rx[..n],
859                            src,
860                        )
861                        .await;
862                    }
863                    continue;
864                }
865            }
866        }
867
868        Ok(payload)
869    }
870
871    /// Broadcast a Who-Is request and collect I-Am replies for the duration of `wait`.
872    ///
873    /// `range` constrains the device-instance range as `(low, high)`; `None` performs a
874    /// global Who-Is. Duplicate devices (same object id) are deduplicated. Returns
875    /// [`ClientError::DataLink`] on send/receive failure.
876    pub async fn who_is(
877        &self,
878        range: Option<(u32, u32)>,
879        wait: Duration,
880    ) -> Result<Vec<DiscoveredDevice>, ClientError> {
881        // Unconfirmed broadcast — no request_io_lock needed; the recv loop
882        // filters on service_choice so it coexists with concurrent confirmed requests.
883        let req = match range {
884            Some((low, high)) => WhoIsRequest {
885                low_limit: Some(low),
886                high_limit: Some(high),
887            },
888            None => WhoIsRequest::global(),
889        };
890
891        let mut tx = [0u8; 128];
892        let mut w = Writer::new(&mut tx);
893        Npdu::new(0).encode(&mut w)?;
894        req.encode(&mut w)?;
895
896        self.datalink
897            .send(
898                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
899                w.as_written(),
900            )
901            .await?;
902
903        let mut devices = Vec::new();
904        let mut seen = HashSet::new();
905        let deadline = tokio::time::Instant::now() + wait;
906
907        while tokio::time::Instant::now() < deadline {
908            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
909            let mut rx = [0u8; 1500];
910            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
911            match recv {
912                Ok(Ok((n, src))) => {
913                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
914                        continue;
915                    };
916                    let mut r = Reader::new(apdu);
917                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
918                        continue;
919                    };
920                    if unconfirmed.service_choice != SERVICE_I_AM {
921                        continue;
922                    }
923                    let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
924                        continue;
925                    };
926                    if seen.insert(i_am.device_id) {
927                        devices.push(DiscoveredDevice {
928                            address: src,
929                            device_id: Some(i_am.device_id),
930                        });
931                        // Cache the peer's reported max-APDU so segmented
932                        // requests can be sized correctly without a separate read.
933                        if let Ok(mut cache) = self.capability_cache.write() {
934                            cache.insert(src, i_am.max_apdu as usize);
935                        }
936                    }
937                }
938                Ok(Err(DataLinkError::InvalidFrame)) => continue,
939                Ok(Err(e)) => return Err(e.into()),
940                Err(_) => break,
941            }
942        }
943
944        Ok(devices)
945    }
946
947    /// Broadcast a Who-Has request for a specific object id and collect I-Have replies for
948    /// `wait`.
949    ///
950    /// `range` constrains the responding device-instance range; `None` means any device.
951    pub async fn who_has_object_id(
952        &self,
953        range: Option<(u32, u32)>,
954        object_id: ObjectId,
955        wait: Duration,
956    ) -> Result<Vec<DiscoveredObject>, ClientError> {
957        let req = WhoHasRequest {
958            low_limit: range.map(|(low, _)| low),
959            high_limit: range.map(|(_, high)| high),
960            object: WhoHasObject::ObjectId(object_id),
961        };
962        self.who_has(req, wait).await
963    }
964
965    /// Broadcast a Who-Has request for an object by name and collect I-Have replies for `wait`.
966    ///
967    /// `range` constrains the responding device-instance range; `None` means any device.
968    pub async fn who_has_object_name(
969        &self,
970        range: Option<(u32, u32)>,
971        object_name: &str,
972        wait: Duration,
973    ) -> Result<Vec<DiscoveredObject>, ClientError> {
974        let req = WhoHasRequest {
975            low_limit: range.map(|(low, _)| low),
976            high_limit: range.map(|(_, high)| high),
977            object: WhoHasObject::ObjectName(object_name),
978        };
979        self.who_has(req, wait).await
980    }
981
982    async fn who_has(
983        &self,
984        request: WhoHasRequest<'_>,
985        wait: Duration,
986    ) -> Result<Vec<DiscoveredObject>, ClientError> {
987        // Unconfirmed broadcast — same rationale as who_is.
988        let tx = self.encode_with_growth(|w| {
989            Npdu::new(0).encode(w)?;
990            request.encode(w)
991        })?;
992        self.datalink
993            .send(
994                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
995                &tx,
996            )
997            .await?;
998
999        let mut objects = Vec::new();
1000        let mut seen = HashSet::new();
1001        let deadline = tokio::time::Instant::now() + wait;
1002
1003        while tokio::time::Instant::now() < deadline {
1004            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1005            let mut rx = [0u8; 1500];
1006            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1007            match recv {
1008                Ok(Ok((n, src))) => {
1009                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
1010                        continue;
1011                    };
1012                    let mut r = Reader::new(apdu);
1013                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
1014                        continue;
1015                    };
1016                    if unconfirmed.service_choice != SERVICE_I_HAVE {
1017                        continue;
1018                    }
1019                    let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
1020                        continue;
1021                    };
1022                    if !seen.insert((src, i_have.object_id.raw())) {
1023                        continue;
1024                    }
1025                    objects.push(DiscoveredObject {
1026                        address: src,
1027                        device_id: i_have.device_id,
1028                        object_id: i_have.object_id,
1029                        object_name: i_have.object_name.to_string(),
1030                    });
1031                }
1032                Ok(Err(DataLinkError::InvalidFrame)) => continue,
1033                Ok(Err(e)) => return Err(e.into()),
1034                Err(_) => break,
1035            }
1036        }
1037
1038        Ok(objects)
1039    }
1040
1041    /// Send a DeviceCommunicationControl request to a device.
1042    ///
1043    /// `time_duration_seconds` sets the duration for which the state applies; `None` means
1044    /// indefinite. `enable_disable` selects the new communication state. `password` is only
1045    /// required if the device is password-protected.
1046    pub async fn device_communication_control(
1047        &self,
1048        address: DataLinkAddress,
1049        time_duration_seconds: Option<u16>,
1050        enable_disable: DeviceCommunicationState,
1051        password: Option<&str>,
1052    ) -> Result<(), ClientError> {
1053        let invoke_id = self.next_invoke_id().await;
1054        let request = DeviceCommunicationControlRequest {
1055            time_duration_seconds,
1056            enable_disable,
1057            password,
1058            invoke_id,
1059        };
1060        let tx = self.encode_with_growth(|w| {
1061            Npdu::new(0).encode(w)?;
1062            request.encode(w)
1063        })?;
1064        self.await_simple_ack_or_error(
1065            address,
1066            &tx,
1067            invoke_id,
1068            SERVICE_DEVICE_COMMUNICATION_CONTROL,
1069            self.response_timeout,
1070        )
1071        .await
1072    }
1073
1074    /// Send a ReinitializeDevice request to a device (e.g. cold-start, warm-start, or backup).
1075    ///
1076    /// `state` selects the reinitialization type. `password` is sent only if `Some`.
1077    pub async fn reinitialize_device(
1078        &self,
1079        address: DataLinkAddress,
1080        state: ReinitializeState,
1081        password: Option<&str>,
1082    ) -> Result<(), ClientError> {
1083        let invoke_id = self.next_invoke_id().await;
1084        let request = ReinitializeDeviceRequest {
1085            state,
1086            password,
1087            invoke_id,
1088        };
1089        let tx = self.encode_with_growth(|w| {
1090            Npdu::new(0).encode(w)?;
1091            request.encode(w)
1092        })?;
1093        self.await_simple_ack_or_error(
1094            address,
1095            &tx,
1096            invoke_id,
1097            SERVICE_REINITIALIZE_DEVICE,
1098            self.response_timeout,
1099        )
1100        .await
1101    }
1102
1103    /// Send a TimeSynchronization (or UTCTimeSynchronization) request to a device.
1104    ///
1105    /// Set `utc` to `true` to send the UTC variant of the request.
1106    /// This is a fire-and-forget unconfirmed service; no response is awaited.
1107    pub async fn time_synchronize(
1108        &self,
1109        address: DataLinkAddress,
1110        date: Date,
1111        time: Time,
1112        utc: bool,
1113    ) -> Result<(), ClientError> {
1114        let request = if utc {
1115            TimeSynchronizationRequest::utc(date, time)
1116        } else {
1117            TimeSynchronizationRequest::local(date, time)
1118        };
1119        let tx = self.encode_with_growth(|w| {
1120            Npdu::new(0).encode(w)?;
1121            request.encode(w)
1122        })?;
1123        self.datalink.send(address, &tx).await?;
1124        Ok(())
1125    }
1126
1127    /// Create a new object of the given type on the device, letting the device choose the
1128    /// instance number. Returns the [`ObjectId`] assigned by the device.
1129    pub async fn create_object_by_type(
1130        &self,
1131        address: DataLinkAddress,
1132        object_type: rustbac_core::types::ObjectType,
1133    ) -> Result<ObjectId, ClientError> {
1134        self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1135            .await
1136    }
1137
1138    /// Send a CreateObject request to the device and return the [`ObjectId`] of the newly
1139    /// created object. Use this variant when you need fine-grained control over the request
1140    /// (e.g. specifying initial property values).
1141    pub async fn create_object(
1142        &self,
1143        address: DataLinkAddress,
1144        mut request: CreateObjectRequest,
1145    ) -> Result<ObjectId, ClientError> {
1146        request.invoke_id = self.next_invoke_id().await;
1147        let invoke_id = request.invoke_id;
1148        let tx = self.encode_with_growth(|w| {
1149            Npdu::new(0).encode(w)?;
1150            request.encode(w)
1151        })?;
1152        let payload = self
1153            .await_complex_ack_payload_or_error(
1154                address,
1155                &tx,
1156                invoke_id,
1157                SERVICE_CREATE_OBJECT,
1158                self.response_timeout,
1159            )
1160            .await?;
1161        let mut pr = Reader::new(&payload);
1162        let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1163        Ok(parsed.object_id)
1164    }
1165
1166    /// Send a DeleteObject request to remove `object_id` from the device.
1167    pub async fn delete_object(
1168        &self,
1169        address: DataLinkAddress,
1170        object_id: ObjectId,
1171    ) -> Result<(), ClientError> {
1172        let invoke_id = self.next_invoke_id().await;
1173        let request = DeleteObjectRequest {
1174            object_id,
1175            invoke_id,
1176        };
1177        let tx = self.encode_with_growth(|w| {
1178            Npdu::new(0).encode(w)?;
1179            request.encode(w)
1180        })?;
1181        self.await_simple_ack_or_error(
1182            address,
1183            &tx,
1184            invoke_id,
1185            SERVICE_DELETE_OBJECT,
1186            self.response_timeout,
1187        )
1188        .await
1189    }
1190
1191    /// Send an AddListElement request to append elements to a list property on the device.
1192    pub async fn add_list_element(
1193        &self,
1194        address: DataLinkAddress,
1195        mut request: AddListElementRequest<'_>,
1196    ) -> Result<(), ClientError> {
1197        request.invoke_id = self.next_invoke_id().await;
1198        let invoke_id = request.invoke_id;
1199        let tx = self.encode_with_growth(|w| {
1200            Npdu::new(0).encode(w)?;
1201            request.encode(w)
1202        })?;
1203        self.await_simple_ack_or_error(
1204            address,
1205            &tx,
1206            invoke_id,
1207            SERVICE_ADD_LIST_ELEMENT,
1208            self.response_timeout,
1209        )
1210        .await
1211    }
1212
1213    /// Send a RemoveListElement request to delete elements from a list property on the device.
1214    pub async fn remove_list_element(
1215        &self,
1216        address: DataLinkAddress,
1217        mut request: RemoveListElementRequest<'_>,
1218    ) -> Result<(), ClientError> {
1219        request.invoke_id = self.next_invoke_id().await;
1220        let invoke_id = request.invoke_id;
1221        let tx = self.encode_with_growth(|w| {
1222            Npdu::new(0).encode(w)?;
1223            request.encode(w)
1224        })?;
1225        self.await_simple_ack_or_error(
1226            address,
1227            &tx,
1228            invoke_id,
1229            SERVICE_REMOVE_LIST_ELEMENT,
1230            self.response_timeout,
1231        )
1232        .await
1233    }
1234
1235    async fn await_simple_ack_or_error(
1236        &self,
1237        address: DataLinkAddress,
1238        tx: &[u8],
1239        invoke_id: u8,
1240        service_choice: u8,
1241        timeout_window: Duration,
1242    ) -> Result<(), ClientError> {
1243        #[cfg(feature = "tracing")]
1244        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1245        let _io_lock = self.request_io_lock.lock().await;
1246        let deadline = tokio::time::Instant::now() + timeout_window;
1247        self.send_confirmed_request(address, tx, deadline).await?;
1248        while tokio::time::Instant::now() < deadline {
1249            let mut rx = [0u8; 1500];
1250            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1251            if src != address {
1252                // Try to dispatch as an incoming server request
1253                if let Some(ref handler) = self.server_handler {
1254                    let _ = dispatch_incoming_request(
1255                        &self.datalink,
1256                        handler.as_ref(),
1257                        self.server_device_id,
1258                        self.server_vendor_id,
1259                        &rx[..n],
1260                        src,
1261                    )
1262                    .await;
1263                }
1264                continue;
1265            }
1266            let apdu = extract_apdu(&rx[..n])?;
1267            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1268            match ApduType::from_u8(first >> 4) {
1269                Some(ApduType::SimpleAck) => {
1270                    let mut r = Reader::new(apdu);
1271                    let ack = SimpleAck::decode(&mut r)?;
1272                    if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1273                        return Ok(());
1274                    }
1275                }
1276                Some(ApduType::Error) => {
1277                    let mut r = Reader::new(apdu);
1278                    let err = BacnetError::decode(&mut r)?;
1279                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1280                        return Err(remote_service_error(err));
1281                    }
1282                }
1283                Some(ApduType::Reject) => {
1284                    let mut r = Reader::new(apdu);
1285                    let rej = RejectPdu::decode(&mut r)?;
1286                    if rej.invoke_id == invoke_id {
1287                        return Err(ClientError::RemoteReject { reason: rej.reason });
1288                    }
1289                }
1290                Some(ApduType::Abort) => {
1291                    let mut r = Reader::new(apdu);
1292                    let abort = AbortPdu::decode(&mut r)?;
1293                    if abort.invoke_id == invoke_id {
1294                        return Err(ClientError::RemoteAbort {
1295                            reason: abort.reason,
1296                            server: abort.server,
1297                        });
1298                    }
1299                }
1300                _ => {
1301                    // Try to dispatch as an incoming server request
1302                    if let Some(ref handler) = self.server_handler {
1303                        let _ = dispatch_incoming_request(
1304                            &self.datalink,
1305                            handler.as_ref(),
1306                            self.server_device_id,
1307                            self.server_vendor_id,
1308                            &rx[..n],
1309                            src,
1310                        )
1311                        .await;
1312                    }
1313                    continue;
1314                }
1315            }
1316        }
1317        #[cfg(feature = "tracing")]
1318        tracing::warn!(invoke_id = invoke_id, "request timed out");
1319        Err(ClientError::Timeout)
1320    }
1321
1322    async fn await_complex_ack_payload_or_error(
1323        &self,
1324        address: DataLinkAddress,
1325        tx: &[u8],
1326        invoke_id: u8,
1327        service_choice: u8,
1328        timeout_window: Duration,
1329    ) -> Result<Vec<u8>, ClientError> {
1330        #[cfg(feature = "tracing")]
1331        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1332        let _io_lock = self.request_io_lock.lock().await;
1333        let deadline = tokio::time::Instant::now() + timeout_window;
1334        self.send_confirmed_request(address, tx, deadline).await?;
1335        while tokio::time::Instant::now() < deadline {
1336            let mut rx = [0u8; 1500];
1337            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1338            if src != address {
1339                // Try to dispatch as an incoming server request
1340                if let Some(ref handler) = self.server_handler {
1341                    let _ = dispatch_incoming_request(
1342                        &self.datalink,
1343                        handler.as_ref(),
1344                        self.server_device_id,
1345                        self.server_vendor_id,
1346                        &rx[..n],
1347                        src,
1348                    )
1349                    .await;
1350                }
1351                continue;
1352            }
1353
1354            let apdu = extract_apdu(&rx[..n])?;
1355            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1356            match ApduType::from_u8(first >> 4) {
1357                Some(ApduType::ComplexAck) => {
1358                    let mut r = Reader::new(apdu);
1359                    let ack = ComplexAckHeader::decode(&mut r)?;
1360                    if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1361                        continue;
1362                    }
1363                    return self
1364                        .collect_complex_ack_payload(
1365                            address,
1366                            invoke_id,
1367                            service_choice,
1368                            ack,
1369                            r.read_exact(r.remaining())?,
1370                            deadline,
1371                        )
1372                        .await;
1373                }
1374                Some(ApduType::Error) => {
1375                    let mut r = Reader::new(apdu);
1376                    let err = BacnetError::decode(&mut r)?;
1377                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1378                        return Err(remote_service_error(err));
1379                    }
1380                }
1381                Some(ApduType::Reject) => {
1382                    let mut r = Reader::new(apdu);
1383                    let rej = RejectPdu::decode(&mut r)?;
1384                    if rej.invoke_id == invoke_id {
1385                        return Err(ClientError::RemoteReject { reason: rej.reason });
1386                    }
1387                }
1388                Some(ApduType::Abort) => {
1389                    let mut r = Reader::new(apdu);
1390                    let abort = AbortPdu::decode(&mut r)?;
1391                    if abort.invoke_id == invoke_id {
1392                        return Err(ClientError::RemoteAbort {
1393                            reason: abort.reason,
1394                            server: abort.server,
1395                        });
1396                    }
1397                }
1398                _ => {
1399                    // Try to dispatch as an incoming server request
1400                    if let Some(ref handler) = self.server_handler {
1401                        let _ = dispatch_incoming_request(
1402                            &self.datalink,
1403                            handler.as_ref(),
1404                            self.server_device_id,
1405                            self.server_vendor_id,
1406                            &rx[..n],
1407                            src,
1408                        )
1409                        .await;
1410                    }
1411                    continue;
1412                }
1413            }
1414        }
1415        #[cfg(feature = "tracing")]
1416        tracing::warn!(invoke_id = invoke_id, "request timed out");
1417        Err(ClientError::Timeout)
1418    }
1419
1420    /// Send a GetAlarmSummary request and return the list of active alarms on the device.
1421    pub async fn get_alarm_summary(
1422        &self,
1423        address: DataLinkAddress,
1424    ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1425        let invoke_id = self.next_invoke_id().await;
1426        let request = GetAlarmSummaryRequest { invoke_id };
1427        let tx = self.encode_with_growth(|w| {
1428            Npdu::new(0).encode(w)?;
1429            request.encode(w)
1430        })?;
1431        let payload = self
1432            .await_complex_ack_payload_or_error(
1433                address,
1434                &tx,
1435                invoke_id,
1436                SERVICE_GET_ALARM_SUMMARY,
1437                self.response_timeout,
1438            )
1439            .await?;
1440        let mut pr = Reader::new(&payload);
1441        let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1442        Ok(into_client_alarm_summary(parsed.summaries))
1443    }
1444
1445    /// Send a GetEnrollmentSummary request and return the list of event enrollments on the device.
1446    pub async fn get_enrollment_summary(
1447        &self,
1448        address: DataLinkAddress,
1449    ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1450        let invoke_id = self.next_invoke_id().await;
1451        let request = GetEnrollmentSummaryRequest { invoke_id };
1452        let tx = self.encode_with_growth(|w| {
1453            Npdu::new(0).encode(w)?;
1454            request.encode(w)
1455        })?;
1456        let payload = self
1457            .await_complex_ack_payload_or_error(
1458                address,
1459                &tx,
1460                invoke_id,
1461                SERVICE_GET_ENROLLMENT_SUMMARY,
1462                self.response_timeout,
1463            )
1464            .await?;
1465        let mut pr = Reader::new(&payload);
1466        let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1467        Ok(into_client_enrollment_summary(parsed.summaries))
1468    }
1469
1470    /// Send a GetEventInformation request and return active event summaries from the device.
1471    ///
1472    /// Pass `last_received_object_id` to page through results; the returned
1473    /// [`EventInformationResult::more_events`] indicates whether another call is needed.
1474    pub async fn get_event_information(
1475        &self,
1476        address: DataLinkAddress,
1477        last_received_object_id: Option<ObjectId>,
1478    ) -> Result<EventInformationResult, ClientError> {
1479        let invoke_id = self.next_invoke_id().await;
1480        let request = GetEventInformationRequest {
1481            last_received_object_id,
1482            invoke_id,
1483        };
1484        let tx = self.encode_with_growth(|w| {
1485            Npdu::new(0).encode(w)?;
1486            request.encode(w)
1487        })?;
1488        let payload = self
1489            .await_complex_ack_payload_or_error(
1490                address,
1491                &tx,
1492                invoke_id,
1493                SERVICE_GET_EVENT_INFORMATION,
1494                self.response_timeout,
1495            )
1496            .await?;
1497        let mut pr = Reader::new(&payload);
1498        let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1499        Ok(EventInformationResult {
1500            summaries: into_client_event_information(parsed.summaries),
1501            more_events: parsed.more_events,
1502        })
1503    }
1504
1505    /// Send an AcknowledgeAlarm request to the device.
1506    pub async fn acknowledge_alarm(
1507        &self,
1508        address: DataLinkAddress,
1509        mut request: AcknowledgeAlarmRequest<'_>,
1510    ) -> Result<(), ClientError> {
1511        request.invoke_id = self.next_invoke_id().await;
1512        let invoke_id = request.invoke_id;
1513        let tx = self.encode_with_growth(|w| {
1514            Npdu::new(0).encode(w)?;
1515            request.encode(w)
1516        })?;
1517        self.await_simple_ack_or_error(
1518            address,
1519            &tx,
1520            invoke_id,
1521            SERVICE_ACKNOWLEDGE_ALARM,
1522            self.response_timeout,
1523        )
1524        .await
1525    }
1526
1527    /// Read a contiguous byte range from a BACnet File object using stream access.
1528    ///
1529    /// `file_start_position` is the byte offset (may be negative for end-relative access).
1530    /// `requested_octet_count` is the number of bytes to read.
1531    pub async fn atomic_read_file_stream(
1532        &self,
1533        address: DataLinkAddress,
1534        file_object_id: ObjectId,
1535        file_start_position: i32,
1536        requested_octet_count: u32,
1537    ) -> Result<AtomicReadFileResult, ClientError> {
1538        let invoke_id = self.next_invoke_id().await;
1539        let request = AtomicReadFileRequest::stream(
1540            file_object_id,
1541            file_start_position,
1542            requested_octet_count,
1543            invoke_id,
1544        );
1545        self.atomic_read_file(address, request).await
1546    }
1547
1548    /// Read a range of records from a BACnet File object using record access.
1549    ///
1550    /// `file_start_record` is the first record index (may be negative for end-relative access).
1551    /// `requested_record_count` is the number of records to read.
1552    pub async fn atomic_read_file_record(
1553        &self,
1554        address: DataLinkAddress,
1555        file_object_id: ObjectId,
1556        file_start_record: i32,
1557        requested_record_count: u32,
1558    ) -> Result<AtomicReadFileResult, ClientError> {
1559        let invoke_id = self.next_invoke_id().await;
1560        let request = AtomicReadFileRequest::record(
1561            file_object_id,
1562            file_start_record,
1563            requested_record_count,
1564            invoke_id,
1565        );
1566        self.atomic_read_file(address, request).await
1567    }
1568
1569    async fn atomic_read_file(
1570        &self,
1571        address: DataLinkAddress,
1572        request: AtomicReadFileRequest,
1573    ) -> Result<AtomicReadFileResult, ClientError> {
1574        let invoke_id = request.invoke_id;
1575        let tx = self.encode_with_growth(|w| {
1576            Npdu::new(0).encode(w)?;
1577            request.encode(w)
1578        })?;
1579        let payload = self
1580            .await_complex_ack_payload_or_error(
1581                address,
1582                &tx,
1583                invoke_id,
1584                SERVICE_ATOMIC_READ_FILE,
1585                self.response_timeout,
1586            )
1587            .await?;
1588        let mut pr = Reader::new(&payload);
1589        let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1590        Ok(into_client_atomic_read_result(parsed))
1591    }
1592
1593    /// Write `file_data` to a BACnet File object starting at `file_start_position` using stream
1594    /// access. The returned result contains the actual start position used by the device.
1595    pub async fn atomic_write_file_stream(
1596        &self,
1597        address: DataLinkAddress,
1598        file_object_id: ObjectId,
1599        file_start_position: i32,
1600        file_data: &[u8],
1601    ) -> Result<AtomicWriteFileResult, ClientError> {
1602        let invoke_id = self.next_invoke_id().await;
1603        let request = AtomicWriteFileRequest::stream(
1604            file_object_id,
1605            file_start_position,
1606            file_data,
1607            invoke_id,
1608        );
1609        self.atomic_write_file(address, request).await
1610    }
1611
1612    /// Write records to a BACnet File object starting at `file_start_record` using record access.
1613    ///
1614    /// Each element of `file_record_data` is one record's raw bytes.
1615    pub async fn atomic_write_file_record(
1616        &self,
1617        address: DataLinkAddress,
1618        file_object_id: ObjectId,
1619        file_start_record: i32,
1620        file_record_data: &[&[u8]],
1621    ) -> Result<AtomicWriteFileResult, ClientError> {
1622        let invoke_id = self.next_invoke_id().await;
1623        let request = AtomicWriteFileRequest::record(
1624            file_object_id,
1625            file_start_record,
1626            file_record_data,
1627            invoke_id,
1628        );
1629        self.atomic_write_file(address, request).await
1630    }
1631
1632    async fn atomic_write_file(
1633        &self,
1634        address: DataLinkAddress,
1635        request: AtomicWriteFileRequest<'_>,
1636    ) -> Result<AtomicWriteFileResult, ClientError> {
1637        let invoke_id = request.invoke_id;
1638        let tx = self.encode_with_growth(|w| {
1639            Npdu::new(0).encode(w)?;
1640            request.encode(w)
1641        })?;
1642        let payload = self
1643            .await_complex_ack_payload_or_error(
1644                address,
1645                &tx,
1646                invoke_id,
1647                SERVICE_ATOMIC_WRITE_FILE,
1648                self.response_timeout,
1649            )
1650            .await?;
1651        let mut pr = Reader::new(&payload);
1652        let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1653        Ok(into_client_atomic_write_result(parsed))
1654    }
1655
1656    /// Send a SubscribeCOV request to start (or renew) a COV subscription on the device.
1657    ///
1658    /// Use [`cancel_cov_subscription`](Self::cancel_cov_subscription) to unsubscribe.
1659    pub async fn subscribe_cov(
1660        &self,
1661        address: DataLinkAddress,
1662        mut request: SubscribeCovRequest,
1663    ) -> Result<(), ClientError> {
1664        request.invoke_id = self.next_invoke_id().await;
1665        let invoke_id = request.invoke_id;
1666        let tx = self.encode_with_growth(|w| {
1667            Npdu::new(0).encode(w)?;
1668            request.encode(w)
1669        })?;
1670        self.await_simple_ack_or_error(
1671            address,
1672            &tx,
1673            invoke_id,
1674            SERVICE_SUBSCRIBE_COV,
1675            self.response_timeout,
1676        )
1677        .await
1678    }
1679
1680    /// Cancel an existing COV subscription identified by `subscriber_process_id` and
1681    /// `monitored_object_id`.
1682    pub async fn cancel_cov_subscription(
1683        &self,
1684        address: DataLinkAddress,
1685        subscriber_process_id: u32,
1686        monitored_object_id: ObjectId,
1687    ) -> Result<(), ClientError> {
1688        self.subscribe_cov(
1689            address,
1690            SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1691        )
1692        .await
1693    }
1694
1695    /// Send a SubscribeCOVProperty request to subscribe to changes of a specific property.
1696    ///
1697    /// Use [`cancel_cov_property_subscription`](Self::cancel_cov_property_subscription) to
1698    /// unsubscribe.
1699    pub async fn subscribe_cov_property(
1700        &self,
1701        address: DataLinkAddress,
1702        mut request: SubscribeCovPropertyRequest,
1703    ) -> Result<(), ClientError> {
1704        request.invoke_id = self.next_invoke_id().await;
1705        let invoke_id = request.invoke_id;
1706        let tx = self.encode_with_growth(|w| {
1707            Npdu::new(0).encode(w)?;
1708            request.encode(w)
1709        })?;
1710        self.await_simple_ack_or_error(
1711            address,
1712            &tx,
1713            invoke_id,
1714            SERVICE_SUBSCRIBE_COV_PROPERTY,
1715            self.response_timeout,
1716        )
1717        .await
1718    }
1719
1720    /// Cancel an existing COV property subscription.
1721    ///
1722    /// The combination of `subscriber_process_id`, `monitored_object_id`,
1723    /// `monitored_property_id`, and `monitored_property_array_index` must match the
1724    /// subscription to cancel.
1725    pub async fn cancel_cov_property_subscription(
1726        &self,
1727        address: DataLinkAddress,
1728        subscriber_process_id: u32,
1729        monitored_object_id: ObjectId,
1730        monitored_property_id: PropertyId,
1731        monitored_property_array_index: Option<u32>,
1732    ) -> Result<(), ClientError> {
1733        self.subscribe_cov_property(
1734            address,
1735            SubscribeCovPropertyRequest::cancel(
1736                subscriber_process_id,
1737                monitored_object_id,
1738                monitored_property_id,
1739                monitored_property_array_index,
1740                0,
1741            ),
1742        )
1743        .await
1744    }
1745
1746    /// Read a range of entries from a list/log property by absolute position.
1747    ///
1748    /// `reference_index` is the 1-based starting entry index. A positive `count` reads
1749    /// forward; negative reads backward from `reference_index`.
1750    pub async fn read_range_by_position(
1751        &self,
1752        address: DataLinkAddress,
1753        object_id: ObjectId,
1754        property_id: PropertyId,
1755        array_index: Option<u32>,
1756        reference_index: i32,
1757        count: i16,
1758    ) -> Result<ReadRangeResult, ClientError> {
1759        let invoke_id = self.next_invoke_id().await;
1760        let req = ReadRangeRequest::by_position(
1761            object_id,
1762            property_id,
1763            array_index,
1764            reference_index,
1765            count,
1766            invoke_id,
1767        );
1768        self.read_range_with_request(address, req).await
1769    }
1770
1771    /// Read a range of entries from a list/log property anchored at a sequence number.
1772    ///
1773    /// `reference_sequence` is the sequence number of the anchor entry. A positive `count`
1774    /// reads forward; negative reads backward.
1775    pub async fn read_range_by_sequence_number(
1776        &self,
1777        address: DataLinkAddress,
1778        object_id: ObjectId,
1779        property_id: PropertyId,
1780        array_index: Option<u32>,
1781        reference_sequence: u32,
1782        count: i16,
1783    ) -> Result<ReadRangeResult, ClientError> {
1784        let invoke_id = self.next_invoke_id().await;
1785        let req = ReadRangeRequest::by_sequence_number(
1786            object_id,
1787            property_id,
1788            array_index,
1789            reference_sequence,
1790            count,
1791            invoke_id,
1792        );
1793        self.read_range_with_request(address, req).await
1794    }
1795
1796    /// Read a range of entries from a list/log property anchored at a timestamp.
1797    ///
1798    /// `at` is the reference date and time. A positive `count` reads forward from that
1799    /// timestamp; negative reads backward.
1800    pub async fn read_range_by_time(
1801        &self,
1802        address: DataLinkAddress,
1803        object_id: ObjectId,
1804        property_id: PropertyId,
1805        array_index: Option<u32>,
1806        at: (Date, Time),
1807        count: i16,
1808    ) -> Result<ReadRangeResult, ClientError> {
1809        let (date, time) = at;
1810        let invoke_id = self.next_invoke_id().await;
1811        let req = ReadRangeRequest::by_time(
1812            object_id,
1813            property_id,
1814            array_index,
1815            date,
1816            time,
1817            count,
1818            invoke_id,
1819        );
1820        self.read_range_with_request(address, req).await
1821    }
1822
1823    async fn read_range_with_request(
1824        &self,
1825        address: DataLinkAddress,
1826        req: ReadRangeRequest,
1827    ) -> Result<ReadRangeResult, ClientError> {
1828        let invoke_id = req.invoke_id;
1829        let tx = self.encode_with_growth(|w| {
1830            Npdu::new(0).encode(w)?;
1831            req.encode(w)
1832        })?;
1833        let payload = self
1834            .await_complex_ack_payload_or_error(
1835                address,
1836                &tx,
1837                invoke_id,
1838                SERVICE_READ_RANGE,
1839                self.response_timeout,
1840            )
1841            .await?;
1842        let mut pr = Reader::new(&payload);
1843        let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1844        into_client_read_range(parsed)
1845    }
1846
1847    /// Wait up to `wait` for a single incoming COV notification (confirmed or unconfirmed).
1848    ///
1849    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1850    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1851    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1852    pub async fn recv_cov_notification(
1853        &self,
1854        wait: Duration,
1855    ) -> Result<Option<CovNotification>, ClientError> {
1856        let _io_lock = self.request_io_lock.lock().await;
1857        let deadline = tokio::time::Instant::now() + wait;
1858
1859        while tokio::time::Instant::now() < deadline {
1860            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1861            let mut rx = [0u8; 1500];
1862            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1863            let (n, source) = match recv {
1864                Ok(Ok(v)) => v,
1865                Ok(Err(e)) => return Err(e.into()),
1866                Err(_) => break,
1867            };
1868
1869            let apdu = extract_apdu(&rx[..n])?;
1870            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1871            match ApduType::from_u8(first >> 4) {
1872                Some(ApduType::UnconfirmedRequest) => {
1873                    let mut r = Reader::new(apdu);
1874                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1875                    if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1876                        continue;
1877                    }
1878                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1879                    return Ok(Some(into_client_cov_notification(source, false, cov)?));
1880                }
1881                Some(ApduType::ConfirmedRequest) => {
1882                    let mut r = Reader::new(apdu);
1883                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1884                    if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1885                        continue;
1886                    }
1887                    if header.segmented {
1888                        return Err(ClientError::UnsupportedResponse);
1889                    }
1890
1891                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1892                    self.send_simple_ack(
1893                        source,
1894                        header.invoke_id,
1895                        SERVICE_CONFIRMED_COV_NOTIFICATION,
1896                    )
1897                    .await?;
1898                    return Ok(Some(into_client_cov_notification(source, true, cov)?));
1899                }
1900                _ => continue,
1901            }
1902        }
1903
1904        Ok(None)
1905    }
1906
1907    /// Wait up to `wait` for a single incoming event notification (confirmed or unconfirmed).
1908    ///
1909    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1910    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1911    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1912    pub async fn recv_event_notification(
1913        &self,
1914        wait: Duration,
1915    ) -> Result<Option<EventNotification>, ClientError> {
1916        let _io_lock = self.request_io_lock.lock().await;
1917        let deadline = tokio::time::Instant::now() + wait;
1918
1919        while tokio::time::Instant::now() < deadline {
1920            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1921            let mut rx = [0u8; 1500];
1922            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1923            let (n, source) = match recv {
1924                Ok(Ok(v)) => v,
1925                Ok(Err(e)) => return Err(e.into()),
1926                Err(_) => break,
1927            };
1928
1929            let apdu = extract_apdu(&rx[..n])?;
1930            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1931            match ApduType::from_u8(first >> 4) {
1932                Some(ApduType::UnconfirmedRequest) => {
1933                    let mut r = Reader::new(apdu);
1934                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1935                    if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1936                        continue;
1937                    }
1938                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1939                    return Ok(Some(into_client_event_notification(
1940                        source,
1941                        false,
1942                        notification,
1943                    )));
1944                }
1945                Some(ApduType::ConfirmedRequest) => {
1946                    let mut r = Reader::new(apdu);
1947                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1948                    if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1949                        continue;
1950                    }
1951                    if header.segmented {
1952                        return Err(ClientError::UnsupportedResponse);
1953                    }
1954                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1955                    self.send_simple_ack(
1956                        source,
1957                        header.invoke_id,
1958                        SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1959                    )
1960                    .await?;
1961                    return Ok(Some(into_client_event_notification(
1962                        source,
1963                        true,
1964                        notification,
1965                    )));
1966                }
1967                _ => continue,
1968            }
1969        }
1970
1971        Ok(None)
1972    }
1973
1974    /// Send a ReadProperty request and return the property value as a [`ClientDataValue`].
1975    ///
1976    /// Use [`read_property_multiple`](Self::read_property_multiple) to fetch several
1977    /// properties in a single round-trip.
1978    pub async fn read_property(
1979        &self,
1980        address: DataLinkAddress,
1981        object_id: ObjectId,
1982        property_id: PropertyId,
1983    ) -> Result<ClientDataValue, ClientError> {
1984        let invoke_id = self.next_invoke_id().await;
1985        let req = ReadPropertyRequest {
1986            object_id,
1987            property_id,
1988            array_index: None,
1989            invoke_id,
1990        };
1991        let tx = self.encode_with_growth(|w| {
1992            Npdu::new(0).encode(w)?;
1993            req.encode(w)
1994        })?;
1995        let payload = self
1996            .await_complex_ack_payload_or_error(
1997                address,
1998                &tx,
1999                invoke_id,
2000                SERVICE_READ_PROPERTY,
2001                self.response_timeout,
2002            )
2003            .await?;
2004        let mut pr = Reader::new(&payload);
2005        let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
2006        into_client_value(parsed.value)
2007    }
2008
2009    /// Send a WriteProperty request to set a single property on the device.
2010    pub async fn write_property(
2011        &self,
2012        address: DataLinkAddress,
2013        mut request: WritePropertyRequest<'_>,
2014    ) -> Result<(), ClientError> {
2015        request.invoke_id = self.next_invoke_id().await;
2016        let invoke_id = request.invoke_id;
2017        let tx = self.encode_with_growth(|w| {
2018            Npdu::new(0).encode(w)?;
2019            request.encode(w)
2020        })?;
2021        self.await_simple_ack_or_error(
2022            address,
2023            &tx,
2024            invoke_id,
2025            SERVICE_WRITE_PROPERTY,
2026            self.response_timeout,
2027        )
2028        .await
2029    }
2030
2031    /// Send a ReadPropertyMultiple request to fetch several properties of one object in a
2032    /// single round-trip.
2033    ///
2034    /// Returns pairs of `(PropertyId, ClientDataValue)` in the order returned by the device.
2035    pub async fn read_property_multiple(
2036        &self,
2037        address: DataLinkAddress,
2038        object_id: ObjectId,
2039        property_ids: &[PropertyId],
2040    ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
2041        let refs: Vec<PropertyReference> = property_ids
2042            .iter()
2043            .copied()
2044            .map(|property_id| PropertyReference {
2045                property_id,
2046                array_index: None,
2047            })
2048            .collect();
2049        let specs = [ReadAccessSpecification {
2050            object_id,
2051            properties: &refs,
2052        }];
2053
2054        let invoke_id = self.next_invoke_id().await;
2055        let req = ReadPropertyMultipleRequest {
2056            specs: &specs,
2057            invoke_id,
2058        };
2059
2060        let tx = self.encode_with_growth(|w| {
2061            Npdu::new(0).encode(w)?;
2062            req.encode(w)
2063        })?;
2064        let payload = self
2065            .await_complex_ack_payload_or_error(
2066                address,
2067                &tx,
2068                invoke_id,
2069                SERVICE_READ_PROPERTY_MULTIPLE,
2070                self.response_timeout,
2071            )
2072            .await?;
2073        let mut pr = Reader::new(&payload);
2074        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2075        let mut out = Vec::new();
2076        for access in parsed.results {
2077            if access.object_id != object_id {
2078                continue;
2079            }
2080            for item in access.results {
2081                out.push((item.property_id, into_client_value(item.value)?));
2082            }
2083        }
2084        Ok(out)
2085    }
2086
2087    /// Send a WritePropertyMultiple request to set several properties of one object in a
2088    /// single round-trip.
2089    pub async fn write_property_multiple(
2090        &self,
2091        address: DataLinkAddress,
2092        object_id: ObjectId,
2093        properties: &[PropertyWriteSpec<'_>],
2094    ) -> Result<(), ClientError> {
2095        let invoke_id = self.next_invoke_id().await;
2096        let specs = [WriteAccessSpecification {
2097            object_id,
2098            properties,
2099        }];
2100        let req = WritePropertyMultipleRequest {
2101            specs: &specs,
2102            invoke_id,
2103        };
2104
2105        let tx = self.encode_with_growth(|w| {
2106            Npdu::new(0).encode(w)?;
2107            req.encode(w)
2108        })?;
2109        self.await_simple_ack_or_error(
2110            address,
2111            &tx,
2112            invoke_id,
2113            SERVICE_WRITE_PROPERTY_MULTIPLE,
2114            self.response_timeout,
2115        )
2116        .await
2117    }
2118
2119    /// Send a ConfirmedPrivateTransfer request and return the ack.
2120    pub async fn private_transfer(
2121        &self,
2122        address: DataLinkAddress,
2123        vendor_id: u32,
2124        service_number: u32,
2125        service_parameters: Option<&[u8]>,
2126    ) -> Result<PrivateTransferAck, ClientError> {
2127        let invoke_id = self.next_invoke_id().await;
2128        let req = ConfirmedPrivateTransferRequest {
2129            vendor_id,
2130            service_number,
2131            service_parameters,
2132            invoke_id,
2133        };
2134
2135        let tx = self.encode_with_growth(|w| {
2136            Npdu::new(0).encode(w)?;
2137            req.encode(w)
2138        })?;
2139        let payload = self
2140            .await_complex_ack_payload_or_error(
2141                address,
2142                &tx,
2143                invoke_id,
2144                SERVICE_CONFIRMED_PRIVATE_TRANSFER,
2145                self.response_timeout,
2146            )
2147            .await?;
2148        let mut r = Reader::new(&payload);
2149        PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
2150    }
2151
2152    /// Read multiple `(object_id, property_id)` pairs in a single ReadPropertyMultiple round-trip.
2153    ///
2154    /// All pairs must target the same device at `address`. Returns a map from each requested
2155    /// `(ObjectId, PropertyId)` to its value. Properties not returned by the device are absent
2156    /// from the map (the device may skip unknown properties rather than erroring).
2157    pub async fn read_many(
2158        &self,
2159        address: DataLinkAddress,
2160        requests: &[(ObjectId, PropertyId)],
2161    ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
2162        // Group by object to build ReadAccessSpecifications
2163        let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
2164        for &(oid, pid) in requests {
2165            if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
2166                entry.1.push(PropertyReference {
2167                    property_id: pid,
2168                    array_index: None,
2169                });
2170            } else {
2171                grouped.push((
2172                    oid,
2173                    vec![PropertyReference {
2174                        property_id: pid,
2175                        array_index: None,
2176                    }],
2177                ));
2178            }
2179        }
2180
2181        let specs: Vec<ReadAccessSpecification<'_>> = grouped
2182            .iter()
2183            .map(|(oid, props)| ReadAccessSpecification {
2184                object_id: *oid,
2185                properties: props,
2186            })
2187            .collect();
2188
2189        let invoke_id = self.next_invoke_id().await;
2190        let req = ReadPropertyMultipleRequest {
2191            specs: &specs,
2192            invoke_id,
2193        };
2194        let tx = self.encode_with_growth(|w| {
2195            Npdu::new(0).encode(w)?;
2196            req.encode(w)
2197        })?;
2198        let payload = self
2199            .await_complex_ack_payload_or_error(
2200                address,
2201                &tx,
2202                invoke_id,
2203                SERVICE_READ_PROPERTY_MULTIPLE,
2204                self.response_timeout,
2205            )
2206            .await?;
2207
2208        let mut pr = Reader::new(&payload);
2209        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2210        let mut out = HashMap::new();
2211        for access in parsed.results {
2212            for item in access.results {
2213                if let Ok(v) = into_client_value(item.value) {
2214                    out.insert((access.object_id, item.property_id), v);
2215                }
2216            }
2217        }
2218        Ok(out)
2219    }
2220
2221    /// Write multiple properties across one or more objects in a single WritePropertyMultiple
2222    /// round-trip.
2223    ///
2224    /// `writes` is a slice of `(object_id, property_id, value, priority)` tuples.
2225    /// `priority` may be `None` for the relinquish-default case. All writes target the same
2226    /// device at `address`. Takes [`ClientDataValue`] (the owned form) for ergonomic use.
2227    pub async fn write_many(
2228        &self,
2229        address: DataLinkAddress,
2230        writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2231    ) -> Result<(), ClientError> {
2232        use rustbac_core::types::{BitString, DataValue as DV};
2233
2234        fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2235            match v {
2236                ClientDataValue::Null => DV::Null,
2237                ClientDataValue::Boolean(b) => DV::Boolean(*b),
2238                ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2239                ClientDataValue::Signed(n) => DV::Signed(*n),
2240                ClientDataValue::Real(f) => DV::Real(*f),
2241                ClientDataValue::Double(f) => DV::Double(*f),
2242                ClientDataValue::OctetString(b) => DV::OctetString(b),
2243                ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2244                ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2245                    unused_bits: *unused_bits,
2246                    data,
2247                }),
2248                ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2249                ClientDataValue::Date(d) => DV::Date(*d),
2250                ClientDataValue::Time(t) => DV::Time(*t),
2251                ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2252                ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2253                    tag_num: *tag_num,
2254                    values: values.iter().map(cv_to_dv).collect(),
2255                },
2256            }
2257        }
2258
2259        // Pre-convert values so DataValue borrows from the input slice lifetime.
2260        let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2261            .iter()
2262            .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2263            .collect();
2264
2265        // Group by object to build WriteAccessSpecifications.
2266        let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2267        for (oid, pid, val, prio) in &converted {
2268            let spec = PropertyWriteSpec {
2269                property_id: *pid,
2270                array_index: None,
2271                value: val.clone(),
2272                priority: *prio,
2273            };
2274            if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2275                entry.1.push(spec);
2276            } else {
2277                grouped.push((*oid, vec![spec]));
2278            }
2279        }
2280
2281        let specs: Vec<WriteAccessSpecification<'_>> = grouped
2282            .iter()
2283            .map(|(oid, props)| WriteAccessSpecification {
2284                object_id: *oid,
2285                properties: props,
2286            })
2287            .collect();
2288
2289        let invoke_id = self.next_invoke_id().await;
2290        let req = WritePropertyMultipleRequest {
2291            specs: &specs,
2292            invoke_id,
2293        };
2294        let tx = self.encode_with_growth(|w| {
2295            Npdu::new(0).encode(w)?;
2296            req.encode(w)
2297        })?;
2298        self.await_simple_ack_or_error(
2299            address,
2300            &tx,
2301            invoke_id,
2302            SERVICE_WRITE_PROPERTY_MULTIPLE,
2303            self.response_timeout,
2304        )
2305        .await
2306    }
2307}
2308
2309fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2310    let mut r = Reader::new(payload);
2311    let _npdu = Npdu::decode(&mut r)?;
2312    r.read_exact(r.remaining()).map_err(ClientError::from)
2313}
2314
2315// ─────────────────────────────────────────────────────────────────────────────
2316// Inline server dispatch
2317// ─────────────────────────────────────────────────────────────────────────────
2318
2319/// Handle an incoming BACnet service request on behalf of the client's inline server handler.
2320///
2321/// This is the core dispatch logic extracted from [`BacnetServer::handle_frame`] so that
2322/// `BacnetClient` can service requests inline while waiting for its own responses.
2323async fn dispatch_incoming_request<D: DataLink>(
2324    datalink: &D,
2325    handler: &dyn crate::server::ServiceHandler,
2326    device_id: u32,
2327    vendor_id: u16,
2328    frame: &[u8],
2329    source: DataLinkAddress,
2330) -> Result<(), ClientError> {
2331    let mut r = Reader::new(frame);
2332    let _npdu = match Npdu::decode(&mut r) {
2333        Ok(n) => n,
2334        Err(_) => return Ok(()),
2335    };
2336
2337    if r.is_empty() {
2338        return Ok(());
2339    }
2340
2341    let first = match r.peek_u8() {
2342        Ok(b) => b,
2343        Err(_) => return Ok(()),
2344    };
2345    let apdu_type = ApduType::from_u8(first >> 4);
2346
2347    match apdu_type {
2348        Some(ApduType::UnconfirmedRequest) => {
2349            let header = match UnconfirmedRequestHeader::decode(&mut r) {
2350                Ok(h) => h,
2351                Err(_) => return Ok(()),
2352            };
2353            if header.service_choice == 0x08 {
2354                // Who-Is — check optional limits then respond with I-Am.
2355                let limits = dispatch_decode_who_is_limits(&mut r);
2356                if dispatch_matches_who_is(device_id, limits) {
2357                    dispatch_send_i_am(datalink, device_id, vendor_id, source).await;
2358                }
2359            }
2360            // All other unconfirmed services are ignored.
2361        }
2362        Some(ApduType::ConfirmedRequest) => {
2363            let header = match ConfirmedRequestHeader::decode(&mut r) {
2364                Ok(h) => h,
2365                Err(_) => return Ok(()),
2366            };
2367            let invoke_id = header.invoke_id;
2368            match header.service_choice {
2369                SERVICE_READ_PROPERTY => {
2370                    dispatch_handle_read_property(datalink, handler, &mut r, invoke_id, source)
2371                        .await;
2372                }
2373                SERVICE_WRITE_PROPERTY => {
2374                    dispatch_handle_write_property(datalink, handler, &mut r, invoke_id, source)
2375                        .await;
2376                }
2377                SERVICE_READ_PROPERTY_MULTIPLE => {
2378                    dispatch_handle_read_property_multiple(
2379                        datalink, handler, &mut r, invoke_id, source,
2380                    )
2381                    .await;
2382                }
2383                SERVICE_WRITE_PROPERTY_MULTIPLE => {
2384                    dispatch_handle_write_property_multiple(
2385                        datalink, handler, &mut r, invoke_id, source,
2386                    )
2387                    .await;
2388                }
2389                SERVICE_SUBSCRIBE_COV => {
2390                    dispatch_handle_subscribe_cov(datalink, handler, &mut r, invoke_id, source)
2391                        .await;
2392                }
2393                SERVICE_CREATE_OBJECT => {
2394                    dispatch_handle_create_object(datalink, handler, &mut r, invoke_id, source)
2395                        .await;
2396                }
2397                SERVICE_DELETE_OBJECT => {
2398                    dispatch_handle_delete_object(datalink, handler, &mut r, invoke_id, source)
2399                        .await;
2400                }
2401                _ => {
2402                    // Unknown service — send Reject with UNRECOGNIZED_SERVICE (0x08).
2403                    dispatch_send_reject(datalink, invoke_id, 0x08, source).await;
2404                }
2405            }
2406        }
2407        _ => {
2408            // Not a request — ignore.
2409        }
2410    }
2411
2412    Ok(())
2413}
2414
2415// ── Inline dispatch helpers ──────────────────────────────────────────────────
2416
2417async fn dispatch_send_i_am<D: DataLink>(
2418    datalink: &D,
2419    device_id: u32,
2420    vendor_id: u16,
2421    target: DataLinkAddress,
2422) {
2423    let device_oid = ObjectId::new(ObjectType::Device, device_id);
2424    let req = IAmRequest {
2425        device_id: device_oid,
2426        max_apdu: 1476,
2427        segmentation: 3, // no-segmentation
2428        vendor_id: vendor_id as u32,
2429    };
2430    let mut buf = [0u8; 128];
2431    let mut w = Writer::new(&mut buf);
2432    if Npdu::new(0).encode(&mut w).is_err() {
2433        return;
2434    }
2435    if req.encode(&mut w).is_err() {
2436        return;
2437    }
2438    let _ = datalink.send(target, w.as_written()).await;
2439}
2440
2441async fn dispatch_handle_read_property<D: DataLink>(
2442    datalink: &D,
2443    handler: &dyn crate::server::ServiceHandler,
2444    r: &mut Reader<'_>,
2445    invoke_id: u8,
2446    source: DataLinkAddress,
2447) {
2448    let object_id = match crate::decode_ctx_object_id(r) {
2449        Ok(v) => v,
2450        Err(_) => return,
2451    };
2452    let property_id_raw = match crate::decode_ctx_unsigned(r) {
2453        Ok(v) => v,
2454        Err(_) => return,
2455    };
2456    let property_id = PropertyId::from_u32(property_id_raw);
2457
2458    // Optional array index [2].
2459    let array_index = dispatch_decode_optional_array_index(r);
2460
2461    match handler.read_property(object_id, property_id, array_index) {
2462        Ok(value) => {
2463            let borrowed = dispatch_client_value_to_borrowed(&value);
2464            let mut buf = [0u8; 1400];
2465            let mut w = Writer::new(&mut buf);
2466            if Npdu::new(0).encode(&mut w).is_err() {
2467                return;
2468            }
2469            if (ComplexAckHeader {
2470                segmented: false,
2471                more_follows: false,
2472                invoke_id,
2473                sequence_number: None,
2474                proposed_window_size: None,
2475                service_choice: SERVICE_READ_PROPERTY,
2476            })
2477            .encode(&mut w)
2478            .is_err()
2479            {
2480                return;
2481            }
2482            if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2483                return;
2484            }
2485            if encode_ctx_unsigned(&mut w, 1, property_id.to_u32()).is_err() {
2486                return;
2487            }
2488            if (Tag::Opening { tag_num: 3 }).encode(&mut w).is_err() {
2489                return;
2490            }
2491            if encode_application_data_value(&mut w, &borrowed).is_err() {
2492                return;
2493            }
2494            if (Tag::Closing { tag_num: 3 }).encode(&mut w).is_err() {
2495                return;
2496            }
2497            let _ = datalink.send(source, w.as_written()).await;
2498        }
2499        Err(err) => {
2500            dispatch_send_error(datalink, invoke_id, SERVICE_READ_PROPERTY, err, source).await;
2501        }
2502    }
2503}
2504
2505async fn dispatch_handle_write_property<D: DataLink>(
2506    datalink: &D,
2507    handler: &dyn crate::server::ServiceHandler,
2508    r: &mut Reader<'_>,
2509    invoke_id: u8,
2510    source: DataLinkAddress,
2511) {
2512    let object_id = match crate::decode_ctx_object_id(r) {
2513        Ok(v) => v,
2514        Err(_) => return,
2515    };
2516    let property_id_raw = match crate::decode_ctx_unsigned(r) {
2517        Ok(v) => v,
2518        Err(_) => return,
2519    };
2520    let property_id = PropertyId::from_u32(property_id_raw);
2521
2522    // Optional array index [2].
2523    let next_tag = match Tag::decode(r) {
2524        Ok(t) => t,
2525        Err(_) => return,
2526    };
2527    let (array_index, value_start_tag) = match next_tag {
2528        Tag::Context { tag_num: 2, len } => {
2529            let idx = match decode_unsigned(r, len as usize) {
2530                Ok(v) => v,
2531                Err(_) => return,
2532            };
2533            let vt = match Tag::decode(r) {
2534                Ok(t) => t,
2535                Err(_) => return,
2536            };
2537            (Some(idx), vt)
2538        }
2539        other => (None, other),
2540    };
2541
2542    if value_start_tag != (Tag::Opening { tag_num: 3 }) {
2543        return;
2544    }
2545
2546    let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2547        Ok(v) => v,
2548        Err(_) => return,
2549    };
2550
2551    match Tag::decode(r) {
2552        Ok(Tag::Closing { tag_num: 3 }) => {}
2553        _ => return,
2554    }
2555
2556    // Optional priority [4].
2557    let priority = if !r.is_empty() {
2558        match Tag::decode(r) {
2559            Ok(Tag::Context { tag_num: 4, len }) => match decode_unsigned(r, len as usize) {
2560                Ok(p) => Some(p as u8),
2561                Err(_) => return,
2562            },
2563            _ => None,
2564        }
2565    } else {
2566        None
2567    };
2568
2569    let client_val = crate::data_value_to_client(val);
2570
2571    match handler.write_property(object_id, property_id, array_index, client_val, priority) {
2572        Ok(()) => {
2573            let mut buf = [0u8; 32];
2574            let mut w = Writer::new(&mut buf);
2575            if Npdu::new(0).encode(&mut w).is_err() {
2576                return;
2577            }
2578            if (SimpleAck {
2579                invoke_id,
2580                service_choice: SERVICE_WRITE_PROPERTY,
2581            })
2582            .encode(&mut w)
2583            .is_err()
2584            {
2585                return;
2586            }
2587            let _ = datalink.send(source, w.as_written()).await;
2588        }
2589        Err(err) => {
2590            dispatch_send_error(datalink, invoke_id, SERVICE_WRITE_PROPERTY, err, source).await;
2591        }
2592    }
2593}
2594
2595async fn dispatch_handle_read_property_multiple<D: DataLink>(
2596    datalink: &D,
2597    handler: &dyn crate::server::ServiceHandler,
2598    r: &mut Reader<'_>,
2599    invoke_id: u8,
2600    source: DataLinkAddress,
2601) {
2602    type PropRefs = Vec<(PropertyId, Option<u32>)>;
2603    let mut specs: Vec<(ObjectId, PropRefs)> = Vec::new();
2604
2605    while !r.is_empty() {
2606        let object_id = match crate::decode_ctx_object_id(r) {
2607            Ok(v) => v,
2608            Err(_) => return,
2609        };
2610        match Tag::decode(r) {
2611            Ok(Tag::Opening { tag_num: 1 }) => {}
2612            _ => return,
2613        }
2614        let mut props: Vec<(PropertyId, Option<u32>)> = Vec::new();
2615        loop {
2616            let tag = match Tag::decode(r) {
2617                Ok(t) => t,
2618                Err(_) => return,
2619            };
2620            if tag == (Tag::Closing { tag_num: 1 }) {
2621                break;
2622            }
2623            let property_id = match tag {
2624                Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2625                    Ok(v) => PropertyId::from_u32(v),
2626                    Err(_) => return,
2627                },
2628                _ => return,
2629            };
2630            let array_index = if !r.is_empty() {
2631                match dispatch_peek_context_tag(r, 1) {
2632                    Some(len) => {
2633                        match Tag::decode(r) {
2634                            Ok(_) => {}
2635                            Err(_) => return,
2636                        }
2637                        match decode_unsigned(r, len as usize) {
2638                            Ok(idx) => Some(idx),
2639                            Err(_) => return,
2640                        }
2641                    }
2642                    None => None,
2643                }
2644            } else {
2645                None
2646            };
2647            props.push((property_id, array_index));
2648        }
2649        specs.push((object_id, props));
2650    }
2651
2652    let mut buf = [0u8; 1400];
2653    let mut w = Writer::new(&mut buf);
2654    if Npdu::new(0).encode(&mut w).is_err() {
2655        return;
2656    }
2657    if (ComplexAckHeader {
2658        segmented: false,
2659        more_follows: false,
2660        invoke_id,
2661        sequence_number: None,
2662        proposed_window_size: None,
2663        service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2664    })
2665    .encode(&mut w)
2666    .is_err()
2667    {
2668        return;
2669    }
2670
2671    for (object_id, props) in &specs {
2672        if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2673            return;
2674        }
2675        if (Tag::Opening { tag_num: 1 }).encode(&mut w).is_err() {
2676            return;
2677        }
2678
2679        for (property_id, array_index) in props {
2680            if encode_ctx_unsigned(&mut w, 2, property_id.to_u32()).is_err() {
2681                return;
2682            }
2683            if let Some(idx) = array_index {
2684                if encode_ctx_unsigned(&mut w, 3, *idx).is_err() {
2685                    return;
2686                }
2687            }
2688            if (Tag::Opening { tag_num: 4 }).encode(&mut w).is_err() {
2689                return;
2690            }
2691
2692            match handler.read_property(*object_id, *property_id, *array_index) {
2693                Ok(value) => {
2694                    let borrowed = dispatch_client_value_to_borrowed(&value);
2695                    if encode_application_data_value(&mut w, &borrowed).is_err() {
2696                        return;
2697                    }
2698                }
2699                Err(err) => {
2700                    let (class, code) = dispatch_error_class_code(err);
2701                    if (Tag::Opening { tag_num: 5 }).encode(&mut w).is_err() {
2702                        return;
2703                    }
2704                    if encode_ctx_unsigned(&mut w, 0, class as u32).is_err() {
2705                        return;
2706                    }
2707                    if encode_ctx_unsigned(&mut w, 1, code as u32).is_err() {
2708                        return;
2709                    }
2710                    if (Tag::Closing { tag_num: 5 }).encode(&mut w).is_err() {
2711                        return;
2712                    }
2713                }
2714            }
2715
2716            if (Tag::Closing { tag_num: 4 }).encode(&mut w).is_err() {
2717                return;
2718            }
2719        }
2720
2721        if (Tag::Closing { tag_num: 1 }).encode(&mut w).is_err() {
2722            return;
2723        }
2724    }
2725
2726    let _ = datalink.send(source, w.as_written()).await;
2727}
2728
2729async fn dispatch_handle_write_property_multiple<D: DataLink>(
2730    datalink: &D,
2731    handler: &dyn crate::server::ServiceHandler,
2732    r: &mut Reader<'_>,
2733    invoke_id: u8,
2734    source: DataLinkAddress,
2735) {
2736    while !r.is_empty() {
2737        let object_id = match crate::decode_ctx_object_id(r) {
2738            Ok(v) => v,
2739            Err(_) => return,
2740        };
2741        match Tag::decode(r) {
2742            Ok(Tag::Opening { tag_num: 1 }) => {}
2743            _ => return,
2744        }
2745        loop {
2746            let tag = match Tag::decode(r) {
2747                Ok(t) => t,
2748                Err(_) => return,
2749            };
2750            if tag == (Tag::Closing { tag_num: 1 }) {
2751                break;
2752            }
2753            let property_id = match tag {
2754                Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2755                    Ok(v) => PropertyId::from_u32(v),
2756                    Err(_) => return,
2757                },
2758                _ => return,
2759            };
2760            let array_index = if !r.is_empty() {
2761                match dispatch_peek_context_tag(r, 1) {
2762                    Some(len) => {
2763                        let _ = Tag::decode(r);
2764                        decode_unsigned(r, len as usize).ok()
2765                    }
2766                    None => None,
2767                }
2768            } else {
2769                None
2770            };
2771            match Tag::decode(r) {
2772                Ok(Tag::Opening { tag_num: 2 }) => {}
2773                _ => return,
2774            }
2775            let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2776                Ok(v) => v,
2777                Err(_) => return,
2778            };
2779            match Tag::decode(r) {
2780                Ok(Tag::Closing { tag_num: 2 }) => {}
2781                _ => return,
2782            }
2783            let priority = if !r.is_empty() {
2784                match dispatch_peek_context_tag(r, 3) {
2785                    Some(len) => {
2786                        let _ = Tag::decode(r);
2787                        decode_unsigned(r, len as usize).ok().map(|p| p as u8)
2788                    }
2789                    None => None,
2790                }
2791            } else {
2792                None
2793            };
2794            let client_val = crate::data_value_to_client(val);
2795            if let Err(err) =
2796                handler.write_property(object_id, property_id, array_index, client_val, priority)
2797            {
2798                dispatch_send_error(
2799                    datalink,
2800                    invoke_id,
2801                    SERVICE_WRITE_PROPERTY_MULTIPLE,
2802                    err,
2803                    source,
2804                )
2805                .await;
2806                return;
2807            }
2808        }
2809    }
2810    // All properties written successfully — send SimpleAck.
2811    let mut buf = [0u8; 32];
2812    let mut w = Writer::new(&mut buf);
2813    if Npdu::new(0).encode(&mut w).is_err() {
2814        return;
2815    }
2816    if (SimpleAck {
2817        invoke_id,
2818        service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
2819    })
2820    .encode(&mut w)
2821    .is_err()
2822    {
2823        return;
2824    }
2825    let _ = datalink.send(source, w.as_written()).await;
2826}
2827
2828async fn dispatch_handle_subscribe_cov<D: DataLink>(
2829    datalink: &D,
2830    handler: &dyn crate::server::ServiceHandler,
2831    r: &mut Reader<'_>,
2832    invoke_id: u8,
2833    source: DataLinkAddress,
2834) {
2835    // subscriberProcessIdentifier [0]
2836    let subscriber_process_id = match Tag::decode(r) {
2837        Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2838            Ok(v) => v,
2839            Err(_) => return,
2840        },
2841        _ => return,
2842    };
2843    // monitoredObjectIdentifier [1]
2844    let monitored_object_id = match crate::decode_ctx_object_id(r) {
2845        Ok(v) => v,
2846        Err(_) => return,
2847    };
2848    // issueConfirmedNotifications [2]
2849    let issue_confirmed = match Tag::decode(r) {
2850        Ok(Tag::Context { tag_num: 2, len }) => match decode_unsigned(r, len as usize) {
2851            Ok(v) => v != 0,
2852            Err(_) => return,
2853        },
2854        _ => return,
2855    };
2856    // optional lifetime [3]
2857    let lifetime = if !r.is_empty() {
2858        match dispatch_peek_context_tag(r, 3) {
2859            Some(len) => {
2860                let _ = Tag::decode(r);
2861                decode_unsigned(r, len as usize).ok()
2862            }
2863            None => None,
2864        }
2865    } else {
2866        None
2867    };
2868
2869    match handler.subscribe_cov(
2870        subscriber_process_id,
2871        monitored_object_id,
2872        issue_confirmed,
2873        lifetime,
2874    ) {
2875        Ok(()) => {
2876            let mut buf = [0u8; 32];
2877            let mut w = Writer::new(&mut buf);
2878            if Npdu::new(0).encode(&mut w).is_err() {
2879                return;
2880            }
2881            if (SimpleAck {
2882                invoke_id,
2883                service_choice: SERVICE_SUBSCRIBE_COV,
2884            })
2885            .encode(&mut w)
2886            .is_err()
2887            {
2888                return;
2889            }
2890            let _ = datalink.send(source, w.as_written()).await;
2891        }
2892        Err(err) => {
2893            dispatch_send_error(datalink, invoke_id, SERVICE_SUBSCRIBE_COV, err, source).await;
2894        }
2895    }
2896}
2897
2898async fn dispatch_handle_create_object<D: DataLink>(
2899    datalink: &D,
2900    handler: &dyn crate::server::ServiceHandler,
2901    r: &mut Reader<'_>,
2902    invoke_id: u8,
2903    source: DataLinkAddress,
2904) {
2905    // objectSpecifier [0] opening
2906    match Tag::decode(r) {
2907        Ok(Tag::Opening { tag_num: 0 }) => {}
2908        _ => return,
2909    }
2910    // objectType [0] — context-tagged enumerated
2911    let object_type_raw = match Tag::decode(r) {
2912        Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2913            Ok(v) => v,
2914            Err(_) => return,
2915        },
2916        _ => return,
2917    };
2918    let object_type = ObjectType::from_u16(object_type_raw as u16);
2919    // objectSpecifier [0] closing
2920    match Tag::decode(r) {
2921        Ok(Tag::Closing { tag_num: 0 }) => {}
2922        _ => return,
2923    }
2924
2925    match handler.create_object(object_type) {
2926        Ok(created_id) => {
2927            let mut buf = [0u8; 64];
2928            let mut w = Writer::new(&mut buf);
2929            if Npdu::new(0).encode(&mut w).is_err() {
2930                return;
2931            }
2932            if (ComplexAckHeader {
2933                segmented: false,
2934                more_follows: false,
2935                invoke_id,
2936                sequence_number: None,
2937                proposed_window_size: None,
2938                service_choice: SERVICE_CREATE_OBJECT,
2939            })
2940            .encode(&mut w)
2941            .is_err()
2942            {
2943                return;
2944            }
2945            if encode_ctx_unsigned(&mut w, 0, created_id.raw()).is_err() {
2946                return;
2947            }
2948            let _ = datalink.send(source, w.as_written()).await;
2949        }
2950        Err(err) => {
2951            dispatch_send_error(datalink, invoke_id, SERVICE_CREATE_OBJECT, err, source).await;
2952        }
2953    }
2954}
2955
2956async fn dispatch_handle_delete_object<D: DataLink>(
2957    datalink: &D,
2958    handler: &dyn crate::server::ServiceHandler,
2959    r: &mut Reader<'_>,
2960    invoke_id: u8,
2961    source: DataLinkAddress,
2962) {
2963    let object_id = match crate::decode_ctx_object_id(r) {
2964        Ok(v) => v,
2965        Err(_) => return,
2966    };
2967
2968    match handler.delete_object(object_id) {
2969        Ok(()) => {
2970            let mut buf = [0u8; 32];
2971            let mut w = Writer::new(&mut buf);
2972            if Npdu::new(0).encode(&mut w).is_err() {
2973                return;
2974            }
2975            if (SimpleAck {
2976                invoke_id,
2977                service_choice: SERVICE_DELETE_OBJECT,
2978            })
2979            .encode(&mut w)
2980            .is_err()
2981            {
2982                return;
2983            }
2984            let _ = datalink.send(source, w.as_written()).await;
2985        }
2986        Err(err) => {
2987            dispatch_send_error(datalink, invoke_id, SERVICE_DELETE_OBJECT, err, source).await;
2988        }
2989    }
2990}
2991
2992async fn dispatch_send_error<D: DataLink>(
2993    datalink: &D,
2994    invoke_id: u8,
2995    service_choice: u8,
2996    err: crate::server::BacnetServiceError,
2997    target: DataLinkAddress,
2998) {
2999    let (class, code) = dispatch_error_class_code(err);
3000    let mut buf = [0u8; 64];
3001    let mut w = Writer::new(&mut buf);
3002    if Npdu::new(0).encode(&mut w).is_err() {
3003        return;
3004    }
3005    // Error PDU header: type=5 (Error)
3006    if w.write_u8(0x50).is_err() {
3007        return;
3008    }
3009    if w.write_u8(invoke_id).is_err() {
3010        return;
3011    }
3012    if w.write_u8(service_choice).is_err() {
3013        return;
3014    }
3015    // error-class: application Enumerated
3016    if (Tag::Application {
3017        tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3018        len: 1,
3019    })
3020    .encode(&mut w)
3021    .is_err()
3022    {
3023        return;
3024    }
3025    if w.write_u8(class).is_err() {
3026        return;
3027    }
3028    // error-code: application Enumerated
3029    if (Tag::Application {
3030        tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3031        len: 1,
3032    })
3033    .encode(&mut w)
3034    .is_err()
3035    {
3036        return;
3037    }
3038    if w.write_u8(code).is_err() {
3039        return;
3040    }
3041    let _ = datalink.send(target, w.as_written()).await;
3042}
3043
3044async fn dispatch_send_reject<D: DataLink>(
3045    datalink: &D,
3046    invoke_id: u8,
3047    reason: u8,
3048    target: DataLinkAddress,
3049) {
3050    let mut buf = [0u8; 16];
3051    let mut w = Writer::new(&mut buf);
3052    if Npdu::new(0).encode(&mut w).is_err() {
3053        return;
3054    }
3055    // Reject PDU: type=6 (Reject)
3056    if w.write_u8(0x60).is_err() {
3057        return;
3058    }
3059    if w.write_u8(invoke_id).is_err() {
3060        return;
3061    }
3062    if w.write_u8(reason).is_err() {
3063        return;
3064    }
3065    let _ = datalink.send(target, w.as_written()).await;
3066}
3067
3068/// Decode Who-Is optional [0] low-limit and [1] high-limit (dispatch-local copy).
3069fn dispatch_decode_who_is_limits(r: &mut Reader<'_>) -> Option<(u32, u32)> {
3070    if r.is_empty() {
3071        return None;
3072    }
3073    let tag0 = Tag::decode(r).ok()?;
3074    let low = match tag0 {
3075        Tag::Context { tag_num: 0, len } => decode_unsigned(r, len as usize).ok()?,
3076        _ => return None,
3077    };
3078    let tag1 = Tag::decode(r).ok()?;
3079    let high = match tag1 {
3080        Tag::Context { tag_num: 1, len } => decode_unsigned(r, len as usize).ok()?,
3081        _ => return None,
3082    };
3083    Some((low, high))
3084}
3085
3086fn dispatch_error_class_code(err: crate::server::BacnetServiceError) -> (u8, u8) {
3087    use crate::server::BacnetServiceError;
3088    match err {
3089        BacnetServiceError::UnknownObject => (1, 31),
3090        BacnetServiceError::UnknownProperty => (2, 32),
3091        BacnetServiceError::WriteAccessDenied => (2, 40),
3092        BacnetServiceError::InvalidDataType => (2, 9),
3093        BacnetServiceError::ServiceNotSupported => (5, 53),
3094    }
3095}
3096
3097fn dispatch_matches_who_is(device_id: u32, limits: Option<(u32, u32)>) -> bool {
3098    match limits {
3099        None => true,
3100        Some((low, high)) => device_id >= low && device_id <= high,
3101    }
3102}
3103
3104fn dispatch_decode_optional_array_index(r: &mut Reader<'_>) -> Option<u32> {
3105    if r.is_empty() {
3106        return None;
3107    }
3108    let tag = Tag::decode(r).ok()?;
3109    match tag {
3110        Tag::Context { tag_num: 2, len } => decode_unsigned(r, len as usize).ok(),
3111        _ => None,
3112    }
3113}
3114
3115fn dispatch_peek_context_tag(r: &mut Reader<'_>, tag_num: u8) -> Option<u32> {
3116    let first = r.peek_u8().ok()?;
3117    let is_context = (first & 0x08) != 0 && (first & 0x07) < 6;
3118    if !is_context {
3119        return None;
3120    }
3121    let this_tag_num = first >> 4;
3122    if this_tag_num != tag_num {
3123        return None;
3124    }
3125    let short_len = first & 0x07;
3126    if short_len < 5 {
3127        Some(short_len as u32)
3128    } else {
3129        None
3130    }
3131}
3132
3133fn dispatch_client_value_to_borrowed(val: &ClientDataValue) -> DataValue<'_> {
3134    match val {
3135        ClientDataValue::Null => DataValue::Null,
3136        ClientDataValue::Boolean(v) => DataValue::Boolean(*v),
3137        ClientDataValue::Unsigned(v) => DataValue::Unsigned(*v),
3138        ClientDataValue::Signed(v) => DataValue::Signed(*v),
3139        ClientDataValue::Real(v) => DataValue::Real(*v),
3140        ClientDataValue::Double(v) => DataValue::Double(*v),
3141        ClientDataValue::OctetString(v) => DataValue::OctetString(v),
3142        ClientDataValue::CharacterString(v) => DataValue::CharacterString(v),
3143        ClientDataValue::BitString { unused_bits, data } => {
3144            DataValue::BitString(rustbac_core::types::BitString {
3145                unused_bits: *unused_bits,
3146                data,
3147            })
3148        }
3149        ClientDataValue::Enumerated(v) => DataValue::Enumerated(*v),
3150        ClientDataValue::Date(v) => DataValue::Date(*v),
3151        ClientDataValue::Time(v) => DataValue::Time(*v),
3152        ClientDataValue::ObjectId(v) => DataValue::ObjectId(*v),
3153        ClientDataValue::Constructed { tag_num, values } => DataValue::Constructed {
3154            tag_num: *tag_num,
3155            values: values
3156                .iter()
3157                .map(dispatch_client_value_to_borrowed)
3158                .collect(),
3159        },
3160    }
3161}
3162
3163fn remote_service_error(err: BacnetError) -> ClientError {
3164    ClientError::RemoteServiceError {
3165        service_choice: err.service_choice,
3166        error_class_raw: err.error_class,
3167        error_code_raw: err.error_code,
3168        error_class: err.error_class.and_then(ErrorClass::from_u32),
3169        error_code: err.error_code.and_then(ErrorCode::from_u32),
3170    }
3171}
3172
3173fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
3174    Ok(match value {
3175        DataValue::Null => ClientDataValue::Null,
3176        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
3177        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
3178        DataValue::Signed(v) => ClientDataValue::Signed(v),
3179        DataValue::Real(v) => ClientDataValue::Real(v),
3180        DataValue::Double(v) => ClientDataValue::Double(v),
3181        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
3182        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
3183        DataValue::BitString(v) => ClientDataValue::BitString {
3184            unused_bits: v.unused_bits,
3185            data: v.data.to_vec(),
3186        },
3187        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
3188        DataValue::Date(v) => ClientDataValue::Date(v),
3189        DataValue::Time(v) => ClientDataValue::Time(v),
3190        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
3191        DataValue::Constructed { tag_num, values } => {
3192            let mut children = Vec::with_capacity(values.len());
3193            for child in values {
3194                children.push(into_client_value(child)?);
3195            }
3196            ClientDataValue::Constructed {
3197                tag_num,
3198                values: children,
3199            }
3200        }
3201    })
3202}
3203
3204fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
3205    value
3206        .into_iter()
3207        .map(|item| AlarmSummaryItem {
3208            object_id: item.object_id,
3209            alarm_state_raw: item.alarm_state,
3210            alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3211                item.alarm_state,
3212            ),
3213            acknowledged_transitions: ClientBitString {
3214                unused_bits: item.acknowledged_transitions.unused_bits,
3215                data: item.acknowledged_transitions.data.to_vec(),
3216            },
3217        })
3218        .collect()
3219}
3220
3221fn into_client_enrollment_summary(
3222    value: Vec<CoreEnrollmentSummaryItem>,
3223) -> Vec<EnrollmentSummaryItem> {
3224    value
3225        .into_iter()
3226        .map(|item| EnrollmentSummaryItem {
3227            object_id: item.object_id,
3228            event_type: item.event_type,
3229            event_state_raw: item.event_state,
3230            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3231                item.event_state,
3232            ),
3233            priority: item.priority,
3234            notification_class: item.notification_class,
3235        })
3236        .collect()
3237}
3238
3239fn into_client_event_information(
3240    value: Vec<CoreEventSummaryItem<'_>>,
3241) -> Vec<EventInformationItem> {
3242    value
3243        .into_iter()
3244        .map(|item| EventInformationItem {
3245            object_id: item.object_id,
3246            event_state_raw: item.event_state,
3247            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3248                item.event_state,
3249            ),
3250            acknowledged_transitions: ClientBitString {
3251                unused_bits: item.acknowledged_transitions.unused_bits,
3252                data: item.acknowledged_transitions.data.to_vec(),
3253            },
3254            notify_type: item.notify_type,
3255            event_enable: ClientBitString {
3256                unused_bits: item.event_enable.unused_bits,
3257                data: item.event_enable.data.to_vec(),
3258            },
3259            event_priorities: item.event_priorities,
3260        })
3261        .collect()
3262}
3263
3264fn into_client_cov_notification(
3265    source: DataLinkAddress,
3266    confirmed: bool,
3267    value: CovNotificationRequest<'_>,
3268) -> Result<CovNotification, ClientError> {
3269    let mut values = Vec::with_capacity(value.values.len());
3270    for property in value.values {
3271        values.push(CovPropertyValue {
3272            property_id: property.property_id,
3273            array_index: property.array_index,
3274            value: into_client_value(property.value)?,
3275            priority: property.priority,
3276        });
3277    }
3278
3279    Ok(CovNotification {
3280        source,
3281        confirmed,
3282        subscriber_process_id: value.subscriber_process_id,
3283        initiating_device_id: value.initiating_device_id,
3284        monitored_object_id: value.monitored_object_id,
3285        time_remaining_seconds: value.time_remaining_seconds,
3286        values,
3287    })
3288}
3289
3290fn into_client_event_notification(
3291    source: DataLinkAddress,
3292    confirmed: bool,
3293    value: EventNotificationRequest<'_>,
3294) -> EventNotification {
3295    EventNotification {
3296        source,
3297        confirmed,
3298        process_id: value.process_id,
3299        initiating_device_id: value.initiating_device_id,
3300        event_object_id: value.event_object_id,
3301        timestamp: value.timestamp,
3302        notification_class: value.notification_class,
3303        priority: value.priority,
3304        event_type: value.event_type,
3305        message_text: value.message_text.map(str::to_string),
3306        notify_type: value.notify_type,
3307        ack_required: value.ack_required,
3308        from_state_raw: value.from_state,
3309        from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3310            value.from_state,
3311        ),
3312        to_state_raw: value.to_state,
3313        to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
3314    }
3315}
3316
3317fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
3318    let mut items = Vec::with_capacity(value.items.len());
3319    for item in value.items {
3320        items.push(into_client_value(item)?);
3321    }
3322    Ok(ReadRangeResult {
3323        object_id: value.object_id,
3324        property_id: value.property_id,
3325        array_index: value.array_index,
3326        result_flags: ClientBitString {
3327            unused_bits: value.result_flags.unused_bits,
3328            data: value.result_flags.data.to_vec(),
3329        },
3330        item_count: value.item_count,
3331        items,
3332    })
3333}
3334
3335fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
3336    match value.access_method {
3337        AtomicReadFileAckAccess::Stream {
3338            file_start_position,
3339            file_data,
3340        } => AtomicReadFileResult::Stream {
3341            end_of_file: value.end_of_file,
3342            file_start_position,
3343            file_data: file_data.to_vec(),
3344        },
3345        AtomicReadFileAckAccess::Record {
3346            file_start_record,
3347            returned_record_count,
3348            file_record_data,
3349        } => AtomicReadFileResult::Record {
3350            end_of_file: value.end_of_file,
3351            file_start_record,
3352            returned_record_count,
3353            file_record_data: file_record_data
3354                .into_iter()
3355                .map(|record| record.to_vec())
3356                .collect(),
3357        },
3358    }
3359}
3360
3361fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
3362    match value {
3363        AtomicWriteFileAck::Stream {
3364            file_start_position,
3365        } => AtomicWriteFileResult::Stream {
3366            file_start_position,
3367        },
3368        AtomicWriteFileAck::Record { file_start_record } => {
3369            AtomicWriteFileResult::Record { file_start_record }
3370        }
3371    }
3372}
3373
3374#[cfg(test)]
3375mod tests {
3376    use super::BacnetClient;
3377    use crate::{
3378        AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
3379        EnrollmentSummaryItem, EventInformationItem, EventNotification,
3380    };
3381    use rustbac_core::apdu::{
3382        ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
3383        UnconfirmedRequestHeader,
3384    };
3385    use rustbac_core::encoding::{
3386        primitives::{
3387            decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
3388            encode_ctx_object_id, encode_ctx_unsigned,
3389        },
3390        reader::Reader,
3391        tag::{AppTag, Tag},
3392        writer::Writer,
3393    };
3394    use rustbac_core::npdu::Npdu;
3395    use rustbac_core::services::acknowledge_alarm::{
3396        AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
3397    };
3398    use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
3399    use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
3400    use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
3401    use rustbac_core::services::cov_notification::{
3402        SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3403    };
3404    use rustbac_core::services::device_management::{
3405        DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
3406        SERVICE_REINITIALIZE_DEVICE,
3407    };
3408    use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
3409    use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
3410    use rustbac_core::services::event_notification::{
3411        SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3412    };
3413    use rustbac_core::services::list_element::{
3414        AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
3415        SERVICE_REMOVE_LIST_ELEMENT,
3416    };
3417    use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
3418    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
3419    use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
3420    use rustbac_core::services::read_range::SERVICE_READ_RANGE;
3421    use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
3422    use rustbac_core::services::subscribe_cov_property::{
3423        SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
3424    };
3425    use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
3426    use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
3427    use rustbac_core::services::write_property_multiple::{
3428        PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
3429    };
3430    use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
3431    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
3432    use std::collections::VecDeque;
3433    use std::sync::Arc;
3434    use std::time::Duration;
3435    use tokio::sync::Mutex;
3436
3437    #[derive(Debug, Default)]
3438    struct MockState {
3439        sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
3440        recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
3441    }
3442
3443    #[derive(Debug, Clone)]
3444    struct MockDataLink {
3445        state: Arc<MockState>,
3446    }
3447
3448    impl MockDataLink {
3449        fn new() -> (Self, Arc<MockState>) {
3450            let state = Arc::new(MockState::default());
3451            (
3452                Self {
3453                    state: state.clone(),
3454                },
3455                state,
3456            )
3457        }
3458    }
3459
3460    impl DataLink for MockDataLink {
3461        async fn send(
3462            &self,
3463            address: DataLinkAddress,
3464            payload: &[u8],
3465        ) -> Result<(), DataLinkError> {
3466            self.state
3467                .sent
3468                .lock()
3469                .await
3470                .push((address, payload.to_vec()));
3471            Ok(())
3472        }
3473
3474        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
3475            let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
3476                return Err(DataLinkError::InvalidFrame);
3477            };
3478            if payload.len() > buf.len() {
3479                return Err(DataLinkError::FrameTooLarge);
3480            }
3481            buf[..payload.len()].copy_from_slice(&payload);
3482            Ok((payload.len(), addr))
3483        }
3484    }
3485
3486    fn with_npdu(apdu: &[u8]) -> Vec<u8> {
3487        let mut out = [0u8; 512];
3488        let mut w = Writer::new(&mut out);
3489        Npdu::new(0).encode(&mut w).unwrap();
3490        w.write_all(apdu).unwrap();
3491        w.as_written().to_vec()
3492    }
3493
3494    fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3495        let mut apdu_buf = [0u8; 256];
3496        let mut w = Writer::new(&mut apdu_buf);
3497        ComplexAckHeader {
3498            segmented: false,
3499            more_follows: false,
3500            invoke_id,
3501            sequence_number: None,
3502            proposed_window_size: None,
3503            service_choice: SERVICE_READ_RANGE,
3504        }
3505        .encode(&mut w)
3506        .unwrap();
3507        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3508        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3509        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3510        w.write_u8(5).unwrap();
3511        w.write_u8(0b1110_0000).unwrap();
3512        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3513        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3514        encode_app_real(&mut w, 42.0).unwrap();
3515        encode_app_real(&mut w, 43.0).unwrap();
3516        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3517        w.as_written().to_vec()
3518    }
3519
3520    fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
3521        let mut apdu_buf = [0u8; 256];
3522        let mut w = Writer::new(&mut apdu_buf);
3523        ComplexAckHeader {
3524            segmented: false,
3525            more_follows: false,
3526            invoke_id,
3527            sequence_number: None,
3528            proposed_window_size: None,
3529            service_choice: SERVICE_ATOMIC_READ_FILE,
3530        }
3531        .encode(&mut w)
3532        .unwrap();
3533        Tag::Application {
3534            tag: AppTag::Boolean,
3535            len: if eof { 1 } else { 0 },
3536        }
3537        .encode(&mut w)
3538        .unwrap();
3539        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3540        Tag::Application {
3541            tag: AppTag::SignedInt,
3542            len: 1,
3543        }
3544        .encode(&mut w)
3545        .unwrap();
3546        w.write_u8(0).unwrap();
3547        Tag::Application {
3548            tag: AppTag::OctetString,
3549            len: data.len() as u32,
3550        }
3551        .encode(&mut w)
3552        .unwrap();
3553        w.write_all(data).unwrap();
3554        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3555        w.as_written().to_vec()
3556    }
3557
3558    fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
3559        let mut apdu_buf = [0u8; 256];
3560        let mut w = Writer::new(&mut apdu_buf);
3561        ComplexAckHeader {
3562            segmented: false,
3563            more_follows: false,
3564            invoke_id,
3565            sequence_number: None,
3566            proposed_window_size: None,
3567            service_choice: SERVICE_ATOMIC_READ_FILE,
3568        }
3569        .encode(&mut w)
3570        .unwrap();
3571        Tag::Application {
3572            tag: AppTag::Boolean,
3573            len: 0,
3574        }
3575        .encode(&mut w)
3576        .unwrap();
3577        Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
3578        Tag::Application {
3579            tag: AppTag::SignedInt,
3580            len: 1,
3581        }
3582        .encode(&mut w)
3583        .unwrap();
3584        w.write_u8(7).unwrap();
3585        Tag::Application {
3586            tag: AppTag::UnsignedInt,
3587            len: 1,
3588        }
3589        .encode(&mut w)
3590        .unwrap();
3591        w.write_u8(2).unwrap();
3592        Tag::Application {
3593            tag: AppTag::OctetString,
3594            len: 2,
3595        }
3596        .encode(&mut w)
3597        .unwrap();
3598        w.write_all(&[0x01, 0x02]).unwrap();
3599        Tag::Application {
3600            tag: AppTag::OctetString,
3601            len: 3,
3602        }
3603        .encode(&mut w)
3604        .unwrap();
3605        w.write_all(&[0x03, 0x04, 0x05]).unwrap();
3606        Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
3607        w.as_written().to_vec()
3608    }
3609
3610    fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
3611        let mut apdu_buf = [0u8; 64];
3612        let mut w = Writer::new(&mut apdu_buf);
3613        ComplexAckHeader {
3614            segmented: false,
3615            more_follows: false,
3616            invoke_id,
3617            sequence_number: None,
3618            proposed_window_size: None,
3619            service_choice: SERVICE_ATOMIC_WRITE_FILE,
3620        }
3621        .encode(&mut w)
3622        .unwrap();
3623        Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
3624        w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
3625        w.as_written().to_vec()
3626    }
3627
3628    fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
3629        let mut apdu_buf = [0u8; 64];
3630        let mut w = Writer::new(&mut apdu_buf);
3631        ComplexAckHeader {
3632            segmented: false,
3633            more_follows: false,
3634            invoke_id,
3635            sequence_number: None,
3636            proposed_window_size: None,
3637            service_choice: SERVICE_ATOMIC_WRITE_FILE,
3638        }
3639        .encode(&mut w)
3640        .unwrap();
3641        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3642        w.write_u8(start_record as u8).unwrap();
3643        w.as_written().to_vec()
3644    }
3645
3646    fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3647        let mut apdu_buf = [0u8; 64];
3648        let mut w = Writer::new(&mut apdu_buf);
3649        ComplexAckHeader {
3650            segmented: false,
3651            more_follows: false,
3652            invoke_id,
3653            sequence_number: None,
3654            proposed_window_size: None,
3655            service_choice: SERVICE_CREATE_OBJECT,
3656        }
3657        .encode(&mut w)
3658        .unwrap();
3659        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3660        w.as_written().to_vec()
3661    }
3662
3663    fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3664        let mut apdu_buf = [0u8; 128];
3665        let mut w = Writer::new(&mut apdu_buf);
3666        ComplexAckHeader {
3667            segmented: false,
3668            more_follows: false,
3669            invoke_id,
3670            sequence_number: None,
3671            proposed_window_size: None,
3672            service_choice: SERVICE_GET_ALARM_SUMMARY,
3673        }
3674        .encode(&mut w)
3675        .unwrap();
3676        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3677        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3678        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3679        w.write_u8(5).unwrap();
3680        w.write_u8(0b1110_0000).unwrap();
3681
3682        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
3683        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3684        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3685        w.write_u8(5).unwrap();
3686        w.write_u8(0b1100_0000).unwrap();
3687        w.as_written().to_vec()
3688    }
3689
3690    fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3691        let mut apdu_buf = [0u8; 160];
3692        let mut w = Writer::new(&mut apdu_buf);
3693        ComplexAckHeader {
3694            segmented: false,
3695            more_follows: false,
3696            invoke_id,
3697            sequence_number: None,
3698            proposed_window_size: None,
3699            service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
3700        }
3701        .encode(&mut w)
3702        .unwrap();
3703        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3704        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3705        encode_ctx_unsigned(&mut w, 2, 2).unwrap();
3706        encode_ctx_unsigned(&mut w, 3, 200).unwrap();
3707        encode_ctx_unsigned(&mut w, 4, 10).unwrap();
3708
3709        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
3710        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3711        encode_ctx_unsigned(&mut w, 2, 0).unwrap();
3712        encode_ctx_unsigned(&mut w, 3, 20).unwrap();
3713        encode_ctx_unsigned(&mut w, 4, 11).unwrap();
3714        w.as_written().to_vec()
3715    }
3716
3717    fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
3718        let mut apdu_buf = [0u8; 256];
3719        let mut w = Writer::new(&mut apdu_buf);
3720        ComplexAckHeader {
3721            segmented: false,
3722            more_follows: false,
3723            invoke_id,
3724            sequence_number: None,
3725            proposed_window_size: None,
3726            service_choice: SERVICE_GET_EVENT_INFORMATION,
3727        }
3728        .encode(&mut w)
3729        .unwrap();
3730        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3731        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3732        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3733        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3734        w.write_u8(5).unwrap();
3735        w.write_u8(0b1110_0000).unwrap();
3736        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3737        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3738        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3739        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3740        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3741        encode_ctx_unsigned(&mut w, 4, 0).unwrap();
3742        Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
3743        w.write_u8(5).unwrap();
3744        w.write_u8(0b1100_0000).unwrap();
3745        Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
3746        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3747        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3748        encode_ctx_unsigned(&mut w, 2, 3).unwrap();
3749        Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
3750        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3751        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3752        w.write_u8(0).unwrap();
3753        w.as_written().to_vec()
3754    }
3755
3756    #[tokio::test]
3757    async fn who_has_object_name_collects_i_have() {
3758        let (dl, state) = MockDataLink::new();
3759        let client = BacnetClient::with_datalink(dl);
3760        let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
3761
3762        let mut apdu = [0u8; 128];
3763        let mut w = Writer::new(&mut apdu);
3764        UnconfirmedRequestHeader {
3765            service_choice: SERVICE_I_HAVE,
3766        }
3767        .encode(&mut w)
3768        .unwrap();
3769        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
3770        encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3771        encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
3772
3773        state
3774            .recv
3775            .lock()
3776            .await
3777            .push_back((with_npdu(w.as_written()), addr));
3778
3779        let results = client
3780            .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
3781            .await
3782            .unwrap();
3783        assert_eq!(results.len(), 1);
3784        assert_eq!(results[0].address, addr);
3785        assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
3786        assert_eq!(
3787            results[0].object_id,
3788            ObjectId::new(ObjectType::AnalogInput, 7)
3789        );
3790        assert_eq!(results[0].object_name, "Zone Temp");
3791
3792        let sent = state.sent.lock().await;
3793        assert_eq!(sent.len(), 1);
3794        let mut r = Reader::new(&sent[0].1);
3795        let _npdu = Npdu::decode(&mut r).unwrap();
3796        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3797        assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
3798    }
3799
3800    #[tokio::test]
3801    async fn device_communication_control_handles_simple_ack() {
3802        let (dl, state) = MockDataLink::new();
3803        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3804        let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
3805
3806        let mut apdu = [0u8; 32];
3807        let mut w = Writer::new(&mut apdu);
3808        SimpleAck {
3809            invoke_id: 1,
3810            service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
3811        }
3812        .encode(&mut w)
3813        .unwrap();
3814        state
3815            .recv
3816            .lock()
3817            .await
3818            .push_back((with_npdu(w.as_written()), addr));
3819
3820        client
3821            .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
3822            .await
3823            .unwrap();
3824
3825        let sent = state.sent.lock().await;
3826        assert_eq!(sent.len(), 1);
3827        let mut r = Reader::new(&sent[0].1);
3828        let _npdu = Npdu::decode(&mut r).unwrap();
3829        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3830        assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
3831    }
3832
3833    #[tokio::test]
3834    async fn reinitialize_device_handles_simple_ack() {
3835        let (dl, state) = MockDataLink::new();
3836        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3837        let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
3838
3839        let mut apdu = [0u8; 32];
3840        let mut w = Writer::new(&mut apdu);
3841        SimpleAck {
3842            invoke_id: 1,
3843            service_choice: SERVICE_REINITIALIZE_DEVICE,
3844        }
3845        .encode(&mut w)
3846        .unwrap();
3847        state
3848            .recv
3849            .lock()
3850            .await
3851            .push_back((with_npdu(w.as_written()), addr));
3852
3853        client
3854            .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
3855            .await
3856            .unwrap();
3857
3858        let sent = state.sent.lock().await;
3859        assert_eq!(sent.len(), 1);
3860        let mut r = Reader::new(&sent[0].1);
3861        let _npdu = Npdu::decode(&mut r).unwrap();
3862        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3863        assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
3864    }
3865
3866    #[tokio::test]
3867    async fn time_synchronize_sends_unconfirmed_request() {
3868        let (dl, state) = MockDataLink::new();
3869        let client = BacnetClient::with_datalink(dl);
3870        let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
3871
3872        client
3873            .time_synchronize(
3874                addr,
3875                Date {
3876                    year_since_1900: 126,
3877                    month: 2,
3878                    day: 7,
3879                    weekday: 6,
3880                },
3881                Time {
3882                    hour: 10,
3883                    minute: 11,
3884                    second: 12,
3885                    hundredths: 13,
3886                },
3887                false,
3888            )
3889            .await
3890            .unwrap();
3891
3892        let sent = state.sent.lock().await;
3893        assert_eq!(sent.len(), 1);
3894        let mut r = Reader::new(&sent[0].1);
3895        let _npdu = Npdu::decode(&mut r).unwrap();
3896        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3897        assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
3898    }
3899
3900    #[tokio::test]
3901    async fn get_alarm_summary_decodes_complex_ack() {
3902        let (dl, state) = MockDataLink::new();
3903        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3904        let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
3905
3906        state
3907            .recv
3908            .lock()
3909            .await
3910            .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
3911
3912        let summaries = client.get_alarm_summary(addr).await.unwrap();
3913        assert_eq!(summaries.len(), 2);
3914        assert_eq!(
3915            summaries[0],
3916            AlarmSummaryItem {
3917                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3918                alarm_state_raw: 1,
3919                alarm_state: Some(EventState::Fault),
3920                acknowledged_transitions: crate::ClientBitString {
3921                    unused_bits: 5,
3922                    data: vec![0b1110_0000],
3923                },
3924            }
3925        );
3926        assert_eq!(
3927            summaries[1],
3928            AlarmSummaryItem {
3929                object_id: ObjectId::new(ObjectType::BinaryInput, 2),
3930                alarm_state_raw: 0,
3931                alarm_state: Some(EventState::Normal),
3932                acknowledged_transitions: crate::ClientBitString {
3933                    unused_bits: 5,
3934                    data: vec![0b1100_0000],
3935                },
3936            }
3937        );
3938
3939        let sent = state.sent.lock().await;
3940        assert_eq!(sent.len(), 1);
3941        let mut r = Reader::new(&sent[0].1);
3942        let _npdu = Npdu::decode(&mut r).unwrap();
3943        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3944        assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
3945    }
3946
3947    #[tokio::test]
3948    async fn get_enrollment_summary_decodes_complex_ack() {
3949        let (dl, state) = MockDataLink::new();
3950        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3951        let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
3952
3953        state
3954            .recv
3955            .lock()
3956            .await
3957            .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
3958
3959        let summaries = client.get_enrollment_summary(addr).await.unwrap();
3960        assert_eq!(summaries.len(), 2);
3961        assert_eq!(
3962            summaries[0],
3963            EnrollmentSummaryItem {
3964                object_id: ObjectId::new(ObjectType::AnalogInput, 7),
3965                event_type: 1,
3966                event_state_raw: 2,
3967                event_state: Some(EventState::Offnormal),
3968                priority: 200,
3969                notification_class: 10,
3970            }
3971        );
3972        assert_eq!(
3973            summaries[1],
3974            EnrollmentSummaryItem {
3975                object_id: ObjectId::new(ObjectType::BinaryInput, 8),
3976                event_type: 0,
3977                event_state_raw: 0,
3978                event_state: Some(EventState::Normal),
3979                priority: 20,
3980                notification_class: 11,
3981            }
3982        );
3983
3984        let sent = state.sent.lock().await;
3985        assert_eq!(sent.len(), 1);
3986        let mut r = Reader::new(&sent[0].1);
3987        let _npdu = Npdu::decode(&mut r).unwrap();
3988        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3989        assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
3990    }
3991
3992    #[tokio::test]
3993    async fn get_event_information_decodes_complex_ack() {
3994        let (dl, state) = MockDataLink::new();
3995        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3996        let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
3997
3998        state
3999            .recv
4000            .lock()
4001            .await
4002            .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
4003
4004        let result = client.get_event_information(addr, None).await.unwrap();
4005        assert!(!result.more_events);
4006        assert_eq!(result.summaries.len(), 1);
4007        assert_eq!(
4008            result.summaries[0],
4009            EventInformationItem {
4010                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4011                event_state_raw: 2,
4012                event_state: Some(EventState::Offnormal),
4013                acknowledged_transitions: crate::ClientBitString {
4014                    unused_bits: 5,
4015                    data: vec![0b1110_0000],
4016                },
4017                notify_type: 0,
4018                event_enable: crate::ClientBitString {
4019                    unused_bits: 5,
4020                    data: vec![0b1100_0000],
4021                },
4022                event_priorities: [1, 2, 3],
4023            }
4024        );
4025    }
4026
4027    #[tokio::test]
4028    async fn acknowledge_alarm_handles_simple_ack() {
4029        let (dl, state) = MockDataLink::new();
4030        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4031        let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
4032
4033        let mut apdu = [0u8; 32];
4034        let mut w = Writer::new(&mut apdu);
4035        SimpleAck {
4036            invoke_id: 1,
4037            service_choice: SERVICE_ACKNOWLEDGE_ALARM,
4038        }
4039        .encode(&mut w)
4040        .unwrap();
4041        state
4042            .recv
4043            .lock()
4044            .await
4045            .push_back((with_npdu(w.as_written()), addr));
4046
4047        client
4048            .acknowledge_alarm(
4049                addr,
4050                AcknowledgeAlarmRequest {
4051                    acknowledging_process_id: 10,
4052                    event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4053                    event_state_acknowledged: EventState::Offnormal,
4054                    event_time_stamp: TimeStamp::SequenceNumber(42),
4055                    acknowledgment_source: "operator",
4056                    time_of_acknowledgment: TimeStamp::DateTime {
4057                        date: Date {
4058                            year_since_1900: 126,
4059                            month: 2,
4060                            day: 7,
4061                            weekday: 6,
4062                        },
4063                        time: Time {
4064                            hour: 10,
4065                            minute: 11,
4066                            second: 12,
4067                            hundredths: 13,
4068                        },
4069                    },
4070                    invoke_id: 0,
4071                },
4072            )
4073            .await
4074            .unwrap();
4075
4076        let sent = state.sent.lock().await;
4077        assert_eq!(sent.len(), 1);
4078        let mut r = Reader::new(&sent[0].1);
4079        let _npdu = Npdu::decode(&mut r).unwrap();
4080        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4081        assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
4082    }
4083
4084    #[tokio::test]
4085    async fn create_object_by_type_decodes_complex_ack() {
4086        let (dl, state) = MockDataLink::new();
4087        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4088        let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
4089        let created = ObjectId::new(ObjectType::AnalogValue, 42);
4090
4091        state
4092            .recv
4093            .lock()
4094            .await
4095            .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
4096
4097        let result = client
4098            .create_object_by_type(addr, ObjectType::AnalogValue)
4099            .await
4100            .unwrap();
4101        assert_eq!(result, created);
4102
4103        let sent = state.sent.lock().await;
4104        let mut r = Reader::new(&sent[0].1);
4105        let _npdu = Npdu::decode(&mut r).unwrap();
4106        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4107        assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
4108    }
4109
4110    #[tokio::test]
4111    async fn delete_object_handles_simple_ack() {
4112        let (dl, state) = MockDataLink::new();
4113        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4114        let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
4115
4116        let mut apdu = [0u8; 32];
4117        let mut w = Writer::new(&mut apdu);
4118        SimpleAck {
4119            invoke_id: 1,
4120            service_choice: SERVICE_DELETE_OBJECT,
4121        }
4122        .encode(&mut w)
4123        .unwrap();
4124        state
4125            .recv
4126            .lock()
4127            .await
4128            .push_back((with_npdu(w.as_written()), addr));
4129
4130        client
4131            .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
4132            .await
4133            .unwrap();
4134    }
4135
4136    #[tokio::test]
4137    async fn add_list_element_handles_simple_ack() {
4138        let (dl, state) = MockDataLink::new();
4139        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4140        let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
4141
4142        let mut apdu = [0u8; 32];
4143        let mut w = Writer::new(&mut apdu);
4144        SimpleAck {
4145            invoke_id: 1,
4146            service_choice: SERVICE_ADD_LIST_ELEMENT,
4147        }
4148        .encode(&mut w)
4149        .unwrap();
4150        state
4151            .recv
4152            .lock()
4153            .await
4154            .push_back((with_npdu(w.as_written()), addr));
4155
4156        let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
4157        client
4158            .add_list_element(
4159                addr,
4160                AddListElementRequest {
4161                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4162                    property_id: PropertyId::Proprietary(512),
4163                    array_index: None,
4164                    elements: &values,
4165                    invoke_id: 0,
4166                },
4167            )
4168            .await
4169            .unwrap();
4170    }
4171
4172    #[tokio::test]
4173    async fn remove_list_element_handles_simple_ack() {
4174        let (dl, state) = MockDataLink::new();
4175        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4176        let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
4177
4178        let mut apdu = [0u8; 32];
4179        let mut w = Writer::new(&mut apdu);
4180        SimpleAck {
4181            invoke_id: 1,
4182            service_choice: SERVICE_REMOVE_LIST_ELEMENT,
4183        }
4184        .encode(&mut w)
4185        .unwrap();
4186        state
4187            .recv
4188            .lock()
4189            .await
4190            .push_back((with_npdu(w.as_written()), addr));
4191
4192        let values = [DataValue::Unsigned(1)];
4193        client
4194            .remove_list_element(
4195                addr,
4196                RemoveListElementRequest {
4197                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4198                    property_id: PropertyId::Proprietary(513),
4199                    array_index: None,
4200                    elements: &values,
4201                    invoke_id: 0,
4202                },
4203            )
4204            .await
4205            .unwrap();
4206    }
4207
4208    #[tokio::test]
4209    async fn atomic_read_file_stream_decodes_complex_ack() {
4210        let (dl, state) = MockDataLink::new();
4211        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4212        let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
4213        let file_object = ObjectId::new(ObjectType::File, 2);
4214
4215        state.recv.lock().await.push_back((
4216            with_npdu(&atomic_read_file_stream_ack_apdu(
4217                1,
4218                true,
4219                &[0xAA, 0xBB, 0xCC],
4220            )),
4221            addr,
4222        ));
4223
4224        let result = client
4225            .atomic_read_file_stream(addr, file_object, 0, 3)
4226            .await
4227            .unwrap();
4228
4229        assert_eq!(
4230            result,
4231            AtomicReadFileResult::Stream {
4232                end_of_file: true,
4233                file_start_position: 0,
4234                file_data: vec![0xAA, 0xBB, 0xCC],
4235            }
4236        );
4237
4238        let sent = state.sent.lock().await;
4239        assert_eq!(sent.len(), 1);
4240        let mut r = Reader::new(&sent[0].1);
4241        let _npdu = Npdu::decode(&mut r).unwrap();
4242        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4243        assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
4244    }
4245
4246    #[tokio::test]
4247    async fn atomic_read_file_record_decodes_complex_ack() {
4248        let (dl, state) = MockDataLink::new();
4249        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4250        let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
4251        let file_object = ObjectId::new(ObjectType::File, 5);
4252
4253        state
4254            .recv
4255            .lock()
4256            .await
4257            .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
4258
4259        let result = client
4260            .atomic_read_file_record(addr, file_object, 7, 2)
4261            .await
4262            .unwrap();
4263
4264        assert_eq!(
4265            result,
4266            AtomicReadFileResult::Record {
4267                end_of_file: false,
4268                file_start_record: 7,
4269                returned_record_count: 2,
4270                file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
4271            }
4272        );
4273    }
4274
4275    #[tokio::test]
4276    async fn atomic_write_file_stream_decodes_complex_ack() {
4277        let (dl, state) = MockDataLink::new();
4278        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4279        let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
4280        let file_object = ObjectId::new(ObjectType::File, 3);
4281
4282        state
4283            .recv
4284            .lock()
4285            .await
4286            .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
4287
4288        let result = client
4289            .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
4290            .await
4291            .unwrap();
4292
4293        assert_eq!(
4294            result,
4295            AtomicWriteFileResult::Stream {
4296                file_start_position: 128
4297            }
4298        );
4299    }
4300
4301    #[tokio::test]
4302    async fn atomic_write_file_record_decodes_complex_ack() {
4303        let (dl, state) = MockDataLink::new();
4304        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4305        let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
4306        let file_object = ObjectId::new(ObjectType::File, 9);
4307        let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
4308
4309        state
4310            .recv
4311            .lock()
4312            .await
4313            .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
4314
4315        let result = client
4316            .atomic_write_file_record(addr, file_object, 7, &records)
4317            .await
4318            .unwrap();
4319
4320        assert_eq!(
4321            result,
4322            AtomicWriteFileResult::Record {
4323                file_start_record: 7
4324            }
4325        );
4326    }
4327
4328    #[tokio::test]
4329    async fn read_properties_decodes_complex_ack() {
4330        let (dl, state) = MockDataLink::new();
4331        let client = BacnetClient::with_datalink(dl);
4332        let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
4333        let object_id = ObjectId::new(ObjectType::Device, 1);
4334
4335        let mut apdu_buf = [0u8; 256];
4336        let mut w = Writer::new(&mut apdu_buf);
4337        ComplexAckHeader {
4338            segmented: false,
4339            more_follows: false,
4340            invoke_id: 1,
4341            sequence_number: None,
4342            proposed_window_size: None,
4343            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4344        }
4345        .encode(&mut w)
4346        .unwrap();
4347        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4348        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4349            .encode(&mut w)
4350            .unwrap();
4351        encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
4352        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4353            .encode(&mut w)
4354            .unwrap();
4355        encode_app_real(&mut w, 55.5).unwrap();
4356        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4357            .encode(&mut w)
4358            .unwrap();
4359        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4360            .encode(&mut w)
4361            .unwrap();
4362
4363        state
4364            .recv
4365            .lock()
4366            .await
4367            .push_back((with_npdu(w.as_written()), addr));
4368
4369        let values = client
4370            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4371            .await
4372            .unwrap();
4373        assert_eq!(values.len(), 1);
4374        assert_eq!(values[0].0, PropertyId::PresentValue);
4375        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
4376
4377        let sent = state.sent.lock().await;
4378        assert_eq!(sent.len(), 1);
4379        let mut r = Reader::new(&sent[0].1);
4380        let _npdu = Npdu::decode(&mut r).unwrap();
4381        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4382        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
4383    }
4384
4385    #[tokio::test]
4386    async fn read_property_multiple_reassembles_segmented_complex_ack() {
4387        let (dl, state) = MockDataLink::new();
4388        let client = BacnetClient::with_datalink(dl);
4389        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4390        let object_id = ObjectId::new(ObjectType::Device, 1);
4391
4392        let mut payload_buf = [0u8; 256];
4393        let mut pw = Writer::new(&mut payload_buf);
4394        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4395        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4396            .encode(&mut pw)
4397            .unwrap();
4398        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4399        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4400            .encode(&mut pw)
4401            .unwrap();
4402        encode_app_real(&mut pw, 66.0).unwrap();
4403        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4404            .encode(&mut pw)
4405            .unwrap();
4406        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4407            .encode(&mut pw)
4408            .unwrap();
4409        let payload = pw.as_written();
4410        let split = payload.len() / 2;
4411
4412        let mut apdu1 = [0u8; 256];
4413        let mut w1 = Writer::new(&mut apdu1);
4414        ComplexAckHeader {
4415            segmented: true,
4416            more_follows: true,
4417            invoke_id: 1,
4418            sequence_number: Some(0),
4419            proposed_window_size: Some(1),
4420            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4421        }
4422        .encode(&mut w1)
4423        .unwrap();
4424        w1.write_all(&payload[..split]).unwrap();
4425
4426        let mut apdu2 = [0u8; 256];
4427        let mut w2 = Writer::new(&mut apdu2);
4428        ComplexAckHeader {
4429            segmented: true,
4430            more_follows: false,
4431            invoke_id: 1,
4432            sequence_number: Some(1),
4433            proposed_window_size: Some(1),
4434            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4435        }
4436        .encode(&mut w2)
4437        .unwrap();
4438        w2.write_all(&payload[split..]).unwrap();
4439
4440        state
4441            .recv
4442            .lock()
4443            .await
4444            .push_back((with_npdu(w1.as_written()), addr));
4445        state
4446            .recv
4447            .lock()
4448            .await
4449            .push_back((with_npdu(w2.as_written()), addr));
4450
4451        let values = client
4452            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4453            .await
4454            .unwrap();
4455        assert_eq!(values.len(), 1);
4456        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4457
4458        let sent = state.sent.lock().await;
4459        assert!(sent.len() >= 3);
4460
4461        let mut saw_segment_ack = 0usize;
4462        for (_, frame) in sent.iter().skip(1) {
4463            let mut r = Reader::new(frame);
4464            let _npdu = Npdu::decode(&mut r).unwrap();
4465            let apdu = r.read_exact(r.remaining()).unwrap();
4466            if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
4467                let mut sr = Reader::new(apdu);
4468                let sack = SegmentAck::decode(&mut sr).unwrap();
4469                assert_eq!(sack.invoke_id, 1);
4470                saw_segment_ack += 1;
4471            }
4472        }
4473        assert!(saw_segment_ack >= 1);
4474    }
4475
4476    #[tokio::test]
4477    async fn read_property_multiple_tolerates_duplicate_segment() {
4478        let (dl, state) = MockDataLink::new();
4479        let client = BacnetClient::with_datalink(dl);
4480        let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
4481        let object_id = ObjectId::new(ObjectType::Device, 1);
4482
4483        let mut payload_buf = [0u8; 256];
4484        let mut pw = Writer::new(&mut payload_buf);
4485        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4486        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4487            .encode(&mut pw)
4488            .unwrap();
4489        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4490        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4491            .encode(&mut pw)
4492            .unwrap();
4493        encode_app_real(&mut pw, 66.0).unwrap();
4494        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4495            .encode(&mut pw)
4496            .unwrap();
4497        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4498            .encode(&mut pw)
4499            .unwrap();
4500        let payload = pw.as_written();
4501        let split = payload.len() / 2;
4502
4503        let mut apdu1 = [0u8; 256];
4504        let mut w1 = Writer::new(&mut apdu1);
4505        ComplexAckHeader {
4506            segmented: true,
4507            more_follows: true,
4508            invoke_id: 1,
4509            sequence_number: Some(0),
4510            proposed_window_size: Some(1),
4511            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4512        }
4513        .encode(&mut w1)
4514        .unwrap();
4515        w1.write_all(&payload[..split]).unwrap();
4516
4517        let mut dup = [0u8; 256];
4518        let mut wd = Writer::new(&mut dup);
4519        ComplexAckHeader {
4520            segmented: true,
4521            more_follows: true,
4522            invoke_id: 1,
4523            sequence_number: Some(0),
4524            proposed_window_size: Some(1),
4525            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4526        }
4527        .encode(&mut wd)
4528        .unwrap();
4529        wd.write_all(&payload[..split]).unwrap();
4530
4531        let mut apdu2 = [0u8; 256];
4532        let mut w2 = Writer::new(&mut apdu2);
4533        ComplexAckHeader {
4534            segmented: true,
4535            more_follows: false,
4536            invoke_id: 1,
4537            sequence_number: Some(1),
4538            proposed_window_size: Some(1),
4539            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4540        }
4541        .encode(&mut w2)
4542        .unwrap();
4543        w2.write_all(&payload[split..]).unwrap();
4544
4545        {
4546            let mut recv = state.recv.lock().await;
4547            recv.push_back((with_npdu(w1.as_written()), addr));
4548            recv.push_back((with_npdu(wd.as_written()), addr));
4549            recv.push_back((with_npdu(w2.as_written()), addr));
4550        }
4551
4552        let values = client
4553            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4554            .await
4555            .unwrap();
4556        assert_eq!(values.len(), 1);
4557        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4558    }
4559
4560    #[tokio::test]
4561    async fn write_properties_handles_simple_ack() {
4562        let (dl, state) = MockDataLink::new();
4563        let client = BacnetClient::with_datalink(dl);
4564        let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
4565        let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
4566
4567        let mut apdu_buf = [0u8; 32];
4568        let mut w = Writer::new(&mut apdu_buf);
4569        SimpleAck {
4570            invoke_id: 1,
4571            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4572        }
4573        .encode(&mut w)
4574        .unwrap();
4575        state
4576            .recv
4577            .lock()
4578            .await
4579            .push_back((with_npdu(w.as_written()), addr));
4580
4581        let writes = [PropertyWriteSpec {
4582            property_id: PropertyId::PresentValue,
4583            array_index: None,
4584            value: DataValue::Real(12.5),
4585            priority: Some(8),
4586        }];
4587        client
4588            .write_property_multiple(addr, object_id, &writes)
4589            .await
4590            .unwrap();
4591
4592        let sent = state.sent.lock().await;
4593        assert_eq!(sent.len(), 1);
4594        let mut r = Reader::new(&sent[0].1);
4595        let _npdu = Npdu::decode(&mut r).unwrap();
4596        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4597        assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4598    }
4599
4600    #[tokio::test]
4601    async fn subscribe_cov_handles_simple_ack() {
4602        let (dl, state) = MockDataLink::new();
4603        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4604        let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
4605
4606        let mut apdu_buf = [0u8; 32];
4607        let mut w = Writer::new(&mut apdu_buf);
4608        SimpleAck {
4609            invoke_id: 1,
4610            service_choice: SERVICE_SUBSCRIBE_COV,
4611        }
4612        .encode(&mut w)
4613        .unwrap();
4614        state
4615            .recv
4616            .lock()
4617            .await
4618            .push_back((with_npdu(w.as_written()), addr));
4619
4620        client
4621            .subscribe_cov(
4622                addr,
4623                SubscribeCovRequest {
4624                    subscriber_process_id: 10,
4625                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4626                    issue_confirmed_notifications: Some(false),
4627                    lifetime_seconds: Some(300),
4628                    invoke_id: 0,
4629                },
4630            )
4631            .await
4632            .unwrap();
4633
4634        let sent = state.sent.lock().await;
4635        assert_eq!(sent.len(), 1);
4636        let mut r = Reader::new(&sent[0].1);
4637        let _npdu = Npdu::decode(&mut r).unwrap();
4638        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4639        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
4640    }
4641
4642    #[tokio::test]
4643    async fn subscribe_cov_property_handles_simple_ack() {
4644        let (dl, state) = MockDataLink::new();
4645        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4646        let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
4647
4648        let mut apdu_buf = [0u8; 32];
4649        let mut w = Writer::new(&mut apdu_buf);
4650        SimpleAck {
4651            invoke_id: 1,
4652            service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
4653        }
4654        .encode(&mut w)
4655        .unwrap();
4656        state
4657            .recv
4658            .lock()
4659            .await
4660            .push_back((with_npdu(w.as_written()), addr));
4661
4662        client
4663            .subscribe_cov_property(
4664                addr,
4665                SubscribeCovPropertyRequest {
4666                    subscriber_process_id: 22,
4667                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4668                    issue_confirmed_notifications: Some(true),
4669                    lifetime_seconds: Some(120),
4670                    monitored_property_id: PropertyId::PresentValue,
4671                    monitored_property_array_index: None,
4672                    cov_increment: Some(0.1),
4673                    invoke_id: 0,
4674                },
4675            )
4676            .await
4677            .unwrap();
4678
4679        let sent = state.sent.lock().await;
4680        assert_eq!(sent.len(), 1);
4681        let mut r = Reader::new(&sent[0].1);
4682        let _npdu = Npdu::decode(&mut r).unwrap();
4683        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4684        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
4685    }
4686
4687    #[tokio::test]
4688    async fn read_range_by_position_decodes_complex_ack() {
4689        let (dl, state) = MockDataLink::new();
4690        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4691        let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
4692        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4693
4694        let mut apdu_buf = [0u8; 256];
4695        let mut w = Writer::new(&mut apdu_buf);
4696        ComplexAckHeader {
4697            segmented: false,
4698            more_follows: false,
4699            invoke_id: 1,
4700            sequence_number: None,
4701            proposed_window_size: None,
4702            service_choice: SERVICE_READ_RANGE,
4703        }
4704        .encode(&mut w)
4705        .unwrap();
4706        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
4707        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4708        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
4709        w.write_u8(5).unwrap();
4710        w.write_u8(0b1110_0000).unwrap();
4711        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
4712        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
4713        encode_app_real(&mut w, 42.0).unwrap();
4714        encode_app_real(&mut w, 43.0).unwrap();
4715        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
4716
4717        state
4718            .recv
4719            .lock()
4720            .await
4721            .push_back((with_npdu(w.as_written()), addr));
4722
4723        let result = client
4724            .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
4725            .await
4726            .unwrap();
4727        assert_eq!(result.object_id, object_id);
4728        assert_eq!(result.item_count, 2);
4729        assert_eq!(result.items.len(), 2);
4730        assert!(matches!(
4731            result.items[0],
4732            ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
4733        ));
4734    }
4735
4736    #[tokio::test]
4737    async fn read_range_by_sequence_number_encodes_range_selector() {
4738        let (dl, state) = MockDataLink::new();
4739        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4740        let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
4741        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4742
4743        state
4744            .recv
4745            .lock()
4746            .await
4747            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4748
4749        let _ = client
4750            .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
4751            .await
4752            .unwrap();
4753
4754        let sent = state.sent.lock().await;
4755        let mut r = Reader::new(&sent[0].1);
4756        let _npdu = Npdu::decode(&mut r).unwrap();
4757        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4758        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4759        match Tag::decode(&mut r).unwrap() {
4760            Tag::Context { tag_num: 0, len: 4 } => {
4761                let _ = r.read_exact(4).unwrap();
4762            }
4763            other => panic!("unexpected object id tag: {other:?}"),
4764        }
4765        match Tag::decode(&mut r).unwrap() {
4766            Tag::Context { tag_num: 1, len } => {
4767                let _ = decode_unsigned(&mut r, len as usize).unwrap();
4768            }
4769            other => panic!("unexpected property tag: {other:?}"),
4770        }
4771        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
4772        match Tag::decode(&mut r).unwrap() {
4773            Tag::Application {
4774                tag: AppTag::UnsignedInt,
4775                len,
4776            } => {
4777                assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
4778            }
4779            other => panic!("unexpected ref seq tag: {other:?}"),
4780        }
4781        match Tag::decode(&mut r).unwrap() {
4782            Tag::Application {
4783                tag: AppTag::SignedInt,
4784                len,
4785            } => {
4786                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4787            }
4788            other => panic!("unexpected count tag: {other:?}"),
4789        }
4790        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
4791    }
4792
4793    #[tokio::test]
4794    async fn read_range_by_time_encodes_range_selector() {
4795        let (dl, state) = MockDataLink::new();
4796        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4797        let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
4798        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4799        let date = Date {
4800            year_since_1900: 126,
4801            month: 2,
4802            day: 7,
4803            weekday: 6,
4804        };
4805        let time = Time {
4806            hour: 10,
4807            minute: 11,
4808            second: 12,
4809            hundredths: 13,
4810        };
4811
4812        state
4813            .recv
4814            .lock()
4815            .await
4816            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4817
4818        let _ = client
4819            .read_range_by_time(
4820                addr,
4821                object_id,
4822                PropertyId::PresentValue,
4823                None,
4824                (date, time),
4825                2,
4826            )
4827            .await
4828            .unwrap();
4829
4830        let sent = state.sent.lock().await;
4831        let mut r = Reader::new(&sent[0].1);
4832        let _npdu = Npdu::decode(&mut r).unwrap();
4833        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4834        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4835        match Tag::decode(&mut r).unwrap() {
4836            Tag::Context { tag_num: 0, len: 4 } => {
4837                let _ = r.read_exact(4).unwrap();
4838            }
4839            other => panic!("unexpected object id tag: {other:?}"),
4840        }
4841        match Tag::decode(&mut r).unwrap() {
4842            Tag::Context { tag_num: 1, len } => {
4843                let _ = decode_unsigned(&mut r, len as usize).unwrap();
4844            }
4845            other => panic!("unexpected property tag: {other:?}"),
4846        }
4847        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
4848        match Tag::decode(&mut r).unwrap() {
4849            Tag::Application {
4850                tag: AppTag::Date,
4851                len: 4,
4852            } => {
4853                let raw = r.read_exact(4).unwrap();
4854                assert_eq!(
4855                    raw,
4856                    &[date.year_since_1900, date.month, date.day, date.weekday]
4857                );
4858            }
4859            other => panic!("unexpected date tag: {other:?}"),
4860        }
4861        match Tag::decode(&mut r).unwrap() {
4862            Tag::Application {
4863                tag: AppTag::Time,
4864                len: 4,
4865            } => {
4866                let raw = r.read_exact(4).unwrap();
4867                assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
4868            }
4869            other => panic!("unexpected time tag: {other:?}"),
4870        }
4871        match Tag::decode(&mut r).unwrap() {
4872            Tag::Application {
4873                tag: AppTag::SignedInt,
4874                len,
4875            } => {
4876                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4877            }
4878            other => panic!("unexpected count tag: {other:?}"),
4879        }
4880        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
4881    }
4882
4883    #[tokio::test]
4884    async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
4885        let (dl, state) = MockDataLink::new();
4886        let client = BacnetClient::with_datalink(dl);
4887        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4888
4889        let mut apdu = [0u8; 256];
4890        let mut w = Writer::new(&mut apdu);
4891        UnconfirmedRequestHeader {
4892            service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
4893        }
4894        .encode(&mut w)
4895        .unwrap();
4896        encode_ctx_unsigned(&mut w, 0, 17).unwrap();
4897        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4898        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
4899        encode_ctx_unsigned(&mut w, 3, 60).unwrap();
4900        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4901        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4902        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4903        encode_app_real(&mut w, 73.25).unwrap();
4904        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4905        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4906
4907        state
4908            .recv
4909            .lock()
4910            .await
4911            .push_back((with_npdu(w.as_written()), addr));
4912
4913        let notification = client
4914            .recv_cov_notification(Duration::from_secs(1))
4915            .await
4916            .unwrap()
4917            .unwrap();
4918        assert!(!notification.confirmed);
4919        assert_eq!(notification.subscriber_process_id, 17);
4920        assert_eq!(notification.values.len(), 1);
4921        assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
4922        assert!(matches!(
4923            notification.values[0].value,
4924            ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
4925        ));
4926
4927        let sent = state.sent.lock().await;
4928        assert!(sent.is_empty());
4929    }
4930
4931    #[tokio::test]
4932    async fn recv_confirmed_cov_notification_sends_simple_ack() {
4933        let (dl, state) = MockDataLink::new();
4934        let client = BacnetClient::with_datalink(dl);
4935        let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
4936
4937        let mut apdu = [0u8; 256];
4938        let mut w = Writer::new(&mut apdu);
4939        ConfirmedRequestHeader {
4940            segmented: false,
4941            more_follows: false,
4942            segmented_response_accepted: false,
4943            max_segments: 0,
4944            max_apdu: 5,
4945            invoke_id: 9,
4946            sequence_number: None,
4947            proposed_window_size: None,
4948            service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
4949        }
4950        .encode(&mut w)
4951        .unwrap();
4952        encode_ctx_unsigned(&mut w, 0, 18).unwrap();
4953        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4954        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
4955        encode_ctx_unsigned(&mut w, 3, 120).unwrap();
4956        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4957        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4958        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4959        encode_app_real(&mut w, 55.0).unwrap();
4960        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4961        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4962
4963        state
4964            .recv
4965            .lock()
4966            .await
4967            .push_back((with_npdu(w.as_written()), addr));
4968
4969        let notification = client
4970            .recv_cov_notification(Duration::from_secs(1))
4971            .await
4972            .unwrap()
4973            .unwrap();
4974        assert!(notification.confirmed);
4975        assert_eq!(notification.values.len(), 1);
4976
4977        let sent = state.sent.lock().await;
4978        assert_eq!(sent.len(), 1);
4979        let mut r = Reader::new(&sent[0].1);
4980        let _npdu = Npdu::decode(&mut r).unwrap();
4981        let ack = SimpleAck::decode(&mut r).unwrap();
4982        assert_eq!(ack.invoke_id, 9);
4983        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
4984    }
4985
4986    #[tokio::test]
4987    async fn recv_unconfirmed_event_notification_returns_decoded_value() {
4988        let (dl, state) = MockDataLink::new();
4989        let client = BacnetClient::with_datalink(dl);
4990        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4991
4992        let mut apdu = [0u8; 256];
4993        let mut w = Writer::new(&mut apdu);
4994        UnconfirmedRequestHeader {
4995            service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
4996        }
4997        .encode(&mut w)
4998        .unwrap();
4999        encode_ctx_unsigned(&mut w, 0, 99).unwrap();
5000        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5001        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
5002        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5003        encode_ctx_unsigned(&mut w, 1, 55).unwrap();
5004        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5005        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5006        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5007        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5008        encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
5009        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5010        Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
5011        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5012        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5013        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5014        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5015        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5016        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5017        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5018
5019        state
5020            .recv
5021            .lock()
5022            .await
5023            .push_back((with_npdu(w.as_written()), addr));
5024
5025        let notification: EventNotification = client
5026            .recv_event_notification(Duration::from_secs(1))
5027            .await
5028            .unwrap()
5029            .unwrap();
5030        assert!(!notification.confirmed);
5031        assert_eq!(notification.process_id, 99);
5032        assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
5033        assert_eq!(notification.ack_required, Some(true));
5034        assert_eq!(notification.from_state, Some(EventState::Offnormal));
5035        assert_eq!(notification.to_state, Some(EventState::Normal));
5036        assert_eq!(notification.notify_type, 0);
5037
5038        let sent = state.sent.lock().await;
5039        assert!(sent.is_empty());
5040    }
5041
5042    #[tokio::test]
5043    async fn recv_confirmed_event_notification_sends_simple_ack() {
5044        let (dl, state) = MockDataLink::new();
5045        let client = BacnetClient::with_datalink(dl);
5046        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5047
5048        let mut apdu = [0u8; 256];
5049        let mut w = Writer::new(&mut apdu);
5050        ConfirmedRequestHeader {
5051            segmented: false,
5052            more_follows: false,
5053            segmented_response_accepted: false,
5054            max_segments: 0,
5055            max_apdu: 5,
5056            invoke_id: 11,
5057            sequence_number: None,
5058            proposed_window_size: None,
5059            service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
5060        }
5061        .encode(&mut w)
5062        .unwrap();
5063        encode_ctx_unsigned(&mut w, 0, 100).unwrap();
5064        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5065        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
5066        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5067        encode_ctx_unsigned(&mut w, 1, 56).unwrap();
5068        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5069        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5070        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5071        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5072        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5073        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5074        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5075        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5076        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5077        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5078        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5079        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5080
5081        state
5082            .recv
5083            .lock()
5084            .await
5085            .push_back((with_npdu(w.as_written()), addr));
5086
5087        let notification = client
5088            .recv_event_notification(Duration::from_secs(1))
5089            .await
5090            .unwrap()
5091            .unwrap();
5092        assert!(notification.confirmed);
5093
5094        let sent = state.sent.lock().await;
5095        assert_eq!(sent.len(), 1);
5096        let mut r = Reader::new(&sent[0].1);
5097        let _npdu = Npdu::decode(&mut r).unwrap();
5098        let ack = SimpleAck::decode(&mut r).unwrap();
5099        assert_eq!(ack.invoke_id, 11);
5100        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
5101    }
5102
5103    #[tokio::test]
5104    async fn write_property_multiple_segments_large_request() {
5105        let (dl, state) = MockDataLink::new();
5106        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5107        let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
5108        let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
5109
5110        {
5111            let mut recv = state.recv.lock().await;
5112            for seq in 0u8..=254 {
5113                let mut apdu = [0u8; 16];
5114                let mut w = Writer::new(&mut apdu);
5115                SegmentAck {
5116                    negative_ack: false,
5117                    sent_by_server: true,
5118                    invoke_id: 1,
5119                    sequence_number: seq,
5120                    actual_window_size: 1,
5121                }
5122                .encode(&mut w)
5123                .unwrap();
5124                recv.push_back((with_npdu(w.as_written()), addr));
5125            }
5126
5127            let mut apdu = [0u8; 16];
5128            let mut w = Writer::new(&mut apdu);
5129            SimpleAck {
5130                invoke_id: 1,
5131                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5132            }
5133            .encode(&mut w)
5134            .unwrap();
5135            recv.push_back((with_npdu(w.as_written()), addr));
5136        }
5137
5138        let writes: Vec<PropertyWriteSpec> = (0..180)
5139            .map(|_| PropertyWriteSpec {
5140                property_id: PropertyId::Description,
5141                array_index: None,
5142                value: DataValue::CharacterString(
5143                    "rustbac segmented write test payload................................................................",
5144                ),
5145                priority: None,
5146            })
5147            .collect();
5148
5149        client
5150            .write_property_multiple(addr, object_id, &writes)
5151            .await
5152            .unwrap();
5153
5154        let sent = state.sent.lock().await;
5155        assert!(sent.len() > 1);
5156
5157        let mut seqs = Vec::new();
5158        let mut saw_more_follows = false;
5159        let mut saw_last = false;
5160        for (_, frame) in sent.iter() {
5161            let mut r = Reader::new(frame);
5162            let _npdu = Npdu::decode(&mut r).unwrap();
5163            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5164            assert!(hdr.segmented);
5165            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5166            if hdr.more_follows {
5167                saw_more_follows = true;
5168            } else {
5169                saw_last = true;
5170            }
5171            seqs.push(hdr.sequence_number.unwrap());
5172        }
5173
5174        assert!(saw_more_follows);
5175        assert!(saw_last);
5176        for (idx, seq) in seqs.iter().enumerate() {
5177            assert_eq!(*seq as usize, idx);
5178        }
5179    }
5180
5181    #[tokio::test]
5182    async fn write_property_multiple_uses_configured_segment_window() {
5183        let (dl, state) = MockDataLink::new();
5184        let client = BacnetClient::with_datalink(dl)
5185            .with_response_timeout(Duration::from_secs(1))
5186            .with_segmented_request_window_size(4);
5187        let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
5188        let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
5189
5190        {
5191            let mut recv = state.recv.lock().await;
5192            for seq in 0u8..=254 {
5193                let mut apdu = [0u8; 16];
5194                let mut w = Writer::new(&mut apdu);
5195                SegmentAck {
5196                    negative_ack: false,
5197                    sent_by_server: true,
5198                    invoke_id: 1,
5199                    sequence_number: seq,
5200                    actual_window_size: 4,
5201                }
5202                .encode(&mut w)
5203                .unwrap();
5204                recv.push_back((with_npdu(w.as_written()), addr));
5205            }
5206
5207            let mut apdu = [0u8; 16];
5208            let mut w = Writer::new(&mut apdu);
5209            SimpleAck {
5210                invoke_id: 1,
5211                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5212            }
5213            .encode(&mut w)
5214            .unwrap();
5215            recv.push_back((with_npdu(w.as_written()), addr));
5216        }
5217
5218        let writes: Vec<PropertyWriteSpec> = (0..180)
5219            .map(|_| PropertyWriteSpec {
5220                property_id: PropertyId::Description,
5221                array_index: None,
5222                value: DataValue::CharacterString(
5223                    "rustbac segmented write test payload................................................................",
5224                ),
5225                priority: None,
5226            })
5227            .collect();
5228
5229        client
5230            .write_property_multiple(addr, object_id, &writes)
5231            .await
5232            .unwrap();
5233
5234        let sent = state.sent.lock().await;
5235        assert!(sent.len() > 4);
5236        for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
5237            let mut r = Reader::new(frame);
5238            let _npdu = Npdu::decode(&mut r).unwrap();
5239            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5240            assert!(hdr.segmented);
5241            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5242            assert_eq!(hdr.sequence_number, Some(idx as u8));
5243            assert_eq!(hdr.proposed_window_size, Some(4));
5244        }
5245    }
5246
5247    #[tokio::test]
5248    async fn write_property_multiple_adapts_window_to_peer_ack_window() {
5249        let (dl, state) = MockDataLink::new();
5250        let client = BacnetClient::with_datalink(dl)
5251            .with_response_timeout(Duration::from_secs(1))
5252            .with_segmented_request_window_size(4);
5253        let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
5254        let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
5255
5256        {
5257            let mut recv = state.recv.lock().await;
5258            for seq in 0u8..=254 {
5259                let mut apdu = [0u8; 16];
5260                let mut w = Writer::new(&mut apdu);
5261                SegmentAck {
5262                    negative_ack: false,
5263                    sent_by_server: true,
5264                    invoke_id: 1,
5265                    sequence_number: seq,
5266                    actual_window_size: 2,
5267                }
5268                .encode(&mut w)
5269                .unwrap();
5270                recv.push_back((with_npdu(w.as_written()), addr));
5271            }
5272
5273            let mut apdu = [0u8; 16];
5274            let mut w = Writer::new(&mut apdu);
5275            SimpleAck {
5276                invoke_id: 1,
5277                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5278            }
5279            .encode(&mut w)
5280            .unwrap();
5281            recv.push_back((with_npdu(w.as_written()), addr));
5282        }
5283
5284        let writes: Vec<PropertyWriteSpec> = (0..180)
5285            .map(|_| PropertyWriteSpec {
5286                property_id: PropertyId::Description,
5287                array_index: None,
5288                value: DataValue::CharacterString(
5289                    "rustbac segmented write test payload................................................................",
5290                ),
5291                priority: None,
5292            })
5293            .collect();
5294
5295        client
5296            .write_property_multiple(addr, object_id, &writes)
5297            .await
5298            .unwrap();
5299
5300        let sent = state.sent.lock().await;
5301        let mut saw_adapted_window = false;
5302        for (_, frame) in sent.iter() {
5303            let mut r = Reader::new(frame);
5304            let _npdu = Npdu::decode(&mut r).unwrap();
5305            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5306            if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
5307                saw_adapted_window = true;
5308                break;
5309            }
5310        }
5311        assert!(saw_adapted_window);
5312    }
5313
5314    #[tokio::test]
5315    async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
5316        let (dl, state) = MockDataLink::new();
5317        let client = BacnetClient::with_datalink(dl)
5318            .with_response_timeout(Duration::from_secs(1))
5319            .with_segmented_request_window_size(1)
5320            .with_segmented_request_retries(1);
5321        let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
5322        let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
5323
5324        {
5325            let mut recv = state.recv.lock().await;
5326
5327            let mut nack_apdu = [0u8; 16];
5328            let mut nack_w = Writer::new(&mut nack_apdu);
5329            SegmentAck {
5330                negative_ack: true,
5331                sent_by_server: true,
5332                invoke_id: 1,
5333                sequence_number: 0,
5334                actual_window_size: 1,
5335            }
5336            .encode(&mut nack_w)
5337            .unwrap();
5338            recv.push_back((with_npdu(nack_w.as_written()), addr));
5339
5340            for seq in 0u8..=254 {
5341                let mut apdu = [0u8; 16];
5342                let mut w = Writer::new(&mut apdu);
5343                SegmentAck {
5344                    negative_ack: false,
5345                    sent_by_server: true,
5346                    invoke_id: 1,
5347                    sequence_number: seq,
5348                    actual_window_size: 1,
5349                }
5350                .encode(&mut w)
5351                .unwrap();
5352                recv.push_back((with_npdu(w.as_written()), addr));
5353            }
5354
5355            let mut apdu = [0u8; 16];
5356            let mut w = Writer::new(&mut apdu);
5357            SimpleAck {
5358                invoke_id: 1,
5359                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5360            }
5361            .encode(&mut w)
5362            .unwrap();
5363            recv.push_back((with_npdu(w.as_written()), addr));
5364        }
5365
5366        let writes: Vec<PropertyWriteSpec> = (0..180)
5367            .map(|_| PropertyWriteSpec {
5368                property_id: PropertyId::Description,
5369                array_index: None,
5370                value: DataValue::CharacterString(
5371                    "rustbac segmented write test payload................................................................",
5372                ),
5373                priority: None,
5374            })
5375            .collect();
5376
5377        client
5378            .write_property_multiple(addr, object_id, &writes)
5379            .await
5380            .unwrap();
5381
5382        let sent = state.sent.lock().await;
5383        let mut seq0_frames = 0usize;
5384        for (_, frame) in sent.iter() {
5385            let mut r = Reader::new(frame);
5386            let _npdu = Npdu::decode(&mut r).unwrap();
5387            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5388            if hdr.sequence_number == Some(0) {
5389                seq0_frames += 1;
5390            }
5391        }
5392        assert!(seq0_frames >= 2);
5393    }
5394
5395    #[tokio::test]
5396    async fn read_property_ignores_invalid_frames_until_valid_response() {
5397        let (dl, state) = MockDataLink::new();
5398        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5399        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
5400        let state_for_task = state.clone();
5401
5402        tokio::spawn(async move {
5403            tokio::time::sleep(Duration::from_millis(20)).await;
5404            let mut apdu = [0u8; 128];
5405            let mut w = Writer::new(&mut apdu);
5406            ComplexAckHeader {
5407                segmented: false,
5408                more_follows: false,
5409                invoke_id: 1,
5410                sequence_number: None,
5411                proposed_window_size: None,
5412                service_choice: SERVICE_READ_PROPERTY,
5413            }
5414            .encode(&mut w)
5415            .unwrap();
5416            encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5417            encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
5418            Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5419            encode_app_real(&mut w, 77.0).unwrap();
5420            Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5421            state_for_task
5422                .recv
5423                .lock()
5424                .await
5425                .push_back((with_npdu(w.as_written()), addr));
5426        });
5427
5428        let value = client
5429            .read_property(
5430                addr,
5431                ObjectId::new(ObjectType::Device, 1),
5432                PropertyId::PresentValue,
5433            )
5434            .await
5435            .unwrap();
5436        assert!(matches!(
5437            value,
5438            ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
5439        ));
5440    }
5441
5442    #[tokio::test]
5443    async fn read_property_maps_reject() {
5444        let (dl, state) = MockDataLink::new();
5445        let client = BacnetClient::with_datalink(dl);
5446        let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
5447
5448        let mut apdu = [0u8; 8];
5449        let mut w = Writer::new(&mut apdu);
5450        w.write_u8((ApduType::Reject as u8) << 4).unwrap();
5451        w.write_u8(1).unwrap(); // invoke id
5452        w.write_u8(2).unwrap(); // reason
5453        state
5454            .recv
5455            .lock()
5456            .await
5457            .push_back((with_npdu(w.as_written()), addr));
5458
5459        let err = client
5460            .read_property(
5461                addr,
5462                ObjectId::new(ObjectType::Device, 1),
5463                PropertyId::ObjectName,
5464            )
5465            .await
5466            .unwrap_err();
5467        assert!(matches!(
5468            err,
5469            crate::ClientError::RemoteReject { reason: 2 }
5470        ));
5471    }
5472
5473    #[tokio::test]
5474    async fn read_property_maps_remote_error_details() {
5475        let (dl, state) = MockDataLink::new();
5476        let client = BacnetClient::with_datalink(dl);
5477        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5478
5479        let mut apdu = [0u8; 16];
5480        let mut w = Writer::new(&mut apdu);
5481        w.write_u8((ApduType::Error as u8) << 4).unwrap();
5482        w.write_u8(1).unwrap(); // invoke id
5483        w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
5484            .unwrap();
5485        Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
5486        w.write_u8(2).unwrap(); // property class
5487        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
5488        w.write_u8(32).unwrap(); // unknownProperty
5489
5490        state
5491            .recv
5492            .lock()
5493            .await
5494            .push_back((with_npdu(w.as_written()), addr));
5495
5496        let err = client
5497            .read_property(
5498                addr,
5499                ObjectId::new(ObjectType::Device, 1),
5500                PropertyId::ObjectName,
5501            )
5502            .await
5503            .unwrap_err();
5504        assert!(matches!(
5505            err,
5506            crate::ClientError::RemoteServiceError {
5507                service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
5508                error_class_raw: Some(2),
5509                error_code_raw: Some(32),
5510                error_class: Some(rustbac_core::types::ErrorClass::Property),
5511                error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
5512            }
5513        ));
5514    }
5515
5516    #[tokio::test]
5517    async fn write_property_maps_abort() {
5518        let (dl, state) = MockDataLink::new();
5519        let client = BacnetClient::with_datalink(dl);
5520        let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
5521
5522        let mut apdu = [0u8; 8];
5523        let mut w = Writer::new(&mut apdu);
5524        w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); // server abort
5525        w.write_u8(1).unwrap(); // invoke id
5526        w.write_u8(9).unwrap(); // reason
5527        state
5528            .recv
5529            .lock()
5530            .await
5531            .push_back((with_npdu(w.as_written()), addr));
5532
5533        let req = rustbac_core::services::write_property::WritePropertyRequest {
5534            object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
5535            property_id: PropertyId::PresentValue,
5536            value: DataValue::Real(10.0),
5537            priority: Some(8),
5538            ..Default::default()
5539        };
5540        let err = client.write_property(addr, req).await.unwrap_err();
5541        assert!(matches!(
5542            err,
5543            crate::ClientError::RemoteAbort {
5544                reason: 9,
5545                server: true
5546            }
5547        ));
5548    }
5549
5550    #[tokio::test]
5551    async fn read_property_multiple_returns_owned_string() {
5552        let (dl, state) = MockDataLink::new();
5553        let client = BacnetClient::with_datalink(dl);
5554        let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
5555        let object_id = ObjectId::new(ObjectType::Device, 1);
5556
5557        let mut apdu_buf = [0u8; 256];
5558        let mut w = Writer::new(&mut apdu_buf);
5559        ComplexAckHeader {
5560            segmented: false,
5561            more_follows: false,
5562            invoke_id: 1,
5563            sequence_number: None,
5564            proposed_window_size: None,
5565            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
5566        }
5567        .encode(&mut w)
5568        .unwrap();
5569        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
5570        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
5571            .encode(&mut w)
5572            .unwrap();
5573        encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
5574        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
5575            .encode(&mut w)
5576            .unwrap();
5577        rustbac_core::services::value_codec::encode_application_data_value(
5578            &mut w,
5579            &DataValue::CharacterString("AHU-1"),
5580        )
5581        .unwrap();
5582        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
5583            .encode(&mut w)
5584            .unwrap();
5585        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
5586            .encode(&mut w)
5587            .unwrap();
5588
5589        state
5590            .recv
5591            .lock()
5592            .await
5593            .push_back((with_npdu(w.as_written()), addr));
5594
5595        let values = client
5596            .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
5597            .await
5598            .unwrap();
5599        assert_eq!(values.len(), 1);
5600        assert_eq!(values[0].0, PropertyId::ObjectName);
5601        assert!(matches!(
5602            &values[0].1,
5603            ClientDataValue::CharacterString(s) if s == "AHU-1"
5604        ));
5605    }
5606
5607    #[tokio::test]
5608    async fn new_sc_rejects_invalid_endpoint() {
5609        let err = BacnetClient::new_sc("not a url").await.unwrap_err();
5610        assert!(matches!(err, crate::ClientError::DataLink(_)));
5611    }
5612}