Skip to main content

rustbac_client/
client.rs

1use crate::{
2    AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3    ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4    DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5    EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9    AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10    SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{
13    primitives::{decode_unsigned, encode_ctx_unsigned},
14    reader::Reader,
15    tag::Tag,
16    writer::Writer,
17};
18use rustbac_core::npdu::Npdu;
19use rustbac_core::services::acknowledge_alarm::{
20    AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
21};
22use rustbac_core::services::alarm_summary::{
23    AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
24    SERVICE_GET_ALARM_SUMMARY,
25};
26use rustbac_core::services::atomic_read_file::{
27    AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
28};
29use rustbac_core::services::atomic_write_file::{
30    AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
31};
32use rustbac_core::services::cov_notification::{
33    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
34    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
35};
36use rustbac_core::services::device_management::{
37    DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
38    ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
39};
40use rustbac_core::services::enrollment_summary::{
41    EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
42    GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
43};
44use rustbac_core::services::event_information::{
45    EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
46    SERVICE_GET_EVENT_INFORMATION,
47};
48use rustbac_core::services::event_notification::{
49    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
50    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
51};
52use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
53use rustbac_core::services::list_element::{
54    AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
55    SERVICE_REMOVE_LIST_ELEMENT,
56};
57use rustbac_core::services::object_management::{
58    CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
59    SERVICE_DELETE_OBJECT,
60};
61use rustbac_core::services::private_transfer::{
62    ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
63    SERVICE_CONFIRMED_PRIVATE_TRANSFER,
64};
65use rustbac_core::services::read_property::{
66    ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
67};
68use rustbac_core::services::read_property_multiple::{
69    PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
70    ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
71};
72use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
73use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
74use rustbac_core::services::subscribe_cov_property::{
75    SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
76};
77use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
78use rustbac_core::services::value_codec::encode_application_data_value;
79use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
80use rustbac_core::services::who_is::WhoIsRequest;
81use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
82use rustbac_core::services::write_property_multiple::{
83    PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
84    SERVICE_WRITE_PROPERTY_MULTIPLE,
85};
86use rustbac_core::types::{
87    DataValue, Date, ErrorClass, ErrorCode, ObjectId, ObjectType, PropertyId, Time,
88};
89use rustbac_core::EncodeError;
90use rustbac_datalink::bip::transport::{
91    BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
92};
93use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
94use std::collections::{HashMap, HashSet};
95use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
96use std::sync::RwLock;
97use std::time::Duration;
98use tokio::sync::Mutex;
99use tokio::task::JoinHandle;
100use tokio::time::{timeout, Instant};
101
102const MIN_SEGMENT_DATA_LEN: usize = 32;
103const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
104
105/// High-level async BACnet client.
106///
107/// `BacnetClient<D>` wraps any [`DataLink`] transport and exposes ergonomic methods for common
108/// BACnet operations: reading and writing properties, device and object discovery, COV
109/// subscriptions, alarm/event services, file I/O, and device management.
110///
111/// The invoke-id counter and the per-request I/O lock are stored behind `Mutex`es, so the
112/// client is intentionally not `Clone` — wrap it in an `Arc` to share it across tasks.
113///
114/// # Construction
115///
116/// - UDP/IP: [`BacnetClient::new()`] (local broadcast) or [`BacnetClient::new_foreign()`]
117///   (register as a foreign device with a BBMD).
118/// - BACnet/SC (WebSocket): [`BacnetClient::new_sc()`].
119/// - Custom transport: [`BacnetClient::with_datalink()`].
120pub struct BacnetClient<D: DataLink> {
121    datalink: D,
122    invoke_id: Mutex<u8>,
123    request_io_lock: Mutex<()>,
124    response_timeout: Duration,
125    segmented_request_window_size: u8,
126    segmented_request_retries: u8,
127    segment_ack_timeout: Duration,
128    /// Peer max-APDU sizes in bytes, populated from I-Am responses via `who_is`.
129    capability_cache: std::sync::Arc<RwLock<HashMap<DataLinkAddress, usize>>>,
130    /// Optional server handler for inline request dispatch.
131    server_handler: Option<std::sync::Arc<dyn crate::server::ServiceHandler>>,
132    /// Device instance number used for I-Am responses when serving inline.
133    server_device_id: u32,
134    /// Vendor ID used for I-Am responses when serving inline.
135    #[allow(unused)]
136    server_vendor_id: u16,
137}
138
139impl<D: DataLink + std::fmt::Debug> std::fmt::Debug for BacnetClient<D> {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("BacnetClient")
142            .field("datalink", &self.datalink)
143            .field("invoke_id", &self.invoke_id)
144            .field("response_timeout", &self.response_timeout)
145            .field(
146                "segmented_request_window_size",
147                &self.segmented_request_window_size,
148            )
149            .field("segmented_request_retries", &self.segmented_request_retries)
150            .field("segment_ack_timeout", &self.segment_ack_timeout)
151            .field(
152                "server_handler",
153                &self.server_handler.as_ref().map(|_| "..."),
154            )
155            .field("server_device_id", &self.server_device_id)
156            .field("server_vendor_id", &self.server_vendor_id)
157            .finish()
158    }
159}
160
161/// Handle for a background task that periodically re-registers this client as a BACnet
162/// foreign device with the BBMD.
163///
164/// Dropping this value aborts the renewal task automatically. Call [`ForeignDeviceRenewal::stop`]
165/// to abort it explicitly.
166#[derive(Debug)]
167pub struct ForeignDeviceRenewal {
168    task: JoinHandle<()>,
169}
170
171impl ForeignDeviceRenewal {
172    /// Abort the background renewal task immediately.
173    pub fn stop(self) {
174        self.task.abort();
175    }
176}
177
178impl Drop for ForeignDeviceRenewal {
179    fn drop(&mut self) {
180        self.task.abort();
181    }
182}
183
184impl BacnetClient<BacnetIpTransport> {
185    /// Create a UDP/IP BACnet client bound to an ephemeral port on all interfaces.
186    ///
187    /// Returns [`ClientError::DataLink`] if the socket cannot be bound.
188    pub async fn new() -> Result<Self, ClientError> {
189        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
190        let datalink = BacnetIpTransport::bind(bind_addr).await?;
191        Ok(Self {
192            datalink,
193            invoke_id: Mutex::new(1),
194            request_io_lock: Mutex::new(()),
195            response_timeout: Duration::from_secs(3),
196            segmented_request_window_size: 16,
197            segmented_request_retries: 2,
198            segment_ack_timeout: Duration::from_millis(500),
199            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
200            server_handler: None,
201            server_device_id: 0,
202            server_vendor_id: 0,
203        })
204    }
205
206    /// Create a UDP/IP BACnet client registered as a foreign device with the BBMD at
207    /// `bbmd_addr`.
208    ///
209    /// `ttl_seconds` is the time-to-live for the foreign-device registration. Returns
210    /// [`ClientError::DataLink`] if the socket cannot be bound or the initial registration
211    /// request fails.
212    pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
213        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
214        let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
215        datalink.register_foreign_device(ttl_seconds).await?;
216        Ok(Self {
217            datalink,
218            invoke_id: Mutex::new(1),
219            request_io_lock: Mutex::new(()),
220            response_timeout: Duration::from_secs(3),
221            segmented_request_window_size: 16,
222            segmented_request_retries: 2,
223            segment_ack_timeout: Duration::from_millis(500),
224            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
225            server_handler: None,
226            server_device_id: 0,
227            server_vendor_id: 0,
228        })
229    }
230
231    /// Re-register this client as a foreign device with the BBMD, using `ttl_seconds` as the new
232    /// time-to-live.
233    pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
234        let _io = self.request_io_lock.lock().await;
235        self.datalink.register_foreign_device(ttl_seconds).await?;
236        Ok(())
237    }
238
239    /// Read the Broadcast Distribution Table (BDT) from the BBMD.
240    pub async fn read_broadcast_distribution_table(
241        &self,
242    ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
243        let _io = self.request_io_lock.lock().await;
244        self.datalink
245            .read_broadcast_distribution_table()
246            .await
247            .map_err(ClientError::from)
248    }
249
250    /// Write (replace) the Broadcast Distribution Table on the BBMD with the given `entries`.
251    pub async fn write_broadcast_distribution_table(
252        &self,
253        entries: &[BroadcastDistributionEntry],
254    ) -> Result<(), ClientError> {
255        let _io = self.request_io_lock.lock().await;
256        self.datalink
257            .write_broadcast_distribution_table(entries)
258            .await?;
259        Ok(())
260    }
261
262    /// Read the Foreign Device Table (FDT) from the BBMD.
263    pub async fn read_foreign_device_table(
264        &self,
265    ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
266        let _io = self.request_io_lock.lock().await;
267        self.datalink
268            .read_foreign_device_table()
269            .await
270            .map_err(ClientError::from)
271    }
272
273    /// Delete the foreign device entry for `address` from the BBMD's Foreign Device Table.
274    pub async fn delete_foreign_device_table_entry(
275        &self,
276        address: SocketAddrV4,
277    ) -> Result<(), ClientError> {
278        let _io = self.request_io_lock.lock().await;
279        self.datalink
280            .delete_foreign_device_table_entry(address)
281            .await?;
282        Ok(())
283    }
284
285    /// Spawn a background task that re-registers this client as a foreign device at roughly
286    /// 75 % of `ttl_seconds` to keep the registration alive indefinitely.
287    ///
288    /// Returns a [`ForeignDeviceRenewal`] handle; dropping it (or calling `.stop()`) cancels
289    /// the task. Returns an error if `ttl_seconds` is 0.
290    pub fn start_foreign_device_renewal(
291        &self,
292        ttl_seconds: u16,
293    ) -> Result<ForeignDeviceRenewal, ClientError> {
294        if ttl_seconds == 0 {
295            return Err(EncodeError::InvalidLength.into());
296        }
297
298        let datalink = self.datalink.clone();
299        let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
300        let interval = Duration::from_secs(refresh_seconds.max(1));
301        let task = tokio::spawn(async move {
302            loop {
303                tokio::time::sleep(interval).await;
304                if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
305                    log::warn!("foreign device renewal send failed: {err}");
306                }
307            }
308        });
309        Ok(ForeignDeviceRenewal { task })
310    }
311}
312
313impl BacnetClient<BacnetScTransport> {
314    /// Create a BACnet/SC client by connecting to the WebSocket hub at `endpoint`
315    /// (e.g. `"wss://hub.example.com:47808/bacnet"`).
316    pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
317        let datalink = BacnetScTransport::connect(endpoint).await?;
318        Ok(Self::with_datalink(datalink))
319    }
320}
321
322impl<D: DataLink> BacnetClient<D> {
323    /// Create a client wrapping an already-constructed `datalink`.
324    ///
325    /// Use this when you need a custom or pre-configured [`DataLink`] implementation.
326    pub fn with_datalink(datalink: D) -> Self {
327        Self {
328            datalink,
329            invoke_id: Mutex::new(1),
330            request_io_lock: Mutex::new(()),
331            response_timeout: Duration::from_secs(3),
332            segmented_request_window_size: 16,
333            segmented_request_retries: 2,
334            segment_ack_timeout: Duration::from_millis(500),
335            capability_cache: std::sync::Arc::new(RwLock::new(HashMap::new())),
336            server_handler: None,
337            server_device_id: 0,
338            server_vendor_id: 0,
339        }
340    }
341
342    /// Override the per-request response timeout (default: 3 s).
343    pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
344        self.response_timeout = timeout;
345        self
346    }
347
348    /// Override the segmented-request window size (number of segments sent before waiting
349    /// for an ACK). Clamped to a minimum of 1. Default: 16.
350    pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
351        self.segmented_request_window_size = window_size.max(1);
352        self
353    }
354
355    /// Override the number of times a segment window is retried after a negative ACK or
356    /// segment-ACK timeout before giving up. Default: 2.
357    pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
358        self.segmented_request_retries = retries;
359        self
360    }
361
362    /// Override the timeout used when waiting for a segment ACK from the remote device
363    /// during a segmented confirmed request. Clamped to a minimum of 1 ms. Default: 500 ms.
364    pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
365        self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
366        self
367    }
368
369    /// Attach a [`ServiceHandler`](crate::server::ServiceHandler) so that incoming service
370    /// requests (e.g. ReadProperty, WriteProperty, Who-Is) are dispatched inline while the
371    /// client waits for responses.  This avoids the need for a separate
372    /// [`BacnetServer`](crate::server::BacnetServer) sharing the same transport.
373    ///
374    /// `device_id` and `vendor_id` are used for I-Am responses.
375    pub fn with_server_handler(
376        mut self,
377        handler: std::sync::Arc<dyn crate::server::ServiceHandler>,
378        device_id: u32,
379        vendor_id: u16,
380    ) -> Self {
381        self.server_handler = Some(handler);
382        self.server_device_id = device_id;
383        self.server_vendor_id = vendor_id;
384        self
385    }
386
387    /// Non-blocking poll that receives and dispatches a single incoming server request.
388    ///
389    /// Call this periodically when the client is idle (not waiting for a response) to service
390    /// requests such as ReadProperty or Who-Is from other BACnet devices.
391    ///
392    /// Returns `Ok(())` regardless of whether a request was received or dispatched.
393    /// Returns `Err(ClientError::Timeout)` if no server handler has been configured.
394    pub async fn poll_server(&self) -> Result<(), ClientError> {
395        let handler = self.server_handler.as_ref().ok_or(ClientError::Timeout)?;
396        let _io_lock = self.request_io_lock.lock().await;
397        let mut buf = [0u8; 1500];
398        match tokio::time::timeout(Duration::from_millis(50), self.datalink.recv(&mut buf)).await {
399            Ok(Ok((n, src))) => {
400                let _ = dispatch_incoming_request(
401                    &self.datalink,
402                    handler.as_ref(),
403                    self.server_device_id,
404                    self.server_vendor_id,
405                    &buf[..n],
406                    src,
407                )
408                .await;
409                Ok(())
410            }
411            _ => Ok(()),
412        }
413    }
414
415    async fn next_invoke_id(&self) -> u8 {
416        let mut lock = self.invoke_id.lock().await;
417        let id = *lock;
418        *lock = lock.wrapping_add(1);
419        if *lock == 0 {
420            *lock = 1;
421        }
422        id
423    }
424
425    async fn send_segment_ack(
426        &self,
427        address: DataLinkAddress,
428        invoke_id: u8,
429        sequence_number: u8,
430        window_size: u8,
431    ) -> Result<(), ClientError> {
432        let mut tx = [0u8; 64];
433        let mut w = Writer::new(&mut tx);
434        Npdu::new(0).encode(&mut w)?;
435        SegmentAck {
436            negative_ack: false,
437            sent_by_server: false,
438            invoke_id,
439            sequence_number,
440            actual_window_size: window_size,
441        }
442        .encode(&mut w)?;
443        self.datalink.send(address, w.as_written()).await?;
444        Ok(())
445    }
446
447    async fn recv_ignoring_invalid_frame(
448        &self,
449        buf: &mut [u8],
450        deadline: Instant,
451    ) -> Result<(usize, DataLinkAddress), ClientError> {
452        loop {
453            let remaining = deadline.saturating_duration_since(Instant::now());
454            if remaining.is_zero() {
455                return Err(ClientError::Timeout);
456            }
457
458            match timeout(remaining, self.datalink.recv(buf)).await {
459                Err(_) => return Err(ClientError::Timeout),
460                Ok(Err(DataLinkError::InvalidFrame)) => continue,
461                Ok(Err(e)) => return Err(e.into()),
462                Ok(Ok(v)) => return Ok(v),
463            }
464        }
465    }
466
467    async fn send_simple_ack(
468        &self,
469        address: DataLinkAddress,
470        invoke_id: u8,
471        service_choice: u8,
472    ) -> Result<(), ClientError> {
473        let mut tx = [0u8; 64];
474        let mut w = Writer::new(&mut tx);
475        Npdu::new(0).encode(&mut w)?;
476        SimpleAck {
477            invoke_id,
478            service_choice,
479        }
480        .encode(&mut w)?;
481        self.datalink.send(address, w.as_written()).await?;
482        Ok(())
483    }
484
485    fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
486    where
487        F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
488    {
489        for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
490            let mut buf = vec![0u8; size];
491            let mut w = Writer::new(&mut buf);
492            match encode(&mut w) {
493                Ok(()) => {
494                    let written_len = w.as_written().len();
495                    buf.truncate(written_len);
496                    return Ok(buf);
497                }
498                Err(EncodeError::BufferTooSmall) => continue,
499                Err(e) => return Err(e.into()),
500            }
501        }
502        Err(ClientError::SegmentedRequestTooLarge)
503    }
504
505    const fn max_apdu_octets(max_apdu_code: u8) -> usize {
506        match max_apdu_code & 0x0f {
507            0 => 50,
508            1 => 128,
509            2 => 206,
510            3 => 480,
511            4 => 1024,
512            5 => 1476,
513            _ => 480,
514        }
515    }
516
517    async fn await_segment_ack(
518        &self,
519        address: DataLinkAddress,
520        invoke_id: u8,
521        service_choice: u8,
522        expected_sequence: u8,
523        deadline: Instant,
524    ) -> Result<SegmentAck, ClientError> {
525        loop {
526            let remaining = deadline.saturating_duration_since(Instant::now());
527            if remaining.is_zero() {
528                return Err(ClientError::Timeout);
529            }
530
531            let mut rx = [0u8; 1500];
532            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
533            let (n, src) = match recv {
534                Err(_) => return Err(ClientError::Timeout),
535                Ok(Err(DataLinkError::InvalidFrame)) => continue,
536                Ok(Err(e)) => return Err(e.into()),
537                Ok(Ok(v)) => v,
538            };
539            if src != address {
540                continue;
541            }
542
543            let Ok(apdu) = extract_apdu(&rx[..n]) else {
544                continue;
545            };
546            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
547            match ApduType::from_u8(first >> 4) {
548                Some(ApduType::SegmentAck) => {
549                    let mut r = Reader::new(apdu);
550                    let ack = SegmentAck::decode(&mut r)?;
551                    if ack.invoke_id != invoke_id || !ack.sent_by_server {
552                        continue;
553                    }
554                    if ack.negative_ack {
555                        return Err(ClientError::SegmentNegativeAck {
556                            sequence_number: ack.sequence_number,
557                        });
558                    }
559                    if ack.sequence_number == expected_sequence {
560                        return Ok(ack);
561                    }
562                }
563                Some(ApduType::Error) => {
564                    let mut r = Reader::new(apdu);
565                    let err = BacnetError::decode(&mut r)?;
566                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
567                        return Err(remote_service_error(err));
568                    }
569                }
570                Some(ApduType::Reject) => {
571                    let mut r = Reader::new(apdu);
572                    let rej = RejectPdu::decode(&mut r)?;
573                    if rej.invoke_id == invoke_id {
574                        return Err(ClientError::RemoteReject { reason: rej.reason });
575                    }
576                }
577                Some(ApduType::Abort) => {
578                    let mut r = Reader::new(apdu);
579                    let abort = AbortPdu::decode(&mut r)?;
580                    if abort.invoke_id == invoke_id {
581                        return Err(ClientError::RemoteAbort {
582                            reason: abort.reason,
583                            server: abort.server,
584                        });
585                    }
586                }
587                _ => continue,
588            }
589        }
590    }
591
592    async fn send_confirmed_request(
593        &self,
594        address: DataLinkAddress,
595        frame: &[u8],
596        deadline: Instant,
597    ) -> Result<(), ClientError> {
598        let mut pr = Reader::new(frame);
599        let _npdu = Npdu::decode(&mut pr)?;
600        let npdu_len = frame.len() - pr.remaining();
601        let npdu_bytes = &frame[..npdu_len];
602        let apdu = &frame[npdu_len..];
603
604        let mut ar = Reader::new(apdu);
605        let header = ConfirmedRequestHeader::decode(&mut ar)?;
606        let service_payload = ar.read_exact(ar.remaining())?;
607
608        // Use the peer's max-APDU if we learned it from a prior I-Am; fall back to
609        // the code declared in the request header (our own capability advertisement).
610        let peer_max_apdu = self
611            .capability_cache
612            .read()
613            .ok()
614            .and_then(|c| c.get(&address).copied())
615            .unwrap_or_else(|| Self::max_apdu_octets(header.max_apdu));
616        let segment_data_len = peer_max_apdu.saturating_sub(5).max(MIN_SEGMENT_DATA_LEN);
617        let segment_count = service_payload.len().div_ceil(segment_data_len);
618
619        if segment_count <= 1 {
620            self.datalink.send(address, frame).await?;
621            return Ok(());
622        }
623
624        if segment_count > usize::from(u8::MAX) + 1 {
625            return Err(ClientError::SegmentedRequestTooLarge);
626        }
627
628        let configured_window_size = self.segmented_request_window_size.max(1);
629        let mut window_size = configured_window_size;
630        let mut peer_window_ceiling = configured_window_size;
631        let mut batch_start = 0usize;
632        while batch_start < segment_count {
633            let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
634            let expected_sequence = (batch_end - 1) as u8;
635
636            let mut frames = Vec::with_capacity(batch_end - batch_start);
637            for segment_index in batch_start..batch_end {
638                let seq = segment_index as u8;
639                let more_follows = segment_index + 1 < segment_count;
640                let start = segment_index * segment_data_len;
641                let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
642                let segment = &service_payload[start..end];
643
644                let seg_header = ConfirmedRequestHeader {
645                    segmented: true,
646                    more_follows,
647                    segmented_response_accepted: header.segmented_response_accepted,
648                    max_segments: header.max_segments,
649                    max_apdu: header.max_apdu,
650                    invoke_id: header.invoke_id,
651                    sequence_number: Some(seq),
652                    proposed_window_size: Some(window_size),
653                    service_choice: header.service_choice,
654                };
655
656                let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
657                let written_len = {
658                    let mut w = Writer::new(&mut tx);
659                    w.write_all(npdu_bytes)?;
660                    seg_header.encode(&mut w)?;
661                    w.write_all(segment)?;
662                    w.as_written().len()
663                };
664                tx.truncate(written_len);
665                frames.push(tx);
666            }
667
668            let mut retries_remaining = self.segmented_request_retries;
669            loop {
670                for frame in &frames {
671                    self.datalink.send(address, frame).await?;
672                }
673
674                if batch_end == segment_count {
675                    break;
676                }
677
678                let remaining = deadline.saturating_duration_since(Instant::now());
679                if remaining.is_zero() {
680                    return Err(ClientError::Timeout);
681                }
682                let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
683                match self
684                    .await_segment_ack(
685                        address,
686                        header.invoke_id,
687                        header.service_choice,
688                        expected_sequence,
689                        ack_wait_deadline,
690                    )
691                    .await
692                {
693                    Ok(ack) => {
694                        peer_window_ceiling =
695                            peer_window_ceiling.min(ack.actual_window_size.max(1));
696                        window_size = window_size
697                            .saturating_add(1)
698                            .min(configured_window_size)
699                            .min(peer_window_ceiling)
700                            .max(1);
701                        break;
702                    }
703                    Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
704                        if retries_remaining > 0 =>
705                    {
706                        retries_remaining -= 1;
707                        window_size = window_size.saturating_div(2).max(1);
708                        continue;
709                    }
710                    Err(e) => return Err(e),
711                }
712            }
713
714            batch_start = batch_end;
715        }
716
717        Ok(())
718    }
719
720    async fn collect_complex_ack_payload(
721        &self,
722        address: DataLinkAddress,
723        invoke_id: u8,
724        service_choice: u8,
725        first_header: ComplexAckHeader,
726        first_payload: &[u8],
727        deadline: Instant,
728    ) -> Result<Vec<u8>, ClientError> {
729        let mut payload = first_payload.to_vec();
730        if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
731            return Err(ClientError::ResponseTooLarge {
732                limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
733            });
734        }
735        if !first_header.segmented {
736            return Ok(payload);
737        }
738
739        let mut last_seq = first_header
740            .sequence_number
741            .ok_or(ClientError::UnsupportedResponse)?;
742        let mut window_size = first_header.proposed_window_size.unwrap_or(1);
743        self.send_segment_ack(address, invoke_id, last_seq, window_size)
744            .await?;
745        let mut more_follows = first_header.more_follows;
746
747        while more_follows {
748            let mut rx = [0u8; 1500];
749            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
750            if src != address {
751                // Try to dispatch as an incoming server request
752                if let Some(ref handler) = self.server_handler {
753                    let _ = dispatch_incoming_request(
754                        &self.datalink,
755                        handler.as_ref(),
756                        self.server_device_id,
757                        self.server_vendor_id,
758                        &rx[..n],
759                        src,
760                    )
761                    .await;
762                }
763                continue;
764            }
765
766            let Ok(apdu) = extract_apdu(&rx[..n]) else {
767                continue;
768            };
769            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
770            match ApduType::from_u8(first >> 4) {
771                Some(ApduType::ComplexAck) => {
772                    let mut r = Reader::new(apdu);
773                    let seg = ComplexAckHeader::decode(&mut r)?;
774                    if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
775                        continue;
776                    }
777                    if !seg.segmented {
778                        return Err(ClientError::UnsupportedResponse);
779                    }
780                    let seq = seg
781                        .sequence_number
782                        .ok_or(ClientError::UnsupportedResponse)?;
783                    if seq == last_seq {
784                        // Duplicate segment: acknowledge again and continue waiting.
785                        self.send_segment_ack(address, invoke_id, last_seq, window_size)
786                            .await?;
787                        continue;
788                    }
789                    if seq != last_seq.wrapping_add(1) {
790                        continue;
791                    }
792
793                    let seg_payload = r.read_exact(r.remaining())?;
794                    if payload.len().saturating_add(seg_payload.len())
795                        > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
796                    {
797                        return Err(ClientError::ResponseTooLarge {
798                            limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
799                        });
800                    }
801                    payload.extend_from_slice(seg_payload);
802
803                    last_seq = seq;
804                    more_follows = seg.more_follows;
805                    window_size = seg.proposed_window_size.unwrap_or(window_size);
806                    self.send_segment_ack(address, invoke_id, last_seq, window_size)
807                        .await?;
808                }
809                Some(ApduType::Error) => {
810                    let mut r = Reader::new(apdu);
811                    let err = BacnetError::decode(&mut r)?;
812                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
813                        return Err(remote_service_error(err));
814                    }
815                }
816                Some(ApduType::Reject) => {
817                    let mut r = Reader::new(apdu);
818                    let rej = RejectPdu::decode(&mut r)?;
819                    if rej.invoke_id == invoke_id {
820                        return Err(ClientError::RemoteReject { reason: rej.reason });
821                    }
822                }
823                Some(ApduType::Abort) => {
824                    let mut r = Reader::new(apdu);
825                    let abort = AbortPdu::decode(&mut r)?;
826                    if abort.invoke_id == invoke_id {
827                        return Err(ClientError::RemoteAbort {
828                            reason: abort.reason,
829                            server: abort.server,
830                        });
831                    }
832                }
833                _ => {
834                    // Try to dispatch as an incoming server request
835                    if let Some(ref handler) = self.server_handler {
836                        let _ = dispatch_incoming_request(
837                            &self.datalink,
838                            handler.as_ref(),
839                            self.server_device_id,
840                            self.server_vendor_id,
841                            &rx[..n],
842                            src,
843                        )
844                        .await;
845                    }
846                    continue;
847                }
848            }
849        }
850
851        Ok(payload)
852    }
853
854    /// Broadcast a Who-Is request and collect I-Am replies for the duration of `wait`.
855    ///
856    /// `range` constrains the device-instance range as `(low, high)`; `None` performs a
857    /// global Who-Is. Duplicate devices (same object id) are deduplicated. Returns
858    /// [`ClientError::DataLink`] on send/receive failure.
859    pub async fn who_is(
860        &self,
861        range: Option<(u32, u32)>,
862        wait: Duration,
863    ) -> Result<Vec<DiscoveredDevice>, ClientError> {
864        // Unconfirmed broadcast — no request_io_lock needed; the recv loop
865        // filters on service_choice so it coexists with concurrent confirmed requests.
866        let req = match range {
867            Some((low, high)) => WhoIsRequest {
868                low_limit: Some(low),
869                high_limit: Some(high),
870            },
871            None => WhoIsRequest::global(),
872        };
873
874        let mut tx = [0u8; 128];
875        let mut w = Writer::new(&mut tx);
876        Npdu::new(0).encode(&mut w)?;
877        req.encode(&mut w)?;
878
879        self.datalink
880            .send(
881                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
882                w.as_written(),
883            )
884            .await?;
885
886        let mut devices = Vec::new();
887        let mut seen = HashSet::new();
888        let deadline = tokio::time::Instant::now() + wait;
889
890        while tokio::time::Instant::now() < deadline {
891            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
892            let mut rx = [0u8; 1500];
893            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
894            match recv {
895                Ok(Ok((n, src))) => {
896                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
897                        continue;
898                    };
899                    let mut r = Reader::new(apdu);
900                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
901                        continue;
902                    };
903                    if unconfirmed.service_choice != SERVICE_I_AM {
904                        continue;
905                    }
906                    let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
907                        continue;
908                    };
909                    if seen.insert(i_am.device_id) {
910                        devices.push(DiscoveredDevice {
911                            address: src,
912                            device_id: Some(i_am.device_id),
913                        });
914                        // Cache the peer's reported max-APDU so segmented
915                        // requests can be sized correctly without a separate read.
916                        if let Ok(mut cache) = self.capability_cache.write() {
917                            cache.insert(src, i_am.max_apdu as usize);
918                        }
919                    }
920                }
921                Ok(Err(DataLinkError::InvalidFrame)) => continue,
922                Ok(Err(e)) => return Err(e.into()),
923                Err(_) => break,
924            }
925        }
926
927        Ok(devices)
928    }
929
930    /// Broadcast a Who-Has request for a specific object id and collect I-Have replies for
931    /// `wait`.
932    ///
933    /// `range` constrains the responding device-instance range; `None` means any device.
934    pub async fn who_has_object_id(
935        &self,
936        range: Option<(u32, u32)>,
937        object_id: ObjectId,
938        wait: Duration,
939    ) -> Result<Vec<DiscoveredObject>, ClientError> {
940        let req = WhoHasRequest {
941            low_limit: range.map(|(low, _)| low),
942            high_limit: range.map(|(_, high)| high),
943            object: WhoHasObject::ObjectId(object_id),
944        };
945        self.who_has(req, wait).await
946    }
947
948    /// Broadcast a Who-Has request for an object by name and collect I-Have replies for `wait`.
949    ///
950    /// `range` constrains the responding device-instance range; `None` means any device.
951    pub async fn who_has_object_name(
952        &self,
953        range: Option<(u32, u32)>,
954        object_name: &str,
955        wait: Duration,
956    ) -> Result<Vec<DiscoveredObject>, ClientError> {
957        let req = WhoHasRequest {
958            low_limit: range.map(|(low, _)| low),
959            high_limit: range.map(|(_, high)| high),
960            object: WhoHasObject::ObjectName(object_name),
961        };
962        self.who_has(req, wait).await
963    }
964
965    async fn who_has(
966        &self,
967        request: WhoHasRequest<'_>,
968        wait: Duration,
969    ) -> Result<Vec<DiscoveredObject>, ClientError> {
970        // Unconfirmed broadcast — same rationale as who_is.
971        let tx = self.encode_with_growth(|w| {
972            Npdu::new(0).encode(w)?;
973            request.encode(w)
974        })?;
975        self.datalink
976            .send(
977                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
978                &tx,
979            )
980            .await?;
981
982        let mut objects = Vec::new();
983        let mut seen = HashSet::new();
984        let deadline = tokio::time::Instant::now() + wait;
985
986        while tokio::time::Instant::now() < deadline {
987            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
988            let mut rx = [0u8; 1500];
989            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
990            match recv {
991                Ok(Ok((n, src))) => {
992                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
993                        continue;
994                    };
995                    let mut r = Reader::new(apdu);
996                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
997                        continue;
998                    };
999                    if unconfirmed.service_choice != SERVICE_I_HAVE {
1000                        continue;
1001                    }
1002                    let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
1003                        continue;
1004                    };
1005                    if !seen.insert((src, i_have.object_id.raw())) {
1006                        continue;
1007                    }
1008                    objects.push(DiscoveredObject {
1009                        address: src,
1010                        device_id: i_have.device_id,
1011                        object_id: i_have.object_id,
1012                        object_name: i_have.object_name.to_string(),
1013                    });
1014                }
1015                Ok(Err(DataLinkError::InvalidFrame)) => continue,
1016                Ok(Err(e)) => return Err(e.into()),
1017                Err(_) => break,
1018            }
1019        }
1020
1021        Ok(objects)
1022    }
1023
1024    /// Send a DeviceCommunicationControl request to a device.
1025    ///
1026    /// `time_duration_seconds` sets the duration for which the state applies; `None` means
1027    /// indefinite. `enable_disable` selects the new communication state. `password` is only
1028    /// required if the device is password-protected.
1029    pub async fn device_communication_control(
1030        &self,
1031        address: DataLinkAddress,
1032        time_duration_seconds: Option<u16>,
1033        enable_disable: DeviceCommunicationState,
1034        password: Option<&str>,
1035    ) -> Result<(), ClientError> {
1036        let invoke_id = self.next_invoke_id().await;
1037        let request = DeviceCommunicationControlRequest {
1038            time_duration_seconds,
1039            enable_disable,
1040            password,
1041            invoke_id,
1042        };
1043        let tx = self.encode_with_growth(|w| {
1044            Npdu::new(0).encode(w)?;
1045            request.encode(w)
1046        })?;
1047        self.await_simple_ack_or_error(
1048            address,
1049            &tx,
1050            invoke_id,
1051            SERVICE_DEVICE_COMMUNICATION_CONTROL,
1052            self.response_timeout,
1053        )
1054        .await
1055    }
1056
1057    /// Send a ReinitializeDevice request to a device (e.g. cold-start, warm-start, or backup).
1058    ///
1059    /// `state` selects the reinitialization type. `password` is sent only if `Some`.
1060    pub async fn reinitialize_device(
1061        &self,
1062        address: DataLinkAddress,
1063        state: ReinitializeState,
1064        password: Option<&str>,
1065    ) -> Result<(), ClientError> {
1066        let invoke_id = self.next_invoke_id().await;
1067        let request = ReinitializeDeviceRequest {
1068            state,
1069            password,
1070            invoke_id,
1071        };
1072        let tx = self.encode_with_growth(|w| {
1073            Npdu::new(0).encode(w)?;
1074            request.encode(w)
1075        })?;
1076        self.await_simple_ack_or_error(
1077            address,
1078            &tx,
1079            invoke_id,
1080            SERVICE_REINITIALIZE_DEVICE,
1081            self.response_timeout,
1082        )
1083        .await
1084    }
1085
1086    /// Send a TimeSynchronization (or UTCTimeSynchronization) request to a device.
1087    ///
1088    /// Set `utc` to `true` to send the UTC variant of the request.
1089    /// This is a fire-and-forget unconfirmed service; no response is awaited.
1090    pub async fn time_synchronize(
1091        &self,
1092        address: DataLinkAddress,
1093        date: Date,
1094        time: Time,
1095        utc: bool,
1096    ) -> Result<(), ClientError> {
1097        let request = if utc {
1098            TimeSynchronizationRequest::utc(date, time)
1099        } else {
1100            TimeSynchronizationRequest::local(date, time)
1101        };
1102        let tx = self.encode_with_growth(|w| {
1103            Npdu::new(0).encode(w)?;
1104            request.encode(w)
1105        })?;
1106        self.datalink.send(address, &tx).await?;
1107        Ok(())
1108    }
1109
1110    /// Create a new object of the given type on the device, letting the device choose the
1111    /// instance number. Returns the [`ObjectId`] assigned by the device.
1112    pub async fn create_object_by_type(
1113        &self,
1114        address: DataLinkAddress,
1115        object_type: rustbac_core::types::ObjectType,
1116    ) -> Result<ObjectId, ClientError> {
1117        self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
1118            .await
1119    }
1120
1121    /// Send a CreateObject request to the device and return the [`ObjectId`] of the newly
1122    /// created object. Use this variant when you need fine-grained control over the request
1123    /// (e.g. specifying initial property values).
1124    pub async fn create_object(
1125        &self,
1126        address: DataLinkAddress,
1127        mut request: CreateObjectRequest,
1128    ) -> Result<ObjectId, ClientError> {
1129        request.invoke_id = self.next_invoke_id().await;
1130        let invoke_id = request.invoke_id;
1131        let tx = self.encode_with_growth(|w| {
1132            Npdu::new(0).encode(w)?;
1133            request.encode(w)
1134        })?;
1135        let payload = self
1136            .await_complex_ack_payload_or_error(
1137                address,
1138                &tx,
1139                invoke_id,
1140                SERVICE_CREATE_OBJECT,
1141                self.response_timeout,
1142            )
1143            .await?;
1144        let mut pr = Reader::new(&payload);
1145        let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
1146        Ok(parsed.object_id)
1147    }
1148
1149    /// Send a DeleteObject request to remove `object_id` from the device.
1150    pub async fn delete_object(
1151        &self,
1152        address: DataLinkAddress,
1153        object_id: ObjectId,
1154    ) -> Result<(), ClientError> {
1155        let invoke_id = self.next_invoke_id().await;
1156        let request = DeleteObjectRequest {
1157            object_id,
1158            invoke_id,
1159        };
1160        let tx = self.encode_with_growth(|w| {
1161            Npdu::new(0).encode(w)?;
1162            request.encode(w)
1163        })?;
1164        self.await_simple_ack_or_error(
1165            address,
1166            &tx,
1167            invoke_id,
1168            SERVICE_DELETE_OBJECT,
1169            self.response_timeout,
1170        )
1171        .await
1172    }
1173
1174    /// Send an AddListElement request to append elements to a list property on the device.
1175    pub async fn add_list_element(
1176        &self,
1177        address: DataLinkAddress,
1178        mut request: AddListElementRequest<'_>,
1179    ) -> Result<(), ClientError> {
1180        request.invoke_id = self.next_invoke_id().await;
1181        let invoke_id = request.invoke_id;
1182        let tx = self.encode_with_growth(|w| {
1183            Npdu::new(0).encode(w)?;
1184            request.encode(w)
1185        })?;
1186        self.await_simple_ack_or_error(
1187            address,
1188            &tx,
1189            invoke_id,
1190            SERVICE_ADD_LIST_ELEMENT,
1191            self.response_timeout,
1192        )
1193        .await
1194    }
1195
1196    /// Send a RemoveListElement request to delete elements from a list property on the device.
1197    pub async fn remove_list_element(
1198        &self,
1199        address: DataLinkAddress,
1200        mut request: RemoveListElementRequest<'_>,
1201    ) -> Result<(), ClientError> {
1202        request.invoke_id = self.next_invoke_id().await;
1203        let invoke_id = request.invoke_id;
1204        let tx = self.encode_with_growth(|w| {
1205            Npdu::new(0).encode(w)?;
1206            request.encode(w)
1207        })?;
1208        self.await_simple_ack_or_error(
1209            address,
1210            &tx,
1211            invoke_id,
1212            SERVICE_REMOVE_LIST_ELEMENT,
1213            self.response_timeout,
1214        )
1215        .await
1216    }
1217
1218    async fn await_simple_ack_or_error(
1219        &self,
1220        address: DataLinkAddress,
1221        tx: &[u8],
1222        invoke_id: u8,
1223        service_choice: u8,
1224        timeout_window: Duration,
1225    ) -> Result<(), ClientError> {
1226        #[cfg(feature = "tracing")]
1227        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1228        let _io_lock = self.request_io_lock.lock().await;
1229        let deadline = tokio::time::Instant::now() + timeout_window;
1230        self.send_confirmed_request(address, tx, deadline).await?;
1231        while tokio::time::Instant::now() < deadline {
1232            let mut rx = [0u8; 1500];
1233            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1234            if src != address {
1235                // Try to dispatch as an incoming server request
1236                if let Some(ref handler) = self.server_handler {
1237                    let _ = dispatch_incoming_request(
1238                        &self.datalink,
1239                        handler.as_ref(),
1240                        self.server_device_id,
1241                        self.server_vendor_id,
1242                        &rx[..n],
1243                        src,
1244                    )
1245                    .await;
1246                }
1247                continue;
1248            }
1249            let apdu = extract_apdu(&rx[..n])?;
1250            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1251            match ApduType::from_u8(first >> 4) {
1252                Some(ApduType::SimpleAck) => {
1253                    let mut r = Reader::new(apdu);
1254                    let ack = SimpleAck::decode(&mut r)?;
1255                    if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1256                        return Ok(());
1257                    }
1258                }
1259                Some(ApduType::Error) => {
1260                    let mut r = Reader::new(apdu);
1261                    let err = BacnetError::decode(&mut r)?;
1262                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1263                        return Err(remote_service_error(err));
1264                    }
1265                }
1266                Some(ApduType::Reject) => {
1267                    let mut r = Reader::new(apdu);
1268                    let rej = RejectPdu::decode(&mut r)?;
1269                    if rej.invoke_id == invoke_id {
1270                        return Err(ClientError::RemoteReject { reason: rej.reason });
1271                    }
1272                }
1273                Some(ApduType::Abort) => {
1274                    let mut r = Reader::new(apdu);
1275                    let abort = AbortPdu::decode(&mut r)?;
1276                    if abort.invoke_id == invoke_id {
1277                        return Err(ClientError::RemoteAbort {
1278                            reason: abort.reason,
1279                            server: abort.server,
1280                        });
1281                    }
1282                }
1283                _ => {
1284                    // Try to dispatch as an incoming server request
1285                    if let Some(ref handler) = self.server_handler {
1286                        let _ = dispatch_incoming_request(
1287                            &self.datalink,
1288                            handler.as_ref(),
1289                            self.server_device_id,
1290                            self.server_vendor_id,
1291                            &rx[..n],
1292                            src,
1293                        )
1294                        .await;
1295                    }
1296                    continue;
1297                }
1298            }
1299        }
1300        #[cfg(feature = "tracing")]
1301        tracing::warn!(invoke_id = invoke_id, "request timed out");
1302        Err(ClientError::Timeout)
1303    }
1304
1305    async fn await_complex_ack_payload_or_error(
1306        &self,
1307        address: DataLinkAddress,
1308        tx: &[u8],
1309        invoke_id: u8,
1310        service_choice: u8,
1311        timeout_window: Duration,
1312    ) -> Result<Vec<u8>, ClientError> {
1313        #[cfg(feature = "tracing")]
1314        tracing::debug!(invoke_id = invoke_id, service = service_choice, target = %address, "sending confirmed request");
1315        let _io_lock = self.request_io_lock.lock().await;
1316        let deadline = tokio::time::Instant::now() + timeout_window;
1317        self.send_confirmed_request(address, tx, deadline).await?;
1318        while tokio::time::Instant::now() < deadline {
1319            let mut rx = [0u8; 1500];
1320            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1321            if src != address {
1322                // Try to dispatch as an incoming server request
1323                if let Some(ref handler) = self.server_handler {
1324                    let _ = dispatch_incoming_request(
1325                        &self.datalink,
1326                        handler.as_ref(),
1327                        self.server_device_id,
1328                        self.server_vendor_id,
1329                        &rx[..n],
1330                        src,
1331                    )
1332                    .await;
1333                }
1334                continue;
1335            }
1336
1337            let apdu = extract_apdu(&rx[..n])?;
1338            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1339            match ApduType::from_u8(first >> 4) {
1340                Some(ApduType::ComplexAck) => {
1341                    let mut r = Reader::new(apdu);
1342                    let ack = ComplexAckHeader::decode(&mut r)?;
1343                    if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1344                        continue;
1345                    }
1346                    return self
1347                        .collect_complex_ack_payload(
1348                            address,
1349                            invoke_id,
1350                            service_choice,
1351                            ack,
1352                            r.read_exact(r.remaining())?,
1353                            deadline,
1354                        )
1355                        .await;
1356                }
1357                Some(ApduType::Error) => {
1358                    let mut r = Reader::new(apdu);
1359                    let err = BacnetError::decode(&mut r)?;
1360                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1361                        return Err(remote_service_error(err));
1362                    }
1363                }
1364                Some(ApduType::Reject) => {
1365                    let mut r = Reader::new(apdu);
1366                    let rej = RejectPdu::decode(&mut r)?;
1367                    if rej.invoke_id == invoke_id {
1368                        return Err(ClientError::RemoteReject { reason: rej.reason });
1369                    }
1370                }
1371                Some(ApduType::Abort) => {
1372                    let mut r = Reader::new(apdu);
1373                    let abort = AbortPdu::decode(&mut r)?;
1374                    if abort.invoke_id == invoke_id {
1375                        return Err(ClientError::RemoteAbort {
1376                            reason: abort.reason,
1377                            server: abort.server,
1378                        });
1379                    }
1380                }
1381                _ => {
1382                    // Try to dispatch as an incoming server request
1383                    if let Some(ref handler) = self.server_handler {
1384                        let _ = dispatch_incoming_request(
1385                            &self.datalink,
1386                            handler.as_ref(),
1387                            self.server_device_id,
1388                            self.server_vendor_id,
1389                            &rx[..n],
1390                            src,
1391                        )
1392                        .await;
1393                    }
1394                    continue;
1395                }
1396            }
1397        }
1398        #[cfg(feature = "tracing")]
1399        tracing::warn!(invoke_id = invoke_id, "request timed out");
1400        Err(ClientError::Timeout)
1401    }
1402
1403    /// Send a GetAlarmSummary request and return the list of active alarms on the device.
1404    pub async fn get_alarm_summary(
1405        &self,
1406        address: DataLinkAddress,
1407    ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1408        let invoke_id = self.next_invoke_id().await;
1409        let request = GetAlarmSummaryRequest { invoke_id };
1410        let tx = self.encode_with_growth(|w| {
1411            Npdu::new(0).encode(w)?;
1412            request.encode(w)
1413        })?;
1414        let payload = self
1415            .await_complex_ack_payload_or_error(
1416                address,
1417                &tx,
1418                invoke_id,
1419                SERVICE_GET_ALARM_SUMMARY,
1420                self.response_timeout,
1421            )
1422            .await?;
1423        let mut pr = Reader::new(&payload);
1424        let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1425        Ok(into_client_alarm_summary(parsed.summaries))
1426    }
1427
1428    /// Send a GetEnrollmentSummary request and return the list of event enrollments on the device.
1429    pub async fn get_enrollment_summary(
1430        &self,
1431        address: DataLinkAddress,
1432    ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1433        let invoke_id = self.next_invoke_id().await;
1434        let request = GetEnrollmentSummaryRequest { invoke_id };
1435        let tx = self.encode_with_growth(|w| {
1436            Npdu::new(0).encode(w)?;
1437            request.encode(w)
1438        })?;
1439        let payload = self
1440            .await_complex_ack_payload_or_error(
1441                address,
1442                &tx,
1443                invoke_id,
1444                SERVICE_GET_ENROLLMENT_SUMMARY,
1445                self.response_timeout,
1446            )
1447            .await?;
1448        let mut pr = Reader::new(&payload);
1449        let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1450        Ok(into_client_enrollment_summary(parsed.summaries))
1451    }
1452
1453    /// Send a GetEventInformation request and return active event summaries from the device.
1454    ///
1455    /// Pass `last_received_object_id` to page through results; the returned
1456    /// [`EventInformationResult::more_events`] indicates whether another call is needed.
1457    pub async fn get_event_information(
1458        &self,
1459        address: DataLinkAddress,
1460        last_received_object_id: Option<ObjectId>,
1461    ) -> Result<EventInformationResult, ClientError> {
1462        let invoke_id = self.next_invoke_id().await;
1463        let request = GetEventInformationRequest {
1464            last_received_object_id,
1465            invoke_id,
1466        };
1467        let tx = self.encode_with_growth(|w| {
1468            Npdu::new(0).encode(w)?;
1469            request.encode(w)
1470        })?;
1471        let payload = self
1472            .await_complex_ack_payload_or_error(
1473                address,
1474                &tx,
1475                invoke_id,
1476                SERVICE_GET_EVENT_INFORMATION,
1477                self.response_timeout,
1478            )
1479            .await?;
1480        let mut pr = Reader::new(&payload);
1481        let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1482        Ok(EventInformationResult {
1483            summaries: into_client_event_information(parsed.summaries),
1484            more_events: parsed.more_events,
1485        })
1486    }
1487
1488    /// Send an AcknowledgeAlarm request to the device.
1489    pub async fn acknowledge_alarm(
1490        &self,
1491        address: DataLinkAddress,
1492        mut request: AcknowledgeAlarmRequest<'_>,
1493    ) -> Result<(), ClientError> {
1494        request.invoke_id = self.next_invoke_id().await;
1495        let invoke_id = request.invoke_id;
1496        let tx = self.encode_with_growth(|w| {
1497            Npdu::new(0).encode(w)?;
1498            request.encode(w)
1499        })?;
1500        self.await_simple_ack_or_error(
1501            address,
1502            &tx,
1503            invoke_id,
1504            SERVICE_ACKNOWLEDGE_ALARM,
1505            self.response_timeout,
1506        )
1507        .await
1508    }
1509
1510    /// Read a contiguous byte range from a BACnet File object using stream access.
1511    ///
1512    /// `file_start_position` is the byte offset (may be negative for end-relative access).
1513    /// `requested_octet_count` is the number of bytes to read.
1514    pub async fn atomic_read_file_stream(
1515        &self,
1516        address: DataLinkAddress,
1517        file_object_id: ObjectId,
1518        file_start_position: i32,
1519        requested_octet_count: u32,
1520    ) -> Result<AtomicReadFileResult, ClientError> {
1521        let invoke_id = self.next_invoke_id().await;
1522        let request = AtomicReadFileRequest::stream(
1523            file_object_id,
1524            file_start_position,
1525            requested_octet_count,
1526            invoke_id,
1527        );
1528        self.atomic_read_file(address, request).await
1529    }
1530
1531    /// Read a range of records from a BACnet File object using record access.
1532    ///
1533    /// `file_start_record` is the first record index (may be negative for end-relative access).
1534    /// `requested_record_count` is the number of records to read.
1535    pub async fn atomic_read_file_record(
1536        &self,
1537        address: DataLinkAddress,
1538        file_object_id: ObjectId,
1539        file_start_record: i32,
1540        requested_record_count: u32,
1541    ) -> Result<AtomicReadFileResult, ClientError> {
1542        let invoke_id = self.next_invoke_id().await;
1543        let request = AtomicReadFileRequest::record(
1544            file_object_id,
1545            file_start_record,
1546            requested_record_count,
1547            invoke_id,
1548        );
1549        self.atomic_read_file(address, request).await
1550    }
1551
1552    async fn atomic_read_file(
1553        &self,
1554        address: DataLinkAddress,
1555        request: AtomicReadFileRequest,
1556    ) -> Result<AtomicReadFileResult, ClientError> {
1557        let invoke_id = request.invoke_id;
1558        let tx = self.encode_with_growth(|w| {
1559            Npdu::new(0).encode(w)?;
1560            request.encode(w)
1561        })?;
1562        let payload = self
1563            .await_complex_ack_payload_or_error(
1564                address,
1565                &tx,
1566                invoke_id,
1567                SERVICE_ATOMIC_READ_FILE,
1568                self.response_timeout,
1569            )
1570            .await?;
1571        let mut pr = Reader::new(&payload);
1572        let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1573        Ok(into_client_atomic_read_result(parsed))
1574    }
1575
1576    /// Write `file_data` to a BACnet File object starting at `file_start_position` using stream
1577    /// access. The returned result contains the actual start position used by the device.
1578    pub async fn atomic_write_file_stream(
1579        &self,
1580        address: DataLinkAddress,
1581        file_object_id: ObjectId,
1582        file_start_position: i32,
1583        file_data: &[u8],
1584    ) -> Result<AtomicWriteFileResult, ClientError> {
1585        let invoke_id = self.next_invoke_id().await;
1586        let request = AtomicWriteFileRequest::stream(
1587            file_object_id,
1588            file_start_position,
1589            file_data,
1590            invoke_id,
1591        );
1592        self.atomic_write_file(address, request).await
1593    }
1594
1595    /// Write records to a BACnet File object starting at `file_start_record` using record access.
1596    ///
1597    /// Each element of `file_record_data` is one record's raw bytes.
1598    pub async fn atomic_write_file_record(
1599        &self,
1600        address: DataLinkAddress,
1601        file_object_id: ObjectId,
1602        file_start_record: i32,
1603        file_record_data: &[&[u8]],
1604    ) -> Result<AtomicWriteFileResult, ClientError> {
1605        let invoke_id = self.next_invoke_id().await;
1606        let request = AtomicWriteFileRequest::record(
1607            file_object_id,
1608            file_start_record,
1609            file_record_data,
1610            invoke_id,
1611        );
1612        self.atomic_write_file(address, request).await
1613    }
1614
1615    async fn atomic_write_file(
1616        &self,
1617        address: DataLinkAddress,
1618        request: AtomicWriteFileRequest<'_>,
1619    ) -> Result<AtomicWriteFileResult, ClientError> {
1620        let invoke_id = request.invoke_id;
1621        let tx = self.encode_with_growth(|w| {
1622            Npdu::new(0).encode(w)?;
1623            request.encode(w)
1624        })?;
1625        let payload = self
1626            .await_complex_ack_payload_or_error(
1627                address,
1628                &tx,
1629                invoke_id,
1630                SERVICE_ATOMIC_WRITE_FILE,
1631                self.response_timeout,
1632            )
1633            .await?;
1634        let mut pr = Reader::new(&payload);
1635        let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1636        Ok(into_client_atomic_write_result(parsed))
1637    }
1638
1639    /// Send a SubscribeCOV request to start (or renew) a COV subscription on the device.
1640    ///
1641    /// Use [`cancel_cov_subscription`](Self::cancel_cov_subscription) to unsubscribe.
1642    pub async fn subscribe_cov(
1643        &self,
1644        address: DataLinkAddress,
1645        mut request: SubscribeCovRequest,
1646    ) -> Result<(), ClientError> {
1647        request.invoke_id = self.next_invoke_id().await;
1648        let invoke_id = request.invoke_id;
1649        let tx = self.encode_with_growth(|w| {
1650            Npdu::new(0).encode(w)?;
1651            request.encode(w)
1652        })?;
1653        self.await_simple_ack_or_error(
1654            address,
1655            &tx,
1656            invoke_id,
1657            SERVICE_SUBSCRIBE_COV,
1658            self.response_timeout,
1659        )
1660        .await
1661    }
1662
1663    /// Cancel an existing COV subscription identified by `subscriber_process_id` and
1664    /// `monitored_object_id`.
1665    pub async fn cancel_cov_subscription(
1666        &self,
1667        address: DataLinkAddress,
1668        subscriber_process_id: u32,
1669        monitored_object_id: ObjectId,
1670    ) -> Result<(), ClientError> {
1671        self.subscribe_cov(
1672            address,
1673            SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1674        )
1675        .await
1676    }
1677
1678    /// Send a SubscribeCOVProperty request to subscribe to changes of a specific property.
1679    ///
1680    /// Use [`cancel_cov_property_subscription`](Self::cancel_cov_property_subscription) to
1681    /// unsubscribe.
1682    pub async fn subscribe_cov_property(
1683        &self,
1684        address: DataLinkAddress,
1685        mut request: SubscribeCovPropertyRequest,
1686    ) -> Result<(), ClientError> {
1687        request.invoke_id = self.next_invoke_id().await;
1688        let invoke_id = request.invoke_id;
1689        let tx = self.encode_with_growth(|w| {
1690            Npdu::new(0).encode(w)?;
1691            request.encode(w)
1692        })?;
1693        self.await_simple_ack_or_error(
1694            address,
1695            &tx,
1696            invoke_id,
1697            SERVICE_SUBSCRIBE_COV_PROPERTY,
1698            self.response_timeout,
1699        )
1700        .await
1701    }
1702
1703    /// Cancel an existing COV property subscription.
1704    ///
1705    /// The combination of `subscriber_process_id`, `monitored_object_id`,
1706    /// `monitored_property_id`, and `monitored_property_array_index` must match the
1707    /// subscription to cancel.
1708    pub async fn cancel_cov_property_subscription(
1709        &self,
1710        address: DataLinkAddress,
1711        subscriber_process_id: u32,
1712        monitored_object_id: ObjectId,
1713        monitored_property_id: PropertyId,
1714        monitored_property_array_index: Option<u32>,
1715    ) -> Result<(), ClientError> {
1716        self.subscribe_cov_property(
1717            address,
1718            SubscribeCovPropertyRequest::cancel(
1719                subscriber_process_id,
1720                monitored_object_id,
1721                monitored_property_id,
1722                monitored_property_array_index,
1723                0,
1724            ),
1725        )
1726        .await
1727    }
1728
1729    /// Read a range of entries from a list/log property by absolute position.
1730    ///
1731    /// `reference_index` is the 1-based starting entry index. A positive `count` reads
1732    /// forward; negative reads backward from `reference_index`.
1733    pub async fn read_range_by_position(
1734        &self,
1735        address: DataLinkAddress,
1736        object_id: ObjectId,
1737        property_id: PropertyId,
1738        array_index: Option<u32>,
1739        reference_index: i32,
1740        count: i16,
1741    ) -> Result<ReadRangeResult, ClientError> {
1742        let invoke_id = self.next_invoke_id().await;
1743        let req = ReadRangeRequest::by_position(
1744            object_id,
1745            property_id,
1746            array_index,
1747            reference_index,
1748            count,
1749            invoke_id,
1750        );
1751        self.read_range_with_request(address, req).await
1752    }
1753
1754    /// Read a range of entries from a list/log property anchored at a sequence number.
1755    ///
1756    /// `reference_sequence` is the sequence number of the anchor entry. A positive `count`
1757    /// reads forward; negative reads backward.
1758    pub async fn read_range_by_sequence_number(
1759        &self,
1760        address: DataLinkAddress,
1761        object_id: ObjectId,
1762        property_id: PropertyId,
1763        array_index: Option<u32>,
1764        reference_sequence: u32,
1765        count: i16,
1766    ) -> Result<ReadRangeResult, ClientError> {
1767        let invoke_id = self.next_invoke_id().await;
1768        let req = ReadRangeRequest::by_sequence_number(
1769            object_id,
1770            property_id,
1771            array_index,
1772            reference_sequence,
1773            count,
1774            invoke_id,
1775        );
1776        self.read_range_with_request(address, req).await
1777    }
1778
1779    /// Read a range of entries from a list/log property anchored at a timestamp.
1780    ///
1781    /// `at` is the reference date and time. A positive `count` reads forward from that
1782    /// timestamp; negative reads backward.
1783    pub async fn read_range_by_time(
1784        &self,
1785        address: DataLinkAddress,
1786        object_id: ObjectId,
1787        property_id: PropertyId,
1788        array_index: Option<u32>,
1789        at: (Date, Time),
1790        count: i16,
1791    ) -> Result<ReadRangeResult, ClientError> {
1792        let (date, time) = at;
1793        let invoke_id = self.next_invoke_id().await;
1794        let req = ReadRangeRequest::by_time(
1795            object_id,
1796            property_id,
1797            array_index,
1798            date,
1799            time,
1800            count,
1801            invoke_id,
1802        );
1803        self.read_range_with_request(address, req).await
1804    }
1805
1806    async fn read_range_with_request(
1807        &self,
1808        address: DataLinkAddress,
1809        req: ReadRangeRequest,
1810    ) -> Result<ReadRangeResult, ClientError> {
1811        let invoke_id = req.invoke_id;
1812        let tx = self.encode_with_growth(|w| {
1813            Npdu::new(0).encode(w)?;
1814            req.encode(w)
1815        })?;
1816        let payload = self
1817            .await_complex_ack_payload_or_error(
1818                address,
1819                &tx,
1820                invoke_id,
1821                SERVICE_READ_RANGE,
1822                self.response_timeout,
1823            )
1824            .await?;
1825        let mut pr = Reader::new(&payload);
1826        let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1827        into_client_read_range(parsed)
1828    }
1829
1830    /// Wait up to `wait` for a single incoming COV notification (confirmed or unconfirmed).
1831    ///
1832    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1833    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1834    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1835    pub async fn recv_cov_notification(
1836        &self,
1837        wait: Duration,
1838    ) -> Result<Option<CovNotification>, ClientError> {
1839        let _io_lock = self.request_io_lock.lock().await;
1840        let deadline = tokio::time::Instant::now() + wait;
1841
1842        while tokio::time::Instant::now() < deadline {
1843            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1844            let mut rx = [0u8; 1500];
1845            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1846            let (n, source) = match recv {
1847                Ok(Ok(v)) => v,
1848                Ok(Err(e)) => return Err(e.into()),
1849                Err(_) => break,
1850            };
1851
1852            let apdu = extract_apdu(&rx[..n])?;
1853            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1854            match ApduType::from_u8(first >> 4) {
1855                Some(ApduType::UnconfirmedRequest) => {
1856                    let mut r = Reader::new(apdu);
1857                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1858                    if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1859                        continue;
1860                    }
1861                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1862                    return Ok(Some(into_client_cov_notification(source, false, cov)?));
1863                }
1864                Some(ApduType::ConfirmedRequest) => {
1865                    let mut r = Reader::new(apdu);
1866                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1867                    if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1868                        continue;
1869                    }
1870                    if header.segmented {
1871                        return Err(ClientError::UnsupportedResponse);
1872                    }
1873
1874                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1875                    self.send_simple_ack(
1876                        source,
1877                        header.invoke_id,
1878                        SERVICE_CONFIRMED_COV_NOTIFICATION,
1879                    )
1880                    .await?;
1881                    return Ok(Some(into_client_cov_notification(source, true, cov)?));
1882                }
1883                _ => continue,
1884            }
1885        }
1886
1887        Ok(None)
1888    }
1889
1890    /// Wait up to `wait` for a single incoming event notification (confirmed or unconfirmed).
1891    ///
1892    /// Returns `Ok(Some(_))` when a notification arrives, `Ok(None)` on timeout, and
1893    /// `Err` on transport failure. Confirmed notifications are automatically acknowledged.
1894    /// Segmented confirmed notifications return [`ClientError::UnsupportedResponse`].
1895    pub async fn recv_event_notification(
1896        &self,
1897        wait: Duration,
1898    ) -> Result<Option<EventNotification>, ClientError> {
1899        let _io_lock = self.request_io_lock.lock().await;
1900        let deadline = tokio::time::Instant::now() + wait;
1901
1902        while tokio::time::Instant::now() < deadline {
1903            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1904            let mut rx = [0u8; 1500];
1905            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1906            let (n, source) = match recv {
1907                Ok(Ok(v)) => v,
1908                Ok(Err(e)) => return Err(e.into()),
1909                Err(_) => break,
1910            };
1911
1912            let apdu = extract_apdu(&rx[..n])?;
1913            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1914            match ApduType::from_u8(first >> 4) {
1915                Some(ApduType::UnconfirmedRequest) => {
1916                    let mut r = Reader::new(apdu);
1917                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1918                    if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1919                        continue;
1920                    }
1921                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1922                    return Ok(Some(into_client_event_notification(
1923                        source,
1924                        false,
1925                        notification,
1926                    )));
1927                }
1928                Some(ApduType::ConfirmedRequest) => {
1929                    let mut r = Reader::new(apdu);
1930                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1931                    if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1932                        continue;
1933                    }
1934                    if header.segmented {
1935                        return Err(ClientError::UnsupportedResponse);
1936                    }
1937                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1938                    self.send_simple_ack(
1939                        source,
1940                        header.invoke_id,
1941                        SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1942                    )
1943                    .await?;
1944                    return Ok(Some(into_client_event_notification(
1945                        source,
1946                        true,
1947                        notification,
1948                    )));
1949                }
1950                _ => continue,
1951            }
1952        }
1953
1954        Ok(None)
1955    }
1956
1957    /// Send a ReadProperty request and return the property value as a [`ClientDataValue`].
1958    ///
1959    /// Use [`read_property_multiple`](Self::read_property_multiple) to fetch several
1960    /// properties in a single round-trip.
1961    pub async fn read_property(
1962        &self,
1963        address: DataLinkAddress,
1964        object_id: ObjectId,
1965        property_id: PropertyId,
1966    ) -> Result<ClientDataValue, ClientError> {
1967        let invoke_id = self.next_invoke_id().await;
1968        let req = ReadPropertyRequest {
1969            object_id,
1970            property_id,
1971            array_index: None,
1972            invoke_id,
1973        };
1974        let tx = self.encode_with_growth(|w| {
1975            Npdu::new(0).encode(w)?;
1976            req.encode(w)
1977        })?;
1978        let payload = self
1979            .await_complex_ack_payload_or_error(
1980                address,
1981                &tx,
1982                invoke_id,
1983                SERVICE_READ_PROPERTY,
1984                self.response_timeout,
1985            )
1986            .await?;
1987        let mut pr = Reader::new(&payload);
1988        let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1989        into_client_value(parsed.value)
1990    }
1991
1992    /// Send a WriteProperty request to set a single property on the device.
1993    pub async fn write_property(
1994        &self,
1995        address: DataLinkAddress,
1996        mut request: WritePropertyRequest<'_>,
1997    ) -> Result<(), ClientError> {
1998        request.invoke_id = self.next_invoke_id().await;
1999        let invoke_id = request.invoke_id;
2000        let tx = self.encode_with_growth(|w| {
2001            Npdu::new(0).encode(w)?;
2002            request.encode(w)
2003        })?;
2004        self.await_simple_ack_or_error(
2005            address,
2006            &tx,
2007            invoke_id,
2008            SERVICE_WRITE_PROPERTY,
2009            self.response_timeout,
2010        )
2011        .await
2012    }
2013
2014    /// Send a ReadPropertyMultiple request to fetch several properties of one object in a
2015    /// single round-trip.
2016    ///
2017    /// Returns pairs of `(PropertyId, ClientDataValue)` in the order returned by the device.
2018    pub async fn read_property_multiple(
2019        &self,
2020        address: DataLinkAddress,
2021        object_id: ObjectId,
2022        property_ids: &[PropertyId],
2023    ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
2024        let refs: Vec<PropertyReference> = property_ids
2025            .iter()
2026            .copied()
2027            .map(|property_id| PropertyReference {
2028                property_id,
2029                array_index: None,
2030            })
2031            .collect();
2032        let specs = [ReadAccessSpecification {
2033            object_id,
2034            properties: &refs,
2035        }];
2036
2037        let invoke_id = self.next_invoke_id().await;
2038        let req = ReadPropertyMultipleRequest {
2039            specs: &specs,
2040            invoke_id,
2041        };
2042
2043        let tx = self.encode_with_growth(|w| {
2044            Npdu::new(0).encode(w)?;
2045            req.encode(w)
2046        })?;
2047        let payload = self
2048            .await_complex_ack_payload_or_error(
2049                address,
2050                &tx,
2051                invoke_id,
2052                SERVICE_READ_PROPERTY_MULTIPLE,
2053                self.response_timeout,
2054            )
2055            .await?;
2056        let mut pr = Reader::new(&payload);
2057        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2058        let mut out = Vec::new();
2059        for access in parsed.results {
2060            if access.object_id != object_id {
2061                continue;
2062            }
2063            for item in access.results {
2064                out.push((item.property_id, into_client_value(item.value)?));
2065            }
2066        }
2067        Ok(out)
2068    }
2069
2070    /// Send a WritePropertyMultiple request to set several properties of one object in a
2071    /// single round-trip.
2072    pub async fn write_property_multiple(
2073        &self,
2074        address: DataLinkAddress,
2075        object_id: ObjectId,
2076        properties: &[PropertyWriteSpec<'_>],
2077    ) -> Result<(), ClientError> {
2078        let invoke_id = self.next_invoke_id().await;
2079        let specs = [WriteAccessSpecification {
2080            object_id,
2081            properties,
2082        }];
2083        let req = WritePropertyMultipleRequest {
2084            specs: &specs,
2085            invoke_id,
2086        };
2087
2088        let tx = self.encode_with_growth(|w| {
2089            Npdu::new(0).encode(w)?;
2090            req.encode(w)
2091        })?;
2092        self.await_simple_ack_or_error(
2093            address,
2094            &tx,
2095            invoke_id,
2096            SERVICE_WRITE_PROPERTY_MULTIPLE,
2097            self.response_timeout,
2098        )
2099        .await
2100    }
2101
2102    /// Send a ConfirmedPrivateTransfer request and return the ack.
2103    pub async fn private_transfer(
2104        &self,
2105        address: DataLinkAddress,
2106        vendor_id: u32,
2107        service_number: u32,
2108        service_parameters: Option<&[u8]>,
2109    ) -> Result<PrivateTransferAck, ClientError> {
2110        let invoke_id = self.next_invoke_id().await;
2111        let req = ConfirmedPrivateTransferRequest {
2112            vendor_id,
2113            service_number,
2114            service_parameters,
2115            invoke_id,
2116        };
2117
2118        let tx = self.encode_with_growth(|w| {
2119            Npdu::new(0).encode(w)?;
2120            req.encode(w)
2121        })?;
2122        let payload = self
2123            .await_complex_ack_payload_or_error(
2124                address,
2125                &tx,
2126                invoke_id,
2127                SERVICE_CONFIRMED_PRIVATE_TRANSFER,
2128                self.response_timeout,
2129            )
2130            .await?;
2131        let mut r = Reader::new(&payload);
2132        PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
2133    }
2134
2135    /// Read multiple `(object_id, property_id)` pairs in a single ReadPropertyMultiple round-trip.
2136    ///
2137    /// All pairs must target the same device at `address`. Returns a map from each requested
2138    /// `(ObjectId, PropertyId)` to its value. Properties not returned by the device are absent
2139    /// from the map (the device may skip unknown properties rather than erroring).
2140    pub async fn read_many(
2141        &self,
2142        address: DataLinkAddress,
2143        requests: &[(ObjectId, PropertyId)],
2144    ) -> Result<HashMap<(ObjectId, PropertyId), ClientDataValue>, ClientError> {
2145        // Group by object to build ReadAccessSpecifications
2146        let mut grouped: Vec<(ObjectId, Vec<PropertyReference>)> = Vec::new();
2147        for &(oid, pid) in requests {
2148            if let Some(entry) = grouped.iter_mut().find(|(o, _)| *o == oid) {
2149                entry.1.push(PropertyReference {
2150                    property_id: pid,
2151                    array_index: None,
2152                });
2153            } else {
2154                grouped.push((
2155                    oid,
2156                    vec![PropertyReference {
2157                        property_id: pid,
2158                        array_index: None,
2159                    }],
2160                ));
2161            }
2162        }
2163
2164        let specs: Vec<ReadAccessSpecification<'_>> = grouped
2165            .iter()
2166            .map(|(oid, props)| ReadAccessSpecification {
2167                object_id: *oid,
2168                properties: props,
2169            })
2170            .collect();
2171
2172        let invoke_id = self.next_invoke_id().await;
2173        let req = ReadPropertyMultipleRequest {
2174            specs: &specs,
2175            invoke_id,
2176        };
2177        let tx = self.encode_with_growth(|w| {
2178            Npdu::new(0).encode(w)?;
2179            req.encode(w)
2180        })?;
2181        let payload = self
2182            .await_complex_ack_payload_or_error(
2183                address,
2184                &tx,
2185                invoke_id,
2186                SERVICE_READ_PROPERTY_MULTIPLE,
2187                self.response_timeout,
2188            )
2189            .await?;
2190
2191        let mut pr = Reader::new(&payload);
2192        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
2193        let mut out = HashMap::new();
2194        for access in parsed.results {
2195            for item in access.results {
2196                if let Ok(v) = into_client_value(item.value) {
2197                    out.insert((access.object_id, item.property_id), v);
2198                }
2199            }
2200        }
2201        Ok(out)
2202    }
2203
2204    /// Write multiple properties across one or more objects in a single WritePropertyMultiple
2205    /// round-trip.
2206    ///
2207    /// `writes` is a slice of `(object_id, property_id, value, priority)` tuples.
2208    /// `priority` may be `None` for the relinquish-default case. All writes target the same
2209    /// device at `address`. Takes [`ClientDataValue`] (the owned form) for ergonomic use.
2210    pub async fn write_many(
2211        &self,
2212        address: DataLinkAddress,
2213        writes: &[(ObjectId, PropertyId, ClientDataValue, Option<u8>)],
2214    ) -> Result<(), ClientError> {
2215        use rustbac_core::types::{BitString, DataValue as DV};
2216
2217        fn cv_to_dv(v: &ClientDataValue) -> DV<'_> {
2218            match v {
2219                ClientDataValue::Null => DV::Null,
2220                ClientDataValue::Boolean(b) => DV::Boolean(*b),
2221                ClientDataValue::Unsigned(n) => DV::Unsigned(*n),
2222                ClientDataValue::Signed(n) => DV::Signed(*n),
2223                ClientDataValue::Real(f) => DV::Real(*f),
2224                ClientDataValue::Double(f) => DV::Double(*f),
2225                ClientDataValue::OctetString(b) => DV::OctetString(b),
2226                ClientDataValue::CharacterString(s) => DV::CharacterString(s),
2227                ClientDataValue::BitString { unused_bits, data } => DV::BitString(BitString {
2228                    unused_bits: *unused_bits,
2229                    data,
2230                }),
2231                ClientDataValue::Enumerated(n) => DV::Enumerated(*n),
2232                ClientDataValue::Date(d) => DV::Date(*d),
2233                ClientDataValue::Time(t) => DV::Time(*t),
2234                ClientDataValue::ObjectId(o) => DV::ObjectId(*o),
2235                ClientDataValue::Constructed { tag_num, values } => DV::Constructed {
2236                    tag_num: *tag_num,
2237                    values: values.iter().map(cv_to_dv).collect(),
2238                },
2239            }
2240        }
2241
2242        // Pre-convert values so DataValue borrows from the input slice lifetime.
2243        let converted: Vec<(ObjectId, PropertyId, DV<'_>, Option<u8>)> = writes
2244            .iter()
2245            .map(|(oid, pid, val, prio)| (*oid, *pid, cv_to_dv(val), *prio))
2246            .collect();
2247
2248        // Group by object to build WriteAccessSpecifications.
2249        let mut grouped: Vec<(ObjectId, Vec<PropertyWriteSpec<'_>>)> = Vec::new();
2250        for (oid, pid, val, prio) in &converted {
2251            let spec = PropertyWriteSpec {
2252                property_id: *pid,
2253                array_index: None,
2254                value: val.clone(),
2255                priority: *prio,
2256            };
2257            if let Some(entry) = grouped.iter_mut().find(|(o, _)| o == oid) {
2258                entry.1.push(spec);
2259            } else {
2260                grouped.push((*oid, vec![spec]));
2261            }
2262        }
2263
2264        let specs: Vec<WriteAccessSpecification<'_>> = grouped
2265            .iter()
2266            .map(|(oid, props)| WriteAccessSpecification {
2267                object_id: *oid,
2268                properties: props,
2269            })
2270            .collect();
2271
2272        let invoke_id = self.next_invoke_id().await;
2273        let req = WritePropertyMultipleRequest {
2274            specs: &specs,
2275            invoke_id,
2276        };
2277        let tx = self.encode_with_growth(|w| {
2278            Npdu::new(0).encode(w)?;
2279            req.encode(w)
2280        })?;
2281        self.await_simple_ack_or_error(
2282            address,
2283            &tx,
2284            invoke_id,
2285            SERVICE_WRITE_PROPERTY_MULTIPLE,
2286            self.response_timeout,
2287        )
2288        .await
2289    }
2290}
2291
2292fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
2293    let mut r = Reader::new(payload);
2294    let _npdu = Npdu::decode(&mut r)?;
2295    r.read_exact(r.remaining()).map_err(ClientError::from)
2296}
2297
2298// ─────────────────────────────────────────────────────────────────────────────
2299// Inline server dispatch
2300// ─────────────────────────────────────────────────────────────────────────────
2301
2302/// Handle an incoming BACnet service request on behalf of the client's inline server handler.
2303///
2304/// This is the core dispatch logic extracted from [`BacnetServer::handle_frame`] so that
2305/// `BacnetClient` can service requests inline while waiting for its own responses.
2306async fn dispatch_incoming_request<D: DataLink>(
2307    datalink: &D,
2308    handler: &dyn crate::server::ServiceHandler,
2309    device_id: u32,
2310    vendor_id: u16,
2311    frame: &[u8],
2312    source: DataLinkAddress,
2313) -> Result<(), ClientError> {
2314    let mut r = Reader::new(frame);
2315    let _npdu = match Npdu::decode(&mut r) {
2316        Ok(n) => n,
2317        Err(_) => return Ok(()),
2318    };
2319
2320    if r.is_empty() {
2321        return Ok(());
2322    }
2323
2324    let first = match r.peek_u8() {
2325        Ok(b) => b,
2326        Err(_) => return Ok(()),
2327    };
2328    let apdu_type = ApduType::from_u8(first >> 4);
2329
2330    match apdu_type {
2331        Some(ApduType::UnconfirmedRequest) => {
2332            let header = match UnconfirmedRequestHeader::decode(&mut r) {
2333                Ok(h) => h,
2334                Err(_) => return Ok(()),
2335            };
2336            if header.service_choice == 0x08 {
2337                // Who-Is — check optional limits then respond with I-Am.
2338                let limits = dispatch_decode_who_is_limits(&mut r);
2339                if dispatch_matches_who_is(device_id, limits) {
2340                    dispatch_send_i_am(datalink, device_id, vendor_id, source).await;
2341                }
2342            }
2343            // All other unconfirmed services are ignored.
2344        }
2345        Some(ApduType::ConfirmedRequest) => {
2346            let header = match ConfirmedRequestHeader::decode(&mut r) {
2347                Ok(h) => h,
2348                Err(_) => return Ok(()),
2349            };
2350            let invoke_id = header.invoke_id;
2351            match header.service_choice {
2352                SERVICE_READ_PROPERTY => {
2353                    dispatch_handle_read_property(datalink, handler, &mut r, invoke_id, source)
2354                        .await;
2355                }
2356                SERVICE_WRITE_PROPERTY => {
2357                    dispatch_handle_write_property(datalink, handler, &mut r, invoke_id, source)
2358                        .await;
2359                }
2360                SERVICE_READ_PROPERTY_MULTIPLE => {
2361                    dispatch_handle_read_property_multiple(
2362                        datalink, handler, &mut r, invoke_id, source,
2363                    )
2364                    .await;
2365                }
2366                SERVICE_WRITE_PROPERTY_MULTIPLE => {
2367                    dispatch_handle_write_property_multiple(
2368                        datalink, handler, &mut r, invoke_id, source,
2369                    )
2370                    .await;
2371                }
2372                SERVICE_SUBSCRIBE_COV => {
2373                    dispatch_handle_subscribe_cov(datalink, handler, &mut r, invoke_id, source)
2374                        .await;
2375                }
2376                SERVICE_CREATE_OBJECT => {
2377                    dispatch_handle_create_object(datalink, handler, &mut r, invoke_id, source)
2378                        .await;
2379                }
2380                SERVICE_DELETE_OBJECT => {
2381                    dispatch_handle_delete_object(datalink, handler, &mut r, invoke_id, source)
2382                        .await;
2383                }
2384                _ => {
2385                    // Unknown service — send Reject with UNRECOGNIZED_SERVICE (0x08).
2386                    dispatch_send_reject(datalink, invoke_id, 0x08, source).await;
2387                }
2388            }
2389        }
2390        _ => {
2391            // Not a request — ignore.
2392        }
2393    }
2394
2395    Ok(())
2396}
2397
2398// ── Inline dispatch helpers ──────────────────────────────────────────────────
2399
2400async fn dispatch_send_i_am<D: DataLink>(
2401    datalink: &D,
2402    device_id: u32,
2403    vendor_id: u16,
2404    target: DataLinkAddress,
2405) {
2406    let device_oid = ObjectId::new(ObjectType::Device, device_id);
2407    let req = IAmRequest {
2408        device_id: device_oid,
2409        max_apdu: 1476,
2410        segmentation: 3, // no-segmentation
2411        vendor_id: vendor_id as u32,
2412    };
2413    let mut buf = [0u8; 128];
2414    let mut w = Writer::new(&mut buf);
2415    if Npdu::new(0).encode(&mut w).is_err() {
2416        return;
2417    }
2418    if req.encode(&mut w).is_err() {
2419        return;
2420    }
2421    let _ = datalink.send(target, w.as_written()).await;
2422}
2423
2424async fn dispatch_handle_read_property<D: DataLink>(
2425    datalink: &D,
2426    handler: &dyn crate::server::ServiceHandler,
2427    r: &mut Reader<'_>,
2428    invoke_id: u8,
2429    source: DataLinkAddress,
2430) {
2431    let object_id = match crate::decode_ctx_object_id(r) {
2432        Ok(v) => v,
2433        Err(_) => return,
2434    };
2435    let property_id_raw = match crate::decode_ctx_unsigned(r) {
2436        Ok(v) => v,
2437        Err(_) => return,
2438    };
2439    let property_id = PropertyId::from_u32(property_id_raw);
2440
2441    // Optional array index [2].
2442    let array_index = dispatch_decode_optional_array_index(r);
2443
2444    match handler.read_property(object_id, property_id, array_index) {
2445        Ok(value) => {
2446            let borrowed = dispatch_client_value_to_borrowed(&value);
2447            let mut buf = [0u8; 1400];
2448            let mut w = Writer::new(&mut buf);
2449            if Npdu::new(0).encode(&mut w).is_err() {
2450                return;
2451            }
2452            if (ComplexAckHeader {
2453                segmented: false,
2454                more_follows: false,
2455                invoke_id,
2456                sequence_number: None,
2457                proposed_window_size: None,
2458                service_choice: SERVICE_READ_PROPERTY,
2459            })
2460            .encode(&mut w)
2461            .is_err()
2462            {
2463                return;
2464            }
2465            if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2466                return;
2467            }
2468            if encode_ctx_unsigned(&mut w, 1, property_id.to_u32()).is_err() {
2469                return;
2470            }
2471            if (Tag::Opening { tag_num: 3 }).encode(&mut w).is_err() {
2472                return;
2473            }
2474            if encode_application_data_value(&mut w, &borrowed).is_err() {
2475                return;
2476            }
2477            if (Tag::Closing { tag_num: 3 }).encode(&mut w).is_err() {
2478                return;
2479            }
2480            let _ = datalink.send(source, w.as_written()).await;
2481        }
2482        Err(err) => {
2483            dispatch_send_error(datalink, invoke_id, SERVICE_READ_PROPERTY, err, source).await;
2484        }
2485    }
2486}
2487
2488async fn dispatch_handle_write_property<D: DataLink>(
2489    datalink: &D,
2490    handler: &dyn crate::server::ServiceHandler,
2491    r: &mut Reader<'_>,
2492    invoke_id: u8,
2493    source: DataLinkAddress,
2494) {
2495    let object_id = match crate::decode_ctx_object_id(r) {
2496        Ok(v) => v,
2497        Err(_) => return,
2498    };
2499    let property_id_raw = match crate::decode_ctx_unsigned(r) {
2500        Ok(v) => v,
2501        Err(_) => return,
2502    };
2503    let property_id = PropertyId::from_u32(property_id_raw);
2504
2505    // Optional array index [2].
2506    let next_tag = match Tag::decode(r) {
2507        Ok(t) => t,
2508        Err(_) => return,
2509    };
2510    let (array_index, value_start_tag) = match next_tag {
2511        Tag::Context { tag_num: 2, len } => {
2512            let idx = match decode_unsigned(r, len as usize) {
2513                Ok(v) => v,
2514                Err(_) => return,
2515            };
2516            let vt = match Tag::decode(r) {
2517                Ok(t) => t,
2518                Err(_) => return,
2519            };
2520            (Some(idx), vt)
2521        }
2522        other => (None, other),
2523    };
2524
2525    if value_start_tag != (Tag::Opening { tag_num: 3 }) {
2526        return;
2527    }
2528
2529    let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2530        Ok(v) => v,
2531        Err(_) => return,
2532    };
2533
2534    match Tag::decode(r) {
2535        Ok(Tag::Closing { tag_num: 3 }) => {}
2536        _ => return,
2537    }
2538
2539    // Optional priority [4].
2540    let priority = if !r.is_empty() {
2541        match Tag::decode(r) {
2542            Ok(Tag::Context { tag_num: 4, len }) => match decode_unsigned(r, len as usize) {
2543                Ok(p) => Some(p as u8),
2544                Err(_) => return,
2545            },
2546            _ => None,
2547        }
2548    } else {
2549        None
2550    };
2551
2552    let client_val = crate::data_value_to_client(val);
2553
2554    match handler.write_property(object_id, property_id, array_index, client_val, priority) {
2555        Ok(()) => {
2556            let mut buf = [0u8; 32];
2557            let mut w = Writer::new(&mut buf);
2558            if Npdu::new(0).encode(&mut w).is_err() {
2559                return;
2560            }
2561            if (SimpleAck {
2562                invoke_id,
2563                service_choice: SERVICE_WRITE_PROPERTY,
2564            })
2565            .encode(&mut w)
2566            .is_err()
2567            {
2568                return;
2569            }
2570            let _ = datalink.send(source, w.as_written()).await;
2571        }
2572        Err(err) => {
2573            dispatch_send_error(datalink, invoke_id, SERVICE_WRITE_PROPERTY, err, source).await;
2574        }
2575    }
2576}
2577
2578async fn dispatch_handle_read_property_multiple<D: DataLink>(
2579    datalink: &D,
2580    handler: &dyn crate::server::ServiceHandler,
2581    r: &mut Reader<'_>,
2582    invoke_id: u8,
2583    source: DataLinkAddress,
2584) {
2585    type PropRefs = Vec<(PropertyId, Option<u32>)>;
2586    let mut specs: Vec<(ObjectId, PropRefs)> = Vec::new();
2587
2588    while !r.is_empty() {
2589        let object_id = match crate::decode_ctx_object_id(r) {
2590            Ok(v) => v,
2591            Err(_) => return,
2592        };
2593        match Tag::decode(r) {
2594            Ok(Tag::Opening { tag_num: 1 }) => {}
2595            _ => return,
2596        }
2597        let mut props: Vec<(PropertyId, Option<u32>)> = Vec::new();
2598        loop {
2599            let tag = match Tag::decode(r) {
2600                Ok(t) => t,
2601                Err(_) => return,
2602            };
2603            if tag == (Tag::Closing { tag_num: 1 }) {
2604                break;
2605            }
2606            let property_id = match tag {
2607                Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2608                    Ok(v) => PropertyId::from_u32(v),
2609                    Err(_) => return,
2610                },
2611                _ => return,
2612            };
2613            let array_index = if !r.is_empty() {
2614                match dispatch_peek_context_tag(r, 1) {
2615                    Some(len) => {
2616                        match Tag::decode(r) {
2617                            Ok(_) => {}
2618                            Err(_) => return,
2619                        }
2620                        match decode_unsigned(r, len as usize) {
2621                            Ok(idx) => Some(idx),
2622                            Err(_) => return,
2623                        }
2624                    }
2625                    None => None,
2626                }
2627            } else {
2628                None
2629            };
2630            props.push((property_id, array_index));
2631        }
2632        specs.push((object_id, props));
2633    }
2634
2635    let mut buf = [0u8; 1400];
2636    let mut w = Writer::new(&mut buf);
2637    if Npdu::new(0).encode(&mut w).is_err() {
2638        return;
2639    }
2640    if (ComplexAckHeader {
2641        segmented: false,
2642        more_follows: false,
2643        invoke_id,
2644        sequence_number: None,
2645        proposed_window_size: None,
2646        service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2647    })
2648    .encode(&mut w)
2649    .is_err()
2650    {
2651        return;
2652    }
2653
2654    for (object_id, props) in &specs {
2655        if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
2656            return;
2657        }
2658        if (Tag::Opening { tag_num: 1 }).encode(&mut w).is_err() {
2659            return;
2660        }
2661
2662        for (property_id, array_index) in props {
2663            if encode_ctx_unsigned(&mut w, 2, property_id.to_u32()).is_err() {
2664                return;
2665            }
2666            if let Some(idx) = array_index {
2667                if encode_ctx_unsigned(&mut w, 3, *idx).is_err() {
2668                    return;
2669                }
2670            }
2671            if (Tag::Opening { tag_num: 4 }).encode(&mut w).is_err() {
2672                return;
2673            }
2674
2675            match handler.read_property(*object_id, *property_id, *array_index) {
2676                Ok(value) => {
2677                    let borrowed = dispatch_client_value_to_borrowed(&value);
2678                    if encode_application_data_value(&mut w, &borrowed).is_err() {
2679                        return;
2680                    }
2681                }
2682                Err(err) => {
2683                    let (class, code) = dispatch_error_class_code(err);
2684                    if (Tag::Opening { tag_num: 5 }).encode(&mut w).is_err() {
2685                        return;
2686                    }
2687                    if encode_ctx_unsigned(&mut w, 0, class as u32).is_err() {
2688                        return;
2689                    }
2690                    if encode_ctx_unsigned(&mut w, 1, code as u32).is_err() {
2691                        return;
2692                    }
2693                    if (Tag::Closing { tag_num: 5 }).encode(&mut w).is_err() {
2694                        return;
2695                    }
2696                }
2697            }
2698
2699            if (Tag::Closing { tag_num: 4 }).encode(&mut w).is_err() {
2700                return;
2701            }
2702        }
2703
2704        if (Tag::Closing { tag_num: 1 }).encode(&mut w).is_err() {
2705            return;
2706        }
2707    }
2708
2709    let _ = datalink.send(source, w.as_written()).await;
2710}
2711
2712async fn dispatch_handle_write_property_multiple<D: DataLink>(
2713    datalink: &D,
2714    handler: &dyn crate::server::ServiceHandler,
2715    r: &mut Reader<'_>,
2716    invoke_id: u8,
2717    source: DataLinkAddress,
2718) {
2719    while !r.is_empty() {
2720        let object_id = match crate::decode_ctx_object_id(r) {
2721            Ok(v) => v,
2722            Err(_) => return,
2723        };
2724        match Tag::decode(r) {
2725            Ok(Tag::Opening { tag_num: 1 }) => {}
2726            _ => return,
2727        }
2728        loop {
2729            let tag = match Tag::decode(r) {
2730                Ok(t) => t,
2731                Err(_) => return,
2732            };
2733            if tag == (Tag::Closing { tag_num: 1 }) {
2734                break;
2735            }
2736            let property_id = match tag {
2737                Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
2738                    Ok(v) => PropertyId::from_u32(v),
2739                    Err(_) => return,
2740                },
2741                _ => return,
2742            };
2743            let array_index = if !r.is_empty() {
2744                match dispatch_peek_context_tag(r, 1) {
2745                    Some(len) => {
2746                        let _ = Tag::decode(r);
2747                        decode_unsigned(r, len as usize).ok()
2748                    }
2749                    None => None,
2750                }
2751            } else {
2752                None
2753            };
2754            match Tag::decode(r) {
2755                Ok(Tag::Opening { tag_num: 2 }) => {}
2756                _ => return,
2757            }
2758            let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
2759                Ok(v) => v,
2760                Err(_) => return,
2761            };
2762            match Tag::decode(r) {
2763                Ok(Tag::Closing { tag_num: 2 }) => {}
2764                _ => return,
2765            }
2766            let priority = if !r.is_empty() {
2767                match dispatch_peek_context_tag(r, 3) {
2768                    Some(len) => {
2769                        let _ = Tag::decode(r);
2770                        decode_unsigned(r, len as usize).ok().map(|p| p as u8)
2771                    }
2772                    None => None,
2773                }
2774            } else {
2775                None
2776            };
2777            let client_val = crate::data_value_to_client(val);
2778            if let Err(err) =
2779                handler.write_property(object_id, property_id, array_index, client_val, priority)
2780            {
2781                dispatch_send_error(
2782                    datalink,
2783                    invoke_id,
2784                    SERVICE_WRITE_PROPERTY_MULTIPLE,
2785                    err,
2786                    source,
2787                )
2788                .await;
2789                return;
2790            }
2791        }
2792    }
2793    // All properties written successfully — send SimpleAck.
2794    let mut buf = [0u8; 32];
2795    let mut w = Writer::new(&mut buf);
2796    if Npdu::new(0).encode(&mut w).is_err() {
2797        return;
2798    }
2799    if (SimpleAck {
2800        invoke_id,
2801        service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
2802    })
2803    .encode(&mut w)
2804    .is_err()
2805    {
2806        return;
2807    }
2808    let _ = datalink.send(source, w.as_written()).await;
2809}
2810
2811async fn dispatch_handle_subscribe_cov<D: DataLink>(
2812    datalink: &D,
2813    handler: &dyn crate::server::ServiceHandler,
2814    r: &mut Reader<'_>,
2815    invoke_id: u8,
2816    source: DataLinkAddress,
2817) {
2818    // subscriberProcessIdentifier [0]
2819    let subscriber_process_id = match Tag::decode(r) {
2820        Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2821            Ok(v) => v,
2822            Err(_) => return,
2823        },
2824        _ => return,
2825    };
2826    // monitoredObjectIdentifier [1]
2827    let monitored_object_id = match crate::decode_ctx_object_id(r) {
2828        Ok(v) => v,
2829        Err(_) => return,
2830    };
2831    // issueConfirmedNotifications [2]
2832    let issue_confirmed = match Tag::decode(r) {
2833        Ok(Tag::Context { tag_num: 2, len }) => match decode_unsigned(r, len as usize) {
2834            Ok(v) => v != 0,
2835            Err(_) => return,
2836        },
2837        _ => return,
2838    };
2839    // optional lifetime [3]
2840    let lifetime = if !r.is_empty() {
2841        match dispatch_peek_context_tag(r, 3) {
2842            Some(len) => {
2843                let _ = Tag::decode(r);
2844                decode_unsigned(r, len as usize).ok()
2845            }
2846            None => None,
2847        }
2848    } else {
2849        None
2850    };
2851
2852    match handler.subscribe_cov(
2853        subscriber_process_id,
2854        monitored_object_id,
2855        issue_confirmed,
2856        lifetime,
2857    ) {
2858        Ok(()) => {
2859            let mut buf = [0u8; 32];
2860            let mut w = Writer::new(&mut buf);
2861            if Npdu::new(0).encode(&mut w).is_err() {
2862                return;
2863            }
2864            if (SimpleAck {
2865                invoke_id,
2866                service_choice: SERVICE_SUBSCRIBE_COV,
2867            })
2868            .encode(&mut w)
2869            .is_err()
2870            {
2871                return;
2872            }
2873            let _ = datalink.send(source, w.as_written()).await;
2874        }
2875        Err(err) => {
2876            dispatch_send_error(datalink, invoke_id, SERVICE_SUBSCRIBE_COV, err, source).await;
2877        }
2878    }
2879}
2880
2881async fn dispatch_handle_create_object<D: DataLink>(
2882    datalink: &D,
2883    handler: &dyn crate::server::ServiceHandler,
2884    r: &mut Reader<'_>,
2885    invoke_id: u8,
2886    source: DataLinkAddress,
2887) {
2888    // objectSpecifier [0] opening
2889    match Tag::decode(r) {
2890        Ok(Tag::Opening { tag_num: 0 }) => {}
2891        _ => return,
2892    }
2893    // objectType [0] — context-tagged enumerated
2894    let object_type_raw = match Tag::decode(r) {
2895        Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
2896            Ok(v) => v,
2897            Err(_) => return,
2898        },
2899        _ => return,
2900    };
2901    let object_type = ObjectType::from_u16(object_type_raw as u16);
2902    // objectSpecifier [0] closing
2903    match Tag::decode(r) {
2904        Ok(Tag::Closing { tag_num: 0 }) => {}
2905        _ => return,
2906    }
2907
2908    match handler.create_object(object_type) {
2909        Ok(created_id) => {
2910            let mut buf = [0u8; 64];
2911            let mut w = Writer::new(&mut buf);
2912            if Npdu::new(0).encode(&mut w).is_err() {
2913                return;
2914            }
2915            if (ComplexAckHeader {
2916                segmented: false,
2917                more_follows: false,
2918                invoke_id,
2919                sequence_number: None,
2920                proposed_window_size: None,
2921                service_choice: SERVICE_CREATE_OBJECT,
2922            })
2923            .encode(&mut w)
2924            .is_err()
2925            {
2926                return;
2927            }
2928            if encode_ctx_unsigned(&mut w, 0, created_id.raw()).is_err() {
2929                return;
2930            }
2931            let _ = datalink.send(source, w.as_written()).await;
2932        }
2933        Err(err) => {
2934            dispatch_send_error(datalink, invoke_id, SERVICE_CREATE_OBJECT, err, source).await;
2935        }
2936    }
2937}
2938
2939async fn dispatch_handle_delete_object<D: DataLink>(
2940    datalink: &D,
2941    handler: &dyn crate::server::ServiceHandler,
2942    r: &mut Reader<'_>,
2943    invoke_id: u8,
2944    source: DataLinkAddress,
2945) {
2946    let object_id = match crate::decode_ctx_object_id(r) {
2947        Ok(v) => v,
2948        Err(_) => return,
2949    };
2950
2951    match handler.delete_object(object_id) {
2952        Ok(()) => {
2953            let mut buf = [0u8; 32];
2954            let mut w = Writer::new(&mut buf);
2955            if Npdu::new(0).encode(&mut w).is_err() {
2956                return;
2957            }
2958            if (SimpleAck {
2959                invoke_id,
2960                service_choice: SERVICE_DELETE_OBJECT,
2961            })
2962            .encode(&mut w)
2963            .is_err()
2964            {
2965                return;
2966            }
2967            let _ = datalink.send(source, w.as_written()).await;
2968        }
2969        Err(err) => {
2970            dispatch_send_error(datalink, invoke_id, SERVICE_DELETE_OBJECT, err, source).await;
2971        }
2972    }
2973}
2974
2975async fn dispatch_send_error<D: DataLink>(
2976    datalink: &D,
2977    invoke_id: u8,
2978    service_choice: u8,
2979    err: crate::server::BacnetServiceError,
2980    target: DataLinkAddress,
2981) {
2982    let (class, code) = dispatch_error_class_code(err);
2983    let mut buf = [0u8; 64];
2984    let mut w = Writer::new(&mut buf);
2985    if Npdu::new(0).encode(&mut w).is_err() {
2986        return;
2987    }
2988    // Error PDU header: type=5 (Error)
2989    if w.write_u8(0x50).is_err() {
2990        return;
2991    }
2992    if w.write_u8(invoke_id).is_err() {
2993        return;
2994    }
2995    if w.write_u8(service_choice).is_err() {
2996        return;
2997    }
2998    // error-class: application Enumerated
2999    if (Tag::Application {
3000        tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3001        len: 1,
3002    })
3003    .encode(&mut w)
3004    .is_err()
3005    {
3006        return;
3007    }
3008    if w.write_u8(class).is_err() {
3009        return;
3010    }
3011    // error-code: application Enumerated
3012    if (Tag::Application {
3013        tag: rustbac_core::encoding::tag::AppTag::Enumerated,
3014        len: 1,
3015    })
3016    .encode(&mut w)
3017    .is_err()
3018    {
3019        return;
3020    }
3021    if w.write_u8(code).is_err() {
3022        return;
3023    }
3024    let _ = datalink.send(target, w.as_written()).await;
3025}
3026
3027async fn dispatch_send_reject<D: DataLink>(
3028    datalink: &D,
3029    invoke_id: u8,
3030    reason: u8,
3031    target: DataLinkAddress,
3032) {
3033    let mut buf = [0u8; 16];
3034    let mut w = Writer::new(&mut buf);
3035    if Npdu::new(0).encode(&mut w).is_err() {
3036        return;
3037    }
3038    // Reject PDU: type=6 (Reject)
3039    if w.write_u8(0x60).is_err() {
3040        return;
3041    }
3042    if w.write_u8(invoke_id).is_err() {
3043        return;
3044    }
3045    if w.write_u8(reason).is_err() {
3046        return;
3047    }
3048    let _ = datalink.send(target, w.as_written()).await;
3049}
3050
3051/// Decode Who-Is optional [0] low-limit and [1] high-limit (dispatch-local copy).
3052fn dispatch_decode_who_is_limits(r: &mut Reader<'_>) -> Option<(u32, u32)> {
3053    if r.is_empty() {
3054        return None;
3055    }
3056    let tag0 = Tag::decode(r).ok()?;
3057    let low = match tag0 {
3058        Tag::Context { tag_num: 0, len } => decode_unsigned(r, len as usize).ok()?,
3059        _ => return None,
3060    };
3061    let tag1 = Tag::decode(r).ok()?;
3062    let high = match tag1 {
3063        Tag::Context { tag_num: 1, len } => decode_unsigned(r, len as usize).ok()?,
3064        _ => return None,
3065    };
3066    Some((low, high))
3067}
3068
3069fn dispatch_error_class_code(err: crate::server::BacnetServiceError) -> (u8, u8) {
3070    use crate::server::BacnetServiceError;
3071    match err {
3072        BacnetServiceError::UnknownObject => (1, 31),
3073        BacnetServiceError::UnknownProperty => (2, 32),
3074        BacnetServiceError::WriteAccessDenied => (2, 40),
3075        BacnetServiceError::InvalidDataType => (2, 9),
3076        BacnetServiceError::ServiceNotSupported => (5, 53),
3077    }
3078}
3079
3080fn dispatch_matches_who_is(device_id: u32, limits: Option<(u32, u32)>) -> bool {
3081    match limits {
3082        None => true,
3083        Some((low, high)) => device_id >= low && device_id <= high,
3084    }
3085}
3086
3087fn dispatch_decode_optional_array_index(r: &mut Reader<'_>) -> Option<u32> {
3088    if r.is_empty() {
3089        return None;
3090    }
3091    let tag = Tag::decode(r).ok()?;
3092    match tag {
3093        Tag::Context { tag_num: 2, len } => decode_unsigned(r, len as usize).ok(),
3094        _ => None,
3095    }
3096}
3097
3098fn dispatch_peek_context_tag(r: &mut Reader<'_>, tag_num: u8) -> Option<u32> {
3099    let first = r.peek_u8().ok()?;
3100    let is_context = (first & 0x08) != 0 && (first & 0x07) < 6;
3101    if !is_context {
3102        return None;
3103    }
3104    let this_tag_num = first >> 4;
3105    if this_tag_num != tag_num {
3106        return None;
3107    }
3108    let short_len = first & 0x07;
3109    if short_len < 5 {
3110        Some(short_len as u32)
3111    } else {
3112        None
3113    }
3114}
3115
3116fn dispatch_client_value_to_borrowed(val: &ClientDataValue) -> DataValue<'_> {
3117    match val {
3118        ClientDataValue::Null => DataValue::Null,
3119        ClientDataValue::Boolean(v) => DataValue::Boolean(*v),
3120        ClientDataValue::Unsigned(v) => DataValue::Unsigned(*v),
3121        ClientDataValue::Signed(v) => DataValue::Signed(*v),
3122        ClientDataValue::Real(v) => DataValue::Real(*v),
3123        ClientDataValue::Double(v) => DataValue::Double(*v),
3124        ClientDataValue::OctetString(v) => DataValue::OctetString(v),
3125        ClientDataValue::CharacterString(v) => DataValue::CharacterString(v),
3126        ClientDataValue::BitString { unused_bits, data } => {
3127            DataValue::BitString(rustbac_core::types::BitString {
3128                unused_bits: *unused_bits,
3129                data,
3130            })
3131        }
3132        ClientDataValue::Enumerated(v) => DataValue::Enumerated(*v),
3133        ClientDataValue::Date(v) => DataValue::Date(*v),
3134        ClientDataValue::Time(v) => DataValue::Time(*v),
3135        ClientDataValue::ObjectId(v) => DataValue::ObjectId(*v),
3136        ClientDataValue::Constructed { tag_num, values } => DataValue::Constructed {
3137            tag_num: *tag_num,
3138            values: values
3139                .iter()
3140                .map(dispatch_client_value_to_borrowed)
3141                .collect(),
3142        },
3143    }
3144}
3145
3146fn remote_service_error(err: BacnetError) -> ClientError {
3147    ClientError::RemoteServiceError {
3148        service_choice: err.service_choice,
3149        error_class_raw: err.error_class,
3150        error_code_raw: err.error_code,
3151        error_class: err.error_class.and_then(ErrorClass::from_u32),
3152        error_code: err.error_code.and_then(ErrorCode::from_u32),
3153    }
3154}
3155
3156fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
3157    Ok(match value {
3158        DataValue::Null => ClientDataValue::Null,
3159        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
3160        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
3161        DataValue::Signed(v) => ClientDataValue::Signed(v),
3162        DataValue::Real(v) => ClientDataValue::Real(v),
3163        DataValue::Double(v) => ClientDataValue::Double(v),
3164        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
3165        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
3166        DataValue::BitString(v) => ClientDataValue::BitString {
3167            unused_bits: v.unused_bits,
3168            data: v.data.to_vec(),
3169        },
3170        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
3171        DataValue::Date(v) => ClientDataValue::Date(v),
3172        DataValue::Time(v) => ClientDataValue::Time(v),
3173        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
3174        DataValue::Constructed { tag_num, values } => {
3175            let mut children = Vec::with_capacity(values.len());
3176            for child in values {
3177                children.push(into_client_value(child)?);
3178            }
3179            ClientDataValue::Constructed {
3180                tag_num,
3181                values: children,
3182            }
3183        }
3184    })
3185}
3186
3187fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
3188    value
3189        .into_iter()
3190        .map(|item| AlarmSummaryItem {
3191            object_id: item.object_id,
3192            alarm_state_raw: item.alarm_state,
3193            alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3194                item.alarm_state,
3195            ),
3196            acknowledged_transitions: ClientBitString {
3197                unused_bits: item.acknowledged_transitions.unused_bits,
3198                data: item.acknowledged_transitions.data.to_vec(),
3199            },
3200        })
3201        .collect()
3202}
3203
3204fn into_client_enrollment_summary(
3205    value: Vec<CoreEnrollmentSummaryItem>,
3206) -> Vec<EnrollmentSummaryItem> {
3207    value
3208        .into_iter()
3209        .map(|item| EnrollmentSummaryItem {
3210            object_id: item.object_id,
3211            event_type: item.event_type,
3212            event_state_raw: item.event_state,
3213            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3214                item.event_state,
3215            ),
3216            priority: item.priority,
3217            notification_class: item.notification_class,
3218        })
3219        .collect()
3220}
3221
3222fn into_client_event_information(
3223    value: Vec<CoreEventSummaryItem<'_>>,
3224) -> Vec<EventInformationItem> {
3225    value
3226        .into_iter()
3227        .map(|item| EventInformationItem {
3228            object_id: item.object_id,
3229            event_state_raw: item.event_state,
3230            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3231                item.event_state,
3232            ),
3233            acknowledged_transitions: ClientBitString {
3234                unused_bits: item.acknowledged_transitions.unused_bits,
3235                data: item.acknowledged_transitions.data.to_vec(),
3236            },
3237            notify_type: item.notify_type,
3238            event_enable: ClientBitString {
3239                unused_bits: item.event_enable.unused_bits,
3240                data: item.event_enable.data.to_vec(),
3241            },
3242            event_priorities: item.event_priorities,
3243        })
3244        .collect()
3245}
3246
3247fn into_client_cov_notification(
3248    source: DataLinkAddress,
3249    confirmed: bool,
3250    value: CovNotificationRequest<'_>,
3251) -> Result<CovNotification, ClientError> {
3252    let mut values = Vec::with_capacity(value.values.len());
3253    for property in value.values {
3254        values.push(CovPropertyValue {
3255            property_id: property.property_id,
3256            array_index: property.array_index,
3257            value: into_client_value(property.value)?,
3258            priority: property.priority,
3259        });
3260    }
3261
3262    Ok(CovNotification {
3263        source,
3264        confirmed,
3265        subscriber_process_id: value.subscriber_process_id,
3266        initiating_device_id: value.initiating_device_id,
3267        monitored_object_id: value.monitored_object_id,
3268        time_remaining_seconds: value.time_remaining_seconds,
3269        values,
3270    })
3271}
3272
3273fn into_client_event_notification(
3274    source: DataLinkAddress,
3275    confirmed: bool,
3276    value: EventNotificationRequest<'_>,
3277) -> EventNotification {
3278    EventNotification {
3279        source,
3280        confirmed,
3281        process_id: value.process_id,
3282        initiating_device_id: value.initiating_device_id,
3283        event_object_id: value.event_object_id,
3284        timestamp: value.timestamp,
3285        notification_class: value.notification_class,
3286        priority: value.priority,
3287        event_type: value.event_type,
3288        message_text: value.message_text.map(str::to_string),
3289        notify_type: value.notify_type,
3290        ack_required: value.ack_required,
3291        from_state_raw: value.from_state,
3292        from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
3293            value.from_state,
3294        ),
3295        to_state_raw: value.to_state,
3296        to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
3297    }
3298}
3299
3300fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
3301    let mut items = Vec::with_capacity(value.items.len());
3302    for item in value.items {
3303        items.push(into_client_value(item)?);
3304    }
3305    Ok(ReadRangeResult {
3306        object_id: value.object_id,
3307        property_id: value.property_id,
3308        array_index: value.array_index,
3309        result_flags: ClientBitString {
3310            unused_bits: value.result_flags.unused_bits,
3311            data: value.result_flags.data.to_vec(),
3312        },
3313        item_count: value.item_count,
3314        items,
3315    })
3316}
3317
3318fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
3319    match value.access_method {
3320        AtomicReadFileAckAccess::Stream {
3321            file_start_position,
3322            file_data,
3323        } => AtomicReadFileResult::Stream {
3324            end_of_file: value.end_of_file,
3325            file_start_position,
3326            file_data: file_data.to_vec(),
3327        },
3328        AtomicReadFileAckAccess::Record {
3329            file_start_record,
3330            returned_record_count,
3331            file_record_data,
3332        } => AtomicReadFileResult::Record {
3333            end_of_file: value.end_of_file,
3334            file_start_record,
3335            returned_record_count,
3336            file_record_data: file_record_data
3337                .into_iter()
3338                .map(|record| record.to_vec())
3339                .collect(),
3340        },
3341    }
3342}
3343
3344fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
3345    match value {
3346        AtomicWriteFileAck::Stream {
3347            file_start_position,
3348        } => AtomicWriteFileResult::Stream {
3349            file_start_position,
3350        },
3351        AtomicWriteFileAck::Record { file_start_record } => {
3352            AtomicWriteFileResult::Record { file_start_record }
3353        }
3354    }
3355}
3356
3357#[cfg(test)]
3358mod tests {
3359    use super::BacnetClient;
3360    use crate::{
3361        AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
3362        EnrollmentSummaryItem, EventInformationItem, EventNotification,
3363    };
3364    use rustbac_core::apdu::{
3365        ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
3366        UnconfirmedRequestHeader,
3367    };
3368    use rustbac_core::encoding::{
3369        primitives::{
3370            decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
3371            encode_ctx_object_id, encode_ctx_unsigned,
3372        },
3373        reader::Reader,
3374        tag::{AppTag, Tag},
3375        writer::Writer,
3376    };
3377    use rustbac_core::npdu::Npdu;
3378    use rustbac_core::services::acknowledge_alarm::{
3379        AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
3380    };
3381    use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
3382    use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
3383    use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
3384    use rustbac_core::services::cov_notification::{
3385        SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3386    };
3387    use rustbac_core::services::device_management::{
3388        DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
3389        SERVICE_REINITIALIZE_DEVICE,
3390    };
3391    use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
3392    use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
3393    use rustbac_core::services::event_notification::{
3394        SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3395    };
3396    use rustbac_core::services::list_element::{
3397        AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
3398        SERVICE_REMOVE_LIST_ELEMENT,
3399    };
3400    use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
3401    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
3402    use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
3403    use rustbac_core::services::read_range::SERVICE_READ_RANGE;
3404    use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
3405    use rustbac_core::services::subscribe_cov_property::{
3406        SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
3407    };
3408    use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
3409    use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
3410    use rustbac_core::services::write_property_multiple::{
3411        PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
3412    };
3413    use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
3414    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
3415    use std::collections::VecDeque;
3416    use std::sync::Arc;
3417    use std::time::Duration;
3418    use tokio::sync::Mutex;
3419
3420    #[derive(Debug, Default)]
3421    struct MockState {
3422        sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
3423        recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
3424    }
3425
3426    #[derive(Debug, Clone)]
3427    struct MockDataLink {
3428        state: Arc<MockState>,
3429    }
3430
3431    impl MockDataLink {
3432        fn new() -> (Self, Arc<MockState>) {
3433            let state = Arc::new(MockState::default());
3434            (
3435                Self {
3436                    state: state.clone(),
3437                },
3438                state,
3439            )
3440        }
3441    }
3442
3443    impl DataLink for MockDataLink {
3444        async fn send(
3445            &self,
3446            address: DataLinkAddress,
3447            payload: &[u8],
3448        ) -> Result<(), DataLinkError> {
3449            self.state
3450                .sent
3451                .lock()
3452                .await
3453                .push((address, payload.to_vec()));
3454            Ok(())
3455        }
3456
3457        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
3458            let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
3459                return Err(DataLinkError::InvalidFrame);
3460            };
3461            if payload.len() > buf.len() {
3462                return Err(DataLinkError::FrameTooLarge);
3463            }
3464            buf[..payload.len()].copy_from_slice(&payload);
3465            Ok((payload.len(), addr))
3466        }
3467    }
3468
3469    fn with_npdu(apdu: &[u8]) -> Vec<u8> {
3470        let mut out = [0u8; 512];
3471        let mut w = Writer::new(&mut out);
3472        Npdu::new(0).encode(&mut w).unwrap();
3473        w.write_all(apdu).unwrap();
3474        w.as_written().to_vec()
3475    }
3476
3477    fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3478        let mut apdu_buf = [0u8; 256];
3479        let mut w = Writer::new(&mut apdu_buf);
3480        ComplexAckHeader {
3481            segmented: false,
3482            more_follows: false,
3483            invoke_id,
3484            sequence_number: None,
3485            proposed_window_size: None,
3486            service_choice: SERVICE_READ_RANGE,
3487        }
3488        .encode(&mut w)
3489        .unwrap();
3490        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3491        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3492        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3493        w.write_u8(5).unwrap();
3494        w.write_u8(0b1110_0000).unwrap();
3495        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3496        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3497        encode_app_real(&mut w, 42.0).unwrap();
3498        encode_app_real(&mut w, 43.0).unwrap();
3499        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3500        w.as_written().to_vec()
3501    }
3502
3503    fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
3504        let mut apdu_buf = [0u8; 256];
3505        let mut w = Writer::new(&mut apdu_buf);
3506        ComplexAckHeader {
3507            segmented: false,
3508            more_follows: false,
3509            invoke_id,
3510            sequence_number: None,
3511            proposed_window_size: None,
3512            service_choice: SERVICE_ATOMIC_READ_FILE,
3513        }
3514        .encode(&mut w)
3515        .unwrap();
3516        Tag::Application {
3517            tag: AppTag::Boolean,
3518            len: if eof { 1 } else { 0 },
3519        }
3520        .encode(&mut w)
3521        .unwrap();
3522        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3523        Tag::Application {
3524            tag: AppTag::SignedInt,
3525            len: 1,
3526        }
3527        .encode(&mut w)
3528        .unwrap();
3529        w.write_u8(0).unwrap();
3530        Tag::Application {
3531            tag: AppTag::OctetString,
3532            len: data.len() as u32,
3533        }
3534        .encode(&mut w)
3535        .unwrap();
3536        w.write_all(data).unwrap();
3537        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3538        w.as_written().to_vec()
3539    }
3540
3541    fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
3542        let mut apdu_buf = [0u8; 256];
3543        let mut w = Writer::new(&mut apdu_buf);
3544        ComplexAckHeader {
3545            segmented: false,
3546            more_follows: false,
3547            invoke_id,
3548            sequence_number: None,
3549            proposed_window_size: None,
3550            service_choice: SERVICE_ATOMIC_READ_FILE,
3551        }
3552        .encode(&mut w)
3553        .unwrap();
3554        Tag::Application {
3555            tag: AppTag::Boolean,
3556            len: 0,
3557        }
3558        .encode(&mut w)
3559        .unwrap();
3560        Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
3561        Tag::Application {
3562            tag: AppTag::SignedInt,
3563            len: 1,
3564        }
3565        .encode(&mut w)
3566        .unwrap();
3567        w.write_u8(7).unwrap();
3568        Tag::Application {
3569            tag: AppTag::UnsignedInt,
3570            len: 1,
3571        }
3572        .encode(&mut w)
3573        .unwrap();
3574        w.write_u8(2).unwrap();
3575        Tag::Application {
3576            tag: AppTag::OctetString,
3577            len: 2,
3578        }
3579        .encode(&mut w)
3580        .unwrap();
3581        w.write_all(&[0x01, 0x02]).unwrap();
3582        Tag::Application {
3583            tag: AppTag::OctetString,
3584            len: 3,
3585        }
3586        .encode(&mut w)
3587        .unwrap();
3588        w.write_all(&[0x03, 0x04, 0x05]).unwrap();
3589        Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
3590        w.as_written().to_vec()
3591    }
3592
3593    fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
3594        let mut apdu_buf = [0u8; 64];
3595        let mut w = Writer::new(&mut apdu_buf);
3596        ComplexAckHeader {
3597            segmented: false,
3598            more_follows: false,
3599            invoke_id,
3600            sequence_number: None,
3601            proposed_window_size: None,
3602            service_choice: SERVICE_ATOMIC_WRITE_FILE,
3603        }
3604        .encode(&mut w)
3605        .unwrap();
3606        Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
3607        w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
3608        w.as_written().to_vec()
3609    }
3610
3611    fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
3612        let mut apdu_buf = [0u8; 64];
3613        let mut w = Writer::new(&mut apdu_buf);
3614        ComplexAckHeader {
3615            segmented: false,
3616            more_follows: false,
3617            invoke_id,
3618            sequence_number: None,
3619            proposed_window_size: None,
3620            service_choice: SERVICE_ATOMIC_WRITE_FILE,
3621        }
3622        .encode(&mut w)
3623        .unwrap();
3624        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3625        w.write_u8(start_record as u8).unwrap();
3626        w.as_written().to_vec()
3627    }
3628
3629    fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
3630        let mut apdu_buf = [0u8; 64];
3631        let mut w = Writer::new(&mut apdu_buf);
3632        ComplexAckHeader {
3633            segmented: false,
3634            more_follows: false,
3635            invoke_id,
3636            sequence_number: None,
3637            proposed_window_size: None,
3638            service_choice: SERVICE_CREATE_OBJECT,
3639        }
3640        .encode(&mut w)
3641        .unwrap();
3642        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3643        w.as_written().to_vec()
3644    }
3645
3646    fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3647        let mut apdu_buf = [0u8; 128];
3648        let mut w = Writer::new(&mut apdu_buf);
3649        ComplexAckHeader {
3650            segmented: false,
3651            more_follows: false,
3652            invoke_id,
3653            sequence_number: None,
3654            proposed_window_size: None,
3655            service_choice: SERVICE_GET_ALARM_SUMMARY,
3656        }
3657        .encode(&mut w)
3658        .unwrap();
3659        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3660        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3661        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3662        w.write_u8(5).unwrap();
3663        w.write_u8(0b1110_0000).unwrap();
3664
3665        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
3666        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3667        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3668        w.write_u8(5).unwrap();
3669        w.write_u8(0b1100_0000).unwrap();
3670        w.as_written().to_vec()
3671    }
3672
3673    fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
3674        let mut apdu_buf = [0u8; 160];
3675        let mut w = Writer::new(&mut apdu_buf);
3676        ComplexAckHeader {
3677            segmented: false,
3678            more_follows: false,
3679            invoke_id,
3680            sequence_number: None,
3681            proposed_window_size: None,
3682            service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
3683        }
3684        .encode(&mut w)
3685        .unwrap();
3686        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3687        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3688        encode_ctx_unsigned(&mut w, 2, 2).unwrap();
3689        encode_ctx_unsigned(&mut w, 3, 200).unwrap();
3690        encode_ctx_unsigned(&mut w, 4, 10).unwrap();
3691
3692        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
3693        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
3694        encode_ctx_unsigned(&mut w, 2, 0).unwrap();
3695        encode_ctx_unsigned(&mut w, 3, 20).unwrap();
3696        encode_ctx_unsigned(&mut w, 4, 11).unwrap();
3697        w.as_written().to_vec()
3698    }
3699
3700    fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
3701        let mut apdu_buf = [0u8; 256];
3702        let mut w = Writer::new(&mut apdu_buf);
3703        ComplexAckHeader {
3704            segmented: false,
3705            more_follows: false,
3706            invoke_id,
3707            sequence_number: None,
3708            proposed_window_size: None,
3709            service_choice: SERVICE_GET_EVENT_INFORMATION,
3710        }
3711        .encode(&mut w)
3712        .unwrap();
3713        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3714        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3715        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3716        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
3717        w.write_u8(5).unwrap();
3718        w.write_u8(0b1110_0000).unwrap();
3719        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3720        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3721        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
3722        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3723        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3724        encode_ctx_unsigned(&mut w, 4, 0).unwrap();
3725        Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
3726        w.write_u8(5).unwrap();
3727        w.write_u8(0b1100_0000).unwrap();
3728        Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
3729        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3730        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
3731        encode_ctx_unsigned(&mut w, 2, 3).unwrap();
3732        Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
3733        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3734        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
3735        w.write_u8(0).unwrap();
3736        w.as_written().to_vec()
3737    }
3738
3739    #[tokio::test]
3740    async fn who_has_object_name_collects_i_have() {
3741        let (dl, state) = MockDataLink::new();
3742        let client = BacnetClient::with_datalink(dl);
3743        let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
3744
3745        let mut apdu = [0u8; 128];
3746        let mut w = Writer::new(&mut apdu);
3747        UnconfirmedRequestHeader {
3748            service_choice: SERVICE_I_HAVE,
3749        }
3750        .encode(&mut w)
3751        .unwrap();
3752        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
3753        encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3754        encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
3755
3756        state
3757            .recv
3758            .lock()
3759            .await
3760            .push_back((with_npdu(w.as_written()), addr));
3761
3762        let results = client
3763            .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
3764            .await
3765            .unwrap();
3766        assert_eq!(results.len(), 1);
3767        assert_eq!(results[0].address, addr);
3768        assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
3769        assert_eq!(
3770            results[0].object_id,
3771            ObjectId::new(ObjectType::AnalogInput, 7)
3772        );
3773        assert_eq!(results[0].object_name, "Zone Temp");
3774
3775        let sent = state.sent.lock().await;
3776        assert_eq!(sent.len(), 1);
3777        let mut r = Reader::new(&sent[0].1);
3778        let _npdu = Npdu::decode(&mut r).unwrap();
3779        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3780        assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
3781    }
3782
3783    #[tokio::test]
3784    async fn device_communication_control_handles_simple_ack() {
3785        let (dl, state) = MockDataLink::new();
3786        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3787        let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
3788
3789        let mut apdu = [0u8; 32];
3790        let mut w = Writer::new(&mut apdu);
3791        SimpleAck {
3792            invoke_id: 1,
3793            service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
3794        }
3795        .encode(&mut w)
3796        .unwrap();
3797        state
3798            .recv
3799            .lock()
3800            .await
3801            .push_back((with_npdu(w.as_written()), addr));
3802
3803        client
3804            .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
3805            .await
3806            .unwrap();
3807
3808        let sent = state.sent.lock().await;
3809        assert_eq!(sent.len(), 1);
3810        let mut r = Reader::new(&sent[0].1);
3811        let _npdu = Npdu::decode(&mut r).unwrap();
3812        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3813        assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
3814    }
3815
3816    #[tokio::test]
3817    async fn reinitialize_device_handles_simple_ack() {
3818        let (dl, state) = MockDataLink::new();
3819        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3820        let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
3821
3822        let mut apdu = [0u8; 32];
3823        let mut w = Writer::new(&mut apdu);
3824        SimpleAck {
3825            invoke_id: 1,
3826            service_choice: SERVICE_REINITIALIZE_DEVICE,
3827        }
3828        .encode(&mut w)
3829        .unwrap();
3830        state
3831            .recv
3832            .lock()
3833            .await
3834            .push_back((with_npdu(w.as_written()), addr));
3835
3836        client
3837            .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
3838            .await
3839            .unwrap();
3840
3841        let sent = state.sent.lock().await;
3842        assert_eq!(sent.len(), 1);
3843        let mut r = Reader::new(&sent[0].1);
3844        let _npdu = Npdu::decode(&mut r).unwrap();
3845        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3846        assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
3847    }
3848
3849    #[tokio::test]
3850    async fn time_synchronize_sends_unconfirmed_request() {
3851        let (dl, state) = MockDataLink::new();
3852        let client = BacnetClient::with_datalink(dl);
3853        let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
3854
3855        client
3856            .time_synchronize(
3857                addr,
3858                Date {
3859                    year_since_1900: 126,
3860                    month: 2,
3861                    day: 7,
3862                    weekday: 6,
3863                },
3864                Time {
3865                    hour: 10,
3866                    minute: 11,
3867                    second: 12,
3868                    hundredths: 13,
3869                },
3870                false,
3871            )
3872            .await
3873            .unwrap();
3874
3875        let sent = state.sent.lock().await;
3876        assert_eq!(sent.len(), 1);
3877        let mut r = Reader::new(&sent[0].1);
3878        let _npdu = Npdu::decode(&mut r).unwrap();
3879        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
3880        assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
3881    }
3882
3883    #[tokio::test]
3884    async fn get_alarm_summary_decodes_complex_ack() {
3885        let (dl, state) = MockDataLink::new();
3886        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3887        let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
3888
3889        state
3890            .recv
3891            .lock()
3892            .await
3893            .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
3894
3895        let summaries = client.get_alarm_summary(addr).await.unwrap();
3896        assert_eq!(summaries.len(), 2);
3897        assert_eq!(
3898            summaries[0],
3899            AlarmSummaryItem {
3900                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3901                alarm_state_raw: 1,
3902                alarm_state: Some(EventState::Fault),
3903                acknowledged_transitions: crate::ClientBitString {
3904                    unused_bits: 5,
3905                    data: vec![0b1110_0000],
3906                },
3907            }
3908        );
3909        assert_eq!(
3910            summaries[1],
3911            AlarmSummaryItem {
3912                object_id: ObjectId::new(ObjectType::BinaryInput, 2),
3913                alarm_state_raw: 0,
3914                alarm_state: Some(EventState::Normal),
3915                acknowledged_transitions: crate::ClientBitString {
3916                    unused_bits: 5,
3917                    data: vec![0b1100_0000],
3918                },
3919            }
3920        );
3921
3922        let sent = state.sent.lock().await;
3923        assert_eq!(sent.len(), 1);
3924        let mut r = Reader::new(&sent[0].1);
3925        let _npdu = Npdu::decode(&mut r).unwrap();
3926        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3927        assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
3928    }
3929
3930    #[tokio::test]
3931    async fn get_enrollment_summary_decodes_complex_ack() {
3932        let (dl, state) = MockDataLink::new();
3933        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3934        let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
3935
3936        state
3937            .recv
3938            .lock()
3939            .await
3940            .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
3941
3942        let summaries = client.get_enrollment_summary(addr).await.unwrap();
3943        assert_eq!(summaries.len(), 2);
3944        assert_eq!(
3945            summaries[0],
3946            EnrollmentSummaryItem {
3947                object_id: ObjectId::new(ObjectType::AnalogInput, 7),
3948                event_type: 1,
3949                event_state_raw: 2,
3950                event_state: Some(EventState::Offnormal),
3951                priority: 200,
3952                notification_class: 10,
3953            }
3954        );
3955        assert_eq!(
3956            summaries[1],
3957            EnrollmentSummaryItem {
3958                object_id: ObjectId::new(ObjectType::BinaryInput, 8),
3959                event_type: 0,
3960                event_state_raw: 0,
3961                event_state: Some(EventState::Normal),
3962                priority: 20,
3963                notification_class: 11,
3964            }
3965        );
3966
3967        let sent = state.sent.lock().await;
3968        assert_eq!(sent.len(), 1);
3969        let mut r = Reader::new(&sent[0].1);
3970        let _npdu = Npdu::decode(&mut r).unwrap();
3971        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3972        assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
3973    }
3974
3975    #[tokio::test]
3976    async fn get_event_information_decodes_complex_ack() {
3977        let (dl, state) = MockDataLink::new();
3978        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3979        let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
3980
3981        state
3982            .recv
3983            .lock()
3984            .await
3985            .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
3986
3987        let result = client.get_event_information(addr, None).await.unwrap();
3988        assert!(!result.more_events);
3989        assert_eq!(result.summaries.len(), 1);
3990        assert_eq!(
3991            result.summaries[0],
3992            EventInformationItem {
3993                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
3994                event_state_raw: 2,
3995                event_state: Some(EventState::Offnormal),
3996                acknowledged_transitions: crate::ClientBitString {
3997                    unused_bits: 5,
3998                    data: vec![0b1110_0000],
3999                },
4000                notify_type: 0,
4001                event_enable: crate::ClientBitString {
4002                    unused_bits: 5,
4003                    data: vec![0b1100_0000],
4004                },
4005                event_priorities: [1, 2, 3],
4006            }
4007        );
4008    }
4009
4010    #[tokio::test]
4011    async fn acknowledge_alarm_handles_simple_ack() {
4012        let (dl, state) = MockDataLink::new();
4013        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4014        let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
4015
4016        let mut apdu = [0u8; 32];
4017        let mut w = Writer::new(&mut apdu);
4018        SimpleAck {
4019            invoke_id: 1,
4020            service_choice: SERVICE_ACKNOWLEDGE_ALARM,
4021        }
4022        .encode(&mut w)
4023        .unwrap();
4024        state
4025            .recv
4026            .lock()
4027            .await
4028            .push_back((with_npdu(w.as_written()), addr));
4029
4030        client
4031            .acknowledge_alarm(
4032                addr,
4033                AcknowledgeAlarmRequest {
4034                    acknowledging_process_id: 10,
4035                    event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
4036                    event_state_acknowledged: EventState::Offnormal,
4037                    event_time_stamp: TimeStamp::SequenceNumber(42),
4038                    acknowledgment_source: "operator",
4039                    time_of_acknowledgment: TimeStamp::DateTime {
4040                        date: Date {
4041                            year_since_1900: 126,
4042                            month: 2,
4043                            day: 7,
4044                            weekday: 6,
4045                        },
4046                        time: Time {
4047                            hour: 10,
4048                            minute: 11,
4049                            second: 12,
4050                            hundredths: 13,
4051                        },
4052                    },
4053                    invoke_id: 0,
4054                },
4055            )
4056            .await
4057            .unwrap();
4058
4059        let sent = state.sent.lock().await;
4060        assert_eq!(sent.len(), 1);
4061        let mut r = Reader::new(&sent[0].1);
4062        let _npdu = Npdu::decode(&mut r).unwrap();
4063        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4064        assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
4065    }
4066
4067    #[tokio::test]
4068    async fn create_object_by_type_decodes_complex_ack() {
4069        let (dl, state) = MockDataLink::new();
4070        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4071        let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
4072        let created = ObjectId::new(ObjectType::AnalogValue, 42);
4073
4074        state
4075            .recv
4076            .lock()
4077            .await
4078            .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
4079
4080        let result = client
4081            .create_object_by_type(addr, ObjectType::AnalogValue)
4082            .await
4083            .unwrap();
4084        assert_eq!(result, created);
4085
4086        let sent = state.sent.lock().await;
4087        let mut r = Reader::new(&sent[0].1);
4088        let _npdu = Npdu::decode(&mut r).unwrap();
4089        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4090        assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
4091    }
4092
4093    #[tokio::test]
4094    async fn delete_object_handles_simple_ack() {
4095        let (dl, state) = MockDataLink::new();
4096        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4097        let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
4098
4099        let mut apdu = [0u8; 32];
4100        let mut w = Writer::new(&mut apdu);
4101        SimpleAck {
4102            invoke_id: 1,
4103            service_choice: SERVICE_DELETE_OBJECT,
4104        }
4105        .encode(&mut w)
4106        .unwrap();
4107        state
4108            .recv
4109            .lock()
4110            .await
4111            .push_back((with_npdu(w.as_written()), addr));
4112
4113        client
4114            .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
4115            .await
4116            .unwrap();
4117    }
4118
4119    #[tokio::test]
4120    async fn add_list_element_handles_simple_ack() {
4121        let (dl, state) = MockDataLink::new();
4122        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4123        let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
4124
4125        let mut apdu = [0u8; 32];
4126        let mut w = Writer::new(&mut apdu);
4127        SimpleAck {
4128            invoke_id: 1,
4129            service_choice: SERVICE_ADD_LIST_ELEMENT,
4130        }
4131        .encode(&mut w)
4132        .unwrap();
4133        state
4134            .recv
4135            .lock()
4136            .await
4137            .push_back((with_npdu(w.as_written()), addr));
4138
4139        let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
4140        client
4141            .add_list_element(
4142                addr,
4143                AddListElementRequest {
4144                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4145                    property_id: PropertyId::Proprietary(512),
4146                    array_index: None,
4147                    elements: &values,
4148                    invoke_id: 0,
4149                },
4150            )
4151            .await
4152            .unwrap();
4153    }
4154
4155    #[tokio::test]
4156    async fn remove_list_element_handles_simple_ack() {
4157        let (dl, state) = MockDataLink::new();
4158        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4159        let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
4160
4161        let mut apdu = [0u8; 32];
4162        let mut w = Writer::new(&mut apdu);
4163        SimpleAck {
4164            invoke_id: 1,
4165            service_choice: SERVICE_REMOVE_LIST_ELEMENT,
4166        }
4167        .encode(&mut w)
4168        .unwrap();
4169        state
4170            .recv
4171            .lock()
4172            .await
4173            .push_back((with_npdu(w.as_written()), addr));
4174
4175        let values = [DataValue::Unsigned(1)];
4176        client
4177            .remove_list_element(
4178                addr,
4179                RemoveListElementRequest {
4180                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
4181                    property_id: PropertyId::Proprietary(513),
4182                    array_index: None,
4183                    elements: &values,
4184                    invoke_id: 0,
4185                },
4186            )
4187            .await
4188            .unwrap();
4189    }
4190
4191    #[tokio::test]
4192    async fn atomic_read_file_stream_decodes_complex_ack() {
4193        let (dl, state) = MockDataLink::new();
4194        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4195        let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
4196        let file_object = ObjectId::new(ObjectType::File, 2);
4197
4198        state.recv.lock().await.push_back((
4199            with_npdu(&atomic_read_file_stream_ack_apdu(
4200                1,
4201                true,
4202                &[0xAA, 0xBB, 0xCC],
4203            )),
4204            addr,
4205        ));
4206
4207        let result = client
4208            .atomic_read_file_stream(addr, file_object, 0, 3)
4209            .await
4210            .unwrap();
4211
4212        assert_eq!(
4213            result,
4214            AtomicReadFileResult::Stream {
4215                end_of_file: true,
4216                file_start_position: 0,
4217                file_data: vec![0xAA, 0xBB, 0xCC],
4218            }
4219        );
4220
4221        let sent = state.sent.lock().await;
4222        assert_eq!(sent.len(), 1);
4223        let mut r = Reader::new(&sent[0].1);
4224        let _npdu = Npdu::decode(&mut r).unwrap();
4225        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4226        assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
4227    }
4228
4229    #[tokio::test]
4230    async fn atomic_read_file_record_decodes_complex_ack() {
4231        let (dl, state) = MockDataLink::new();
4232        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4233        let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
4234        let file_object = ObjectId::new(ObjectType::File, 5);
4235
4236        state
4237            .recv
4238            .lock()
4239            .await
4240            .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
4241
4242        let result = client
4243            .atomic_read_file_record(addr, file_object, 7, 2)
4244            .await
4245            .unwrap();
4246
4247        assert_eq!(
4248            result,
4249            AtomicReadFileResult::Record {
4250                end_of_file: false,
4251                file_start_record: 7,
4252                returned_record_count: 2,
4253                file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
4254            }
4255        );
4256    }
4257
4258    #[tokio::test]
4259    async fn atomic_write_file_stream_decodes_complex_ack() {
4260        let (dl, state) = MockDataLink::new();
4261        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4262        let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
4263        let file_object = ObjectId::new(ObjectType::File, 3);
4264
4265        state
4266            .recv
4267            .lock()
4268            .await
4269            .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
4270
4271        let result = client
4272            .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
4273            .await
4274            .unwrap();
4275
4276        assert_eq!(
4277            result,
4278            AtomicWriteFileResult::Stream {
4279                file_start_position: 128
4280            }
4281        );
4282    }
4283
4284    #[tokio::test]
4285    async fn atomic_write_file_record_decodes_complex_ack() {
4286        let (dl, state) = MockDataLink::new();
4287        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4288        let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
4289        let file_object = ObjectId::new(ObjectType::File, 9);
4290        let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
4291
4292        state
4293            .recv
4294            .lock()
4295            .await
4296            .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
4297
4298        let result = client
4299            .atomic_write_file_record(addr, file_object, 7, &records)
4300            .await
4301            .unwrap();
4302
4303        assert_eq!(
4304            result,
4305            AtomicWriteFileResult::Record {
4306                file_start_record: 7
4307            }
4308        );
4309    }
4310
4311    #[tokio::test]
4312    async fn read_properties_decodes_complex_ack() {
4313        let (dl, state) = MockDataLink::new();
4314        let client = BacnetClient::with_datalink(dl);
4315        let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
4316        let object_id = ObjectId::new(ObjectType::Device, 1);
4317
4318        let mut apdu_buf = [0u8; 256];
4319        let mut w = Writer::new(&mut apdu_buf);
4320        ComplexAckHeader {
4321            segmented: false,
4322            more_follows: false,
4323            invoke_id: 1,
4324            sequence_number: None,
4325            proposed_window_size: None,
4326            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4327        }
4328        .encode(&mut w)
4329        .unwrap();
4330        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4331        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4332            .encode(&mut w)
4333            .unwrap();
4334        encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
4335        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4336            .encode(&mut w)
4337            .unwrap();
4338        encode_app_real(&mut w, 55.5).unwrap();
4339        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4340            .encode(&mut w)
4341            .unwrap();
4342        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4343            .encode(&mut w)
4344            .unwrap();
4345
4346        state
4347            .recv
4348            .lock()
4349            .await
4350            .push_back((with_npdu(w.as_written()), addr));
4351
4352        let values = client
4353            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4354            .await
4355            .unwrap();
4356        assert_eq!(values.len(), 1);
4357        assert_eq!(values[0].0, PropertyId::PresentValue);
4358        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
4359
4360        let sent = state.sent.lock().await;
4361        assert_eq!(sent.len(), 1);
4362        let mut r = Reader::new(&sent[0].1);
4363        let _npdu = Npdu::decode(&mut r).unwrap();
4364        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4365        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
4366    }
4367
4368    #[tokio::test]
4369    async fn read_property_multiple_reassembles_segmented_complex_ack() {
4370        let (dl, state) = MockDataLink::new();
4371        let client = BacnetClient::with_datalink(dl);
4372        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4373        let object_id = ObjectId::new(ObjectType::Device, 1);
4374
4375        let mut payload_buf = [0u8; 256];
4376        let mut pw = Writer::new(&mut payload_buf);
4377        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4378        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4379            .encode(&mut pw)
4380            .unwrap();
4381        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4382        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4383            .encode(&mut pw)
4384            .unwrap();
4385        encode_app_real(&mut pw, 66.0).unwrap();
4386        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4387            .encode(&mut pw)
4388            .unwrap();
4389        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4390            .encode(&mut pw)
4391            .unwrap();
4392        let payload = pw.as_written();
4393        let split = payload.len() / 2;
4394
4395        let mut apdu1 = [0u8; 256];
4396        let mut w1 = Writer::new(&mut apdu1);
4397        ComplexAckHeader {
4398            segmented: true,
4399            more_follows: true,
4400            invoke_id: 1,
4401            sequence_number: Some(0),
4402            proposed_window_size: Some(1),
4403            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4404        }
4405        .encode(&mut w1)
4406        .unwrap();
4407        w1.write_all(&payload[..split]).unwrap();
4408
4409        let mut apdu2 = [0u8; 256];
4410        let mut w2 = Writer::new(&mut apdu2);
4411        ComplexAckHeader {
4412            segmented: true,
4413            more_follows: false,
4414            invoke_id: 1,
4415            sequence_number: Some(1),
4416            proposed_window_size: Some(1),
4417            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4418        }
4419        .encode(&mut w2)
4420        .unwrap();
4421        w2.write_all(&payload[split..]).unwrap();
4422
4423        state
4424            .recv
4425            .lock()
4426            .await
4427            .push_back((with_npdu(w1.as_written()), addr));
4428        state
4429            .recv
4430            .lock()
4431            .await
4432            .push_back((with_npdu(w2.as_written()), addr));
4433
4434        let values = client
4435            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4436            .await
4437            .unwrap();
4438        assert_eq!(values.len(), 1);
4439        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4440
4441        let sent = state.sent.lock().await;
4442        assert!(sent.len() >= 3);
4443
4444        let mut saw_segment_ack = 0usize;
4445        for (_, frame) in sent.iter().skip(1) {
4446            let mut r = Reader::new(frame);
4447            let _npdu = Npdu::decode(&mut r).unwrap();
4448            let apdu = r.read_exact(r.remaining()).unwrap();
4449            if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
4450                let mut sr = Reader::new(apdu);
4451                let sack = SegmentAck::decode(&mut sr).unwrap();
4452                assert_eq!(sack.invoke_id, 1);
4453                saw_segment_ack += 1;
4454            }
4455        }
4456        assert!(saw_segment_ack >= 1);
4457    }
4458
4459    #[tokio::test]
4460    async fn read_property_multiple_tolerates_duplicate_segment() {
4461        let (dl, state) = MockDataLink::new();
4462        let client = BacnetClient::with_datalink(dl);
4463        let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
4464        let object_id = ObjectId::new(ObjectType::Device, 1);
4465
4466        let mut payload_buf = [0u8; 256];
4467        let mut pw = Writer::new(&mut payload_buf);
4468        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
4469        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4470            .encode(&mut pw)
4471            .unwrap();
4472        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
4473        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4474            .encode(&mut pw)
4475            .unwrap();
4476        encode_app_real(&mut pw, 66.0).unwrap();
4477        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4478            .encode(&mut pw)
4479            .unwrap();
4480        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4481            .encode(&mut pw)
4482            .unwrap();
4483        let payload = pw.as_written();
4484        let split = payload.len() / 2;
4485
4486        let mut apdu1 = [0u8; 256];
4487        let mut w1 = Writer::new(&mut apdu1);
4488        ComplexAckHeader {
4489            segmented: true,
4490            more_follows: true,
4491            invoke_id: 1,
4492            sequence_number: Some(0),
4493            proposed_window_size: Some(1),
4494            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4495        }
4496        .encode(&mut w1)
4497        .unwrap();
4498        w1.write_all(&payload[..split]).unwrap();
4499
4500        let mut dup = [0u8; 256];
4501        let mut wd = Writer::new(&mut dup);
4502        ComplexAckHeader {
4503            segmented: true,
4504            more_follows: true,
4505            invoke_id: 1,
4506            sequence_number: Some(0),
4507            proposed_window_size: Some(1),
4508            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4509        }
4510        .encode(&mut wd)
4511        .unwrap();
4512        wd.write_all(&payload[..split]).unwrap();
4513
4514        let mut apdu2 = [0u8; 256];
4515        let mut w2 = Writer::new(&mut apdu2);
4516        ComplexAckHeader {
4517            segmented: true,
4518            more_follows: false,
4519            invoke_id: 1,
4520            sequence_number: Some(1),
4521            proposed_window_size: Some(1),
4522            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4523        }
4524        .encode(&mut w2)
4525        .unwrap();
4526        w2.write_all(&payload[split..]).unwrap();
4527
4528        {
4529            let mut recv = state.recv.lock().await;
4530            recv.push_back((with_npdu(w1.as_written()), addr));
4531            recv.push_back((with_npdu(wd.as_written()), addr));
4532            recv.push_back((with_npdu(w2.as_written()), addr));
4533        }
4534
4535        let values = client
4536            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
4537            .await
4538            .unwrap();
4539        assert_eq!(values.len(), 1);
4540        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
4541    }
4542
4543    #[tokio::test]
4544    async fn write_properties_handles_simple_ack() {
4545        let (dl, state) = MockDataLink::new();
4546        let client = BacnetClient::with_datalink(dl);
4547        let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
4548        let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
4549
4550        let mut apdu_buf = [0u8; 32];
4551        let mut w = Writer::new(&mut apdu_buf);
4552        SimpleAck {
4553            invoke_id: 1,
4554            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
4555        }
4556        .encode(&mut w)
4557        .unwrap();
4558        state
4559            .recv
4560            .lock()
4561            .await
4562            .push_back((with_npdu(w.as_written()), addr));
4563
4564        let writes = [PropertyWriteSpec {
4565            property_id: PropertyId::PresentValue,
4566            array_index: None,
4567            value: DataValue::Real(12.5),
4568            priority: Some(8),
4569        }];
4570        client
4571            .write_property_multiple(addr, object_id, &writes)
4572            .await
4573            .unwrap();
4574
4575        let sent = state.sent.lock().await;
4576        assert_eq!(sent.len(), 1);
4577        let mut r = Reader::new(&sent[0].1);
4578        let _npdu = Npdu::decode(&mut r).unwrap();
4579        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4580        assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
4581    }
4582
4583    #[tokio::test]
4584    async fn subscribe_cov_handles_simple_ack() {
4585        let (dl, state) = MockDataLink::new();
4586        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4587        let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
4588
4589        let mut apdu_buf = [0u8; 32];
4590        let mut w = Writer::new(&mut apdu_buf);
4591        SimpleAck {
4592            invoke_id: 1,
4593            service_choice: SERVICE_SUBSCRIBE_COV,
4594        }
4595        .encode(&mut w)
4596        .unwrap();
4597        state
4598            .recv
4599            .lock()
4600            .await
4601            .push_back((with_npdu(w.as_written()), addr));
4602
4603        client
4604            .subscribe_cov(
4605                addr,
4606                SubscribeCovRequest {
4607                    subscriber_process_id: 10,
4608                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4609                    issue_confirmed_notifications: Some(false),
4610                    lifetime_seconds: Some(300),
4611                    invoke_id: 0,
4612                },
4613            )
4614            .await
4615            .unwrap();
4616
4617        let sent = state.sent.lock().await;
4618        assert_eq!(sent.len(), 1);
4619        let mut r = Reader::new(&sent[0].1);
4620        let _npdu = Npdu::decode(&mut r).unwrap();
4621        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4622        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
4623    }
4624
4625    #[tokio::test]
4626    async fn subscribe_cov_property_handles_simple_ack() {
4627        let (dl, state) = MockDataLink::new();
4628        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4629        let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
4630
4631        let mut apdu_buf = [0u8; 32];
4632        let mut w = Writer::new(&mut apdu_buf);
4633        SimpleAck {
4634            invoke_id: 1,
4635            service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
4636        }
4637        .encode(&mut w)
4638        .unwrap();
4639        state
4640            .recv
4641            .lock()
4642            .await
4643            .push_back((with_npdu(w.as_written()), addr));
4644
4645        client
4646            .subscribe_cov_property(
4647                addr,
4648                SubscribeCovPropertyRequest {
4649                    subscriber_process_id: 22,
4650                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
4651                    issue_confirmed_notifications: Some(true),
4652                    lifetime_seconds: Some(120),
4653                    monitored_property_id: PropertyId::PresentValue,
4654                    monitored_property_array_index: None,
4655                    cov_increment: Some(0.1),
4656                    invoke_id: 0,
4657                },
4658            )
4659            .await
4660            .unwrap();
4661
4662        let sent = state.sent.lock().await;
4663        assert_eq!(sent.len(), 1);
4664        let mut r = Reader::new(&sent[0].1);
4665        let _npdu = Npdu::decode(&mut r).unwrap();
4666        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4667        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
4668    }
4669
4670    #[tokio::test]
4671    async fn read_range_by_position_decodes_complex_ack() {
4672        let (dl, state) = MockDataLink::new();
4673        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4674        let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
4675        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4676
4677        let mut apdu_buf = [0u8; 256];
4678        let mut w = Writer::new(&mut apdu_buf);
4679        ComplexAckHeader {
4680            segmented: false,
4681            more_follows: false,
4682            invoke_id: 1,
4683            sequence_number: None,
4684            proposed_window_size: None,
4685            service_choice: SERVICE_READ_RANGE,
4686        }
4687        .encode(&mut w)
4688        .unwrap();
4689        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
4690        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4691        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
4692        w.write_u8(5).unwrap();
4693        w.write_u8(0b1110_0000).unwrap();
4694        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
4695        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
4696        encode_app_real(&mut w, 42.0).unwrap();
4697        encode_app_real(&mut w, 43.0).unwrap();
4698        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
4699
4700        state
4701            .recv
4702            .lock()
4703            .await
4704            .push_back((with_npdu(w.as_written()), addr));
4705
4706        let result = client
4707            .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
4708            .await
4709            .unwrap();
4710        assert_eq!(result.object_id, object_id);
4711        assert_eq!(result.item_count, 2);
4712        assert_eq!(result.items.len(), 2);
4713        assert!(matches!(
4714            result.items[0],
4715            ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
4716        ));
4717    }
4718
4719    #[tokio::test]
4720    async fn read_range_by_sequence_number_encodes_range_selector() {
4721        let (dl, state) = MockDataLink::new();
4722        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4723        let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
4724        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4725
4726        state
4727            .recv
4728            .lock()
4729            .await
4730            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4731
4732        let _ = client
4733            .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
4734            .await
4735            .unwrap();
4736
4737        let sent = state.sent.lock().await;
4738        let mut r = Reader::new(&sent[0].1);
4739        let _npdu = Npdu::decode(&mut r).unwrap();
4740        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4741        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4742        match Tag::decode(&mut r).unwrap() {
4743            Tag::Context { tag_num: 0, len: 4 } => {
4744                let _ = r.read_exact(4).unwrap();
4745            }
4746            other => panic!("unexpected object id tag: {other:?}"),
4747        }
4748        match Tag::decode(&mut r).unwrap() {
4749            Tag::Context { tag_num: 1, len } => {
4750                let _ = decode_unsigned(&mut r, len as usize).unwrap();
4751            }
4752            other => panic!("unexpected property tag: {other:?}"),
4753        }
4754        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
4755        match Tag::decode(&mut r).unwrap() {
4756            Tag::Application {
4757                tag: AppTag::UnsignedInt,
4758                len,
4759            } => {
4760                assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
4761            }
4762            other => panic!("unexpected ref seq tag: {other:?}"),
4763        }
4764        match Tag::decode(&mut r).unwrap() {
4765            Tag::Application {
4766                tag: AppTag::SignedInt,
4767                len,
4768            } => {
4769                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4770            }
4771            other => panic!("unexpected count tag: {other:?}"),
4772        }
4773        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
4774    }
4775
4776    #[tokio::test]
4777    async fn read_range_by_time_encodes_range_selector() {
4778        let (dl, state) = MockDataLink::new();
4779        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4780        let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
4781        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
4782        let date = Date {
4783            year_since_1900: 126,
4784            month: 2,
4785            day: 7,
4786            weekday: 6,
4787        };
4788        let time = Time {
4789            hour: 10,
4790            minute: 11,
4791            second: 12,
4792            hundredths: 13,
4793        };
4794
4795        state
4796            .recv
4797            .lock()
4798            .await
4799            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
4800
4801        let _ = client
4802            .read_range_by_time(
4803                addr,
4804                object_id,
4805                PropertyId::PresentValue,
4806                None,
4807                (date, time),
4808                2,
4809            )
4810            .await
4811            .unwrap();
4812
4813        let sent = state.sent.lock().await;
4814        let mut r = Reader::new(&sent[0].1);
4815        let _npdu = Npdu::decode(&mut r).unwrap();
4816        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4817        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
4818        match Tag::decode(&mut r).unwrap() {
4819            Tag::Context { tag_num: 0, len: 4 } => {
4820                let _ = r.read_exact(4).unwrap();
4821            }
4822            other => panic!("unexpected object id tag: {other:?}"),
4823        }
4824        match Tag::decode(&mut r).unwrap() {
4825            Tag::Context { tag_num: 1, len } => {
4826                let _ = decode_unsigned(&mut r, len as usize).unwrap();
4827            }
4828            other => panic!("unexpected property tag: {other:?}"),
4829        }
4830        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
4831        match Tag::decode(&mut r).unwrap() {
4832            Tag::Application {
4833                tag: AppTag::Date,
4834                len: 4,
4835            } => {
4836                let raw = r.read_exact(4).unwrap();
4837                assert_eq!(
4838                    raw,
4839                    &[date.year_since_1900, date.month, date.day, date.weekday]
4840                );
4841            }
4842            other => panic!("unexpected date tag: {other:?}"),
4843        }
4844        match Tag::decode(&mut r).unwrap() {
4845            Tag::Application {
4846                tag: AppTag::Time,
4847                len: 4,
4848            } => {
4849                let raw = r.read_exact(4).unwrap();
4850                assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
4851            }
4852            other => panic!("unexpected time tag: {other:?}"),
4853        }
4854        match Tag::decode(&mut r).unwrap() {
4855            Tag::Application {
4856                tag: AppTag::SignedInt,
4857                len,
4858            } => {
4859                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
4860            }
4861            other => panic!("unexpected count tag: {other:?}"),
4862        }
4863        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
4864    }
4865
4866    #[tokio::test]
4867    async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
4868        let (dl, state) = MockDataLink::new();
4869        let client = BacnetClient::with_datalink(dl);
4870        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
4871
4872        let mut apdu = [0u8; 256];
4873        let mut w = Writer::new(&mut apdu);
4874        UnconfirmedRequestHeader {
4875            service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
4876        }
4877        .encode(&mut w)
4878        .unwrap();
4879        encode_ctx_unsigned(&mut w, 0, 17).unwrap();
4880        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4881        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
4882        encode_ctx_unsigned(&mut w, 3, 60).unwrap();
4883        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4884        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4885        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4886        encode_app_real(&mut w, 73.25).unwrap();
4887        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4888        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4889
4890        state
4891            .recv
4892            .lock()
4893            .await
4894            .push_back((with_npdu(w.as_written()), addr));
4895
4896        let notification = client
4897            .recv_cov_notification(Duration::from_secs(1))
4898            .await
4899            .unwrap()
4900            .unwrap();
4901        assert!(!notification.confirmed);
4902        assert_eq!(notification.subscriber_process_id, 17);
4903        assert_eq!(notification.values.len(), 1);
4904        assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
4905        assert!(matches!(
4906            notification.values[0].value,
4907            ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
4908        ));
4909
4910        let sent = state.sent.lock().await;
4911        assert!(sent.is_empty());
4912    }
4913
4914    #[tokio::test]
4915    async fn recv_confirmed_cov_notification_sends_simple_ack() {
4916        let (dl, state) = MockDataLink::new();
4917        let client = BacnetClient::with_datalink(dl);
4918        let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
4919
4920        let mut apdu = [0u8; 256];
4921        let mut w = Writer::new(&mut apdu);
4922        ConfirmedRequestHeader {
4923            segmented: false,
4924            more_follows: false,
4925            segmented_response_accepted: false,
4926            max_segments: 0,
4927            max_apdu: 5,
4928            invoke_id: 9,
4929            sequence_number: None,
4930            proposed_window_size: None,
4931            service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
4932        }
4933        .encode(&mut w)
4934        .unwrap();
4935        encode_ctx_unsigned(&mut w, 0, 18).unwrap();
4936        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4937        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
4938        encode_ctx_unsigned(&mut w, 3, 120).unwrap();
4939        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
4940        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
4941        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
4942        encode_app_real(&mut w, 55.0).unwrap();
4943        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
4944        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
4945
4946        state
4947            .recv
4948            .lock()
4949            .await
4950            .push_back((with_npdu(w.as_written()), addr));
4951
4952        let notification = client
4953            .recv_cov_notification(Duration::from_secs(1))
4954            .await
4955            .unwrap()
4956            .unwrap();
4957        assert!(notification.confirmed);
4958        assert_eq!(notification.values.len(), 1);
4959
4960        let sent = state.sent.lock().await;
4961        assert_eq!(sent.len(), 1);
4962        let mut r = Reader::new(&sent[0].1);
4963        let _npdu = Npdu::decode(&mut r).unwrap();
4964        let ack = SimpleAck::decode(&mut r).unwrap();
4965        assert_eq!(ack.invoke_id, 9);
4966        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
4967    }
4968
4969    #[tokio::test]
4970    async fn recv_unconfirmed_event_notification_returns_decoded_value() {
4971        let (dl, state) = MockDataLink::new();
4972        let client = BacnetClient::with_datalink(dl);
4973        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4974
4975        let mut apdu = [0u8; 256];
4976        let mut w = Writer::new(&mut apdu);
4977        UnconfirmedRequestHeader {
4978            service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
4979        }
4980        .encode(&mut w)
4981        .unwrap();
4982        encode_ctx_unsigned(&mut w, 0, 99).unwrap();
4983        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4984        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
4985        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4986        encode_ctx_unsigned(&mut w, 1, 55).unwrap();
4987        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4988        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
4989        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
4990        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
4991        encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
4992        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
4993        Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
4994        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
4995        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
4996        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
4997        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
4998        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
4999        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5000        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5001
5002        state
5003            .recv
5004            .lock()
5005            .await
5006            .push_back((with_npdu(w.as_written()), addr));
5007
5008        let notification: EventNotification = client
5009            .recv_event_notification(Duration::from_secs(1))
5010            .await
5011            .unwrap()
5012            .unwrap();
5013        assert!(!notification.confirmed);
5014        assert_eq!(notification.process_id, 99);
5015        assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
5016        assert_eq!(notification.ack_required, Some(true));
5017        assert_eq!(notification.from_state, Some(EventState::Offnormal));
5018        assert_eq!(notification.to_state, Some(EventState::Normal));
5019        assert_eq!(notification.notify_type, 0);
5020
5021        let sent = state.sent.lock().await;
5022        assert!(sent.is_empty());
5023    }
5024
5025    #[tokio::test]
5026    async fn recv_confirmed_event_notification_sends_simple_ack() {
5027        let (dl, state) = MockDataLink::new();
5028        let client = BacnetClient::with_datalink(dl);
5029        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5030
5031        let mut apdu = [0u8; 256];
5032        let mut w = Writer::new(&mut apdu);
5033        ConfirmedRequestHeader {
5034            segmented: false,
5035            more_follows: false,
5036            segmented_response_accepted: false,
5037            max_segments: 0,
5038            max_apdu: 5,
5039            invoke_id: 11,
5040            sequence_number: None,
5041            proposed_window_size: None,
5042            service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
5043        }
5044        .encode(&mut w)
5045        .unwrap();
5046        encode_ctx_unsigned(&mut w, 0, 100).unwrap();
5047        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5048        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
5049        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5050        encode_ctx_unsigned(&mut w, 1, 56).unwrap();
5051        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5052        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
5053        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
5054        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
5055        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
5056        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
5057        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
5058        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
5059        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
5060        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
5061        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
5062        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
5063
5064        state
5065            .recv
5066            .lock()
5067            .await
5068            .push_back((with_npdu(w.as_written()), addr));
5069
5070        let notification = client
5071            .recv_event_notification(Duration::from_secs(1))
5072            .await
5073            .unwrap()
5074            .unwrap();
5075        assert!(notification.confirmed);
5076
5077        let sent = state.sent.lock().await;
5078        assert_eq!(sent.len(), 1);
5079        let mut r = Reader::new(&sent[0].1);
5080        let _npdu = Npdu::decode(&mut r).unwrap();
5081        let ack = SimpleAck::decode(&mut r).unwrap();
5082        assert_eq!(ack.invoke_id, 11);
5083        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
5084    }
5085
5086    #[tokio::test]
5087    async fn write_property_multiple_segments_large_request() {
5088        let (dl, state) = MockDataLink::new();
5089        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5090        let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
5091        let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
5092
5093        {
5094            let mut recv = state.recv.lock().await;
5095            for seq in 0u8..=254 {
5096                let mut apdu = [0u8; 16];
5097                let mut w = Writer::new(&mut apdu);
5098                SegmentAck {
5099                    negative_ack: false,
5100                    sent_by_server: true,
5101                    invoke_id: 1,
5102                    sequence_number: seq,
5103                    actual_window_size: 1,
5104                }
5105                .encode(&mut w)
5106                .unwrap();
5107                recv.push_back((with_npdu(w.as_written()), addr));
5108            }
5109
5110            let mut apdu = [0u8; 16];
5111            let mut w = Writer::new(&mut apdu);
5112            SimpleAck {
5113                invoke_id: 1,
5114                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5115            }
5116            .encode(&mut w)
5117            .unwrap();
5118            recv.push_back((with_npdu(w.as_written()), addr));
5119        }
5120
5121        let writes: Vec<PropertyWriteSpec> = (0..180)
5122            .map(|_| PropertyWriteSpec {
5123                property_id: PropertyId::Description,
5124                array_index: None,
5125                value: DataValue::CharacterString(
5126                    "rustbac segmented write test payload................................................................",
5127                ),
5128                priority: None,
5129            })
5130            .collect();
5131
5132        client
5133            .write_property_multiple(addr, object_id, &writes)
5134            .await
5135            .unwrap();
5136
5137        let sent = state.sent.lock().await;
5138        assert!(sent.len() > 1);
5139
5140        let mut seqs = Vec::new();
5141        let mut saw_more_follows = false;
5142        let mut saw_last = false;
5143        for (_, frame) in sent.iter() {
5144            let mut r = Reader::new(frame);
5145            let _npdu = Npdu::decode(&mut r).unwrap();
5146            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5147            assert!(hdr.segmented);
5148            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5149            if hdr.more_follows {
5150                saw_more_follows = true;
5151            } else {
5152                saw_last = true;
5153            }
5154            seqs.push(hdr.sequence_number.unwrap());
5155        }
5156
5157        assert!(saw_more_follows);
5158        assert!(saw_last);
5159        for (idx, seq) in seqs.iter().enumerate() {
5160            assert_eq!(*seq as usize, idx);
5161        }
5162    }
5163
5164    #[tokio::test]
5165    async fn write_property_multiple_uses_configured_segment_window() {
5166        let (dl, state) = MockDataLink::new();
5167        let client = BacnetClient::with_datalink(dl)
5168            .with_response_timeout(Duration::from_secs(1))
5169            .with_segmented_request_window_size(4);
5170        let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
5171        let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
5172
5173        {
5174            let mut recv = state.recv.lock().await;
5175            for seq in 0u8..=254 {
5176                let mut apdu = [0u8; 16];
5177                let mut w = Writer::new(&mut apdu);
5178                SegmentAck {
5179                    negative_ack: false,
5180                    sent_by_server: true,
5181                    invoke_id: 1,
5182                    sequence_number: seq,
5183                    actual_window_size: 4,
5184                }
5185                .encode(&mut w)
5186                .unwrap();
5187                recv.push_back((with_npdu(w.as_written()), addr));
5188            }
5189
5190            let mut apdu = [0u8; 16];
5191            let mut w = Writer::new(&mut apdu);
5192            SimpleAck {
5193                invoke_id: 1,
5194                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5195            }
5196            .encode(&mut w)
5197            .unwrap();
5198            recv.push_back((with_npdu(w.as_written()), addr));
5199        }
5200
5201        let writes: Vec<PropertyWriteSpec> = (0..180)
5202            .map(|_| PropertyWriteSpec {
5203                property_id: PropertyId::Description,
5204                array_index: None,
5205                value: DataValue::CharacterString(
5206                    "rustbac segmented write test payload................................................................",
5207                ),
5208                priority: None,
5209            })
5210            .collect();
5211
5212        client
5213            .write_property_multiple(addr, object_id, &writes)
5214            .await
5215            .unwrap();
5216
5217        let sent = state.sent.lock().await;
5218        assert!(sent.len() > 4);
5219        for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
5220            let mut r = Reader::new(frame);
5221            let _npdu = Npdu::decode(&mut r).unwrap();
5222            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5223            assert!(hdr.segmented);
5224            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
5225            assert_eq!(hdr.sequence_number, Some(idx as u8));
5226            assert_eq!(hdr.proposed_window_size, Some(4));
5227        }
5228    }
5229
5230    #[tokio::test]
5231    async fn write_property_multiple_adapts_window_to_peer_ack_window() {
5232        let (dl, state) = MockDataLink::new();
5233        let client = BacnetClient::with_datalink(dl)
5234            .with_response_timeout(Duration::from_secs(1))
5235            .with_segmented_request_window_size(4);
5236        let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
5237        let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
5238
5239        {
5240            let mut recv = state.recv.lock().await;
5241            for seq in 0u8..=254 {
5242                let mut apdu = [0u8; 16];
5243                let mut w = Writer::new(&mut apdu);
5244                SegmentAck {
5245                    negative_ack: false,
5246                    sent_by_server: true,
5247                    invoke_id: 1,
5248                    sequence_number: seq,
5249                    actual_window_size: 2,
5250                }
5251                .encode(&mut w)
5252                .unwrap();
5253                recv.push_back((with_npdu(w.as_written()), addr));
5254            }
5255
5256            let mut apdu = [0u8; 16];
5257            let mut w = Writer::new(&mut apdu);
5258            SimpleAck {
5259                invoke_id: 1,
5260                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5261            }
5262            .encode(&mut w)
5263            .unwrap();
5264            recv.push_back((with_npdu(w.as_written()), addr));
5265        }
5266
5267        let writes: Vec<PropertyWriteSpec> = (0..180)
5268            .map(|_| PropertyWriteSpec {
5269                property_id: PropertyId::Description,
5270                array_index: None,
5271                value: DataValue::CharacterString(
5272                    "rustbac segmented write test payload................................................................",
5273                ),
5274                priority: None,
5275            })
5276            .collect();
5277
5278        client
5279            .write_property_multiple(addr, object_id, &writes)
5280            .await
5281            .unwrap();
5282
5283        let sent = state.sent.lock().await;
5284        let mut saw_adapted_window = false;
5285        for (_, frame) in sent.iter() {
5286            let mut r = Reader::new(frame);
5287            let _npdu = Npdu::decode(&mut r).unwrap();
5288            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5289            if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
5290                saw_adapted_window = true;
5291                break;
5292            }
5293        }
5294        assert!(saw_adapted_window);
5295    }
5296
5297    #[tokio::test]
5298    async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
5299        let (dl, state) = MockDataLink::new();
5300        let client = BacnetClient::with_datalink(dl)
5301            .with_response_timeout(Duration::from_secs(1))
5302            .with_segmented_request_window_size(1)
5303            .with_segmented_request_retries(1);
5304        let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
5305        let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
5306
5307        {
5308            let mut recv = state.recv.lock().await;
5309
5310            let mut nack_apdu = [0u8; 16];
5311            let mut nack_w = Writer::new(&mut nack_apdu);
5312            SegmentAck {
5313                negative_ack: true,
5314                sent_by_server: true,
5315                invoke_id: 1,
5316                sequence_number: 0,
5317                actual_window_size: 1,
5318            }
5319            .encode(&mut nack_w)
5320            .unwrap();
5321            recv.push_back((with_npdu(nack_w.as_written()), addr));
5322
5323            for seq in 0u8..=254 {
5324                let mut apdu = [0u8; 16];
5325                let mut w = Writer::new(&mut apdu);
5326                SegmentAck {
5327                    negative_ack: false,
5328                    sent_by_server: true,
5329                    invoke_id: 1,
5330                    sequence_number: seq,
5331                    actual_window_size: 1,
5332                }
5333                .encode(&mut w)
5334                .unwrap();
5335                recv.push_back((with_npdu(w.as_written()), addr));
5336            }
5337
5338            let mut apdu = [0u8; 16];
5339            let mut w = Writer::new(&mut apdu);
5340            SimpleAck {
5341                invoke_id: 1,
5342                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
5343            }
5344            .encode(&mut w)
5345            .unwrap();
5346            recv.push_back((with_npdu(w.as_written()), addr));
5347        }
5348
5349        let writes: Vec<PropertyWriteSpec> = (0..180)
5350            .map(|_| PropertyWriteSpec {
5351                property_id: PropertyId::Description,
5352                array_index: None,
5353                value: DataValue::CharacterString(
5354                    "rustbac segmented write test payload................................................................",
5355                ),
5356                priority: None,
5357            })
5358            .collect();
5359
5360        client
5361            .write_property_multiple(addr, object_id, &writes)
5362            .await
5363            .unwrap();
5364
5365        let sent = state.sent.lock().await;
5366        let mut seq0_frames = 0usize;
5367        for (_, frame) in sent.iter() {
5368            let mut r = Reader::new(frame);
5369            let _npdu = Npdu::decode(&mut r).unwrap();
5370            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
5371            if hdr.sequence_number == Some(0) {
5372                seq0_frames += 1;
5373            }
5374        }
5375        assert!(seq0_frames >= 2);
5376    }
5377
5378    #[tokio::test]
5379    async fn read_property_ignores_invalid_frames_until_valid_response() {
5380        let (dl, state) = MockDataLink::new();
5381        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
5382        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
5383        let state_for_task = state.clone();
5384
5385        tokio::spawn(async move {
5386            tokio::time::sleep(Duration::from_millis(20)).await;
5387            let mut apdu = [0u8; 128];
5388            let mut w = Writer::new(&mut apdu);
5389            ComplexAckHeader {
5390                segmented: false,
5391                more_follows: false,
5392                invoke_id: 1,
5393                sequence_number: None,
5394                proposed_window_size: None,
5395                service_choice: SERVICE_READ_PROPERTY,
5396            }
5397            .encode(&mut w)
5398            .unwrap();
5399            encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
5400            encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
5401            Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
5402            encode_app_real(&mut w, 77.0).unwrap();
5403            Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
5404            state_for_task
5405                .recv
5406                .lock()
5407                .await
5408                .push_back((with_npdu(w.as_written()), addr));
5409        });
5410
5411        let value = client
5412            .read_property(
5413                addr,
5414                ObjectId::new(ObjectType::Device, 1),
5415                PropertyId::PresentValue,
5416            )
5417            .await
5418            .unwrap();
5419        assert!(matches!(
5420            value,
5421            ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
5422        ));
5423    }
5424
5425    #[tokio::test]
5426    async fn read_property_maps_reject() {
5427        let (dl, state) = MockDataLink::new();
5428        let client = BacnetClient::with_datalink(dl);
5429        let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
5430
5431        let mut apdu = [0u8; 8];
5432        let mut w = Writer::new(&mut apdu);
5433        w.write_u8((ApduType::Reject as u8) << 4).unwrap();
5434        w.write_u8(1).unwrap(); // invoke id
5435        w.write_u8(2).unwrap(); // reason
5436        state
5437            .recv
5438            .lock()
5439            .await
5440            .push_back((with_npdu(w.as_written()), addr));
5441
5442        let err = client
5443            .read_property(
5444                addr,
5445                ObjectId::new(ObjectType::Device, 1),
5446                PropertyId::ObjectName,
5447            )
5448            .await
5449            .unwrap_err();
5450        assert!(matches!(
5451            err,
5452            crate::ClientError::RemoteReject { reason: 2 }
5453        ));
5454    }
5455
5456    #[tokio::test]
5457    async fn read_property_maps_remote_error_details() {
5458        let (dl, state) = MockDataLink::new();
5459        let client = BacnetClient::with_datalink(dl);
5460        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
5461
5462        let mut apdu = [0u8; 16];
5463        let mut w = Writer::new(&mut apdu);
5464        w.write_u8((ApduType::Error as u8) << 4).unwrap();
5465        w.write_u8(1).unwrap(); // invoke id
5466        w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
5467            .unwrap();
5468        Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
5469        w.write_u8(2).unwrap(); // property class
5470        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
5471        w.write_u8(32).unwrap(); // unknownProperty
5472
5473        state
5474            .recv
5475            .lock()
5476            .await
5477            .push_back((with_npdu(w.as_written()), addr));
5478
5479        let err = client
5480            .read_property(
5481                addr,
5482                ObjectId::new(ObjectType::Device, 1),
5483                PropertyId::ObjectName,
5484            )
5485            .await
5486            .unwrap_err();
5487        assert!(matches!(
5488            err,
5489            crate::ClientError::RemoteServiceError {
5490                service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
5491                error_class_raw: Some(2),
5492                error_code_raw: Some(32),
5493                error_class: Some(rustbac_core::types::ErrorClass::Property),
5494                error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
5495            }
5496        ));
5497    }
5498
5499    #[tokio::test]
5500    async fn write_property_maps_abort() {
5501        let (dl, state) = MockDataLink::new();
5502        let client = BacnetClient::with_datalink(dl);
5503        let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
5504
5505        let mut apdu = [0u8; 8];
5506        let mut w = Writer::new(&mut apdu);
5507        w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); // server abort
5508        w.write_u8(1).unwrap(); // invoke id
5509        w.write_u8(9).unwrap(); // reason
5510        state
5511            .recv
5512            .lock()
5513            .await
5514            .push_back((with_npdu(w.as_written()), addr));
5515
5516        let req = rustbac_core::services::write_property::WritePropertyRequest {
5517            object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
5518            property_id: PropertyId::PresentValue,
5519            value: DataValue::Real(10.0),
5520            priority: Some(8),
5521            ..Default::default()
5522        };
5523        let err = client.write_property(addr, req).await.unwrap_err();
5524        assert!(matches!(
5525            err,
5526            crate::ClientError::RemoteAbort {
5527                reason: 9,
5528                server: true
5529            }
5530        ));
5531    }
5532
5533    #[tokio::test]
5534    async fn read_property_multiple_returns_owned_string() {
5535        let (dl, state) = MockDataLink::new();
5536        let client = BacnetClient::with_datalink(dl);
5537        let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
5538        let object_id = ObjectId::new(ObjectType::Device, 1);
5539
5540        let mut apdu_buf = [0u8; 256];
5541        let mut w = Writer::new(&mut apdu_buf);
5542        ComplexAckHeader {
5543            segmented: false,
5544            more_follows: false,
5545            invoke_id: 1,
5546            sequence_number: None,
5547            proposed_window_size: None,
5548            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
5549        }
5550        .encode(&mut w)
5551        .unwrap();
5552        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
5553        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
5554            .encode(&mut w)
5555            .unwrap();
5556        encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
5557        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
5558            .encode(&mut w)
5559            .unwrap();
5560        rustbac_core::services::value_codec::encode_application_data_value(
5561            &mut w,
5562            &DataValue::CharacterString("AHU-1"),
5563        )
5564        .unwrap();
5565        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
5566            .encode(&mut w)
5567            .unwrap();
5568        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
5569            .encode(&mut w)
5570            .unwrap();
5571
5572        state
5573            .recv
5574            .lock()
5575            .await
5576            .push_back((with_npdu(w.as_written()), addr));
5577
5578        let values = client
5579            .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
5580            .await
5581            .unwrap();
5582        assert_eq!(values.len(), 1);
5583        assert_eq!(values[0].0, PropertyId::ObjectName);
5584        assert!(matches!(
5585            &values[0].1,
5586            ClientDataValue::CharacterString(s) if s == "AHU-1"
5587        ));
5588    }
5589
5590    #[tokio::test]
5591    async fn new_sc_rejects_invalid_endpoint() {
5592        let err = BacnetClient::new_sc("not a url").await.unwrap_err();
5593        assert!(matches!(err, crate::ClientError::DataLink(_)));
5594    }
5595}